最近看了李艳鹏的书,其中有一个kafka的客户端的手写组件kclient。研读之后有以下心得:
1、prouduce
这个比较简单就是包装了一下kafka的生产这,直接调用即可,没什么说的。自己觉得可以优化点为:kafka的链接可以使用common-pool进行池化。减少new对象
public class KafkaProducer {
public void send2Topic(String topicName, String message) {
...
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName, message);producer.send(km);
}
}
2、consumer
封装kafka 的consumer客户端,创建线程池进行轮询卡夫卡,有消息就调用子类的handler进行消费。这部分代码同样池化做的不是很好,或者作者根本没有考虑要池化。
abstract class AbstractMessageTask implements Runnable {
protected KafkaStream<String, String> stream;
protected MessageHandler messageHandler;
AbstractMessageTask(KafkaStream<String, String> stream,
MessageHandler messageHandler) {
this.stream = stream;
this.messageHandler = messageHandler;
}
public void run() {
ConsumerIterator<String, String> it = stream.iterator();
while (status == Status.RUNNING) {
boolean hasNext = false;
try {
hasNext = it.hasNext();
} catch (Exception e) {
if (e instanceof InterruptedException) {
if (status != Status.RUNNING) {
it.clearCurrentChunk();
shutdown();
break;
}
} else {
log.error("Retrieve Error: ", e);
continue;
}
}
if (hasNext) {
MessageAndMetadata<String, String> item = it.next();
handleMessage(item.message());
if (!isAutoCommitOffset) {
consumerConnector.commitOffsets();
}
}
}
}
protected abstract void handleMessage(String message);
}
使用Abstract类上层对handler进行封装,对子类暴露消息处理的handler方法,通用的消息接收的run方法在Abstract类中,这样子类只需要关心业务处理即可。这种设计在大公司很常见,就是业务无关的封装到Abstract父类,子类只关心本业务。 以上两个是API的方式执行业务。
3 反射执行业务
利用注解标注处理方法。当spring启动的时候将注解和要执行的方法放到对应关系里面。当收到kafka消息的时候进行反射执行即可,这也是为什么像rabbitmq rocketmq。只要在方法和类撒类上注解就能实现接收消息的原理。 主要在KClientBoot.java。KClientBoot中包含了生产者消费者的创建,以及消费kafka消息后的invoke注解的方法
public class KClientBoot implements ApplicationContextAware {
protected List<KafkaHandlerMeta> getKafkaHandlerMeta() {
List<KafkaHandlerMeta> meta = new ArrayList<KafkaHandlerMeta>();
String[] kafkaHandlerBeanNames = applicationContext
.getBeanNamesForAnnotation(KafkaHandlers.class);
for (String kafkaHandlerBeanName : kafkaHandlerBeanNames) {
Object kafkaHandlerBean = applicationContext
.getBean(kafkaHandlerBeanName);
Class<? extends Object> kafkaHandlerBeanClazz = kafkaHandlerBean
.getClass();
Map<Class<? extends Annotation>, Map<Method, Annotation>> mapData = extractAnnotationMaps(kafkaHandlerBeanClazz);
meta.addAll(convertAnnotationMaps2Meta(mapData, kafkaHandlerBean));
}
return meta;
}
|