一.编写 MqttClient类
class MqttClient {
var url:String="tcp://192.168.174.206:1883"
var flag:Boolean=true
private val topicArr: Array[Topic] = List(new Topic("topic_test",QoS.AT_LEAST_ONCE),new Topic("topic_test",QoS.AT_MOST_ONCE),new Topic("topic_test",QoS.EXACTLY_ONCE)).toArray
//开始连接
def start():FutureConnection={
val mqtt = new MQTT()
mqtt.setHost(url) //设置连接地址
mqtt.setCleanSession(true)//是否清空会话信息
mqtt.setReconnectAttemptsMax(10) //设置最大的重新连接次数
mqtt.setReconnectDelay(10) //设置重新连接的时长
mqtt.setKeepAlive(30) //设置心跳时间
mqtt.setSendBufferSize(64) //设置缓冲区的大小
mqtt.setClientId("clientid_0001")
val connection: FutureConnection = mqtt.futureConnection()
connection.connect()
connection.subscribe(topicArr)
return connection
}
}
?2.编辑MqttSource
object MqttSource extends RichSourceFunction[String] {
var flag:Boolean=true
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
val client = new MqttClient()
val connection: FutureConnection = client.start()
var msg:String=""
while (true){
val futureMessage: Future[Message] = connection.receive()
val message: Message = futureMessage.await()
msg = String.valueOf(message.getPayloadBuffer()).substring(6)
sourceContext.collect(msg)
}
}
override def cancel(): Unit = {
flag=false
}
}
?三,编辑flink主类?flinkMQtt
object flinkMQtt {
def main(args: Array[String]): Unit = {
//创建执行的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(1)
val mqttDS: DataStreamSource[String] = env.addSource(MqttSource)
mqttDS.print()
//执行job
env.execute()
}
}
?存在一个问题:如果发送汉字字符串接,接收端收到一段数字?还没解决!!!知道的留下解决方式......
|