Flow 非常适合实时数据更新和无休止的数据流,Flow 已经被集成到许多 Jetpack 库中
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
创建 Flow 数据流
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while (true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}.map { it }
.flowOn(Dispatchers.IO)
.catch { exception -> notifyError(exception) }
}
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
订阅 Flow 数据流
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
在 默认的 Flow Builder 中,我们通过 emit 发射数据,而 callbackFlow / channelFlow 允许我们从不同的 CoroutineContext 中或者在协程之外通过 trySend 发射数据。callbackFlow / channelFlow 内部通过 Channel 实现,默认的缓冲区大小为64,如果超过该大小,则 trySend 返回 false
val latestNews: Flow<List<Int>> = channelFlow {
newsApi.fetchLatestNews(object : NewsApiCallback {
override fun onSuccess(list: List<Int>) {
if (trySend(listOf(i)).isSuccess) {
Log.e("scrutiny", "source trySend success")
} else {
Log.e("scrutiny", "source trySend failed")
}
}
override fun onError() {}
})
delay(refreshIntervalMs)
withContext(Dispatchers.IO) {
trySend(listOf<Int>())
}
awaitClose {
Log.e("scrutiny", "channelFlow close")
}
}
冷数据流 VS 热数据流
相比于热数据流(StateFlow、SharedFlow),通过 flow / callbackFlow / channelFlow 创建的数据流是冷数据流,Consumer 每次 collect 都会重新执行发射逻辑
StateFlow
持有状态的 Flow,它向 Consumer 发出当前和新状态更新。 当前状态值也可以通过其 value 属性读取。 要更新状态并将其发送到流,请为 MutableStateFlow 类的 value 属性分配一个新值
通过 repeatOnlifecycle 可以用来替换 LiveData,用于解决复杂场景的线程切换问题
override fun onCreate(savedInstanceState: Bundle?) {
// Start a coroutine in the lifecycle scope
lifecycleScope.launch {
// repeatOnLifecycle launches the block in a new coroutine every time the
// lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
repeatOnLifecycle(Lifecycle.State.STARTED) {
// Trigger the flow and start listening for values.
// Note that this happens when lifecycle is STARTED and stops
// collecting when the lifecycle is STOPPED
latestNewsViewModel.uiState.collect { uiState ->
// New value received
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
To convert any flow to a StateFlow, use the stateIn intermediate operator.
StateFlow vs LiveData
StateFlow and LiveData have similarities. Both are observable data holder classes, and both follow a similar pattern when used in your app architecture. Note, however, that StateFlow and LiveData do behave differently:
- StateFlow requires an initial state to be passed in to the constructor, while LiveData does not.
- LiveData.observe automatically unregisters the consumer when the view goes to the STOPPED state, whereas collecting from a StateFlow or any other flow does not stop collecting automatically. To achieve the same behavior,you need to collect the flow from a Lifecycle.repeatOnLifecycle block.
- StateFlow 配合协程可以完成复杂场景的线程切换
?SharedFlow
通过 shareIn 函数可以将冷流 Flow 转换为热流 SharedFlow, SharedFlow 是 StateFlow 的高度可配置的泛化,具体参数有:
- A CoroutineScope that is used to share the flow. This scope should live longer than any consumer to keep the shared flow alive as long as needed.
- The number of items to replay to each new collector.
- The start behavior policy.
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed()
)
SharingStarted 参数
public companion object {
/**
* Sharing is started immediately and never stops.
*/
public val Eagerly: SharingStarted = StartedEagerly()
/**
* Sharing is started when the first subscriber appears and never stops.
*/
public val Lazily: SharingStarted = StartedLazily()
/**
* Sharing is started when the first subscriber appears, immediately stops when the last
* subscriber disappears (by default), keeping the replay cache forever (by default).
*
* It has the following optional parameters:
*
* * [stopTimeoutMillis] configures a delay (in milliseconds) between the disappearance of the last
* subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).
* * [replayExpirationMillis] configures a delay (in milliseconds) between the stopping of
* the sharing coroutine and the resetting of the replay cache
* (which makes the cache empty for the [shareIn] operator and resets the cached value
* to the original `initialValue` for the [stateIn] operator).
* It defaults to `Long.MAX_VALUE` (keep replay cache forever, never reset buffer).
* Use zero value to expire the cache immediately.
*/
@Suppress("FunctionName")
public fun WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted = StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
}
|