Skip to content

SharedFlow random lose message when emit in cancelled coroutine #3774

Open
@wxbsocial

Description

Hello everyone, I encountered this problem in the project, in order to highlight the problem, the following is the simplified code

Kotlin: 1.8.21
excepted: recv:100,emit:100
but: recv:76,emit:100

@Test
fun shared_flow_random_lose_message_when_emit_in_cancelled_coroutine(): Unit = runBlocking {
    val signalBus = MutableSharedFlow<Int>()

    val scope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())

    var oneTimes = 0
    var emitOneTimes = 0

    signalBus.onEach {
        when (it) {
            1 -> oneTimes++
            else -> {}
        }

    }.stateIn(scope, SharingStarted.Eagerly, 0)

    delay(2_000) // wait stateIn inited


    repeat(100) {
        val job = foo(scope, signalBus)
        delay(100) // wait a moment
        job.cancelAndJoin()
        signalBus.emit(1)
        emitOneTimes++
    }

    delay(1_000)
    Log.d(TAG, "recv:$oneTimes,emit:$emitOneTimes")

    assertEquals(100, oneTimes) // failed < 100
    assertEquals(100, emitOneTimes)
}


private suspend fun foo(scope: CoroutineScope, shared: MutableSharedFlow<Int>): Job {

    return scope.launch {
        try {
            delay(10_000)

        } catch (_: CancellationException) {
            shared.emit(0) // throw ex, (0) not emit,and cause random loss of messages (1) that emit by signalBus.emit(1) after called foo()
        }
    }
}

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions