Disruptor系列(⼆)使⽤场景
Disruptor 系列(⼆)使⽤场景
今天⽤⼀个订单问题来加深对 Disruptor 的理解。当系统中有订单产⽣时,系统⾸先会记录订单信息。同时也会发送消息到其他系统处理相关业务,最后才是订单的处理。 代码包含以下内容:
1)事件对象 Event
3)⼀个⽣产者 Producer
4)执⾏ Main ⽅法
⼀、订单处理系统代码
(1) Event
public class Trade {
private String id;//ID
private String name;
private double price;//⾦额
private AtomicInteger count = new AtomicInteger(0);
// 省略getter/setter
}
(2) Handler 类
⼀个负责存储订单信息,⼀个负责发送 kafka 信息到其他系统中,最后⼀个负责处理订单信息。
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
/**
* 第⼀个 Handler1,存储到数据库中
*/
public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
}
声音定位
@Override
public void onEvent(Trade event) throws Exception {
long threadId = Thread.currentThread().getId(); // 获取当前线程id
String id = Id(); // 获取订单号
System.out.println(String.format("%s:Thread Id %s 订单信息保存 %s 到数据库中 ....",
}
}
import com.lmax.disruptor.EventHandler;
/**
* 第⼆个 Handler2,订单信息发送到其它系统中
*/
public class Handler2 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
long threadId = Thread.currentThread().getId(); // 获取当前线程id
String id = Id(); // 获取订单号
System.out.println(String.format("%s:Thread Id %s 订单信息 %s 发送到 karaf 系统中 ....",
}
}
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
/**
* 第三个 Handler2,处理订单信息
*/
public class Handler3 implements EventHandler<Trade>, WorkHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { onEvent(event);
}
@Override
public void onEvent(Trade event) throws Exception {
long threadId = Thread.currentThread().getId(); // 获取当前线程id
String id = Id(); // 获取订单号
System.out.println(String.format("%s:Thread Id %s 订单信息 %s 处理中 ....",
}
}
(3) Producer 类
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.UUID;
import urrent.CountDownLatch;
public class TradePublisher implements Runnable {
Disruptor<Trade> disruptor;
private CountDownLatch latch;
private static int LOOP = 1; // 模拟百万次交易的发⽣
public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
this.disruptor=disruptor;
this.latch=latch;
}
@Override
public void run() {
TradeEventTranslator tradeTransloator = new TradeEventTranslator();
for(int i = 0; i < LOOP; i++) {
disruptor.publishEvent(tradeTransloator);
}
gps信号转发器
}
}
class TradeEventTranslator implements EventTranslator<Trade>{
@Override
public void translateTo(Trade event, long sequence) {
event.setId(UUID.randomUUID().toString());
}
}
(4) 执⾏的 Main ⽅法
package com.github.binarylei.disruptor.demo3;
闪蒸温度import urrent.CountDownLatch;
import urrent.ExecutorService;
import urrent.Executors;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
public static void main(String[] args) throws InterruptedException {
long beginTime=System.currentTimeMillis();
int bufferSize=1024;
ExecutorService wFixedThreadPool(8);
Disruptor<Trade> disruptor = new Disruptor<>(new EventFactory<Trade>() {
@Override
public Trade newInstance() {
return new Trade();
}
}, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
//菱形操作
//使⽤disruptor创建消费者组C1,C2
EventHandlerGroup<Trade> handlerGroup =
disruptor.handleEventsWith(new Handler1(), new Handler2());
//声明在C1,C2完事之后执⾏JMS消息发送操作也就是流程⾛到C3
告警系统
handlerGroup.then(new Handler3());
disruptor.start();//启动
CountDownLatch latch=new CountDownLatch(1);
//⽣产者准备
executor.submit(new TradePublisher(latch, disruptor));
latch.await();//等待⽣产者完事.
disruptor.shutdown();
executor.shutdown();
System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
}
}
测试结果如下:
Handler1:Thread Id 10 订单信息保存 a097c77d-08f1-430a-8342-2143963f268f 到数据库中 ....
Handler2:Thread Id 11 订单信息 a097c77d-08f1-430a-8342-2143963f268f 发送到 karaf 系统中 ....
Handler3:Thread Id 13 订单信息 a097c77d-08f1-430a-8342-2143963f268f 处理中 ....
总耗时:1631
可以看到 Handler3 在 Handler1 和 Handler2 执⾏完成后才执⾏。
⼆、Disruptor DSL
虽然 disruptor 模式使⽤起来很简单,但是建⽴多个消费者以及它们之间的依赖关系需要的样板代码太多了。为了能快速⼜简单适⽤于99%的场景,我为 Disruptor 模式准备了⼀个简单的领域特定语⾔(DSL),定义了消费顺序。
在讲解 Disruptor DSL 之前先看⼀下多个消费者不重复消费的问题。
2.1 多个消费者不重复消费
默认⼀个消费者⼀个线程,如果想要实现 C3 多个消费者共同不重复消费数据,可以使⽤
handlerGroup.thenHandleEventsWithWorkerPool(customers)
//使⽤disruptor创建消费者组C1, C2
EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
// 多个消费者不重复消费
Handler3[] customers = new Handler3[]{new Handler3(), new Handler3(), new Handler3()};
handlerGroup.thenHandleEventsWithWorkerPool(customers);
2.2 消费者的“四边形模式”
在这种情况下,只要⽣产者(P1)将元素放到ring buffer上,消费者C1和C2就可以并⾏处理这些元素。但是消费者C3必须⼀直等到C1和C2处理完之后,才可以处理。在现实世界中的对应的案例就像:
在处理实际的业务逻辑(C3)之前,需要校验数据(C1),以及将数据写⼊磁盘(C2)。
//1. 使⽤disruptor创建消费者组C1,C2
EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
//2. 声明在C1,C2完事之后执⾏JMS消息发送操作也就是流程⾛到C3
handlerGroup.then(new Handler3());
2.3 消费者的“顺序执⾏模式”
disruptor.handleEventsWith(new Handler1()).
handleEventsWith(new Handler2()).
handleEventsWith(new Handler3());
电梯应急装置2.4 消费者的“六边形模式”
我们甚⾄可以在⼀个更复杂的六边形模式中构建⼀个并⾏消费者链:
srteHandler1 h1 = new Handler1();
Handler2 h2 = new Handler2();
Handler3 h3 = new Handler3();
Handler4 h4 = new Handler4();
Handler5 h5 = new Handler5();
disruptor.handleEventsWith(h1, h2);
disruptor.after(h1).handleEventsWith(h4);
disruptor.after(h2).handleEventsWith(h5);
disruptor.after(h4, h5).handleEventsWith(h3);
每天⽤⼼记录⼀点点。内容也许不重要,但习惯很重要!