Blog

StateFlowの深堀り、SharedFlowとの違いとstateIn【kotlin coroutines flow】

StateFlowkotlin corouteinsの1.3.6で追加された状態管理用の特別なFlowです。

以前、「StateFlowのドキュメントを読み込む」という記事を書きましたが、その後SharedFlowが追加され、若干実装に変更がありました。

また、新たにstateInというoperatorも追加されています。

今回はそれらを含めたStateFlowの詳細な仕様に関して深堀りしていきたいと思います。

Flow, SharedFlow, StateFlowの比較はこちらの記事を、SharedFlowの深堀りはこちらの記事を参考にしてください。

StateFlowの基本的な使い方

StateFlowは状態管理に特化したFlowです。

StateFlowはMutableStateFlowを使うことで作成することができます。

val mutableStateFlow = MutableStateFlow(0)

mutableStateFlow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 1

mutableStateFlow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 2
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach1:0
onEach1:1
onEach2:1
onEach2:2
onEach1:2

StateFlowには必ず初期値があり、最新の1つの値を新しいサブスクライバーにreplayする特徴があります。

また、同じ値は流れない、連続して値が変更されると、最後の値のみ流れてくる、といった特徴があります。

SharedFlowとの違い

StateFlowはSharedFlowを継承しており、SharedFlowの一部という立ち位置です。

interface StateFlow<out T> : SharedFlow<T>

SharedFlowを以下のように使うことで、StateFlowと同じ動作をします。

val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue)
val state = shared.distinctUntilChanged()

SharedFlowを継承しているため、SharedFlowと同じ操作が行えますが、 resetReplayCache はサポートしていないことに注意してください。

override fun resetReplayCache() {
    throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}

また、emittryEmitも行うことができますが、内部でvalueが呼ばれるだけなので、使う意味はなさそうです。

わかりにくいので、必ず value で操作するようにしたほうが良いと思います。

override suspend fun emit(value: T) {
    this.value = value
}

stateIn

通常のFlowをStateFlowにするために、 stateInというoperatorを使うことができます。

stateIn にはsuspend functionのものと、通常関数のものがあります。

通常関数の定義はこうなっています。

fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T> (source)

このように使うことができます。

val flow = flow {
    delay(10)
    emit(1)
    delay(10)
    emit(2)
}
val stateFlow = flow.stateIn(
    scope = GlobalScope,
    started = SharingStarted.Eagerly,
    initialValue = 0
)

stateFlow.onEach {
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach:0
onEach:1
onEach:2

coroutine scopeと初期値が必須で、startedによってhot stream化するタイミングを変更することができます。

Eagerly, Lazily, WhileSubscribedから選ぶことができます。

もう一つのsuspend functionのstateInの定義は、このようになっています。

public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>

このように使うことができます。

val flow = flow {
    delay(10)
    emit(1)
    delay(10)
    emit(2)
}

runBlocking {
    val stateFlow = flow.stateIn(GlobalScope)

    stateFlow.onEach {
        println("onEach:$it")
    }.launchIn(GlobalScope)

    delay(100) // 処理が終わるまでちょっと待つ
}
onEach:1
onEach:2

初期値もstartedもいらないのが特徴的ですが、suspend functionのため、若干使いづらいと思います。

ConflatedBroadcastChannel

StateFlowが登場するまで利用していた ConflatedBroadcastChannel は、StateFlowがstableになるタイミングでdeprecatedになることが決定しています。

StateFlowになったことで、シンプルで使いやすく変わっていると思います。

一方で、初期値が必須になったり、closeできなくなってる点には注意してください。

その他注意点

以下のoperatorはStateFlowで使えません。

Flowにキャストしなおせばコンパイルは通りますが、streamに変化を与えません。

まとめ

今回はStateFlowに関して深堀りしてみました。

SharedFlowを継承することで、若干挙動が変わったところがあるので、注意が必要です。

また、 stateIn 等は結構頻繁に使うのではないかなと思っています。

個人的にはこのような拡張関数を用意して使おうかなと考えています。

fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T?> {
    return stateIn(scope, SharingStarted.Eagerly, null)
}

ぜひ、SharedFlow、StateFlowを使ったアプリケーション開発にチャレンジしてみてください。

- 2021/01/31追記-

Kotlin Coroutinesの解説本をZennにて販売しています。より詳しく学びたい方は、こちらも合わせて確認してみて下さい。

詳解 Kotlin Coroutines [2021] | Zenn

人気の記事

kotlin coroutinesのFlow, SharedFlow, StateFlowを整理する

Jetpack ComposeとKotlin Coroutinesを連携させる

Layout Composableを使って複雑なレイアウトを組む【Jetpack Compose】

テスト用Dispatcherの使い分け【Kotlin Coroutines】

Flow.combineの内部実装がすごい話

Jetpack ComposeのRippleエフェクトを深堀り、カスタマイズも