关于长连接的一些介绍
长连接的应用场景非常的广泛,比如监控系统,IM系统,即时报价系统,推送服务等等。像这些场景都是比较注重实时性,如果每次发送数据都要进行一次DNS解析,建立连接的过程肯定是极其影响体验。
长连接的维护必然需要一套机制来控制。比如 HTTP/1.0 通过在 header 头中添加 Connection:Keep-Alive参数,如果当前请求需要保活则添加该参数作为标识,否则服务端就不会保持该连接的状态,发送完数据之后就关闭连接。HTTP/1.1以后 Keep-Alive 是默认打开的。
Netty 是 基于 TCP 协议开发的,在四层协议 TCP 协议的实现中也提供了 keepalive 报文用来探测对端是否可用。TCP 层将在定时时间到后发送相应的 KeepAlive 探针以确定连接可用性
Netty状态回调代码
class ClientHandler : ChannelInboundHandlerAdapter() {
private val heartBeat =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("heartbeat", CharsetUtil.UTF_8))
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
}
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
if (IdleState.WRITER_IDLE?.equals(evt.state())) {
ctx.writeAndFlush(heartBeat.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
}
super.userEventTriggered(ctx, evt)
}
override fun channelActive(ctx: ChannelHandlerContext) {
ctx.fireChannelActive()
}
override fun channelInactive(ctx: ChannelHandlerContext) {
super.channelInactive(ctx)
}
override fun exceptionCaught(
ctx: ChannelHandlerContext,
cause: Throwable
) {
cause.printStackTrace()
ctx.close()
}
override fun handlerRemoved(ctx: ChannelHandlerContext) {
super.handlerRemoved(ctx)
NettyObserverManager.instance.notifyObserver(ctx)
}
}
添加回调的管理类
class SimpleChatClientInitializer :
ChannelInitializer<SocketChannel>() {
@Throws(Exception::class)
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
pipeline.addLast(IdleStateHandler(4, 4, 4, TimeUnit.SECONDS))
pipeline.addLast("decoder", StringDecoder())
pipeline.addLast("encoder", StringEncoder())
pipeline.addLast("handler", ClientHandler())
}
}
使用服务开启长连接
class NotifyService : Service(), ChannelFutureListener, NettyObserverListener {
private var channel: Channel? = null
private var host = "xxx.xxx.xxx"
private val port = 0
private var nio: NioEventLoopGroup? = null
private val clientInitializer = SimpleChatClientInitializer()
private val grayServiceId = 1001
private val mBinder = ClientBinder()
override fun onBind(intent: Intent): IBinder {
return mBinder
}
override fun onCreate() {
super.onCreate()
NettyObserverManager.instance.add(this)
init()
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
if (Build.VERSION.SDK_INT < Build.VERSION_CODES.O) {
val innerIntent = Intent(this, GrayInnerService::class.java)
startService(innerIntent)
startForeground(grayServiceId, Notification())
} else {
val channel = NotificationChannel(
"com.guanwei.pddemo", "notify",
NotificationManager.IMPORTANCE_HIGH
)
val manager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
manager.createNotificationChannel(channel)
val notification = Notification.Builder(
applicationContext,
"com.guanwei.pddemo"
).build()
startForeground(grayServiceId, notification)
}
return START_STICKY
}
inner class GrayInnerService : Service() {
override fun onBind(intent: Intent?): IBinder? {
return null
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
startForeground(grayServiceId, Notification())
stopForeground(true)
stopSelf()
return super.onStartCommand(intent, flags, startId)
}
}
private fun init() {
Thread {
kotlin.run {
if (nio == null) {
nio = NioEventLoopGroup()
}
doConnect()
}
}.start()
}
private fun doConnect() {
try {
channel = Bootstrap().run {
group(nio)
channel(NioSocketChannel::class.java)
handler(clientInitializer)
connect(
host,
port
).sync().channel()
}
val map = HashMap<String, Long>()
map["id"] = SPUtils.getInstance().getLong("userId")
val json = JSONObject(map as Map<*, *>).toString()
sendMessage(json)
} catch (e: Exception) {
e.stackTrace
}
}
fun sendMessage(msg: String) {
channel!!.writeAndFlush(msg).addListener(this)
}
override fun operationComplete(future: ChannelFuture) {
if (!future.isSuccess) {
future.channel().eventLoop().schedule({
kotlin.run {
doConnect()
}
}, 3, TimeUnit.SECONDS)
} else {
LogUtils.a("连接成功")
}
}
override fun nettyObserverUpData(v: ChannelHandlerContext) {
v.channel().eventLoop().schedule({
kotlin.run {
doConnect()
}
}, 3, TimeUnit.SECONDS)
}
override fun onDestroy() {
super.onDestroy()
NettyObserverManager.instance.remove(this)
}
inner class ClientBinder : Binder() {
public fun getService(): NotifyService {
return this@NotifyService
}
public fun sendMsg(msg: String) {
sendMessage(msg)
}
}
}
封装观察者进行状态监听与通知
interface NettyObserverListener {
fun nettyObserverUpData(v: ChannelHandlerContext)
}
interface NettySubjectListener {
fun add(nettyObserverListener: NettyObserverListener)
fun notifyObserver(t: ChannelHandlerContext)
fun remove(nettyObserverListener: NettyObserverListener)
}
class NettyObserverManager : NettySubjectListener {
private val list = ArrayList<NettyObserverListener>()
override fun add(nettyObserverListener: NettyObserverListener) {
list.add(nettyObserverListener)
}
override fun notifyObserver(t: ChannelHandlerContext) {
for (ol in list) {
ol.nettyObserverUpData(t)
}
}
override fun remove(nettyObserverListener: NettyObserverListener) {
list.remove(nettyObserverListener)
}
companion object {
val instance: NettyObserverManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
NettyObserverManager()
}
}
}
在Activity中开启、绑定服务
服务开启与绑定所需要的参数如何获取就不多讲了,不会的建议回去重新学一下子服务
startService(iIntent)
bindService(
iIntent,
myConnection,
Context.BIND_AUTO_CREATE
)
override fun onDestroy() {
super.onDestroy()
unbindService(myConnection)
stopService(iIntent)
}
|