Description
Describe the bug
The Flow.conflate()
documentation states that:
Adjacent applications of conflate/buffer, channelFlow, flowOn and produceIn are always fused so that only one properly configured channel is used for execution. Conflation takes precedence over buffer() calls with any other capacity.
This is not the behavior I've observed when using a callbackFlow within my library.
What happened? What should have happened instead?
While working on a library I publish, I wanted to ensure that a flow I return from my public API is always conflated. It does not make sense for events to be buffered because, even though I was using a callbackFlow
, the items within that flow describe state and not events. I thought I had achieved this behavior by applying the Flow.conflate
operator to the flow I return, however I was able to disprove that this unconditionally always configures the underlying channel to be conflated in some unit tests. In my experience, adjacent applications of the Flow.buffer
operator would reconfigure the underlying channel's capacity, but would not reconfigure the buffer overflow strategy.
Instead, I would expect calls to Flow.conflate
to always take precedence over other fused operators as the documentation leads me to believe.
I am hoping that this isn't just treated as unclear documentation. From my perspective, I don't see much utility in allowing Flow consumers to buffer a flow which has previously been conflated.
Provide a Reproducer
// The flow that's returned from my public API.
val original = channelFlow { awaitCancellation() }.conflate()
// The flow that's altered by the API consumer.
val altered = original.buffer(3, onBufferOverflow = BufferOverflow.SUSPEND)
The above snippet will configure the underlying channel to have a capacity of three and a buffer overflow strategy of DROP_OLDEST
. It seems like partial retention of the conflated policy is successful since DROP_OLDEST
is retained, however the final configured capacity is inconsistent with what's documented.