分享一下用Scala发送kafka消息。
准备 :
- 代码编辑器 IDE
- offset explore2.1(链接你的cluster,同时查看你的kafka消息是否发送成功)
pom.xml 引入 kafka-client
新建文件编写脚本:
package utils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.serialization.StringSerializer
import java.util
import java.util.{Date, Properties, UUID}
import scala.io.Source
class KafkaMessage(properties: Properties){
def writeBatchToKafka(server: String, topic: String,tenant: String, filepath: String): Unit = {
val value = Source.fromFile(filepath).mkString
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put("security.protocol", properties.get("security.protocol"))
props.put("sasl.mechanism", properties.get("sasl.mechanism"))
props.put("sasl.jaas.config", properties.get("sasl.jaas.config"))
val producer = new KafkaProducer[String, String](props)
val header: util.List[Header] = util.Arrays.asList(new RecordHeader("TenantID", tenant.getBytes())
, new RecordHeader("X-Creation-Time", new Date().toString.getBytes())
, new RecordHeader("X-Tenant-ID", tenant.getBytes())
, new RecordHeader("X-Topic", topic.getBytes())
, new RecordHeader("X-Message-ID", UUID.randomUUID().toString.getBytes())
)
val message = new ProducerRecord[String, String](topic, 0, tenant, value, header)
producer.send(message)
producer.flush()
producer.close()
}
}
?下面是offset explore跟代码的关系图:
?查看结果:
发送完之后到你的topic下面run data,就可以查看你的kafka消息是否发送成功?
?
|