?依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.3.1</version>
</dependency>
<!-- storm kafka -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!-- JSON -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.38</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
</dependencies>
main
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
/**
*
* @author duhai
* @date 2022年4月27日
*/
public class MyTopology {
public static void main(final String[] args) {
KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig
.builder("192.168.1.17:9092", "fault_recovery_test")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafka-group1")
.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.setProp(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000)
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000)
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName())
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName())
.setOffsetCommitPeriodMs(5000)// 控制spout多久向kafka提交一次offset
.setMaxUncommittedOffsets(250)//
// .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)//
.build();
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<String, String>(kafkaSpoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", kafkaSpout, 1).setNumTasks(1);
builder.setBolt("printBolt", new PrintBolt(), 1).shuffleGrouping("kafkaSpout");
Config conf = new Config();
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.setMaxSpoutPending(100000);
conf.setMessageTimeoutSecs(1000);
conf.setDebug(false);
if (args != null && args.length > 0) {// 集群提交
System.out.println("【run on cluster】");
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("提交完成");
} else {// 本地提交
System.out.println("【run on local】");
try {
LocalCluster lc = new LocalCluster();
lc.submitTopology("storm_kafka", conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
|