StateFlowはkotlin corouteinsの1.3.6で追加された状態管理用の特別なFlowです。
以前、「StateFlowのドキュメントを読み込む」という記事を書きましたが、その後SharedFlowが追加され、若干実装に変更がありました。
また、新たにstateInというoperatorも追加されています。
今回はそれらを含めたStateFlowの詳細な仕様に関して深堀りしていきたいと思います。
Flow, SharedFlow, StateFlowの比較はこちらの記事を、SharedFlowの深堀りはこちらの記事を参考にしてください。
StateFlowは状態管理に特化したFlowです。
StateFlowはMutableStateFlowを使うことで作成することができます。
val mutableStateFlow = MutableStateFlow(0)
mutableStateFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 1
mutableStateFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 2
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach1:0
onEach1:1
onEach2:1
onEach2:2
onEach1:2
StateFlowには必ず初期値があり、最新の1つの値を新しいサブスクライバーにreplayする特徴があります。
また、同じ値は流れない、連続して値が変更されると、最後の値のみ流れてくる、といった特徴があります。
StateFlowはSharedFlowを継承しており、SharedFlowの一部という立ち位置です。
interface StateFlow<out T> : SharedFlow<T>
SharedFlowを以下のように使うことで、StateFlowと同じ動作をします。
val shared = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue)
val state = shared.distinctUntilChanged()
SharedFlowを継承しているため、SharedFlowと同じ操作が行えますが、 resetReplayCache はサポートしていないことに注意してください。
override fun resetReplayCache() {
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}
また、emitやtryEmitも行うことができますが、内部でvalueが呼ばれるだけなので、使う意味はなさそうです。
わかりにくいので、必ず value
で操作するようにしたほうが良いと思います。
override suspend fun emit(value: T) {
this.value = value
}
通常のFlowをStateFlowにするために、 stateInというoperatorを使うことができます。
stateIn
にはsuspend functionのものと、通常関数のものがあります。
通常関数の定義はこうなっています。
fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T> (source)
このように使うことができます。
val flow = flow {
delay(10)
emit(1)
delay(10)
emit(2)
}
val stateFlow = flow.stateIn(
scope = GlobalScope,
started = SharingStarted.Eagerly,
initialValue = 0
)
stateFlow.onEach {
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach:0
onEach:1
onEach:2
coroutine scopeと初期値が必須で、startedによってhot stream化するタイミングを変更することができます。
Eagerly, Lazily, WhileSubscribedから選ぶことができます。
もう一つのsuspend functionのstateInの定義は、このようになっています。
public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>
このように使うことができます。
val flow = flow {
delay(10)
emit(1)
delay(10)
emit(2)
}
runBlocking {
val stateFlow = flow.stateIn(GlobalScope)
stateFlow.onEach {
println("onEach:$it")
}.launchIn(GlobalScope)
delay(100) // 処理が終わるまでちょっと待つ
}
onEach:1
onEach:2
初期値もstartedもいらないのが特徴的ですが、suspend functionのため、若干使いづらいと思います。
StateFlowが登場するまで利用していた ConflatedBroadcastChannel は、StateFlowがstableになるタイミングでdeprecatedになることが決定しています。
StateFlowになったことで、シンプルで使いやすく変わっていると思います。
一方で、初期値が必須になったり、closeできなくなってる点には注意してください。
以下のoperatorはStateFlowで使えません。
Flowにキャストしなおせばコンパイルは通りますが、streamに変化を与えません。
今回はStateFlowに関して深堀りしてみました。
SharedFlowを継承することで、若干挙動が変わったところがあるので、注意が必要です。
また、 stateIn
等は結構頻繁に使うのではないかなと思っています。
個人的にはこのような拡張関数を用意して使おうかなと考えています。
fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T?> {
return stateIn(scope, SharingStarted.Eagerly, null)
}
ぜひ、SharedFlow、StateFlowを使ったアプリケーション開発にチャレンジしてみてください。
- 2021/01/31追記-
Kotlin Coroutinesの解説本をZennにて販売しています。より詳しく学びたい方は、こちらも合わせて確認してみて下さい。