Blog

kotlin coroutinesのFlow, SharedFlow, StateFlowを整理する

JetpackでもRoom, Paging 3, DataStore等様々なライブラリがkotlin coroutines flowを使い始め、もはやAndroid開発にはflowが必要不可欠になってきました。

そんな中、以前紹介したStateFlowに加えて、SharedFlow1.4.0-M1から登場しました。

少し複雑に感じますが、実はかなり整理されており、以前より使いやすくなっていると思っています。

今回は、Flow、SharedFlow、StateFlowの概要に関して紹介し、各々の詳細に関しては別途まとめたいと思います。

型の関係を確認する

Flow

まず、Flowの定義はこのようになっています。

今回は型のみを重視するため、一旦メンバに関しては考慮しません。

public interface Flow<out T>

SharedFlow

次にSharedFlowです。

public interface SharedFlow<out T> : Flow<T> 

Flowを継承していることがわかると思います。

また、書き換え可能なSharedFlowである、MutableSharedFlowの定義はこうです。

public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T>

SharedFlowを継承しており、またFlowCollectorという型も継承しています。

FlowCollectorはこのような定義になっています。

public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

FlowのBuilderの中にも使われているので、実はよく使っていると思います。

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>

StateFlow

次に、以前も紹介したStateFlowの定義です。

public interface StateFlow<out T> : SharedFlow<T>

以前紹介した際はFlowを継承していましたが、SharedFlowの継承に変わっています。

そして、こちらもMutableなMutableStateFlowがあります。

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> 

StateFlowだけでなく、MutableSharedFlowも継承していますね。

これらを、ベン図で表すとこのような関係性になります。

SharedFlowもStateFlowもFlowの一種であり、StateFlowはSharedFlowに内包されています。

これらの関係性を理解した上で、それぞれの機能についてみていきましょう。

それぞれの機能について

Flow

まず最初に、StateFlowでもSharedFlowでもない、通常のflowの挙動について確認したいと思います。

flowにはflowOf, asFlow, flow, channelFlow等を使って作ることができます。

いずれを使っても、作成されるのはcold streamと言われるものです。

cold streamはsubscribeするまで動作せず、subscribeした際に毎回動作します。

例えば、以下のようなflowがあり、複数回launchInを行うと

val flow = flow {
    println("emit!")
    emit(1)
}

flow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

flow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ

このように複数回、emit! と表示され、最初に渡したlambda式がsubscribeの回数分呼ばれていることがわかります。

emit!
emit!
onEach1:1
onEach2:1

この仕様は、ときに便利ですが、builderやoperator内で重たい処理をしたいときに不便になることもあります。

val flow = flow {
    // 重たい処理(API call等)
}.map {
    // 重たい処理(API call等)
}

// 重たい処理が毎回呼ばれる
flow.launchIn(GlobalScope)

また、cold streamのため、flowに対して直接値を流すことはできません。

val flow = flowOf(1)
flow.emit(2) // compile error!!

SharedFlow

上記のFlowに対して、hot streamとして登場したのがSharedFlowです。

SharedFlowの作り方として、MutableSharedFlowとshareInがあります。

MutableSharedFlow

MutableSharedFlowは、このように使うことができます。

val mutableSharedFlow = MutableSharedFlow<Int>()

mutableSharedFlow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

mutableSharedFlow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking {
    mutableSharedFlow.emit(1)
    mutableSharedFlow.emit(2)
    delay(100) // 処理が終わるまでちょっと待つ
}

先程のflowとは違い、flowに対して直接emitすることができます。

emitはsuspend functionになっており、coroutines scopeが必要なことに注意してください。

実行すると、以下のような結果が得られます。

onEach2:1
onEach1:1
onEach2:2
onEach1:2

簡単に複数箇所でsubscribe可能なstreamを作成することができました。

MutableSharedFlowには、このように3つの引数がありますが、それぞれの内容に関しては今回は省略させてください。

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

shareIn

SharedFlowを作る、もう一つの方法、shareInに関して紹介します。

shareInは通常のFlowをSharedFlowに変換するoperatorです。

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

shareInを使うことで、flowの説明のときに話した、重たい処理の問題を解決することができます。

val flow = flow {
    println("emit!")
    emit(1)
}
val sharedFlow = flow.shareIn(GlobalScope, SharingStarted.Eagerly)

sharedFlow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

sharedFlow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach2:1
onEach1:1

Flowを直接subscribeするのとは異なり、 emit! が一度しか呼ばれてないことがわかるでしょうか。

これは、SharedFlowが下流の複数subscriber間で値を共有しているためです。

started, replayの引数に関する説明は今回は省略します。

StateFlow

続いてStateFlowです。

こちらは、状態保持のための特別なSharedFlowとして認識してもらえれば大丈夫です。

同じく、MutableStateFlowかstateInで作成ができます。

今回は、MutableStateFlowの方を紹介し、stateInの方は省略します。

MutableStateFlow

このように使うことができます。

val mutableStateFlow = MutableStateFlow(0)

mutableStateFlow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 1

mutableStateFlow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 2
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach1:0
onEach1:1
onEach2:1
onEach2:2
onEach1:2

ほとんどSharedFlowと同じように扱えますが、いくつか異なる点があります。

まず、初期値が必須です。

そして、launchInしたタイミングで直近の値が1件流れてきます。

値の設定がvalueで行え、coroutines scopeがなくても問題ありません。

また、同じ値は流れない、連続して値が変更されると、最後の値のみ流れてくる、といった特徴があります。

val mutableStateFlow = MutableStateFlow(0)

mutableStateFlow.onEach {
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 1
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 1 // 同じ値を設定
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach:0
onEach:1 // 一度しか流れてこない
val mutableStateFlow = MutableStateFlow(0)

mutableStateFlow.onEach {
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
mutableStateFlow.value = 1
mutableStateFlow.value = 2 // 連続して値を変更
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach:0
onEach:2 // 1は流れてこない

あくまでも状態保持のflowとして設計されているのがわかると思います。

BroadcastChannel / ConflatedBroadcastChannel

以前まで、hot streamとしてchannelを使っていたと思います。

今回追加された変更により、BroadcastChannelはSharedFlowに、ConflatedBroadcastChannelはStateFlowに置き換えが可能です。

また、公式にSharedFlowとStateFlowがstableになり次第、BroadcastChannelとConflatedBroadcastChannelはdeprecatedになると告知されています。

この方針は変わらないと思われるので、そろそろ乗り換えを検討したほうが良さそうです。

まとめ

表にまとめると、このようになります

--FlowSharedFlowStateFlow
hot or coldcoldhothot
作成方法flowOf, asFlow, flow, channelFlowMutableSharedFlow, shareInMutableStateFlow, stateIn
値の設定不可emit(suspend), tryEmitvalue
初期値不要不要必要
その他特徴subscribeするまで動かない複数subscriber間で値を共有する複数subscriber間で値を共有する,
subscribe時に直前の値が流れる,
同じ値を流さない,
連続した値は最後だけ流す

使い分けとしては、イベントを扱いたいならSharedFlow、状態を扱いたいならStateFlowを使い、streamの変換等はFlowを使って行うことになると思います。

また、flowで流れてきたものを、shareInを使ってSharedFlowに変換することで、ストリームの共通化を行うことができます。

- 2020/11/15追記 -

- 2021/01/31追記-

Kotlin Coroutinesの解説本をZennにて販売しています。より詳しく学びたい方は、こちらも合わせて確認してみて下さい。

詳解 Kotlin Coroutines [2021] | Zenn

人気の記事

Jetpack ComposeとKotlin Coroutinesを連携させる

Layout Composableを使って複雑なレイアウトを組む【Jetpack Compose】

テスト用Dispatcherの使い分け【Kotlin Coroutines】

Flow.combineの内部実装がすごい話

Jetpack ComposeのRippleエフェクトを深堀り、カスタマイズも

Kotlin Coroutinesで共有リソースを扱う