Open
Description
Hello everyone, I encountered this problem in the project, in order to highlight the problem, the following is the simplified code
Kotlin: 1.8.21
excepted: recv:100,emit:100
but: recv:76,emit:100
@Test
fun shared_flow_random_lose_message_when_emit_in_cancelled_coroutine(): Unit = runBlocking {
val signalBus = MutableSharedFlow<Int>()
val scope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())
var oneTimes = 0
var emitOneTimes = 0
signalBus.onEach {
when (it) {
1 -> oneTimes++
else -> {}
}
}.stateIn(scope, SharingStarted.Eagerly, 0)
delay(2_000) // wait stateIn inited
repeat(100) {
val job = foo(scope, signalBus)
delay(100) // wait a moment
job.cancelAndJoin()
signalBus.emit(1)
emitOneTimes++
}
delay(1_000)
Log.d(TAG, "recv:$oneTimes,emit:$emitOneTimes")
assertEquals(100, oneTimes) // failed < 100
assertEquals(100, emitOneTimes)
}
private suspend fun foo(scope: CoroutineScope, shared: MutableSharedFlow<Int>): Job {
return scope.launch {
try {
delay(10_000)
} catch (_: CancellationException) {
shared.emit(0) // throw ex, (0) not emit,and cause random loss of messages (1) that emit by signalBus.emit(1) after called foo()
}
}
}