这是我在windows上用spring famework框架+Compose Multiplatform+swing开发的一个桌面pc项目抽出来的一小块。 Compose Multiplatform官方示例用的rxjava,基于我引入了spring(主要是比较熟悉)所以我用了reactor ,大家看一下思路,应该是很容易改写成kotlin flow和rxjava的。
全局单观察者
reactor in spring 建立流
@Bean
fun serialFlow(serial: ISerialport): Flux<ByteArray> = Flux.create { sink ->
serial.readByStrategy(
SerialPortImpl.Strategy(0xEE, 2, 3, SerialPortImpl.CS.SUM_VALUE, 1, 2, 0xBB, 5),
object : ISerialPortDataListener {
override fun onDataReceived(bytes: ByteArray) {
sink.next(bytes)
}
}
)
sink.onDispose { serial.close() }
}
消费,省略其他无关代码:
@Service
class xxxServiceImpl(@Autowired val serialFlow: Flux<ByteArray>){
init {
CoroutineScope(Dispatchers.Default).launch {
serialFlow.map { bytes ->
when (bytes[CMD_INDEX]) {
CMD_EXAMINING -> {
dosomethings()
}
CMD_UPLOAD -> handleData(bytes)
else -> {}
}
}.subscribe()
}
}
}
多观察者
当多个viewmodel(我用spring的service模拟)去观察时,这种是比较好的方式,特别pc端应用是多窗口的。
@Component
class LazySerialObservable(
@Autowired val serialFlow: Flux<ByteArray>
) : ISerialObservable {
private val subscribers: MutableSet<FluxSink<ByteArray>> = CopyOnWriteArraySet()
private var serialFlowJob: Disposable? = null
override fun isRunning() = !(serialFlowJob?.isDisposed ?: true)
@Synchronized
override fun register(subscriber: FluxSink<ByteArray>) {
if (subscribers.isEmpty()) {
serialFlowJob = serialFlow
.map { bytes -> subscribers.forEach { it.next(bytes) } }
.subscribeOn(Schedulers.newSingle("serialFlow"))
.subscribe()
}
subscribers.add(subscriber)
}
@Synchronized
override fun deregister(subscriber: FluxSink<ByteArray>) {
subscribers.remove(subscriber)
if (subscribers.isEmpty()) {
serialFlowJob?.dispose()
}
}
}
|