spring boot结合kafka 对接采用http进行数据交换
1.要求 最近工作要求: 交换中心采用基于Kafka的分布式消息服务来实现,省厅建立交换中心,园区建立交换节点,交换节点将数据按照消息样例并加密后发送至省厅kafka对应的topic,完成数据的交换
2.准备材料 1.ip白名单(本地外网IP添加到省厅白名单中之后,才能连接省厅交换中心) 2.省厅交换中心Kafka集群地址(123.12.34.134:1010,123.12.34.134:1011,123.12.34.134:1012 测试地址) 3.ssl加密包
3.1 third.keystore.jks
3.2 ssl.keystore.password=test123
3.3 ssl.key.password=test1234
3.4 third.truststore.jks
3.5 ssl.truststore.password=test12
- hostname配置
C盘:drivers\etc\hosts 123.12.34.134 kafka01 123.12.34.134 kafka02 123.12.34.134 kafka03
3.开始新建springboot项目 1.基本配置 spring for Apache Kafka 不勾也行,后面版本还的自己配置
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.test_kafka</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>test_kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!--版本号根据kafka安装包版本指定,比如kafka_2.12-2.0.0-->
<version>2.4.1</version>
</dependency>
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>
<dependency><!--自动生成getter,setter-->
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>5.1.42</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.41</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.项目结构
3.新建KafkaConfig和KafkaProducer(用来确定消息成功发送与否) KafkaConfig
package com.test_kafka.demo.config;
import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.StringUtils;
import org.springframework.util.backoff.FixedBackOff;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${kafka.inner.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.inner.security-protocol}")
private String kafkaSecurityProtocol;
@Value("${kafka.producer.retries}")
private String producerRetries;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(CommonClientConfigs.GROUP_ID_CONFIG,"test-group");
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "D:/kafka_ssl_properties/third.keystore.jks");
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test123");
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "D:/kafka_ssl_properties/third.truststore.jks");
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test12");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put(ProducerConfig.ACKS_CONFIG, "1");
properties.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return new DefaultKafkaProducerFactory<>(properties);
}
@ean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducer
package com.test_kafka.demo.config;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Component
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic,Object obj){
String obj2String = JSONObject.toJSONString(obj);
System.out.println("准备发送消息为:"+ obj2String);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, obj2String);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>(){
@Override
public void onFailure(Throwable throwable) {
log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> stringObjectSendResult) {
log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
}
}
4.配置application.properties
server.port=3023
## 数据源
spring.datasource.url=jdbc:mysql://13.23.33.43:3306/mysql_ku?useUincode=true&characterEncoding=UTF-8&useSSL=false&zeroDateTimeBehavior=convertToNull
spring.datasource.username=root
spring.datasource.password=test_mysql_pwd
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
#kafka
kafka.inner.bootstrap-servers=123.12.34.134:1010,123.12.34.134:1011,123.12.34.134:1012
kafka.inner.security-protocol=SSL
kafka.inner.sasl-mechanism=PLAIN
##=============== producer =======================
## 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
## 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
kafka.producer.retries=0
5.新建ProducerController控制器
package com.test_kafka.demo.controller;
import com.alibaba.fastjson.JSON;
import com.test_kafka.demo.config.KafkaProducer;
import com.test_kafka.demo.entity.*;
import com.test_kafka.demo.mapper.Sfes_yueyangMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
@Controller
public class ProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
KafkaProducer kafkaProducer;
@Autowired
Sfes_yueyangMapper sfes_yueyangMapper;
private SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@RequestMapping("/yq_baseinfo_send")
public @ResponseBody
Map yq_baseinfo_send() {
Map<String,Object> map=new HashMap<>();
Map<String,Object> map1=new HashMap<>();
Object company_data=JSON.toJSON(sfes_yueyangMapper.company_select());
map1.put("type","struct");
map1.put("optional",false);
map1.put("name","postgre.public.yq_baseinfo.Value");
map.put("schema",map1);
kafkaProducer.send("test_topic", JSON.toJSONString(map));
return map;
}
public static String dateToStamp(String s) throws ParseException {
String res;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(s);
long ts = date.getTime();
res = String.valueOf(ts);
return res;
}
}
6.启动器配置
package com.test_kafka.demo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.net.InetAddress;
import java.net.UnknownHostException;
@EntityScan(basePackages = "com.test_kafka.demo.entity")
@MapperScan("com.test_kafka.demo.mapper")
@EnableScheduling
@SpringBootApplication
public class TestKafkaApplication {
public static void main(String[] args) throws UnknownHostException {
SpringApplication.run(TestKafkaApplication.class, args);
System.out.println("Server Started");
}
}
其余的用Mapper连接mysql获取数据就省略了
5.最后连接并发送成功你会看到
|