JetpackでもRoom, Paging 3, DataStore等様々なライブラリがkotlin coroutines flowを使い始め、もはやAndroid開発にはflowが必要不可欠になってきました。
そんな中、以前紹介したStateFlowに加えて、SharedFlowが1.4.0-M1から登場しました。
少し複雑に感じますが、実はかなり整理されており、以前より使いやすくなっていると思っています。
今回は、Flow、SharedFlow、StateFlowの概要に関して紹介し、各々の詳細に関しては別途まとめたいと思います。
まず、Flowの定義はこのようになっています。
今回は型のみを重視するため、一旦メンバに関しては考慮しません。
public interface Flow<out T>
次にSharedFlowです。
public interface SharedFlow<out T> : Flow<T>
Flowを継承していることがわかると思います。
また、書き換え可能なSharedFlowである、MutableSharedFlowの定義はこうです。
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T>
SharedFlowを継承しており、またFlowCollectorという型も継承しています。
FlowCollectorはこのような定義になっています。
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
FlowのBuilderの中にも使われているので、実はよく使っていると思います。
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
次に、以前も紹介したStateFlowの定義です。
public interface StateFlow<out T> : SharedFlow<T>
以前紹介した際はFlowを継承していましたが、SharedFlowの継承に変わっています。
そして、こちらもMutableなMutableStateFlowがあります。
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T>
StateFlowだけでなく、MutableSharedFlowも継承していますね。
これらを、ベン図で表すとこのような関係性になります。
SharedFlowもStateFlowもFlowの一種であり、StateFlowはSharedFlowに内包されています。
これらの関係性を理解した上で、それぞれの機能についてみていきましょう。
まず最初に、StateFlowでもSharedFlowでもない、通常のflowの挙動について確認したいと思います。
flowにはflowOf, asFlow, flow, channelFlow等を使って作ることができます。
いずれを使っても、作成されるのはcold streamと言われるものです。
cold streamはsubscribeするまで動作せず、subscribeした際に毎回動作します。
例えば、以下のようなflowがあり、複数回launchInを行うと
val flow = flow {
println("emit!")
emit(1)
}
flow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
flow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
このように複数回、emit!
と表示され、最初に渡したlambda式がsubscribeの回数分呼ばれていることがわかります。
emit!
emit!
onEach1:1
onEach2:1
この仕様は、ときに便利ですが、builderやoperator内で重たい処理をしたいときに不便になることもあります。
val flow = flow {
// 重たい処理(API call等)
}.map {
// 重たい処理(API call等)
}
// 重たい処理が毎回呼ばれる
flow.launchIn(GlobalScope)
また、cold streamのため、flowに対して直接値を流すことはできません。
val flow = flowOf(1)
flow.emit(2) // compile error!!
上記のFlowに対して、hot streamとして登場したのがSharedFlowです。
SharedFlowの作り方として、MutableSharedFlowとshareInがあります。
MutableSharedFlowは、このように使うことができます。
val mutableSharedFlow = MutableSharedFlow<Int>()
mutableSharedFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
mutableSharedFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking {
mutableSharedFlow.emit(1)
mutableSharedFlow.emit(2)
delay(100) // 処理が終わるまでちょっと待つ
}
先程のflowとは違い、flowに対して直接emitすることができます。
emitはsuspend functionになっており、coroutines scopeが必要なことに注意してください。
実行すると、以下のような結果が得られます。
onEach2:1
onEach1:1
onEach2:2
onEach1:2
簡単に複数箇所でsubscribe可能なstreamを作成することができました。
MutableSharedFlowには、このように3つの引数がありますが、それぞれの内容に関しては今回は省略させてください。
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
SharedFlowを作る、もう一つの方法、shareInに関して紹介します。
shareInは通常のFlowをSharedFlowに変換するoperatorです。
fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
shareInを使うことで、flowの説明のときに話した、重たい処理の問題を解決することができます。
val flow = flow {
println("emit!")
emit(1)
}
val sharedFlow = flow.shareIn(GlobalScope, SharingStarted.Eagerly)
sharedFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
sharedFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach2:1
onEach1:1
Flowを直接subscribeするのとは異なり、 emit!
が一度しか呼ばれてないことがわかるでしょうか。
これは、SharedFlowが下流の複数subscriber間で値を共有しているためです。
started, replayの引数に関する説明は今回は省略します。
続いてStateFlowです。
こちらは、状態保持のための特別なSharedFlowとして認識してもらえれば大丈夫です。
同じく、MutableStateFlowかstateInで作成ができます。
今回は、MutableStateFlowの方を紹介し、stateInの方は省略します。
このように使うことができます。
val mutableStateFlow = MutableStateFlow(0)
mutableStateFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 1
mutableStateFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 2
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach1:0
onEach1:1
onEach2:1
onEach2:2
onEach1:2
ほとんどSharedFlowと同じように扱えますが、いくつか異なる点があります。
まず、初期値が必須です。
そして、launchInしたタイミングで直近の値が1件流れてきます。
値の設定がvalueで行え、coroutines scopeがなくても問題ありません。
また、同じ値は流れない、連続して値が変更されると、最後の値のみ流れてくる、といった特徴があります。
val mutableStateFlow = MutableStateFlow(0)
mutableStateFlow.onEach {
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 1
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 1 // 同じ値を設定
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach:0
onEach:1 // 一度しか流れてこない
val mutableStateFlow = MutableStateFlow(0)
mutableStateFlow.onEach {
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 1
mutableStateFlow.value = 2 // 連続して値を変更
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach:0
onEach:2 // 1は流れてこない
あくまでも状態保持のflowとして設計されているのがわかると思います。
以前まで、hot streamとしてchannelを使っていたと思います。
今回追加された変更により、BroadcastChannelはSharedFlowに、ConflatedBroadcastChannelはStateFlowに置き換えが可能です。
また、公式にSharedFlowとStateFlowがstableになり次第、BroadcastChannelとConflatedBroadcastChannelはdeprecatedになると告知されています。
この方針は変わらないと思われるので、そろそろ乗り換えを検討したほうが良さそうです。
表にまとめると、このようになります
-- | Flow | SharedFlow | StateFlow |
---|---|---|---|
hot or cold | cold | hot | hot |
作成方法 | flowOf, asFlow, flow, channelFlow | MutableSharedFlow, shareIn | MutableStateFlow, stateIn |
値の設定 | 不可 | emit(suspend), tryEmit | value |
初期値 | 不要 | 不要 | 必要 |
その他特徴 | subscribeするまで動かない | 複数subscriber間で値を共有する | 複数subscriber間で値を共有する, subscribe時に直前の値が流れる, 同じ値を流さない, 連続した値は最後だけ流す |
使い分けとしては、イベントを扱いたいならSharedFlow、状態を扱いたいならStateFlowを使い、streamの変換等はFlowを使って行うことになると思います。
また、flowで流れてきたものを、shareInを使ってSharedFlowに変換することで、ストリームの共通化を行うことができます。
- 2021/01/31追記-
Kotlin Coroutinesの解説本をZennにて販売しています。より詳しく学びたい方は、こちらも合わせて確認してみて下さい。