diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 488331fc37..47cb0ccf48 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -65,7 +65,15 @@ internal class LimitedDispatcher( // `runningWorkers` when they observed an empty queue. if (!tryAllocateWorker()) return val task = obtainTaskOrDeallocateWorker() ?: return - startWorker(Worker(task)) + try { + startWorker(Worker(task)) + } catch (e: Throwable) { + // If we failed to start a worker, we should deallocate the worker slot + synchronized(workerAllocationLock) { + runningWorkers.decrementAndGet() + } + throw e + } } /** @@ -107,21 +115,29 @@ internal class LimitedDispatcher( */ private inner class Worker(private var currentTask: Runnable) : Runnable { override fun run() { - var fairnessCounter = 0 - while (true) { - try { - currentTask.run() - } catch (e: Throwable) { - handleCoroutineException(EmptyCoroutineContext, e) + try { + var fairnessCounter = 0 + while (true) { + try { + currentTask.run() + } catch (e: Throwable) { + handleCoroutineException(EmptyCoroutineContext, e) + } + currentTask = obtainTaskOrDeallocateWorker() ?: return + // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well + if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@LimitedDispatcher)) { + // Do "yield" to let other views execute their runnable as well + // Note that we do not decrement 'runningWorkers' as we are still committed to our part of work + dispatcher.safeDispatch(this@LimitedDispatcher, this) + return + } } - currentTask = obtainTaskOrDeallocateWorker() ?: return - // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well - if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@LimitedDispatcher)) { - // Do "yield" to let other views execute their runnable as well - // Note that we do not decrement 'runningWorkers' as we are still committed to our part of work - dispatcher.safeDispatch(this@LimitedDispatcher, this) - return + } catch (e: Throwable) { + // If the worker failed, we should deallocate its slot + synchronized(workerAllocationLock) { + runningWorkers.decrementAndGet() } + throw e } } } @@ -132,4 +148,4 @@ internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive pa internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher { if (name != null) return NamedDispatcher(this, name) return this -} \ No newline at end of file +}