前言
最近做项目,有许多业务需要处理,放到了kafka中,为了提高消费kafka效率,引入了线程池,不同的业务处理使用不同的线程池。其他暂且不论,直接上配置。
1.注解使用
提前说明下使用地方,有个印象,后需还会讲到。
- @EnableAsync 这里会配置在2个地方:启动类和线程池配置类
- @Async 可以写在类上或者方法上
- @Component 注册类到ioc
- @ConfigurationProperties 读取yml配置
- @Autowired
- @Qualifier
2.yml配置
配置线程池基本信息
async-pool:
consumer-kafka:
corePoolSize: 2
maxPoolSize: 4
keepAliveSeconds: 120
queueCapacity: 10
normal-collection:
corePoolSize: 2
maxPoolSize: 4
keepAliveSeconds: 120
queueCapacity: 10
fail-file:
corePoolSize: 2
maxPoolSize: 4
keepAliveSeconds: 120
queueCapacity: 10
3.获取yml配置信息
创建父类抽象类 AbstractExecutorPool 分别创建子类继承父类 ConsumerKafkaPool 、NormalCollectionPool、FailFilePool
@Data
public abstract class AbstractExecutorPool {
private int corePoolSize;
private int maxPoolSize;
private int keepAliveSeconds;
private int queueCapacity;
}
@Component
@ConfigurationProperties(prefix = "async-pool.consumer-kafka")
@Data
public class ConsumerKafkaPool extends AbstractExecutorPool {
private String threadNamePrefix = "handler consumer kafka executor-";
}
@Component
@ConfigurationProperties(prefix = "async-pool.normal-collection")
@Data
public class NormalCollectionPool extends AbstractExecutorPool {
private String threadNamePrefix = "handler norma collection executor-";
}
@Component
@ConfigurationProperties(prefix = "async-pool.fail-file")
@Data
public class FailFilePool extends AbstractExecutorPool {
private String threadNamePrefix = "handler move fail file executor-";
}
4.线程池配置
创建类 ThreadPoolConfig 线程池具体执行步骤,这里不提,说下拒绝策略 ThreadPoolExecutor 类中提供的前4个拒绝策略,也可以自定义策略。
- AbortPolicy 默认策略,队列满时抛出异常RejectedExecutionException
- DiscardOldestPolicy 去除队列中最早的任务,将新任务放入队列
- DiscardPolicy 直接丢掉任务
- CallerRunsPolicy 队列满时,主线程执行任务
- 自定义处理策略
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {
private final ConsumerKafkaPool consumerKafkaPool;
private final NormalCollectionPool normalCollectionPool;
private final FailFilePool failFilePool;
@Autowired
public ThreadPoolConfig(ConsumerKafkaPool consumerKafkaPool, NormalCollectionPool normalCollectionPool, FailFilePool failFilePool){
this.consumerKafkaPool = consumerKafkaPool;
this.normalCollectionPool = normalCollectionPool;
this.failFilePool = failFilePool;
}
@Bean(name = "asyncExecutorConsumerKafka")
public Executor asyncExecutorConsumerKafka() {
return initExcutor(consumerKafkaPool, consumerKafkaPool.getThreadNamePrefix(), (r, executor) -> {
log.info("队列已满,根据业务自行处理。。。");
});
}
@Bean(name = "asyncExecutorNormalCollection")
public Executor asyncExecutorNormalCollection() {
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
return initExcutor(normalCollectionPool, normalCollectionPool.getThreadNamePrefix(), callerRunsPolicy);
}
@Bean(name = "asyncExecutorFailFile")
public Executor asyncExecutorFailFile() {
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
return initExcutor(failFilePool, failFilePool.getThreadNamePrefix(), callerRunsPolicy);
}
private Executor initExcutor(AbstractExecutorPool abstractExecutorPool,String threadName, RejectedExecutionHandler rejectedExecutionHandler){
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(abstractExecutorPool.getCorePoolSize());
threadPool.setMaxPoolSize(abstractExecutorPool.getMaxPoolSize());
threadPool.setKeepAliveSeconds(abstractExecutorPool.getKeepAliveSeconds());
threadPool.setQueueCapacity(abstractExecutorPool.getQueueCapacity());
threadPool.setThreadNamePrefix(threadName);
threadPool.setRejectedExecutionHandler(rejectedExecutionHandler);
return threadPool;
}
}
5.启动类
启动类上 加注解 @EnableAsync ,这里就不贴代码了
6.调用方法2种方式
创建接口
public interface TestService {
void asynctest(String value);
}
使用注解@Async
该注解可以使用在方法或者类上,使用在方法上生命该方法可异步执行,在类上,该类所有方法可异步执行。
@Slf4j
@Service
public class TestServiceImpl implements TestService {
@Override
@Async
public void asynctest(String value) {
String threadName = Thread.currentThread().getName();
log.info(">>线程{},正在处理:{}", threadName, value);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(">>线程test{},处理完成:{}", threadName, value);
}
}
调用
@Component
@Slf4j
public class KafkaCustomTest {
@Autowired
private TestService testService ;
@Autowired
@Qualifier("asyncExecutorConsumerKafka")
private Executor asyncExecutorConsumerKafka;
@KafkaListener(topics = { "test" }, autoStartup = "true")
public void normal(ConsumerRecord<String, String> record, Acknowledgment ack) throws IOException, InterruptedException {
String value = record.value();
testService.asynctest(value);
long offset = record.offset();
log.info(">>>该消息offset:{},消息:{},已提交异步处理。", offset, value);
ack.acknowledge();
}
}
实践过程中的坑和经验包
ThreadPoolTaskExecutor 和 ThreadPoolExecutor
顺便提一句 ThreadPoolTaskExecutor 和 ThreadPoolExecutor 区别
- ThreadPoolTaskExecutor 是spring提供的
- ThreadPoolExecutor 是jdk提供的
ThreadPoolExecutor 可以在代码中写成静态的增长调用, 如:
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 20L,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.AbortPolicy());
public static ThreadPoolExecutor getPool() {
return pool;
}
但是 ThreadPoolTaskExecutor 写成静态的就会失效,具体区别了解不深。
LinkedBlockingQueue 和 ArrayBlockingQueue
- LinkedBlockingQueue中的锁是分离的,生产者的锁PutLock,消费者的锁takeLock
- ArrayBlockingQueue生产者和消费者使用的是同一把锁
ThreadPoolExecutor 可以自己指定使用那个队列。
ThreadPoolTaskExecutor 使用的LinkedBlockingQueue
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<>(queueCapacity);
}
else {
return new SynchronousQueue<>();
}
}
好好学习,天天努力
|