一、并发框架disruptor介绍
1、概念:同一个jvm进程中线程间异步通信的框架
2、环形数组RingBuffer:disruptor的核心存储容器
2.1、环形数组中的元素采用覆盖方式,避免了jvm的GC 2.2、数组的大小为2的n次方,这样元素定位可以通过位运算效率会更高,其实和hashmap的index运算一样,不一样的是hashmap会扩容,而这个RingBuffer不扩容而去覆盖原来的数据
3、SequenceBarrier:
是起屏障作用的类,因为在往RingBuffer放的过程中,生产者和消费者的存取速度不一致会造成错误。这时用SequenceBarrier可以来限制过快的存或者取,来达到速度的一致,保证不出错。原理是每次消费者取的时候会把取到的数据的位置返给生产者,生产者通过这个位置来判断什么时候往RingBuffer中放数据
4、工作流程:
生产者往RingBuffer中放数据,disruptor把数据推给消费者
5、工作模式:
统一消费、分组消费、顺序消费、多支线顺序消费
详细介绍: https://blog.csdn.net/zhouzhenyong/article/details/81303011
二、springboot整合disruptor
1、消息体
package com.example.demo.disruptor;
import lombok.Data;
@Data
public class MessageModel {
private String message;
}
2、事件工厂
package com.example.demo.disruptor;
import com.lmax.disruptor.EventFactory;
public class MessageEventFactory implements EventFactory<MessageModel> {
@Override
public MessageModel newInstance() {
return new MessageModel();
}
}
3、消费者
package com.example.demo.disruptor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MsgConsumer implements EventHandler<MessageModel> , WorkHandler<MessageModel> {
private String name;
public MsgConsumer(String name){
this.name = name;
}
@Override
public void onEvent(MessageModel msgEvent, long l, boolean b) throws Exception {
try {
log.info("消费者"+name+"处理消息开始");
if (msgEvent != null) {
log.info("消费者"+name+"消费的信息是:{}",msgEvent.getMessage());
}
} catch (Exception e) {
log.info("消费者"+name+"处理消息失败");
}
log.info("消费者"+name+"处理消息结束");
}
@Override
public void onEvent(MessageModel messageModel) throws Exception {
try {
Thread.sleep(1000);
if (messageModel != null) {
log.info("消费者"+name+"消费的信息是:{}",messageModel.getMessage());
}
} catch (Exception e) {
log.info("消费者"+name+"处理消息失败");
}
log.info("消费者"+name+"处理消息结束");
}
}
4、定义RingBuffer
package com.example.demo.disruptor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@Configuration
public class MQManager {
@Bean()
public RingBuffer<MessageModel> messageModelRingBuffer() {
ThreadFactory executor = Executors.defaultThreadFactory();
MessageEventFactory factory = new MessageEventFactory();
int bufferSize = 1024 * 256;
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
ProducerType.SINGLE, new BlockingWaitStrategy());
MsgConsumer msg1 = new MsgConsumer("1");
MsgConsumer msg2 = new MsgConsumer("2");
MsgConsumer msg3 = new MsgConsumer("3");
MsgConsumer msg4 = new MsgConsumer("4");
MsgConsumer msg5 = new MsgConsumer("5");
disruptor.handleEventsWith(msg1, msg3);
disruptor.handleEventsWith(msg2, msg4);
disruptor.after(msg3, msg4).handleEventsWith(msg5);
disruptor.start();
RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}
}
5、生产者
package com.example.demo.disruptor;
import com.lmax.disruptor.RingBuffer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MsgProducer {
@Autowired
private RingBuffer<MessageModel> messageModelRingBuffer;
public void send(String message) {
long sequence = messageModelRingBuffer.next();
try {
MessageModel event = messageModelRingBuffer.get(sequence);
event.setMessage(message);
log.info("往消息队列中添加消息:{}", event);
} catch (Exception e) {
log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
} finally {
messageModelRingBuffer.publish(sequence);
}
}
}
6、测试
package com.example.demo.disruptor;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("disruptor")
public class Test {
@Autowired
private MsgProducer msgProducer;
@PostMapping("/test")
public String test() {
msgProducer.send("test");
return "ok";
}
}
|