Blog

kotlin coroutines flowのテストを快適に書く

StateFlowもリリースされ、kotlin coroutines flowがますます存在感を増してきています。

一方、単体テスト等を書こうとした際、こういったストリームをテストするのは比較的難しいです。

今回は、僕が普段実際に使ってるテスト用utilとその使い方について紹介したいと思います。

toList()

一番シンプルなのはtoListでlistにして、assertする形です。

シンプルなオペレータ等をテストする際はこれで十分だと思います。

coroutines scopeをmockする際はrunBlockingTestを使うと良いと思います。

// テストしたい関数
fun mapToString(flow: Flow<Int>): Flow<String> {
    return flow.map { it.toString() }
}
// テストコード
@Test
fun mapToStringTest() = runBlockingTest {
    val flow = flowOf(1, 2, 3)
    val history = mapToString(flow).toList()
    assertThat(history).isEqualTo(listOf("1", "2", "3"))
}

toList { }

上記の toList() では、flowがcloseされないとlistが作成されません。

また、操作を加えたときにどのようなflowが流れるか、みたいなtestはかけません。

そのため、以下のようなutilを作成しました。

// テスト用util
suspend fun <T> Flow<T>.toList(
    block: suspend CoroutineScope.() -> Unit
): List<T> {
    val list = mutableListOf<T>()
    coroutineScope {
        val job = launch {
            this@toList.collect {
                list.add(it)
            }
        }
        block.invoke(this)
        job.cancel()
    }
    return list
}

少し複雑ですが、blockを渡し、そのblockを実行中に流れたflowをlist化してくれます。

例えば、以下のようなコードをテストする際に使えます。

// テストしたいクラス
class IntRepository {
    private val channel = BroadcastChannel<Int>(Channel.BUFFERED)
    val flow: Flow<Int> = channel.asFlow()

    suspend fun fetch() {
        channel.send(1)
    }
}
// テストコード
@Test
fun fetchTest() = runBlockingTest {
    val target = IntRepository()
    val history = target.flow.toList {
        target.fetch()
    }
    assertThat(history).isEqualTo(listOf(1))
}

fetch() をしたことにより、flowにデータが流れることを確認することができます。

TestFlow

上記のtoListでほとんどのケースをカバーすることはできると思いますが、各stepで流れてるflowを確認したい、複数のflowを同時に確認したい、という複雑なテストを書きたい場合もあるとおもいます。

そのため、以下のようなtest用クラスを作成しました。

// テスト用util
class TestFlow<T>(
    flow: Flow<T>,
    scope: CoroutineScope
) {
    private val _history = mutableListOf<T>()
    val history: List<T> = _history
    private val job: Job

    init {
        job = flow.onEach { _history.add(it) }
            .launchIn(scope)
    }

    fun close() {
        job.cancel()
    }

    fun getHistoryWithClose(): List<T> {
        close()
        return history
    }
}

fun <T> Flow<T>.toTest(scope: CoroutineScope): TestFlow<T> =
    TestFlow(this, scope)

これにより、任意のタイミングで各flowのhistoryを確認することができます。

各stepでflowの状態を確認する

以下のようなクラスをtestしたいとします。

// テストしたいクラス
class IntRepository {
    private val channel = BroadcastChannel<Int>(Channel.BUFFERED)
    val flow: Flow<Int> = channel.asFlow()

    suspend fun step1() {
        channel.send(1)
    }

    suspend fun step2() {
        channel.send(2)
    }

    suspend fun step3() {
        channel.send(3)
    }
}

このように各stepでどのようなflowが流れてくるのか確認することができます。

// テストしたいクラス
@Test
fun stepTest() = runBlockingTest {
    val target = IntRepository()
    val testFlow = target.flow.toTest(this)

    assertThat(testFlow.history).isEqualTo(emptyList<Int>())
    target.step1()
    assertThat(testFlow.history).isEqualTo(listOf(1))
    target.step2()
    assertThat(testFlow.history).isEqualTo(listOf(1, 2))
    target.step3()
    assertThat(testFlow.history).isEqualTo(listOf(1, 2, 3))

    testFlow.close()
}

最後にcloseするのを忘れないでください。

Test finished with active jobs と怒られます。

複数のflowを同時に確認する。

以下のようにflowが2つ出力されるクラスをテストしたいとします。

// テストしたいクラス
class IntRepository {
    private val channel = BroadcastChannel<Int>(Channel.BUFFERED)
    val flowInt: Flow<Int> = channel.asFlow()
    val flowSum: Flow<Int> = flowInt.scan(0) { acc, value ->
        acc + value
    }

    suspend fun fetch() {
        channel.send(1)
    }
}

このように2回 toTest() を使うことで同時にテストすることができます。

@Test
fun fetchTest() = runBlockingTest {
    val target = IntRepository()
    val intTest = target.flowInt.toTest(this)
    val sumTest = target.flowSum.toTest(this)

    target.fetch()
    target.fetch()

    val intHistory = intTest.getHistoryWithClose()
    assertThat(intHistory).isEqualTo(listOf(1, 1))

    val sumHistory = sumTest.getHistoryWithClose()
    assertThat(sumHistory).isEqualTo(listOf(0, 1, 2))
}

history の代わりに getHistoryWithClose() を使うことで、即座にcloseしてくれます。

エラーが流れてくるflowのテスト

flowはrxと同じく、エラー(Throwable) を流すこともできます。

僕自身はあまり使ってませんが、以下のようにThrowableも返す toList() を用意しています。

// テスト用util
suspend fun <T> Flow<T>.toListAndThrowable(): Pair<List<T>, Throwable?> {
    var error: Throwable? = null
    val list = catch { e ->
        error = e
    }.toList()
    return list to error
}

以下のように使うことができます。

@Test
fun errorTest() = runBlockingTest {
    val flow = flow {
        emit(1)
        emit(2)
        throw Exception("error!")
    }
    val (history, error) = flow.toListAndThrowable()

    assertThat(history).isEqualTo(listOf(1, 2))
    assertThat(error).isNotNull()
}

上記の toList {}TestFlow でも、必要であればerrorをcheckできるようにするのが良いと思います。

まとめ

今回、普段僕が行っているcoroutines flowのtestの書き方について紹介しました。

かなり自己流みたいなところがありますが、概ね問題なく機能していると思います。

今後、公式からもテスト用のutilが提供されていく可能性が高いので、ぜひそちらも確認してください。

- 2021/01/31追記-

Kotlin Coroutinesの解説本をZennにて販売しています。より詳しく学びたい方は、こちらも合わせて確認してみて下さい。

詳解 Kotlin Coroutines [2021] | Zenn

人気の記事

kotlin coroutinesのFlow, SharedFlow, StateFlowを整理する

Jetpack ComposeとKotlin Coroutinesを連携させる

Layout Composableを使って複雑なレイアウトを組む【Jetpack Compose】

テスト用Dispatcherの使い分け【Kotlin Coroutines】

Flow.combineの内部実装がすごい話

Jetpack ComposeのRippleエフェクトを深堀り、カスタマイズも