Open
Description
Nothing in the docs says that channel elements can be lost (and the resources can be leaked) if collection gets cancelled, but it can happen.
val channel = Channel<Int>(onUndeliveredElement = {
println("Undelivered element: $it")
})
val totalElements = 100
val expectedElements = AtomicInteger(totalElements)
launch(Dispatchers.Default) {
repeat(totalElements) {
channel.send(it)
}
channel.close()
}
while(!channel.isClosedForReceive) {
coroutineScope {
val job = launch {
channel.receiveAsFlow().collect {
expectedElements.decrementAndGet()
}
}
launch(Dispatchers.Default) {
job.cancel()
}
}
}
assertEquals(0, expectedElements.get())
One of the runs prints:
Undelivered element: 38
Undelivered element: 44
Undelivered element: 46
Undelivered element: 49
Undelivered element: 51
Undelivered element: 53
Undelivered element: 55
Undelivered element: 62
Undelivered element: 66
Undelivered element: 68
Undelivered element: 72
Undelivered element: 76
Undelivered element: 78
Undelivered element: 82
Undelivered element: 85
Undelivered element: 87
Undelivered element: 89
Undelivered element: 91
Undelivered element: 93
Undelivered element: 95
Undelivered element: 97
Expected :0
Actual :21