SharedFlow はkotlin corouteinsの1.4.0-M1 で追加された新しいFlowです。
以前、Flow, SharedFlow, StateFlowの比較を行いました。
今回はSharedFlowの詳細な仕様に関して深堀りしていきたいと思います。
SharedFlowを作成する方法の一つに、MutableSharedFlowを使う方法があります。
基本的には、以下のように使うことができます。
val mutableSharedFlow = MutableSharedFlow<Int>()
mutableSharedFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
mutableSharedFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking {
mutableSharedFlow.emit(1)
mutableSharedFlow.emit(2)
delay(100) // 処理が終わるまでちょっと待つ
}
flowに対して直接emitできることが特徴です。
実行結果はこのようになります。
onEach2:1
onEach1:1
onEach2:2
onEach1:2
簡単に複数箇所でsubscribe可能なstreamを作成することができました。
MutableStateFlowには、replayとbufferの機能があり、このような3つの引数で操作します。
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
それぞれ挙動について説明をしていきます。
新規のsubscriberがsubscribeする際に流れてくるデータの数です。
デフォルトは0になっているので、subscribe時にデータは流れてきません。
例えば3みたいに指定すると、このようなことが発生します。
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 3)
runBlocking {
mutableSharedFlow.emit(1)
mutableSharedFlow.emit(2)
mutableSharedFlow.emit(3)
mutableSharedFlow.emit(4)
}
mutableSharedFlow.onEach {
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking {
delay(100)
mutableSharedFlow.emit(5)
}
予め1~4までのデータを流した後、subscribeし、その後また5を流しています。
この結果は、このようになります。
onEach:2
onEach:3
onEach:4
onEach:5
launchInしたタイミングで、直近の3件がreplayされます。
3件に満たない場合は、それまでに流れた数のデータだけがreplayされます。
replayされる内容はreplayCacheで取得することができ、resetReplayCacheでクリアすることができます。
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 3)
runBlocking {
mutableSharedFlow.emit(1)
mutableSharedFlow.emit(2)
mutableSharedFlow.emit(3)
mutableSharedFlow.emit(4)
}
println(mutableSharedFlow.replayCache)
mutableSharedFlow.resetReplayCache()
println(mutableSharedFlow.replayCache)
[2, 3, 4]
[]
一方、Androidアプリ開発において、このreplayを使うことはあまりないかな、というのが正直な感想です。
また、SharedFlowにはsubscribe時に時間がかかったときにバッファリングしてくれる、bufferという機能があります。
先程のreplyとextraBufferCapacityの合計がbufferの数になり、onBufferOverflowはbufferサイズを超えたときの挙動を設定します。
BufferOverflowにはSUSPEND, DROP_OLDEST, DROP_LATESTの3つがあり、デフォルトはSUSPENDです。
まず、デフォルトの挙動を確認してみましょう。
今回は、onEachの中で delay(500)
を行っています。
val mutableSharedFlow = MutableSharedFlow<Int>()
mutableSharedFlow.onEach {
delay(500)
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking {
val duration = measureTimeMillis {
mutableSharedFlow.emit(1)
mutableSharedFlow.emit(2)
}
delay(1500) // 処理が終わるまで待つ
println("duration:${duration}ms")
}
この場合、 mutableSharedFlow.emit(2)
がbufferが開くまでsuspend状態になり、2回emitするのに515msかかりました。
onEach:1
onEach:2
duration:515ms
次に、buffer sizeを増やしてみます。
val mutableSharedFlow = MutableSharedFlow<Int>(
extraBufferCapacity = 2
)
mutableSharedFlow.onEach {
delay(500)
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking {
val duration = measureTimeMillis {
mutableSharedFlow.emit(1)
mutableSharedFlow.emit(2)
}
delay(1500) // 処理が終わるまで待つ
println("duration:${duration}ms")
}
onEach:1
onEach:2
duration:0ms
バッファサイズ内であればSharedFlow内でバッファリングしてくれるため、emitに時間はかかりません(0ms)。
これらの挙動は、複数箇所でsubscribeした際に特に違いが出ます。
capacityが0のとき、emitは時間のかかるsubscriberを待つため、2つのsubscriberはほとんど同時に実行されます。
val mutableSharedFlow = MutableSharedFlow<Int>()
mutableSharedFlow.onEach {
println("onEach1:$it")
delay(500)
}.launchIn(GlobalScope)
mutableSharedFlow.onEach {
println("onEach2:$it")
delay(100)
}.launchIn(GlobalScope)
runBlocking {
mutableSharedFlow.emit(1)
mutableSharedFlow.emit(2)
mutableSharedFlow.emit(3)
delay(1500)
}
onEach1:1
onEach2:1
onEach2:2
onEach1:2
onEach2:3
onEach1:3
capacityが多いとき、emitは先に行われて、SharedFlow内でバッファリングするため、時間のかからないsubscriberのほうが早く処理を終えます。
val mutableSharedFlow = MutableSharedFlow<Int>(
extraBufferCapacity = 3
)
mutableSharedFlow.onEach {
println("onEach1:$it")
delay(500)
}.launchIn(GlobalScope)
mutableSharedFlow.onEach {
println("onEach2:$it")
delay(100)
}.launchIn(GlobalScope)
runBlocking {
mutableSharedFlow.emit(1)
mutableSharedFlow.emit(2)
mutableSharedFlow.emit(3)
delay(1500)
}
onEach2:1
onEach1:1
onEach2:2
onEach2:3
onEach1:2
onEach1:3
どれくらいの頻度でデータが流れてくるのか、時間のかかるsubscriberはないか等をもとに、適切なbuffer sizeを設定していく必要がありそうです。
バッファサイズがいっぱいだったとき、デフォルトのBufferOverflow.SUSPENDでは、emitがsuspend状態になることがわかりました。
では、残りのDROP_OLDEST、DROP_LATESTを指定するとどうなるのでしょう?
DROP_OLDESTは、バッファがいっぱいだったとき、古いものを削除します。
DROP_OLDEST、DROP_LATESTではbuffer size(extraBufferCapacityかreplay)を1以上にする必要があるので注意してください。
val mutableSharedFlow = MutableSharedFlow<Int>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
mutableSharedFlow.onEach {
delay(500)
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking {
mutableSharedFlow.emit(1)
delay(10)
mutableSharedFlow.emit(2)
delay(10)
mutableSharedFlow.emit(3)
delay(1500) // 処理が終わるまで待つ
}
この場合、1を処理中に流れてきた2は、1度bufferに貯められますが、その後3が流れてきたタイミングで2は削除されるため、結果的に1と3のみが処理されます。
onEach:1
onEach:3
DROP_LATESTはその逆で、bufferに入らない新しい値が破棄されます。
val mutableSharedFlow = MutableSharedFlow<Int>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_LATEST
)
mutableSharedFlow.onEach {
delay(500)
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking {
mutableSharedFlow.emit(1)
delay(10)
mutableSharedFlow.emit(2)
delay(10)
mutableSharedFlow.emit(3)
delay(1500) // 処理が終わるまで待つ
}
今回のケースでは、1の処理中に2がバッファに入り、3が流れてきたときはバッファがいっぱいのため、3が破棄されます。
onEach:1
onEach:2
必要に応じて使い分けていく必要がありそうです。
個人的には、subscribeする側でconflatedやbufferしてあげたほうがミスが起こりにくいかなとも思っています。
emitはsuspend functionですが、tryEmitという通常関数もあります。
こちらは、emit可能ならemitしtrueを返しますが、suspendの必要があればemitせずにfalseを返します。
val mutableSharedFlow = MutableSharedFlow<Int>(
extraBufferCapacity = 1
)
mutableSharedFlow.onEach {
delay(500)
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking {
val result1 = mutableSharedFlow.tryEmit(1)
println("emit1:$result1")
delay(10)
val result2 = mutableSharedFlow.tryEmit(2)
println("emit2:$result2")
delay(10)
val result3 = mutableSharedFlow.tryEmit(3)
println("emit3:$result3")
delay(1500) // 処理が終わるまで待つ
}
この場合は1回目と2回目のemitは成功しますが、3回目は失敗し、emitされません。
emit1:true
emit2:true
emit3:false
onEach:1
onEach:2
ちなみにbuffer capacityが0だと毎回falseになり
val mutableSharedFlow = MutableSharedFlow<Int>(
extraBufferCapacity = 0
)
mutableSharedFlow.onEach {
delay(500)
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking {
val result1 = mutableSharedFlow.tryEmit(1)
println("emit1:$result1")
delay(10)
val result2 = mutableSharedFlow.tryEmit(2)
println("emit2:$result2")
delay(10)
val result3 = mutableSharedFlow.tryEmit(3)
println("emit3:$result3")
delay(1500) // 処理が終わるまで待つ
}
emit1:false
emit2:false
emit3:false
DROP_OLDEST, DROP_LATESTを指定した場合は、trueが返ってきますが、bufferがいっぱいの場合は普通に破棄されます。
val mutableSharedFlow = MutableSharedFlow<Int>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
mutableSharedFlow.onEach {
delay(500)
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking {
val result1 = mutableSharedFlow.tryEmit(1)
println("emit1:$result1")
delay(10)
val result2 = mutableSharedFlow.tryEmit(2)
println("emit2:$result2")
delay(10)
val result3 = mutableSharedFlow.tryEmit(3)
println("emit3:$result3")
delay(1500) // 処理が終わるまで待つ
}
emit1:true
emit2:true
emit3:true
onEach:1
onEach:2
SharedFlowを作るもう一つの方法がsharedInです。
通常のFlowをSharedFlowに変換することができ、基本的な使い方としてはこのような感じになると思います。
val flow = flow {
println("emit!")
emit(1)
}
val sharedFlow = flow.shareIn(GlobalScope, SharingStarted.Eagerly)
sharedFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
sharedFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach2:1
onEach1:1
Flowを直接subscribeするのとは異なり、値がSharedFlow内で共有されるため、emit!
が一度しか呼ばれないことが特徴です。
こちらも、定義はこの様になっており、replayとstartedを指定することができます。
fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
replayに関しては、上記で述べたものと全く同じで、subscribe時にしてした個数分だけreplayしてくれます。
val flow = flowOf(1, 2, 3, 4)
val sharedFlow = flow.shareIn(
scope = GlobalScope,
started = SharingStarted.Eagerly,
replay = 3
)
sharedFlow.onEach {
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onEach:2
onEach:3
onEach:4
sharedFlowの開始のタイミングを変更することができます。
Eagerly, Lazily, WhileSubscribedから選ぶことができます。
これはすぐにhot stream化します。
そのため、このように作成したSharedFlowをsubscribeしなくても、 emit
が実行されます。
val flow = flow {
println("emit!")
emit(1)
}
flow.shareIn(
scope = GlobalScope,
started = SharingStarted.Eagerly
)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
Lazilyは初回subscribeされるまで待ちます。
なので、先程のように一度もsubscribeしないと、 emit
は実行されません。
val flow = flow {
println("emit!")
emit(1)
}
flow.shareIn(
scope = GlobalScope,
started = SharingStarted.Lazily
)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
// 何も表示されない
このようにsubscribeすることで、初めて emit
が動作します。
val flow = flow {
println("emit!")
emit(1)
}
val sharedFlow = flow.shareIn(
scope = GlobalScope,
started = SharingStarted.Lazily
)
sharedFlow.onEach {
println("onEach:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach:1
こちらは少し複雑です。
最初のsubscribe時に起動し、1つもsubscribeされていないと、停止します。
例えばこのように、2回subscribeした場合、emit
が2回呼ばれます。
val flow = flow {
println("emit!")
emit(1)
}
val sharedFlow = flow.shareIn(
scope = GlobalScope,
started = SharingStarted.WhileSubscribed()
)
// 1回目subscribe
val job = sharedFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) }
// 1回目のsubscribeをキャンセル
job.cancel()
runBlocking { delay(100) }
// 2回目subscribe
sharedFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach1:1
emit!
onEach2:1
停止するまでの時間は指定することができ ( stopTimeoutMillis
)、このように設定することで、timeout時間より短い停止は無視されます。
val flow = flow {
println("emit!")
emit(1)
}
val sharedFlow = flow.shareIn(
scope = GlobalScope,
started = SharingStarted.WhileSubscribed(
stopTimeoutMillis = 200
)
)
// 1回目subscribe
val job = sharedFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) }
// キャンセル
job.cancel()
runBlocking { delay(100) }
// 2回目subscribe
sharedFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach1:1
また、replay countを設定している場合、停止してもreplayは共有されます。
この場合、2つめのsubscriberには2回 1
が送られます
val flow = flow {
println("emit!")
emit(1)
}
val sharedFlow = flow.shareIn(
scope = GlobalScope,
started = SharingStarted.WhileSubscribed(),
replay = 1
)
// 1回目subscribe
val job = sharedFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) }
// キャンセル
job.cancel()
runBlocking { delay(100) }
// 2回目subscribe
sharedFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach1:1
onEach2:1
emit!
onEach2:1
replayExpirationMillis
を設定することで、停止後どれくらいでreplayを破棄するか指定することができます。
このように0にすると、replayは共有されません。
val flow = flow {
println("emit!")
emit(1)
}
val sharedFlow = flow.shareIn(
scope = GlobalScope,
started = SharingStarted.WhileSubscribed(
replayExpirationMillis = 0
),
replay = 1
)
// 1回目subscribe
val job = sharedFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) }
// キャンセル
job.cancel()
runBlocking { delay(100) }
// 2回目subscribe
sharedFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
onEach1:1
emit!
onEach2:1
SharedFlowには、一つだけonSubscriptionという新しいoperatorが追加されています。
fun <T> SharedFlow<T>.onSubscription(
action: suspend FlowCollector<T>.() -> Unit
): SharedFlow<T>
新たなsubscriberが登録された際に動作し、onStartと非常によく似ていますが、内部でMutableSharedFlowを操作したときの挙動が異なります。
このように、onSubscriptionで1、onStartで2を流した場合、その後のonEachではonSubscriptionで流した1のみ受け取ることができます。(buffer sizeが0だとsuspend状態になってonEachが呼ばれないことに気をつけてください)
val mutableSharedFlow = MutableSharedFlow<Int>(
extraBufferCapacity = 10
)
mutableSharedFlow
.onSubscription {
println("onSubscription")
mutableSharedFlow.emit(1)
}
.onStart {
println("onStart")
mutableSharedFlow.emit(2)
}
.onEach {
println("onEach:$it")
}
.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
onStart
onSubscription
onEach:1
これは、onStartはsubscriberが登録される直前に、onSubscriptionはsubscriberが登録された直後に呼ばれることに起因します。
つまり、onStartでemitしても、その後subscribeされるので、値が流れてこないわけです。
あまりこのようなケースは多くないと思いますが、若干挙動が複雑なため、注意が必要です。
SharedFlowが登場するまで利用していた BroadcastChannel は、SharedFlowがstableになるタイミングでdeprecatedになることが決定しています。
SharedFlowはBroadcastChannelと比較し、シンプルになり、replayの機能が追加されています。
一方で、closeすることができなくなっているので、注意してください。
Flowで使えていた以下のoperatorは、使用しても変化が起こらないため、使用できなくなっています。
また、buffer operatorをRENDEZVOUSに設定しても、変化が起こらないため、注意が必要です。
今回はSharedFlowに関して深堀りしてみました。
ほとんど全ての機能に関して解説できたと思います。
SharedFlowは非常に機能が多く、また強力なため、利用は注意しながら行う必要があると思います。
個人的には、あまりストリームのみで書くことにとらわれず、可能な限り通常のメソッドやクラスに切り出すほうが健全だと感じています。
ぜひアプリ開発においてSharedFlowの効果的な使い方があれば教えて下さい。
- 2021/01/31追記-
Kotlin Coroutinesの解説本をZennにて販売しています。より詳しく学びたい方は、こちらも合わせて確認してみて下さい。