多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析

多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析JMHDisruptor

大家好,欢迎来到IT知识分享网。

一,JMH

Java Microbenchmark Harness,是Java用来做基准测试的工具,测试结果可信度高,用于测试某个方法的性能。13年首发,由JIT开发人员开发,后归于OpenJDK。

官网:OpenJDK: jmh

多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析

官方样例: code-tools/jmh: 2be2df7dbaf8 /jmh-samples/src/main/java/org/openjdk/jmh/samples/

(一)创建JMH测试

需要准确的效果,建议是用命令行的方式,并独一代码单独打包进行测试。

如果想边开发边测试(我是懒得重弄项目了),可以使用JMH的插件。

1,idea安装JMH插件   idea18  版本,装的这个

多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析

2,MAVEN

 <!--jmh-->
        <!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-core -->
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-core</artifactId>
            <version>1.21</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-generator-annprocess -->
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-generator-annprocess</artifactId>
            <version>1.21</version>
            <scope>test</scope>
        </dependency>

3,由于用到了注解,打开运行程序注解配置

compiler -> Annotation Processors -> Enable Annotation Processing

多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析

5,测试类

package com.example.demo.jmh;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class PS {

    static List<Integer> nums = new ArrayList<>();
    static {
        Random r = new Random();
        for (int i = 0; i < 10000; i++) nums.add(1000000 + r.nextInt(1000000));
    }

    static void foreach() {
        nums.forEach(v->isPrime(v));
    }

    static void parallel() {
        nums.parallelStream().forEach(PS::isPrime);
    }

    static boolean isPrime(int num) {
        for(int i=2; i<=num/2; i++) {
            if(num % i == 0) return false;
        }
        return true;
    }
}

6,单元测试

package com.example.demo.jmh;

import org.openjdk.jmh.annotations.Benchmark;

public class PSTest {
    @Benchmark
    public void testForEach() {
        PS.foreach();
    }
}

7,运行测试类,如果遇到下面的错误:

 ERROR: org.openjdk.jmh.runner.RunnerException: ERROR: Exception while trying to acquire the JMH lock (C:\WINDOWS\/jmh.lock): C:\WINDOWS\jmh.lock (拒绝访问。), exiting. Use -Djmh.ignoreLock=true to forcefully continue.
     at org.openjdk.jmh.runner.Runner.run(Runner.java:216)
     at org.openjdk.jmh.Main.main(Main.java:71)

多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析

这个错误是因为JMH运行需要访问系统的TMP目录,解决办法是:

打开RunConfiguration -> Environment Variables -> include system environment viables

多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析

 多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析

多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析 多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析

8,阅读测试报告

"C:\Program Files\Java\jdk1.8.0_171\bin\java.exe" -Dfile.encoding=UTF-8 -classpath "D:\Program Files\JetBrains\idea18\work\learnRecords\target\test-classes;D:\Program Files\JetBrains\idea18\work\learnRecords\target\classes;C:\Users\jinyh\.m2\repository\org\springframework\boot\spring-boot-starter\2.5.3\spring-boot-starter-2.5.3.jar;C:\Users\jinyh\.m2\repository\org\springframework\boot\spring-boot\2.5.3\spring-boot-2.5.3.jar;C:\Users\jinyh\.m2\repository\org\springframework\spring-context\5.3.9\spring-context-5.3.9.jar;C:\Users\jinyh\.m2\repository\org\springframework\spring-aop\5.3.9\spring-aop-5.3.9.jar;C:\Users\jinyh\.m2\repository\org\springframework\spring-beans\5.3.9\spring-beans-5.3.9.jar;C:\Users\jinyh\.m2\repository\org\springframework\spring-expression\5.3.9\spring-expression-5.3.9.jar;C:\Users\jinyh\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.5.3\spring-boot-autoconfigure-2.5.3.jar;C:\Users\jinyh\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.5.3\spring-boot-starter-logging-2.5.3.jar;C:\Users\jinyh\.m2\repository\ch\qos\logback\logback-classic\1.2.4\logback-classic-1.2.4.jar;C:\Users\jinyh\.m2\repository\ch\qos\logback\logback-core\1.2.4\logback-core-1.2.4.jar;C:\Users\jinyh\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.14.1\log4j-to-slf4j-2.14.1.jar;C:\Users\jinyh\.m2\repository\org\apache\logging\log4j\log4j-api\2.14.1\log4j-api-2.14.1.jar;C:\Users\jinyh\.m2\repository\org\slf4j\jul-to-slf4j\1.7.32\jul-to-slf4j-1.7.32.jar;C:\Users\jinyh\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\jinyh\.m2\repository\org\springframework\spring-core\5.3.9\spring-core-5.3.9.jar;C:\Users\jinyh\.m2\repository\org\springframework\spring-jcl\5.3.9\spring-jcl-5.3.9.jar;C:\Users\jinyh\.m2\repository\org\yaml\snakeyaml\1.28\snakeyaml-1.28.jar;C:\Users\jinyh\.m2\repository\org\springframework\boot\spring-boot-starter-test\2.5.3\spring-boot-starter-test-2.5.3.jar;C:\Users\jinyh\.m2\repository\org\springframework\boot\spring-boot-test\2.5.3\spring-boot-test-2.5.3.jar;C:\Users\jinyh\.m2\repository\org\springframework\boot\spring-boot-test-autoconfigure\2.5.3\spring-boot-test-autoconfigure-2.5.3.jar;C:\Users\jinyh\.m2\repository\com\jayway\jsonpath\json-path\2.5.0\json-path-2.5.0.jar;C:\Users\jinyh\.m2\repository\net\minidev\json-smart\2.4.7\json-smart-2.4.7.jar;C:\Users\jinyh\.m2\repository\net\minidev\accessors-smart\2.4.7\accessors-smart-2.4.7.jar;C:\Users\jinyh\.m2\repository\org\ow2\asm\asm\9.1\asm-9.1.jar;C:\Users\jinyh\.m2\repository\jakarta\xml\bind\jakarta.xml.bind-api\2.3.3\jakarta.xml.bind-api-2.3.3.jar;C:\Users\jinyh\.m2\repository\jakarta\activation\jakarta.activation-api\1.2.2\jakarta.activation-api-1.2.2.jar;C:\Users\jinyh\.m2\repository\org\assertj\assertj-core\3.19.0\assertj-core-3.19.0.jar;C:\Users\jinyh\.m2\repository\org\hamcrest\hamcrest\2.2\hamcrest-2.2.jar;C:\Users\jinyh\.m2\repository\org\junit\jupiter\junit-jupiter\5.7.2\junit-jupiter-5.7.2.jar;C:\Users\jinyh\.m2\repository\org\junit\jupiter\junit-jupiter-api\5.7.2\junit-jupiter-api-5.7.2.jar;C:\Users\jinyh\.m2\repository\org\apiguardian\apiguardian-api\1.1.0\apiguardian-api-1.1.0.jar;C:\Users\jinyh\.m2\repository\org\opentest4j\opentest4j\1.2.0\opentest4j-1.2.0.jar;C:\Users\jinyh\.m2\repository\org\junit\platform\junit-platform-commons\1.7.2\junit-platform-commons-1.7.2.jar;C:\Users\jinyh\.m2\repository\org\junit\jupiter\junit-jupiter-params\5.7.2\junit-jupiter-params-5.7.2.jar;C:\Users\jinyh\.m2\repository\org\junit\jupiter\junit-jupiter-engine\5.7.2\junit-jupiter-engine-5.7.2.jar;C:\Users\jinyh\.m2\repository\org\junit\platform\junit-platform-engine\1.7.2\junit-platform-engine-1.7.2.jar;C:\Users\jinyh\.m2\repository\org\mockito\mockito-core\3.9.0\mockito-core-3.9.0.jar;C:\Users\jinyh\.m2\repository\net\bytebuddy\byte-buddy\1.10.22\byte-buddy-1.10.22.jar;C:\Users\jinyh\.m2\repository\net\bytebuddy\byte-buddy-agent\1.10.22\byte-buddy-agent-1.10.22.jar;C:\Users\jinyh\.m2\repository\org\objenesis\objenesis\3.2\objenesis-3.2.jar;C:\Users\jinyh\.m2\repository\org\mockito\mockito-junit-jupiter\3.9.0\mockito-junit-jupiter-3.9.0.jar;C:\Users\jinyh\.m2\repository\org\skyscreamer\jsonassert\1.5.0\jsonassert-1.5.0.jar;C:\Users\jinyh\.m2\repository\com\vaadin\external\google\android-json\0.0.20131108.vaadin1\android-json-0.0.20131108.vaadin1.jar;C:\Users\jinyh\.m2\repository\org\springframework\spring-test\5.3.9\spring-test-5.3.9.jar;C:\Users\jinyh\.m2\repository\org\xmlunit\xmlunit-core\2.8.2\xmlunit-core-2.8.2.jar;C:\Users\jinyh\.m2\repository\net\sourceforge\jexcelapi\jxl\2.6.12\jxl-2.6.12.jar;C:\Users\jinyh\.m2\repository\log4j\log4j\1.2.14\log4j-1.2.14.jar;C:\Users\jinyh\.m2\repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;C:\Users\jinyh\.m2\repository\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;C:\Users\jinyh\.m2\repository\org\slf4j\slf4j-log4j12\1.7.32\slf4j-log4j12-1.7.32.jar;C:\Users\jinyh\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\jinyh\.m2\repository\junit\junit\4.13.2\junit-4.13.2.jar;C:\Users\jinyh\.m2\repository\org\hamcrest\hamcrest-core\2.2\hamcrest-core-2.2.jar;C:\Users\jinyh\.m2\repository\io\netty\netty\3.7.0.Final\netty-3.7.0.Final.jar;C:\Users\jinyh\.m2\repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;C:\Users\jinyh\.m2\repository\org\openjdk\jmh\jmh-core\1.21\jmh-core-1.21.jar;C:\Users\jinyh\.m2\repository\net\sf\jopt-simple\jopt-simple\4.6\jopt-simple-4.6.jar;C:\Users\jinyh\.m2\repository\org\apache\commons\commons-math3\3.2\commons-math3-3.2.jar;C:\Users\jinyh\.m2\repository\org\openjdk\jmh\jmh-generator-annprocess\1.21\jmh-generator-annprocess-1.21.jar" org.openjdk.jmh.Main com.example.demo.jmh.PSTest.testForEach
# JMH version: 1.21
# VM version: JDK 1.8.0_171, Java HotSpot(TM) 64-Bit Server VM, 25.171-b11
# VM invoker: C:\Program Files\Java\jdk1.8.0_171\jre\bin\java.exe
# VM options: -Dfile.encoding=UTF-8
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: com.example.demo.jmh.PSTest.testForEach

# Run progress: 0.00% complete, ETA 00:08:20
# Fork: 1 of 5
# Warmup Iteration   1: 0.754 ops/s
# Warmup Iteration   2: 0.751 ops/s
# Warmup Iteration   3: 0.759 ops/s
# Warmup Iteration   4: 0.762 ops/s
# Warmup Iteration   5: 0.756 ops/s
Iteration   1: 0.754 ops/s
Iteration   2: 0.758 ops/s
Iteration   3: 0.760 ops/s
Iteration   4: 0.757 ops/s
Iteration   5: 0.751 ops/s

# Run progress: 20.00% complete, ETA 00:07:06
# Fork: 2 of 5
# Warmup Iteration   1: 0.761 ops/s
# Warmup Iteration   2: 0.773 ops/s
# Warmup Iteration   3: 0.764 ops/s
# Warmup Iteration   4: 0.768 ops/s
# Warmup Iteration   5: 0.772 ops/s
Iteration   1: 0.775 ops/s
Iteration   2: 0.773 ops/s
Iteration   3: 0.760 ops/s
Iteration   4: 0.737 ops/s
Iteration   5: 0.748 ops/s

# Run progress: 40.00% complete, ETA 00:05:18
# Fork: 3 of 5
# Warmup Iteration   1: 0.765 ops/s
# Warmup Iteration   2: 0.705 ops/s
# Warmup Iteration   3: 0.723 ops/s
# Warmup Iteration   4: 0.763 ops/s
# Warmup Iteration   5: 0.753 ops/s
Iteration   1: 0.751 ops/s
Iteration   2: 0.752 ops/s
Iteration   3: 0.759 ops/s
Iteration   4: 0.753 ops/s
Iteration   5: 0.761 ops/s

# Run progress: 60.00% complete, ETA 00:03:33
# Fork: 4 of 5
# Warmup Iteration   1: 0.748 ops/s
# Warmup Iteration   2: 0.743 ops/s
# Warmup Iteration   3: 0.746 ops/s
# Warmup Iteration   4: 0.742 ops/s
# Warmup Iteration   5: 0.739 ops/s
Iteration   1: 0.744 ops/s
Iteration   2: 0.734 ops/s
Iteration   3: 0.739 ops/s
Iteration   4: 0.729 ops/s
Iteration   5: 0.749 ops/s

# Run progress: 80.00% complete, ETA 00:01:47
# Fork: 5 of 5
# Warmup Iteration   1: 0.757 ops/s
# Warmup Iteration   2: 0.737 ops/s
# Warmup Iteration   3: 0.726 ops/s
# Warmup Iteration   4: 0.730 ops/s
# Warmup Iteration   5: 0.717 ops/s
Iteration   1: 0.720 ops/s
Iteration   2: 0.721 ops/s
Iteration   3: 0.737 ops/s
Iteration   4: 0.747 ops/s
Iteration   5: 0.714 ops/s


Result "com.example.demo.jmh.PSTest.testForEach":
  0.747 ±(99.9%) 0.012 ops/s [Average]
  (min, avg, max) = (0.714, 0.747, 0.775), stdev = 0.016
  CI (99.9%): [0.736, 0.759] (assumes normal distribution)


# Run complete. Total time: 00:08:58

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark            Mode  Cnt  Score   Error  Units
PSTest.testForEach  thrpt   25  0.747 ± 0.012  ops/s

Process finished with exit code 0

(二)JMH中的基本概念

  1. Warmup 预热,由于JVM中对于特定代码会存在优化(本地化),预热对于测试结果很重要

  2. Mesurement 总共执行多少次测试

  3. Timeout

  4. Threads 线程数,由fork指定

  5. Benchmark mode 基准测试的模式

  6. Benchmark 测试哪一段代码

二,Disruptor

(一)介绍

主页:LMAX Disruptor

源码:https://github.com/LMAX-Exchange/disruptor

GettingStarted: Getting Started · LMAX-Exchange/disruptor Wiki · GitHub

api: http://lmax-exchange.github.io/disruptor/docs/index.html

maven: https://mvnrepository.com/artifact/com.lmax/disruptor

一个线程中每秒处理600万订单,是金融机构开发的用于交易的。

2011年获得Duke奖项

单机速度最快的MQ

性能极高,无锁CAS,单机支持高并发

内存中用于存储的高效队列。

(二)Disruptor的特点

对比ConcurrentLinkedQueue : 链表实现

JDK中没有ConcurrentArrayQueue ,是因为数组是固定长度的,那么每次扩展都需要复制+1,效率低

多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析

Disruptor是数组实现的

数组是实现的ConcurrentArrayQueue 且头尾相连,

为什么效率比链表快,第一,链表没有数组遍历快;第二,链表需要维护头尾两个指针,数组只需维护一个sequence序列(下个元素的位置);

怎么计算加在环形数组的位置? 添加元素的第几个元素  /  数组容量   求余 

多线程与高并发(六) 单机压测工具JMH,Disruptor原理解析

无锁,高并发,使用环形Buffer,直接覆盖(不用清除)旧的数据,降低GC频率

实现了基于事件的生产者消费者模式(观察者模式)

(三)RingBuffer

环形队列

RingBuffer的序号,指向下一个可用的元素

采用数组实现,没有首尾指针

对比ConcurrentLinkedQueue,用数组实现的速度更快

假如长度为8,当添加到第12个元素的时候在哪个序号上呢?用12%8决定

当Buffer被填满的时候到底是覆盖还是等待,由Producer决定

长度设为2的n次幂,利于二进制计算,例如:12%8 = 12 & (8 – 1) pos = num & (size -1)

01:14“09

(四)Disruptor开发步骤

  1. 定义Event – 队列中需要处理的元素

  2. 定义Event工厂,用于填充队列

    这里牵扯到效率问题:disruptor初始化的时候,会调用Event工厂,对ringBuffer进行内存的提前分配

    GC产频率会降低

  3. 定义EventHandler(消费者),处理容器中的元素

1,事件发布模板

 long sequence = ringBuffer.next();  // Grab the next sequence
 try {
     LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
     // for the sequence
     event.set(8888L);  // Fill with data
 } finally {
     ringBuffer.publish(sequence);
 }

2,使用EventTranslator发布事件

 //===============================================================
         EventTranslator<LongEvent> translator1 = new EventTranslator<LongEvent>() {
             @Override
             public void translateTo(LongEvent event, long sequence) {
                 event.set(8888L);
             }
         };
 ​
         ringBuffer.publishEvent(translator1);
 ​
         //===============================================================
         EventTranslatorOneArg<LongEvent, Long> translator2 = new EventTranslatorOneArg<LongEvent, Long>() {
             @Override
             public void translateTo(LongEvent event, long sequence, Long l) {
                 event.set(l);
             }
         };
 ​
         ringBuffer.publishEvent(translator2, 7777L);
 ​
         //===============================================================
         EventTranslatorTwoArg<LongEvent, Long, Long> translator3 = new EventTranslatorTwoArg<LongEvent, Long, Long>() {
             @Override
             public void translateTo(LongEvent event, long sequence, Long l1, Long l2) {
                 event.set(l1 + l2);
             }
         };
 ​
         ringBuffer.publishEvent(translator3, 10000L, 10000L);
 ​
         //===============================================================
         EventTranslatorThreeArg<LongEvent, Long, Long, Long> translator4 = new EventTranslatorThreeArg<LongEvent, Long, Long, Long>() {
             @Override
             public void translateTo(LongEvent event, long sequence, Long l1, Long l2, Long l3) {
                 event.set(l1 + l2 + l3);
             }
         };
 ​
         ringBuffer.publishEvent(translator4, 10000L, 10000L, 1000L);
 ​
         //===============================================================
         EventTranslatorVararg<LongEvent> translator5 = new EventTranslatorVararg<LongEvent>() {
 ​
             @Override
             public void translateTo(LongEvent event, long sequence, Object... objects) {
                 long result = 0;
                 for(Object o : objects) {
                     long l = (Long)o;
                     result += l;
                 }
                 event.set(result);
             }
         };
 ​
         ringBuffer.publishEvent(translator5, 10000L, 10000L, 10000L, 10000L);

3,使用Lamda表达式

 package com.mashibing.disruptor;
 ​
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.util.DaemonThreadFactory;
 ​
 public class Main03
 {
     public static void main(String[] args) throws Exception
     {
         // Specify the size of the ring buffer, must be power of 2.
         int bufferSize = 1024;
 ​
         // Construct the Disruptor
         Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
 ​
         // Connect the handler
         disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
 ​
         // Start the Disruptor, starts all threads running
         disruptor.start();
 ​
         // Get the ring buffer from the Disruptor to be used for publishing.
         RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
 ​
 ​
         ringBuffer.publishEvent((event, sequence) -> event.set(10000L));
 ​
         System.in.read();
     }
 }

4,ProducerType生产者线程模式

ProducerType有两种模式 Producer.MULTI和Producer.SINGLE

默认是MULTI,表示在多线程模式下产生sequence

如果确认是单线程生产者,那么可以指定SINGLE,效率会提升

如果是多个生产者(多线程),但模式指定为SINGLE,会出什么问题呢?

5,等待策略

如果来不及消费,会被覆盖怎么办?这就引出了等待策略

1,(常用)BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。

2,BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu

3,LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.

4,LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。

5,PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用哪种等待策略

6,TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常

7,(常用)YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu

8,(常用)SleepingWaitStrategy : sleep

6,消费者异常处理

默认:disruptor.setDefaultExceptionHandler()

覆盖:disruptor.handleExceptionFor().with()

7,依赖处理

    <!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/24135.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信