Blog

SharedFlowの深堀り、replay, bufferって何【kotlin coroutines flow】

SharedFlow はkotlin corouteinsの1.4.0-M1 で追加された新しいFlowです。

以前、Flow, SharedFlow, StateFlowの比較を行いました。

今回はSharedFlowの詳細な仕様に関して深堀りしていきたいと思います。

MutableSharedFlow

SharedFlowを作成する方法の一つに、MutableSharedFlowを使う方法があります。

基本的には、以下のように使うことができます。

val mutableSharedFlow = MutableSharedFlow<Int>()

mutableSharedFlow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

mutableSharedFlow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking {
    mutableSharedFlow.emit(1)
    mutableSharedFlow.emit(2)
    delay(100) // 処理が終わるまでちょっと待つ
}

flowに対して直接emitできることが特徴です。

実行結果はこのようになります。

onEach2:1
onEach1:1
onEach2:2
onEach1:2

簡単に複数箇所でsubscribe可能なstreamを作成することができました。

MutableStateFlowには、replayとbufferの機能があり、このような3つの引数で操作します。

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

それぞれ挙動について説明をしていきます。

replay

新規のsubscriberがsubscribeする際に流れてくるデータの数です。

デフォルトは0になっているので、subscribe時にデータは流れてきません。

例えば3みたいに指定すると、このようなことが発生します。

val mutableSharedFlow = MutableSharedFlow<Int>(replay = 3)

runBlocking {
    mutableSharedFlow.emit(1)
    mutableSharedFlow.emit(2)
    mutableSharedFlow.emit(3)
    mutableSharedFlow.emit(4)
}

mutableSharedFlow.onEach {
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking {
    delay(100)
    mutableSharedFlow.emit(5)
}

予め1~4までのデータを流した後、subscribeし、その後また5を流しています。

この結果は、このようになります。

onEach:2
onEach:3
onEach:4
onEach:5

launchInしたタイミングで、直近の3件がreplayされます。

3件に満たない場合は、それまでに流れた数のデータだけがreplayされます。

replayされる内容はreplayCacheで取得することができ、resetReplayCacheでクリアすることができます。

val mutableSharedFlow = MutableSharedFlow<Int>(replay = 3)

runBlocking {
    mutableSharedFlow.emit(1)
    mutableSharedFlow.emit(2)
    mutableSharedFlow.emit(3)
    mutableSharedFlow.emit(4)
}

println(mutableSharedFlow.replayCache)
mutableSharedFlow.resetReplayCache()
println(mutableSharedFlow.replayCache)
[2, 3, 4]
[]

一方、Androidアプリ開発において、このreplayを使うことはあまりないかな、というのが正直な感想です。

buffer

また、SharedFlowにはsubscribe時に時間がかかったときにバッファリングしてくれる、bufferという機能があります。

先程のreplyとextraBufferCapacityの合計がbufferの数になり、onBufferOverflowはbufferサイズを超えたときの挙動を設定します。

BufferOverflowにはSUSPEND, DROP_OLDEST, DROP_LATESTの3つがあり、デフォルトはSUSPENDです。

BufferOverflow.SUSPEND

まず、デフォルトの挙動を確認してみましょう。

今回は、onEachの中で delay(500) を行っています。

val mutableSharedFlow = MutableSharedFlow<Int>()

mutableSharedFlow.onEach {
    delay(500)
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking {
    val duration = measureTimeMillis {
        mutableSharedFlow.emit(1)
        mutableSharedFlow.emit(2)
    }
    delay(1500) // 処理が終わるまで待つ
    println("duration:${duration}ms")
}

この場合、 mutableSharedFlow.emit(2) がbufferが開くまでsuspend状態になり、2回emitするのに515msかかりました。

onEach:1
onEach:2
duration:515ms

次に、buffer sizeを増やしてみます。

val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 2
)

mutableSharedFlow.onEach {
    delay(500)
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking {
    val duration = measureTimeMillis {
        mutableSharedFlow.emit(1)
        mutableSharedFlow.emit(2)
    }
    delay(1500) // 処理が終わるまで待つ
    println("duration:${duration}ms")
}
onEach:1
onEach:2
duration:0ms

バッファサイズ内であればSharedFlow内でバッファリングしてくれるため、emitに時間はかかりません(0ms)。

これらの挙動は、複数箇所でsubscribeした際に特に違いが出ます。

capacityが0のとき、emitは時間のかかるsubscriberを待つため、2つのsubscriberはほとんど同時に実行されます。

val mutableSharedFlow = MutableSharedFlow<Int>()

mutableSharedFlow.onEach {
    println("onEach1:$it")
    delay(500)
}.launchIn(GlobalScope)

mutableSharedFlow.onEach {
    println("onEach2:$it")
    delay(100)
}.launchIn(GlobalScope)

runBlocking {
    mutableSharedFlow.emit(1)
    mutableSharedFlow.emit(2)
    mutableSharedFlow.emit(3)
    delay(1500)
}
onEach1:1
onEach2:1
onEach2:2
onEach1:2
onEach2:3
onEach1:3

capacityが多いとき、emitは先に行われて、SharedFlow内でバッファリングするため、時間のかからないsubscriberのほうが早く処理を終えます。

val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 3
)

mutableSharedFlow.onEach {
    println("onEach1:$it")
    delay(500)
}.launchIn(GlobalScope)

mutableSharedFlow.onEach {
    println("onEach2:$it")
    delay(100)
}.launchIn(GlobalScope)

runBlocking {
    mutableSharedFlow.emit(1)
    mutableSharedFlow.emit(2)
    mutableSharedFlow.emit(3)
    delay(1500)
}
onEach2:1
onEach1:1
onEach2:2
onEach2:3
onEach1:2
onEach1:3

どれくらいの頻度でデータが流れてくるのか、時間のかかるsubscriberはないか等をもとに、適切なbuffer sizeを設定していく必要がありそうです。

BufferOverflow.DROP_OLDEST, BufferOverflow.DROP_LATEST

バッファサイズがいっぱいだったとき、デフォルトのBufferOverflow.SUSPENDでは、emitがsuspend状態になることがわかりました。

では、残りのDROP_OLDEST、DROP_LATESTを指定するとどうなるのでしょう?

DROP_OLDESTは、バッファがいっぱいだったとき、古いものを削除します。

DROP_OLDEST、DROP_LATESTではbuffer size(extraBufferCapacityかreplay)を1以上にする必要があるので注意してください。

val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

mutableSharedFlow.onEach {
    delay(500)
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking {
    mutableSharedFlow.emit(1)
    delay(10)
    mutableSharedFlow.emit(2)
    delay(10)
    mutableSharedFlow.emit(3)
    delay(1500) // 処理が終わるまで待つ
}

この場合、1を処理中に流れてきた2は、1度bufferに貯められますが、その後3が流れてきたタイミングで2は削除されるため、結果的に1と3のみが処理されます。

onEach:1
onEach:3

DROP_LATESTはその逆で、bufferに入らない新しい値が破棄されます。

val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 1,
    onBufferOverflow = BufferOverflow.DROP_LATEST
)

mutableSharedFlow.onEach {
    delay(500)
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking {
    mutableSharedFlow.emit(1)
    delay(10)
    mutableSharedFlow.emit(2)
    delay(10)
    mutableSharedFlow.emit(3)
    delay(1500) // 処理が終わるまで待つ
}

今回のケースでは、1の処理中に2がバッファに入り、3が流れてきたときはバッファがいっぱいのため、3が破棄されます。

onEach:1
onEach:2

必要に応じて使い分けていく必要がありそうです。

個人的には、subscribeする側でconflatedやbufferしてあげたほうがミスが起こりにくいかなとも思っています。

tryEmit

emitはsuspend functionですが、tryEmitという通常関数もあります。

こちらは、emit可能ならemitしtrueを返しますが、suspendの必要があればemitせずにfalseを返します。

val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 1
)

mutableSharedFlow.onEach {
    delay(500)
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking {
    val result1 = mutableSharedFlow.tryEmit(1)
    println("emit1:$result1")
    delay(10)
    val result2 = mutableSharedFlow.tryEmit(2)
    println("emit2:$result2")
    delay(10)
    val result3 = mutableSharedFlow.tryEmit(3)
    println("emit3:$result3")
    delay(1500) // 処理が終わるまで待つ
}

この場合は1回目と2回目のemitは成功しますが、3回目は失敗し、emitされません。

emit1:true
emit2:true
emit3:false
onEach:1
onEach:2

ちなみにbuffer capacityが0だと毎回falseになり

val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 0
)

mutableSharedFlow.onEach {
    delay(500)
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking {
    val result1 = mutableSharedFlow.tryEmit(1)
    println("emit1:$result1")
    delay(10)
    val result2 = mutableSharedFlow.tryEmit(2)
    println("emit2:$result2")
    delay(10)
    val result3 = mutableSharedFlow.tryEmit(3)
    println("emit3:$result3")
    delay(1500) // 処理が終わるまで待つ
}
emit1:false
emit2:false
emit3:false

DROP_OLDEST, DROP_LATESTを指定した場合は、trueが返ってきますが、bufferがいっぱいの場合は普通に破棄されます。

val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

mutableSharedFlow.onEach {
    delay(500)
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking {
    val result1 = mutableSharedFlow.tryEmit(1)
    println("emit1:$result1")
    delay(10)
    val result2 = mutableSharedFlow.tryEmit(2)
    println("emit2:$result2")
    delay(10)
    val result3 = mutableSharedFlow.tryEmit(3)
    println("emit3:$result3")
    delay(1500) // 処理が終わるまで待つ
}
emit1:true
emit2:true
emit3:true
onEach:1
onEach:2

shareIn

SharedFlowを作るもう一つの方法がsharedInです。

通常のFlowをSharedFlowに変換することができ、基本的な使い方としてはこのような感じになると思います。

val flow = flow {
    println("emit!")
    emit(1)
}
val sharedFlow = flow.shareIn(GlobalScope, SharingStarted.Eagerly)

sharedFlow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

sharedFlow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach2:1
onEach1:1

Flowを直接subscribeするのとは異なり、値がSharedFlow内で共有されるため、emit! が一度しか呼ばれないことが特徴です。

こちらも、定義はこの様になっており、replayとstartedを指定することができます。

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

replay

replayに関しては、上記で述べたものと全く同じで、subscribe時にしてした個数分だけreplayしてくれます。

val flow = flowOf(1, 2, 3, 4)
val sharedFlow = flow.shareIn(
    scope = GlobalScope,
    started = SharingStarted.Eagerly,
    replay = 3
)

sharedFlow.onEach {
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach:2
onEach:3
onEach:4

started

sharedFlowの開始のタイミングを変更することができます。

Eagerly, Lazily, WhileSubscribedから選ぶことができます。

SharingStarted.Eagerly

これはすぐにhot stream化します。

そのため、このように作成したSharedFlowをsubscribeしなくても、 emit が実行されます。

val flow = flow {
    println("emit!")
    emit(1)
}
flow.shareIn(
    scope = GlobalScope,
    started = SharingStarted.Eagerly
)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!

SharingStarted.Lazily

Lazilyは初回subscribeされるまで待ちます。

なので、先程のように一度もsubscribeしないと、 emit は実行されません。

val flow = flow {
    println("emit!")
    emit(1)
}
flow.shareIn(
    scope = GlobalScope,
    started = SharingStarted.Lazily
)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
// 何も表示されない

このようにsubscribeすることで、初めて emit が動作します。

val flow = flow {
    println("emit!")
    emit(1)
}
val sharedFlow = flow.shareIn(
    scope = GlobalScope,
    started = SharingStarted.Lazily
)

sharedFlow.onEach {
    println("onEach:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach:1

SharingStarted.WhileSubscribed

こちらは少し複雑です。

最初のsubscribe時に起動し、1つもsubscribeされていないと、停止します。

例えばこのように、2回subscribeした場合、emit が2回呼ばれます。

val flow = flow {
    println("emit!")
    emit(1)
}
val sharedFlow = flow.shareIn(
    scope = GlobalScope,
    started = SharingStarted.WhileSubscribed()
)

// 1回目subscribe
val job = sharedFlow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) }

// 1回目のsubscribeをキャンセル
job.cancel()

runBlocking { delay(100) }

// 2回目subscribe
sharedFlow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach1:1
emit!
onEach2:1

停止するまでの時間は指定することができ ( stopTimeoutMillis )、このように設定することで、timeout時間より短い停止は無視されます。

val flow = flow {
    println("emit!")
    emit(1)
}
val sharedFlow = flow.shareIn(
    scope = GlobalScope,
    started = SharingStarted.WhileSubscribed(
        stopTimeoutMillis = 200
    )
)

// 1回目subscribe
val job = sharedFlow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) }

// キャンセル
job.cancel()

runBlocking { delay(100) }

// 2回目subscribe
sharedFlow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach1:1

また、replay countを設定している場合、停止してもreplayは共有されます。

この場合、2つめのsubscriberには2回 1 が送られます

val flow = flow {
    println("emit!")
    emit(1)
}
val sharedFlow = flow.shareIn(
    scope = GlobalScope,
    started = SharingStarted.WhileSubscribed(),
    replay = 1
)

// 1回目subscribe
val job = sharedFlow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) }

// キャンセル
job.cancel()

runBlocking { delay(100) }

// 2回目subscribe
sharedFlow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach1:1
onEach2:1
emit!
onEach2:1

replayExpirationMillis を設定することで、停止後どれくらいでreplayを破棄するか指定することができます。

このように0にすると、replayは共有されません。

val flow = flow {
    println("emit!")
    emit(1)
}
val sharedFlow = flow.shareIn(
    scope = GlobalScope,
    started = SharingStarted.WhileSubscribed(
        replayExpirationMillis = 0
    ),
    replay = 1
)

// 1回目subscribe
val job = sharedFlow.onEach {
    println("onEach1:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) }

// キャンセル
job.cancel()

runBlocking { delay(100) }

// 2回目subscribe
sharedFlow.onEach {
    println("onEach2:$it")
}.launchIn(GlobalScope)

runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach1:1
emit!
onEach2:1

onSubscription

SharedFlowには、一つだけonSubscriptionという新しいoperatorが追加されています。

fun <T> SharedFlow<T>.onSubscription(
    action: suspend FlowCollector<T>.() -> Unit
): SharedFlow<T>

新たなsubscriberが登録された際に動作し、onStartと非常によく似ていますが、内部でMutableSharedFlowを操作したときの挙動が異なります。

このように、onSubscriptionで1、onStartで2を流した場合、その後のonEachではonSubscriptionで流した1のみ受け取ることができます。(buffer sizeが0だとsuspend状態になってonEachが呼ばれないことに気をつけてください)

val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 10
)
mutableSharedFlow
    .onSubscription {
        println("onSubscription")
        mutableSharedFlow.emit(1)
    }
    .onStart {
        println("onStart")
        mutableSharedFlow.emit(2)
    }
    .onEach {
        println("onEach:$it")
    }
    .launchIn(GlobalScope)

runBlocking { delay(100) }  // 処理が終わるまでちょっと待つ
onStart
onSubscription
onEach:1

これは、onStartはsubscriberが登録される直前に、onSubscriptionはsubscriberが登録された直後に呼ばれることに起因します。

つまり、onStartでemitしても、その後subscribeされるので、値が流れてこないわけです。

あまりこのようなケースは多くないと思いますが、若干挙動が複雑なため、注意が必要です。

BroadcastChannelについて

SharedFlowが登場するまで利用していた BroadcastChannel は、SharedFlowがstableになるタイミングでdeprecatedになることが決定しています。

SharedFlowはBroadcastChannelと比較し、シンプルになり、replayの機能が追加されています。

一方で、closeすることができなくなっているので、注意してください。

その他注意

Flowで使えていた以下のoperatorは、使用しても変化が起こらないため、使用できなくなっています。

また、buffer operatorをRENDEZVOUSに設定しても、変化が起こらないため、注意が必要です。

まとめ

今回はSharedFlowに関して深堀りしてみました。

ほとんど全ての機能に関して解説できたと思います。

SharedFlowは非常に機能が多く、また強力なため、利用は注意しながら行う必要があると思います。

個人的には、あまりストリームのみで書くことにとらわれず、可能な限り通常のメソッドやクラスに切り出すほうが健全だと感じています。

ぜひアプリ開発においてSharedFlowの効果的な使い方があれば教えて下さい。

- 2021/01/31追記-

Kotlin Coroutinesの解説本をZennにて販売しています。より詳しく学びたい方は、こちらも合わせて確認してみて下さい。

詳解 Kotlin Coroutines [2021] | Zenn

人気の記事

kotlin coroutinesのFlow, SharedFlow, StateFlowを整理する

Jetpack ComposeとKotlin Coroutinesを連携させる

Layout Composableを使って複雑なレイアウトを組む【Jetpack Compose】

テスト用Dispatcherの使い分け【Kotlin Coroutines】

Flow.combineの内部実装がすごい話

Jetpack ComposeのRippleエフェクトを深堀り、カスタマイズも