diff --git a/docs/topics/coroutine-context-and-dispatchers.md b/docs/topics/coroutine-context-and-dispatchers.md index 89498af00e..13f0cfd9ab 100644 --- a/docs/topics/coroutine-context-and-dispatchers.md +++ b/docs/topics/coroutine-context-and-dispatchers.md @@ -115,7 +115,7 @@ Produces the output: ```text Unconfined : I'm working in thread main main runBlocking: I'm working in thread main -Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor +Unconfined : After delay in thread DefaultDispatcher oroutine#2 main runBlocking: After delay in thread main ``` diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index 67d3d16bb1..f3ec6c3433 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -48,6 +48,8 @@ public interface Delay { * Schedules invocation of a specified [block] after a specified delay [timeMillis]. * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation * request if it is not needed anymore. + * + * [block] must execute quickly, be non-blocking, and must not throw any exceptions. */ public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = DefaultDelay.invokeOnTimeout(timeMillis, block, context) diff --git a/kotlinx-coroutines-core/common/src/Dispatchers.common.kt b/kotlinx-coroutines-core/common/src/Dispatchers.common.kt index c499a47f92..ae51283a95 100644 --- a/kotlinx-coroutines-core/common/src/Dispatchers.common.kt +++ b/kotlinx-coroutines-core/common/src/Dispatchers.common.kt @@ -71,3 +71,27 @@ public expect object Dispatchers { */ public val Unconfined: CoroutineDispatcher } + +/** + * If a task can no longer run because its dispatcher is closed, it is rescheduled to another dispatcher. + * + * This is required to avoid a situation where some finalizers do not run: + * ``` + * val dispatcher = newSingleThreadContext("test") + * launch(dispatcher) { + * val resource = Resource() + * try { + * // do something `suspending` with resource + * } finally { + * resource.close() + * } + * } + * dispatcher.close() + * ``` + * + * `close` needs to run somewhere, but it can't run on the closed dispatcher. + * + * On the JVM and Native, we reschedule to the thread pool backing `Dispatchers.IO`, + * because an arbitrary task may well have blocking behavior. + */ +internal expect fun rescheduleTaskFromClosedDispatcher(task: Runnable) diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 84291a1b69..e17870392d 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -6,6 +6,48 @@ import kotlin.concurrent.Volatile import kotlin.coroutines.* import kotlin.jvm.* +internal interface UnconfinedEventLoop { + /** + * Returns `true` if calling [yield] in a coroutine in this event loop can avoid yielding and continue executing + * due to there being no other tasks in the queue. + * + * This can only be called from the thread that owns this event loop. + */ + val thisLoopsTaskCanAvoidYielding: Boolean + + /** + * Returns `true` if someone (typically a call to [runUnconfinedEventLoop]) is currently processing the tasks, + * so calling [dispatchUnconfined] is guaranteed to be processed eventually. + * + * This can only be called from the thread that owns this event loop. + */ + val isUnconfinedLoopActive: Boolean + + /** + * Executes [initialBlock] and then processes unconfined tasks until there are no more, blocking the current thread. + * + * This can only be called when no other [runUnconfinedEventLoop] is currently active on this event loop. + * + * This can only be called from the thread that owns this event loop. + */ + fun runUnconfinedEventLoop(initialBlock: () -> Unit) + + /** + * Sends the [task] to this event loop for execution. + * + * This method should only be called while [isUnconfinedLoopActive] is `true`. + * Otherwise, the task may be left unprocessed. + * + * This can only be called from the thread that owns this event loop. + */ + fun dispatchUnconfined(task: DispatchedTask<*>) + + /** + * Tries to interpret this event loop for unconfined tasks as a proper event loop and returns it if successful. + */ + fun tryUseAsEventLoop(): EventLoop? +} + /** * Extended by [CoroutineDispatcher] implementations that have event loop inside and can * be asked to process next event from their event queue. @@ -16,7 +58,7 @@ import kotlin.jvm.* * * @suppress **This an internal API and should not be used from general code.** */ -internal abstract class EventLoop : CoroutineDispatcher() { +internal abstract class EventLoop : CoroutineDispatcher(), UnconfinedEventLoop { /** * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop. */ @@ -51,8 +93,6 @@ internal abstract class EventLoop : CoroutineDispatcher() { return 0 } - protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty - protected open val nextTime: Long get() { val queue = unconfinedQueue ?: return Long.MAX_VALUE @@ -65,6 +105,7 @@ internal abstract class EventLoop : CoroutineDispatcher() { task.run() return true } + /** * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). @@ -77,28 +118,26 @@ internal abstract class EventLoop : CoroutineDispatcher() { * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded] * into the current event loop. */ - fun dispatchUnconfined(task: DispatchedTask<*>) { - val queue = unconfinedQueue ?: - ArrayDeque>().also { unconfinedQueue = it } + override fun dispatchUnconfined(task: DispatchedTask<*>) { + val queue = unconfinedQueue ?: ArrayDeque>().also { unconfinedQueue = it } queue.addLast(task) } val isActive: Boolean get() = useCount > 0 - val isUnconfinedLoopActive: Boolean + override val isUnconfinedLoopActive: Boolean get() = useCount >= delta(unconfined = true) - // May only be used from the event loop's thread - val isUnconfinedQueueEmpty: Boolean - get() = unconfinedQueue?.isEmpty() ?: true + override val thisLoopsTaskCanAvoidYielding: Boolean + get() = unconfinedQueue?.isEmpty() != false private fun delta(unconfined: Boolean) = if (unconfined) (1L shl 32) else 1L fun incrementUseCount(unconfined: Boolean = false) { useCount += delta(unconfined) - if (!unconfined) shared = true + if (!unconfined) shared = true } fun decrementUseCount(unconfined: Boolean = false) { @@ -117,22 +156,37 @@ internal abstract class EventLoop : CoroutineDispatcher() { } open fun shutdown() {} + + override fun runUnconfinedEventLoop(initialBlock: () -> Unit) { + incrementUseCount(unconfined = true) + try { + initialBlock() + while (true) { + // break when all unconfined continuations where executed + if (!processUnconfinedEvent()) break + } + } finally { + decrementUseCount(unconfined = true) + } + } + + override fun tryUseAsEventLoop(): EventLoop? = this } internal object ThreadLocalEventLoop { - private val ref = commonThreadLocal(Symbol("ThreadLocalEventLoop")) + private val ref = commonThreadLocal(Symbol("ThreadLocalEventLoop")) - internal val eventLoop: EventLoop + internal val unconfinedEventLoop: UnconfinedEventLoop get() = ref.get() ?: createEventLoop().also { ref.set(it) } - internal fun currentOrNull(): EventLoop? = + internal fun currentOrNull(): UnconfinedEventLoop? = ref.get() internal fun resetEventLoop() { ref.set(null) } - internal fun setEventLoop(eventLoop: EventLoop) { + internal fun setEventLoop(eventLoop: UnconfinedEventLoop) { ref.set(eventLoop) } } @@ -169,9 +223,6 @@ private typealias Queue = LockFreeTaskQueueCore internal expect abstract class EventLoopImplPlatform() : EventLoop { // Called to unpark this event loop's thread protected fun unpark() - - // Called to reschedule to DefaultExecutor when this event loop is complete - protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) } internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { @@ -186,8 +237,10 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { get() = _isCompleted.value set(value) { _isCompleted.value = value } - override val isEmpty: Boolean get() { - if (!isUnconfinedQueueEmpty) return false + /** + * Checks that at the moment this method is called, there are no tasks in the delayed tasks queue. + */ + protected val delayedQueueIsEmpty: Boolean get() { val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) return false return when (val queue = _queue.value) { @@ -275,7 +328,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { // todo: we should unpark only when this delayed task became first in the queue unpark() } else { - DefaultExecutor.enqueue(task) + rescheduleTaskFromClosedDispatcher(task) } } @@ -386,12 +439,6 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { return delayedTask.scheduleTask(now, delayedQueue, this) } - // It performs "hard" shutdown for test cleanup purposes - protected fun resetAll() { - _queue.value = null - _delayed.value = null - } - // This is a "soft" (normal) shutdown private fun rescheduleAllDelayed() { val now = nanoTime() @@ -408,6 +455,14 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } + // Called to reschedule when this event loop is complete + protected open fun reschedule(now: Long, delayedTask: DelayedTask) { + val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now) + DefaultDelay.invokeOnTimeout(delayTimeMillis, Runnable { + rescheduleTaskFromClosedDispatcher(delayedTask) + }, EmptyCoroutineContext) + } + internal abstract class DelayedTask( /** * This field can be only modified in [scheduleTask] before putting this DelayedTask @@ -530,10 +585,6 @@ internal expect fun createEventLoop(): EventLoop internal expect fun nanoTime(): Long -internal expect object DefaultExecutor { - fun enqueue(task: Runnable) -} - /** * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and * non-Darwin native targets. diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index 4c8f54e877..9891794d2d 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -292,12 +292,12 @@ internal fun DispatchedContinuation.yieldUndispatched(): Boolean = */ private inline fun DispatchedContinuation<*>.executeUnconfined( contState: Any?, mode: Int, doYield: Boolean = false, - block: () -> Unit + noinline block: () -> Unit ): Boolean { assert { mode != MODE_UNINITIALIZED } // invalid execution mode - val eventLoop = ThreadLocalEventLoop.eventLoop + val eventLoop = ThreadLocalEventLoop.unconfinedEventLoop // If we are yielding and unconfined queue is empty, we can bail out as part of fast path - if (doYield && eventLoop.isUnconfinedQueueEmpty) return false + if (doYield && eventLoop.thisLoopsTaskCanAvoidYielding) return false return if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow _state = contState diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt index ad5fed1205..a619827a96 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt @@ -165,7 +165,7 @@ internal fun DispatchedTask.resume(delegate: Continuation, undispatche } private fun DispatchedTask<*>.resumeUnconfined() { - val eventLoop = ThreadLocalEventLoop.eventLoop + val eventLoop = ThreadLocalEventLoop.unconfinedEventLoop if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow eventLoop.dispatchUnconfined(this) @@ -177,25 +177,18 @@ private fun DispatchedTask<*>.resumeUnconfined() { } } -internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( - eventLoop: EventLoop, +internal fun DispatchedTask<*>.runUnconfinedEventLoop( + eventLoop: UnconfinedEventLoop, block: () -> Unit ) { - eventLoop.incrementUseCount(unconfined = true) try { - block() - while (true) { - // break when all unconfined continuations where executed - if (!eventLoop.processUnconfinedEvent()) break - } + eventLoop.runUnconfinedEventLoop(block) } catch (e: Throwable) { /* * This exception doesn't happen normally, only if we have a bug in implementation. * Report it as a fatal exception. */ handleFatalException(e) - } finally { - eventLoop.decrementUseCount(unconfined = true) } } diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt index 771768e008..d89685796a 100644 --- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -19,7 +19,7 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine */ enclosingScope.launch { while (true) { - val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent() + val delayNanos = ThreadLocalEventLoop.currentOrNull()?.tryUseAsEventLoop()?.processNextEvent() ?: error("Event loop is missing, virtual time source works only as part of event loop") if (delayNanos <= 0) continue if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) { diff --git a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt index 7c0581b9d9..c35aa0b199 100644 --- a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt +++ b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt @@ -20,5 +20,11 @@ import kotlin.coroutines.* * * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will * block, potentially leading to thread starvation issues. + * + * If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns, + * they are resubmitted to [Dispatchers.IO]. */ public expect fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T + +internal fun UnconfinedEventLoop.useAsEventLoopForRunBlockingOrFail(): EventLoop = + tryUseAsEventLoop() ?: throw IllegalStateException("runBlocking can not be run in direct dispatchers") diff --git a/kotlinx-coroutines-core/concurrent/src/Dispatchers.kt b/kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt similarity index 56% rename from kotlinx-coroutines-core/concurrent/src/Dispatchers.kt rename to kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt index d18efdc35f..b32264b02f 100644 --- a/kotlinx-coroutines-core/concurrent/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt @@ -39,4 +39,26 @@ package kotlinx.coroutines @Suppress("EXTENSION_SHADOWED_BY_MEMBER") public expect val Dispatchers.IO: CoroutineDispatcher - +internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) { + /** + * We do not create a separate view of [Dispatchers.IO] for the cleanup needs. + * + * If [Dispatchers.IO] is not flooded with other tasks + the cleanup view does not have more threads than + * [Dispatchers.IO], there is no difference between sending cleanup tasks to [Dispatchers.IO] and creating + * a separate view of [Dispatchers.IO] for cleanup. + * + * If [Dispatchers.IO] is flooded with other tasks, we are at a crossroads: + * - On the one hand, we do not want to create too many threads. + * Some clients are carefully monitoring the number of threads and are relying on it not being larger than + * the system property + the sum of explicit `limitedParallelism` arguments in the system. + * - On the other hand, we don't want to delay productive tasks in favor of cleanup tasks. + * + * The first consideration wins on two accounts: + * - As of writing this, [Dispatchers.IO] has been in use as the cleanup dispatcher for dispatchers obtained + * from JVM executors for years, and this has not caused any issues that we know of. + * - On the other hand, some people likely rely on the number of threads not exceeding the number they control. + * If we were to create a separate view of [Dispatchers.IO] for cleanup, this number would be exceeded, which + * is a regression. + */ + Dispatchers.IO.dispatch(Dispatchers.IO, task) +} diff --git a/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt b/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt index 90549eecf4..d62f96557e 100644 --- a/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt @@ -2,24 +2,23 @@ package kotlinx.coroutines import kotlin.coroutines.* -internal actual fun createEventLoop(): EventLoop = UnconfinedEventLoop() +internal actual fun createEventLoop(): EventLoop = UnconfinedEventLoopImpl() internal actual fun nanoTime(): Long = unsupported() -internal class UnconfinedEventLoop : EventLoop() { +private class UnconfinedEventLoopImpl : EventLoop() { override fun dispatch(context: CoroutineContext, block: Runnable): Unit = unsupported() } internal actual abstract class EventLoopImplPlatform : EventLoop() { protected actual fun unpark(): Unit = unsupported() - protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit = unsupported() -} - -internal actual object DefaultExecutor { - public actual fun enqueue(task: Runnable): Unit = unsupported() } private fun unsupported(): Nothing = throw UnsupportedOperationException("runBlocking event loop is not supported") internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() + +internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) { + Dispatchers.Default.dispatch(Dispatchers.Default, task) +} diff --git a/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt index f497dc803c..619d7d6809 100644 --- a/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt @@ -10,8 +10,8 @@ internal abstract class AbstractTimeSource { abstract fun currentTimeMillis(): Long abstract fun nanoTime(): Long abstract fun wrapTask(block: Runnable): Runnable - abstract fun trackTask() - abstract fun unTrackTask() + abstract fun trackTask(obj: Any) + abstract fun unTrackTask(obj: Any) abstract fun registerTimeLoopThread() abstract fun unregisterTimeLoopThread() abstract fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0 @@ -27,43 +27,109 @@ internal inline fun mockTimeSource(source: AbstractTimeSource?) { timeSource = source } +/** + * The current system time in milliseconds. + * + * This is only used for automatically-generated tests in place of [System.currentTimeMillis], + * it is not used in production code. + */ @InlineOnly internal inline fun currentTimeMillis(): Long = timeSource?.currentTimeMillis() ?: System.currentTimeMillis() +/** + * The reading from a high-precision monotonic clock used for measuring time intervals. + * Logically equivalent to [kotlin.time.TimeSource.Monotonic.markNow]. + */ @InlineOnly internal actual inline fun nanoTime(): Long = timeSource?.nanoTime() ?: System.nanoTime() +/** + * Calls [trackTask] and returns a wrapped version of the given [block] that calls [unTrackTask] after it completes. + * Is optimized to just returning [block] if [trackTask] and [unTrackTask] are no-ops. + */ @InlineOnly internal inline fun wrapTask(block: Runnable): Runnable = timeSource?.wrapTask(block) ?: block +/** + * Increments the number of tasks not under our control. + * + * Virtual time source uses this to decide whether to jump to the moment of virtual time when the next sleeping thread + * should wake up. + * If there are some uncontrollable tasks, it will not jump to the moment of the next sleeping thread, + * because the uncontrollable tasks may change the shared state in a way that affects the sleeping thread. + * + * Example: + * + * ``` + * thread { // controlled thread + * while (true) { + * if (sharedState == 42) { + * break + * } + * Thread.sleep(1000) + * } + * } + * + * thread { // uncontrolled thread + * sharedState = 42 + * } + * ``` + * + * If the second thread is not tracked, the first thread effectively enters a spin loop until the second thread + * physically changes the shared state. + * + * Every call to [trackTask] must be accompanied by a call to [unTrackTask] with the same [obj], + * but [unTrackTask] can be called even if the corresponding [trackTask] wasn't called. + */ @InlineOnly -internal inline fun trackTask() { - timeSource?.trackTask() +internal inline fun trackTask(obj: Any) { + timeSource?.trackTask(obj) } +/** + * Marks the task [obj] as complete. If [obj] wasn't tracked, does nothing. See [trackTask] for more details. + */ @InlineOnly -internal inline fun unTrackTask() { - timeSource?.unTrackTask() +internal inline fun unTrackTask(obj: Any) { + timeSource?.unTrackTask(obj) } +/** + * Increases the registered number of nested loops of the form + * `while (nanoTime() < deadline) { parkNanos(deadline - nanoTime()) }` corresponding to the object [key] + * and signals that the current thread is in such a loop. + * + * While at least one such loop is running, other threads are allowed to call [unpark] on the current thread + * and wake it up. Before [registerTimeLoopThread] is called, [unpark] is not guaranteed to have any effect. + */ @InlineOnly internal inline fun registerTimeLoopThread() { timeSource?.registerTimeLoopThread() } +/** + * The opposite of [registerTimeLoopThread]. + */ @InlineOnly internal inline fun unregisterTimeLoopThread() { timeSource?.unregisterTimeLoopThread() } +/** + * Waits for either the specified number of nanoseconds to pass or until [unpark] is called. + */ @InlineOnly internal inline fun parkNanos(blocker: Any, nanos: Long) { timeSource?.parkNanos(blocker, nanos) ?: LockSupport.parkNanos(blocker, nanos) } +/** + * Preliminarily unparks the specified thread that's currently parked in [parkNanos]. + * Can be called even before the thread is parked. + */ @InlineOnly internal inline fun unpark(thread: Thread) { timeSource?.unpark(thread) ?: LockSupport.unpark(thread) diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index d2249bfdd0..17765d2b8b 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -4,7 +4,6 @@ package kotlinx.coroutines -import java.util.concurrent.locks.* import kotlin.contracts.* import kotlin.coroutines.* @@ -38,6 +37,9 @@ import kotlin.coroutines.* * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and * this `runBlocking` invocation throws [InterruptedException]. * + * If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns, + * they are resubmitted to [Dispatchers.IO]. + * * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available * for a newly created coroutine. * @@ -55,13 +57,13 @@ public actual fun runBlocking(context: CoroutineContext, block: suspend Coro val newContext: CoroutineContext if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop + eventLoop = ThreadLocalEventLoop.unconfinedEventLoop.useAsEventLoopForRunBlockingOrFail() newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { // See if context's interceptor is an event loop that we shall use (to support TestContext) // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } - ?: ThreadLocalEventLoop.currentOrNull() + ?: ThreadLocalEventLoop.currentOrNull()?.useAsEventLoopForRunBlockingOrFail() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine(newContext, currentThread, eventLoop) diff --git a/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt new file mode 100644 index 0000000000..80d878707b --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt @@ -0,0 +1,174 @@ +@file:JvmName("DefaultExecutorKt") +package kotlinx.coroutines + +import kotlinx.atomicfu.* +import kotlinx.coroutines.internal.* +import kotlinx.coroutines.scheduling.scheduleBackgroundIoTask +import kotlin.coroutines.* +import kotlin.time.Duration + +private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false) + +@PublishedApi +internal actual val DefaultDelay: Delay = initializeDefaultDelay() + +private fun initializeDefaultDelay(): Delay { + // Opt-out flag + if (!defaultMainDelayOptIn) return DefaultDelayImpl + val main = Dispatchers.Main + /* + * When we already are working with UI and Main threads, it makes + * no sense to create a separate thread with timer that cannot be controller + * by the UI runtime. + */ + return if (main.isMissing() || main !is Delay) DefaultDelayImpl else main +} + +/** + * This method can be invoked after all coroutines are completed to wait for the default delay executor to shut down + * in response to the lack of tasks. + * + * This is only useful in tests to ensure that setting a fresh virtual time source will not confuse the default delay + * still running the previous test. + * + * Does nothing if the default delay executor is not in use. + * + * @throws IllegalStateException if the shutdown process notices new tasks entering the system + * @throws IllegalStateException if the shutdown process times out + */ +internal fun ensureDefaultDelayDeinitialized(timeout: Duration) { + (DefaultDelay as? DefaultDelayImpl)?.shutdownForTests(timeout) +} + +private object DefaultDelayImpl : EventLoopImplBase(), Runnable { + const val THREAD_NAME = "kotlinx.coroutines.DefaultDelay" + + init { + incrementUseCount() // this event loop is never completed + } + + private val _thread = atomic(null) + + /** Can only happen when tests close the default executor */ + override fun reschedule(now: Long, delayedTask: DelayedTask) { + throw IllegalStateException("Attempted to schedule $delayedTask at $now after shutdown") + } + + /** + * All event loops are using DefaultDelay#invokeOnTimeout to avoid livelock on + * ``` + * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } } + * ``` + * + * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), + * but it's not exposed as public API. + */ + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduleInvokeOnTimeout(timeMillis, block) + + override fun run() { + val currentThread = Thread.currentThread() + if (!_thread.compareAndSet(null, currentThread)) return // some other thread won the race to start the thread + val oldName = currentThread.name + currentThread.name = THREAD_NAME + try { + ThreadLocalEventLoop.setEventLoop(DelegatingUnconfinedEventLoop) + registerTimeLoopThread() + unTrackTask(this) /** see the comment in [startThreadOrObtainSleepingThread] */ + try { + while (true) { + Thread.interrupted() // just reset interruption flag + val parkNanos = processNextEvent() + if (parkNanos == Long.MAX_VALUE) break // no more events + if (parkNanos > 0) parkNanos(this@DefaultDelayImpl, parkNanos) + } + } finally { + _thread.value = null + unregisterTimeLoopThread() + ThreadLocalEventLoop.resetEventLoop() + // recheck if queues are empty after _thread reference was set to null (!!!) + if (delayedQueueIsEmpty) { + notifyAboutThreadExiting() + } else { + /* recreate the thread, as there is still work to do, + and `unpark` could have awoken the thread we're currently running on */ + startThreadOrObtainSleepingThread() + } + } + } finally { + currentThread.name = oldName + } + } + + override fun startThreadOrObtainSleepingThread(): Thread? { + // Check if the thread is already running + _thread.value?.let { return it } + /* Now we know that at the moment of this call the thread was not initially running. + This means that whatever thread is going to be running by the end of this function, + it's going to notice the tasks it's supposed to run. + We can return `null` unconditionally. */ + /** If this function is called from a thread that's already registered as a time loop thread, + because a time loop thread is not parked right now, the time source will not advance time *currently*, + but it may do that as soon as the thread calling this is parked, which may happen earlier than the default + delay thread has a chance to run. + Because of that, we notify the time source that something is actually happening right now. + This would work automatically if instead of [scheduleBackgroundIoTask] we used [CoroutineDispatcher.dispatch] on + [Dispatchers.IO], but then, none of the delays would be skipped, as the whole time a [DefaultDelay] thread runs + would be considered as a task. + Therefore, we register a task right now and mark it as completed as soon as a [DefaultDelay] time loop gets + registered. */ + trackTask(this) + scheduleBackgroundIoTask(this) + return null + } + + fun shutdownForTests(timeout: Duration) { + if (_thread.value != null) { + val end = System.currentTimeMillis() + timeout.inWholeMilliseconds + while (true) { + synchronized(this) { + unpark(_thread.value ?: return) + val toWait = end - System.currentTimeMillis() + check(toWait > 0) { "Timeout waiting for DefaultExecutor to shutdown" } + (this as Object).wait(toWait) + } + } + } + } + + private fun notifyAboutThreadExiting() { + synchronized(this) { (this as Object).notifyAll() } + } + + override fun toString(): String = "DefaultDelay" +} + +private object DelegatingUnconfinedEventLoop: UnconfinedEventLoop { + override val thisLoopsTaskCanAvoidYielding: Boolean + get() = defaultDelayRunningUnconfinedLoop() + + override val isUnconfinedLoopActive: Boolean get() = false + + override fun runUnconfinedEventLoop(initialBlock: () -> Unit) { + ioView.dispatch(ioView, Runnable { + ThreadLocalEventLoop.unconfinedEventLoop.runUnconfinedEventLoop(initialBlock) + }) + } + + override fun dispatchUnconfined(task: DispatchedTask<*>) = + defaultDelayRunningUnconfinedLoop() + + override fun tryUseAsEventLoop(): EventLoop? = null +} + +private fun defaultDelayRunningUnconfinedLoop(): Nothing { + throw UnsupportedOperationException( + "This method can only be called from the thread where an unconfined event loop is running, " + + "but no tasks can run on this thread." + ) +} + + +/** A view separate from [Dispatchers.IO]. + * [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */ +private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt deleted file mode 100644 index 3ce7e0d333..0000000000 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ /dev/null @@ -1,194 +0,0 @@ -package kotlinx.coroutines - -import kotlinx.coroutines.internal.* -import java.util.concurrent.* -import kotlin.coroutines.* - -private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false) - -@PublishedApi -internal actual val DefaultDelay: Delay = initializeDefaultDelay() - -private fun initializeDefaultDelay(): Delay { - // Opt-out flag - if (!defaultMainDelayOptIn) return DefaultExecutor - val main = Dispatchers.Main - /* - * When we already are working with UI and Main threads, it makes - * no sense to create a separate thread with timer that cannot be controller - * by the UI runtime. - */ - return if (main.isMissing() || main !is Delay) DefaultExecutor else main -} - -@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") -internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { - const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor" - - init { - incrementUseCount() // this event loop is never completed - } - - private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds - - private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos( - try { - java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS) - } catch (e: SecurityException) { - DEFAULT_KEEP_ALIVE_MS - }) - - @Suppress("ObjectPropertyName") - @Volatile - private var _thread: Thread? = null - - override val thread: Thread - get() = _thread ?: createThreadSync() - - private const val FRESH = 0 - private const val ACTIVE = 1 - private const val SHUTDOWN_REQ = 2 - private const val SHUTDOWN_ACK = 3 - private const val SHUTDOWN = 4 - - @Volatile - private var debugStatus: Int = FRESH - - private val isShutDown: Boolean get() = debugStatus == SHUTDOWN - - private val isShutdownRequested: Boolean get() { - val debugStatus = debugStatus - return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK - } - - actual override fun enqueue(task: Runnable) { - if (isShutDown) shutdownError() - super.enqueue(task) - } - - override fun reschedule(now: Long, delayedTask: DelayedTask) { - // Reschedule on default executor can only be invoked after Dispatchers.shutdown - shutdownError() - } - - private fun shutdownError() { - throw RejectedExecutionException("DefaultExecutor was shut down. " + - "This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " + - "Please refer to Dispatchers.shutdown documentation for more details") - } - - override fun shutdown() { - debugStatus = SHUTDOWN - super.shutdown() - } - - /** - * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on - * ``` - * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } } - * ``` - * - * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), - * but it's not exposed as public API. - */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - scheduleInvokeOnTimeout(timeMillis, block) - - override fun run() { - ThreadLocalEventLoop.setEventLoop(this) - registerTimeLoopThread() - try { - var shutdownNanos = Long.MAX_VALUE - if (!notifyStartup()) return - while (true) { - Thread.interrupted() // just reset interruption flag - var parkNanos = processNextEvent() - if (parkNanos == Long.MAX_VALUE) { - // nothing to do, initialize shutdown timeout - val now = nanoTime() - if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS - val tillShutdown = shutdownNanos - now - if (tillShutdown <= 0) return // shut thread down - parkNanos = parkNanos.coerceAtMost(tillShutdown) - } else - shutdownNanos = Long.MAX_VALUE - if (parkNanos > 0) { - // check if shutdown was requested and bail out in this case - if (isShutdownRequested) return - parkNanos(this, parkNanos) - } - } - } finally { - _thread = null // this thread is dead - acknowledgeShutdownIfNeeded() - unregisterTimeLoopThread() - // recheck if queues are empty after _thread reference was set to null (!!!) - if (!isEmpty) thread // recreate thread if it is needed - } - } - - @Synchronized - private fun createThreadSync(): Thread { - return _thread ?: Thread(this, THREAD_NAME).apply { - _thread = this - /* - * `DefaultExecutor` is a global singleton that creates its thread lazily. - * To isolate the classloaders properly, we are inherting the context classloader from - * the singleton itself instead of using parent' thread one - * in order not to accidentally capture temporary application classloader. - */ - contextClassLoader = this@DefaultExecutor.javaClass.classLoader - isDaemon = true - start() - } - } - - // used for tests - @Synchronized - internal fun ensureStarted() { - assert { _thread == null } // ensure we are at a clean state - assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK } - debugStatus = FRESH - createThreadSync() // create fresh thread - while (debugStatus == FRESH) (this as Object).wait() - } - - @Synchronized - private fun notifyStartup(): Boolean { - if (isShutdownRequested) return false - debugStatus = ACTIVE - (this as Object).notifyAll() - return true - } - - @Synchronized // used _only_ for tests - fun shutdownForTests(timeout: Long) { - val deadline = System.currentTimeMillis() + timeout - if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ - // loop while there is anything to do immediately or deadline passes - while (debugStatus != SHUTDOWN_ACK && _thread != null) { - _thread?.let { unpark(it) } // wake up thread if present - val remaining = deadline - System.currentTimeMillis() - if (remaining <= 0) break - (this as Object).wait(timeout) - } - // restore fresh status - debugStatus = FRESH - } - - @Synchronized - private fun acknowledgeShutdownIfNeeded() { - if (!isShutdownRequested) return - debugStatus = SHUTDOWN_ACK - resetAll() // clear queues - (this as Object).notifyAll() - } - - // User only for testing and nothing else - internal val isThreadPresent - get() = _thread != null - - override fun toString(): String { - return "DefaultExecutor" - } -} diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index a5e8da4c40..04aa1addea 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -2,7 +2,6 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlinx.coroutines.scheduling.* -import kotlin.coroutines.* /** * Name of the property that defines the maximal number of threads that are used by [Dispatchers.IO] coroutines dispatcher. @@ -20,7 +19,8 @@ public actual object Dispatchers { public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher @JvmStatic - public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined + public actual val Unconfined: CoroutineDispatcher get() = + kotlinx.coroutines.Unconfined /** * The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads. @@ -67,8 +67,8 @@ public actual object Dispatchers { /** * Shuts down built-in dispatchers, such as [Default] and [IO], * stopping all the threads associated with them and making them reject all new tasks. - * Dispatcher used as a fallback for time-related operations (`delay`, `withTimeout`) - * and to handle rejected tasks from other dispatchers is also shut down. + * Dispatchers used as fallbacks for time-related operations (`delay`, `withTimeout`) + * and to handle rejected tasks from other dispatchers are also shut down. * * This is a **delicate** API. It is not supposed to be called from a general * application-level code and its invocation is irreversible. @@ -86,7 +86,6 @@ public actual object Dispatchers { */ @DelicateCoroutinesApi public fun shutdown() { - DefaultExecutor.shutdown() // Also shuts down Dispatchers.IO DefaultScheduler.shutdown() } diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index 15d4ab5c85..9a2125ba0e 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -1,27 +1,25 @@ package kotlinx.coroutines -import kotlinx.coroutines.Runnable -import kotlinx.coroutines.scheduling.* import kotlinx.coroutines.scheduling.CoroutineScheduler internal actual abstract class EventLoopImplPlatform: EventLoop() { - - protected abstract val thread: Thread + /** Returns `null` if a thread was created and doesn't need to be awoken. + * Returns a thread to awaken if the thread already existed when this method was called. */ + protected abstract fun startThreadOrObtainSleepingThread(): Thread? protected actual fun unpark() { - val thread = thread // atomic read - if (Thread.currentThread() !== thread) - unpark(thread) + startThreadOrObtainSleepingThread()?.let(::unpark) } - protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - DefaultExecutor.schedule(now, delayedTask) - } } internal class BlockingEventLoop( - override val thread: Thread -) : EventLoopImplBase() + private val thread: Thread +) : EventLoopImplBase() { + override fun startThreadOrObtainSleepingThread(): Thread? = + if (Thread.currentThread() !== thread) thread else null + +} internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread()) @@ -48,7 +46,7 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr public fun processNextEventInCurrentThread(): Long = // This API is used in Ktor for serverless integration where a single thread awaits a blocking call // (and, to avoid actual blocking, does something via this call), see #850 - ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE + ThreadLocalEventLoop.currentOrNull()?.tryUseAsEventLoop()?.processNextEvent() ?: Long.MAX_VALUE internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() @@ -122,4 +120,3 @@ internal fun Thread.isIoDispatcherThread(): Boolean { if (this !is CoroutineScheduler.Worker) return false return isIo() } - diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index bdfbe6dbbc..7430b8c6de 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -129,9 +129,9 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) try { executor.execute(wrapTask(block)) } catch (e: RejectedExecutionException) { - unTrackTask() + unTrackTask(block) cancelJobOnRejection(context, e) - Dispatchers.IO.dispatch(context, block) + rescheduleTaskFromClosedDispatcher(block) } } @@ -146,15 +146,15 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) continuation.invokeOnCancellation(CancelFutureOnCancel(future)) return } - // Otherwise fallback to default executor - DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) + // Otherwise fallback to default delay + DefaultDelay.scheduleResumeAfterDelay(timeMillis, continuation) } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) return when { future != null -> DisposableFutureHandle(future) - else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) + else -> DefaultDelay.invokeOnTimeout(timeMillis, block, context) } } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 3430ebadec..5a4dcf9316 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -391,9 +391,11 @@ internal class CoroutineScheduler( * - Concurrent [close] that effectively shutdowns the worker thread. * Used for [yield]. */ - fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) { - trackTask() // this is needed for virtual time support + fun dispatch( + block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false, track: Boolean = true + ) { val task = createTask(block, taskContext) + if (track) trackTask(task) // this is needed for virtual time support val isBlockingTask = task.isBlocking // Invariant: we increment counter **before** publishing the task // so executing thread can safely decrement the number of blocking tasks @@ -588,7 +590,7 @@ internal class CoroutineScheduler( val thread = Thread.currentThread() thread.uncaughtExceptionHandler.uncaughtException(thread, e) } finally { - unTrackTask() + unTrackTask(task) } } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 28d5537108..a510f02f46 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -37,11 +37,11 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { @InternalCoroutinesApi override fun dispatchYield(context: CoroutineContext, block: Runnable) { - DefaultScheduler.dispatchWithContext(block, BlockingContext, true) + DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = true, track = true) } override fun dispatch(context: CoroutineContext, block: Runnable) { - DefaultScheduler.dispatchWithContext(block, BlockingContext, false) + DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = false, track = true) } override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { @@ -58,6 +58,10 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { } } +internal fun scheduleBackgroundIoTask(block: Runnable) { + DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = false, track = false) +} + // Dispatchers.IO internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { @@ -126,8 +130,8 @@ internal open class SchedulerCoroutineDispatcher( coroutineScheduler.dispatch(block, fair = true) } - internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) { - coroutineScheduler.dispatch(block, context, fair) + internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean, track: Boolean) { + coroutineScheduler.dispatch(block, context, fair = fair, track = track) } override fun close() { diff --git a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt index 58b8024547..773b518b8b 100644 --- a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt @@ -8,7 +8,7 @@ class DefaultExecutorStressTest : TestBase() { @Test fun testDelay() = runTest { val iterations = 100_000 * stressTestMultiplier - withContext(DefaultExecutor) { + withContext(DefaultDelay as CoroutineDispatcher) { expect(1) var expected = 1 repeat(iterations) { @@ -33,30 +33,4 @@ class DefaultExecutorStressTest : TestBase() { } finish(2 + iterations * 4) } - - @Test - fun testWorkerShutdown() = withVirtualTimeSource { - val iterations = 1_000 * stressTestMultiplier - // wait for the worker to shut down - suspend fun awaitWorkerShutdown() { - val executorTimeoutMs = 1000L - delay(executorTimeoutMs) - while (DefaultExecutor.isThreadPresent) { delay(10) } // hangs if the thread refuses to stop - assertFalse(DefaultExecutor.isThreadPresent) // just to make sure - } - runTest { - awaitWorkerShutdown() // so that the worker shuts down after the initial launch - repeat (iterations) { - val job = launch(Dispatchers.Unconfined) { - // this line runs in the main thread - delay(1) - // this line runs in the DefaultExecutor worker - } - delay(100) // yield the execution, allow the worker to spawn - assertTrue(DefaultExecutor.isThreadPresent) // the worker spawned - job.join() - awaitWorkerShutdown() - } - } - } } diff --git a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt index 32573ca1f6..06560391c1 100644 --- a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt @@ -29,12 +29,12 @@ class DispatchersToStringTest { ) } // Not overridden at all, limited parallelism returns `this` - assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + assertEquals("DefaultDelay", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString()) assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString()) assertEquals("\uD80C\uDE11", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "\uD80C\uDE11").toString()) - assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + assertEquals("DefaultDelay", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) val limitedNamed = Dispatchers.IO.limitedParallelism(10, "limited") assertEquals("limited.limitedParallelism(2)", limitedNamed.limitedParallelism(2).toString()) @@ -53,4 +53,4 @@ class DispatchersToStringTest { assertEquals("Named", named.toString()) } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt b/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt index 551d1977c0..c31f6e67bc 100644 --- a/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt @@ -49,25 +49,6 @@ class EventLoopsTest : TestBase() { finish(5) } - @Test - fun testEventLoopInDefaultExecutor() = runTest { - expect(1) - withContext(Dispatchers.Unconfined) { - delay(1) - assertTrue(Thread.currentThread().name.startsWith(DefaultExecutor.THREAD_NAME)) - expect(2) - // now runBlocking inside default executor thread --> should use outer event loop - DefaultExecutor.enqueue(Runnable { - expect(4) // will execute when runBlocking runs loop - }) - expect(3) - runBlocking { - expect(5) - } - } - finish(6) - } - /** * Simple test for [processNextEventInCurrentThread] API use-case. */ @@ -159,4 +140,4 @@ class EventLoopsTest : TestBase() { waitingThread.value = null } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt index cc2291e6c1..6535fabb6e 100644 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt @@ -13,4 +13,3 @@ class RunBlockingJvmTest : TestBase() { rb.hashCode() // unused } } - diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index 8a461087c3..1c139841a9 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -3,18 +3,18 @@ package kotlinx.coroutines import java.io.* import java.util.concurrent.* import java.util.concurrent.locks.* +import kotlin.time.Duration.Companion.seconds -private const val SHUTDOWN_TIMEOUT = 1000L +private val SHUTDOWN_TIMEOUT = 1.seconds internal inline fun withVirtualTimeSource(log: PrintStream? = null, block: () -> Unit) { - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working) + ensureDefaultDelayDeinitialized(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working) val testTimeSource = VirtualTimeSource(log) mockTimeSource(testTimeSource) - DefaultExecutor.ensureStarted() // should start with new time source try { block() } finally { - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) + ensureDefaultDelayDeinitialized(SHUTDOWN_TIMEOUT) testTimeSource.shutdown() mockTimeSource(null) // restore time source } @@ -48,7 +48,7 @@ internal class VirtualTimeSource( @Volatile private var time: Long = 0 - private var trackedTasks = 0 + private val trackedTasks = HashSet() private val threads = ConcurrentHashMap() @@ -56,22 +56,21 @@ internal class VirtualTimeSource( override fun nanoTime(): Long = time override fun wrapTask(block: Runnable): Runnable { - trackTask() + trackTask(block) return Runnable { try { block.run() } - finally { unTrackTask() } + finally { unTrackTask(block) } } } @Synchronized - override fun trackTask() { - trackedTasks++ + override fun trackTask(obj: Any) { + trackedTasks.add(obj) } @Synchronized - override fun unTrackTask() { - assert(trackedTasks > 0) - trackedTasks-- + override fun unTrackTask(obj: Any) { + trackedTasks.remove(obj) } @Synchronized @@ -125,7 +124,7 @@ internal class VirtualTimeSource( return } if (threads[mainThread] == null) return - if (trackedTasks != 0) return + if (trackedTasks.isNotEmpty()) return val minParkedTill = minParkedTill() if (minParkedTill <= time) return time = minParkedTill diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 65102095b1..f60a32505b 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -14,7 +14,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() @Parameterized.Parameters(name = "{0}") @JvmStatic fun params(): Collection> = - Channel.values().map { arrayOf(it) } + Channel.entries.map { arrayOf(it) } } enum class Channel { @@ -35,13 +35,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() fun testDelay() = withVirtualTimeSource { runTest { val delayChannel = channelFactory(delay = 10000) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(5000) delayChannel.checkEmpty() delay(5100) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() delay(5100) @@ -57,13 +57,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() delay(500) delayChannel.checkEmpty() delay(300) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() // Regular delay delay(750) delayChannel.checkEmpty() delay(260) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } @@ -72,7 +72,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() fun testReceive() = withVirtualTimeSource { runTest { val delayChannel = channelFactory(delay = 1000) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() var value = withTimeoutOrNull(750) { delayChannel.receive() 1 @@ -93,6 +93,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() fun testComplexOperator() = withVirtualTimeSource { runTest { val producer = GlobalScope.produce { + delay(1) // ensure that the ordering of dispatches doesn't affect the result for (i in 1..7) { send(i) delay(1000) @@ -158,7 +159,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() fun ReceiveChannel.checkEmpty() = assertNull(tryReceive().getOrNull()) -fun ReceiveChannel.checkNotEmpty() { - assertNotNull(tryReceive().getOrNull()) +suspend fun ReceiveChannel.receiveSingle() { + receive() assertNull(tryReceive().getOrNull()) } diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt index 051d670743..0f7e6b1bf9 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt @@ -9,15 +9,15 @@ class TickerChannelTest : TestBase() { fun testFixedDelayChannelBackpressure() = withVirtualTimeSource { runTest { val delayChannel = ticker(delayMillis = 1000, initialDelayMillis = 0, mode = TickerMode.FIXED_DELAY) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(1500) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(500) delayChannel.checkEmpty() delay(520) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } @@ -26,17 +26,17 @@ class TickerChannelTest : TestBase() { fun testDelayChannelBackpressure() = withVirtualTimeSource { runTest { val delayChannel = ticker(delayMillis = 1000, initialDelayMillis = 0) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(1500) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(520) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(500) delayChannel.checkEmpty() delay(520) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } @@ -45,17 +45,17 @@ class TickerChannelTest : TestBase() { fun testDelayChannelBackpressure2() = withVirtualTimeSource { runTest { val delayChannel = ticker(delayMillis = 200, initialDelayMillis = 0) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(500) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(110) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(110) delayChannel.checkEmpty() delay(110) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt index 1cf6c2bd4d..a8c99dd9d8 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt @@ -20,7 +20,7 @@ class DispatcherGuideTest { test("ExampleContext02") { kotlinx.coroutines.guide.exampleContext02.main() }.verifyLinesStart( "Unconfined : I'm working in thread main", "main runBlocking: I'm working in thread main", - "Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor", + "Unconfined : After delay in thread DefaultDispatcher oroutine#2", "main runBlocking: After delay in thread main" ) } diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt index d0a5551567..3821cb6824 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt @@ -24,7 +24,6 @@ fun test(name: String, block: () -> R): List = outputException(name) try { captureOutput(name, stdoutEnabled = OUT_ENABLED) { log -> DefaultScheduler.usePrivateScheduler() - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) resetCoroutineId() val threadsBefore = currentThreads() try { @@ -35,9 +34,8 @@ fun test(name: String, block: () -> R): List = outputException(name) } finally { // the shutdown log.println("--- shutting down") - DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) shutdownDispatcherPools(SHUTDOWN_TIMEOUT) - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks + DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks } checkTestThreads(threadsBefore) // check thread if the main completed successfully } @@ -55,7 +53,7 @@ private fun shutdownDispatcherPools(timeout: Long) { (thread.dispatcher.executor as ExecutorService).apply { shutdown() awaitTermination(timeout, TimeUnit.MILLISECONDS) - shutdownNow().forEach { DefaultExecutor.enqueue(it) } + shutdownNow().forEach { rescheduleTaskFromClosedDispatcher(it) } } } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt index ee21be23fc..398d1150f7 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt @@ -1,6 +1,5 @@ package kotlinx.coroutines.scheduling -import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt index 33e32838da..81d66422ba 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt @@ -13,33 +13,6 @@ abstract class SchedulerTestBase : TestBase() { companion object { val CORES_COUNT = AVAILABLE_PROCESSORS - /** - * Asserts that [expectedThreadsCount] pool worker threads were created. - * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking - */ - fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) { - val threadsCount = maxSequenceNumber()!! - assertEquals(expectedThreadsCount, threadsCount, "Expected $expectedThreadsCount pool threads, but has $threadsCount") - } - - /** - * Asserts that any number of pool worker threads in [range] were created. - * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking - */ - fun checkPoolThreadsCreated(range: IntRange, base: Int = CORES_COUNT) { - val maxSequenceNumber = maxSequenceNumber()!! - val r = (range.first)..(range.last + base) - assertTrue( - maxSequenceNumber in r, - "Expected pool threads to be in interval $r, but has $maxSequenceNumber" - ) - } - - private fun maxSequenceNumber(): Int? { - return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker } - .map { sequenceNumber(it.name) }.maxOrNull() - } - private fun sequenceNumber(threadName: String): Int { val suffix = threadName.substring(threadName.lastIndexOf("-") + 1) val separatorIndex = suffix.indexOf(' ') @@ -49,8 +22,6 @@ abstract class SchedulerTestBase : TestBase() { return suffix.substring(0, separatorIndex).toInt() } - - suspend fun Iterable.joinAll() = forEach { it.join() } } protected var corePoolSize = CORES_COUNT @@ -85,19 +56,43 @@ abstract class SchedulerTestBase : TestBase() { return _dispatcher!!.limitedParallelism(parallelism) } + /** + * Asserts that [expectedThreadsCount] pool worker threads were created. + * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking + */ + fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) { + val threadsCount = maxSequenceNumber()!! + assertEquals(expectedThreadsCount, threadsCount, "Expected $expectedThreadsCount pool threads, but has $threadsCount") + } + + /** + * Asserts that any number of pool worker threads in [range] were created. + * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking + */ + fun checkPoolThreadsCreated(range: IntRange, base: Int = CORES_COUNT) { + val maxSequenceNumber = maxSequenceNumber()!! + val r = (range.first)..(range.last + base) + assertTrue( + maxSequenceNumber in r, + "Expected pool threads to be in interval $r, but has $maxSequenceNumber" + ) + } + + private fun maxSequenceNumber(): Int? { + return Thread.getAllStackTraces().keys.asSequence().filter { + it is CoroutineScheduler.Worker && it.scheduler === _dispatcher?.executor + }.map { sequenceNumber(it.name) }.maxOrNull() + } + @After fun after() { - runBlocking { - withTimeout(5_000) { - _dispatcher?.close() - } - } + _dispatcher?.close() } } /** * Implementation note: - * Our [Dispatcher.IO] is a [limitedParallelism][CoroutineDispatcher.limitedParallelism] dispatcher + * Our [Dispatchers.IO] is a [limitedParallelism][CoroutineDispatcher.limitedParallelism] dispatcher * on top of unbounded scheduler. We want to test this scenario, but on top of non-singleton * scheduler so we can control the number of threads, thus this method. */ @@ -106,11 +101,11 @@ internal fun SchedulerCoroutineDispatcher.blocking(parallelism: Int = 16): Corou @InternalCoroutinesApi override fun dispatchYield(context: CoroutineContext, block: Runnable) { - this@blocking.dispatchWithContext(block, BlockingContext, true) + this@blocking.dispatchWithContext(block, BlockingContext, fair = true, track = true) } override fun dispatch(context: CoroutineContext, block: Runnable) { - this@blocking.dispatchWithContext(block, BlockingContext, false) + this@blocking.dispatchWithContext(block, BlockingContext, fair = false, track = true) } }.limitedParallelism(parallelism) } diff --git a/kotlinx-coroutines-core/native/src/Builders.kt b/kotlinx-coroutines-core/native/src/Builders.kt index 77b7cacebb..7a318e86f1 100644 --- a/kotlinx-coroutines-core/native/src/Builders.kt +++ b/kotlinx-coroutines-core/native/src/Builders.kt @@ -1,7 +1,6 @@ @file:OptIn(ExperimentalContracts::class, ObsoleteWorkersApi::class) package kotlinx.coroutines -import kotlinx.cinterop.* import kotlin.contracts.* import kotlin.coroutines.* import kotlin.native.concurrent.* @@ -33,8 +32,8 @@ import kotlin.native.concurrent.* * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, * then this invocation uses the outer event loop. * - * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and - * this `runBlocking` invocation throws [InterruptedException]. + * If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns, + * they are resubmitted to [Dispatchers.IO]. * * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available * for a newly created coroutine. @@ -51,13 +50,13 @@ public actual fun runBlocking(context: CoroutineContext, block: suspend Coro val newContext: CoroutineContext if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop + eventLoop = ThreadLocalEventLoop.unconfinedEventLoop.useAsEventLoopForRunBlockingOrFail() newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { // See if context's interceptor is an event loop that we shall use (to support TestContext) // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } - ?: ThreadLocalEventLoop.currentOrNull() + ?: ThreadLocalEventLoop.currentOrNull()?.useAsEventLoopForRunBlockingOrFail() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine(newContext, eventLoop) diff --git a/kotlinx-coroutines-core/native/src/CoroutineContext.kt b/kotlinx-coroutines-core/native/src/CoroutineContext.kt index 3f4c8d9a01..2334ab7164 100644 --- a/kotlinx-coroutines-core/native/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/native/src/CoroutineContext.kt @@ -3,32 +3,8 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* -internal actual object DefaultExecutor : CoroutineDispatcher(), Delay { - - private val delegate = WorkerDispatcher(name = "DefaultExecutor") - - override fun dispatch(context: CoroutineContext, block: Runnable) { - delegate.dispatch(context, block) - } - - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - delegate.scheduleResumeAfterDelay(timeMillis, continuation) - } - - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - return delegate.invokeOnTimeout(timeMillis, block, context) - } - - actual fun enqueue(task: Runnable): Unit { - delegate.dispatch(EmptyCoroutineContext, task) - } -} - internal expect fun createDefaultDispatcher(): CoroutineDispatcher -@PublishedApi -internal actual val DefaultDelay: Delay = DefaultExecutor - public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) diff --git a/kotlinx-coroutines-core/native/src/DefaultDelay.kt b/kotlinx-coroutines-core/native/src/DefaultDelay.kt new file mode 100644 index 0000000000..b4570744e8 --- /dev/null +++ b/kotlinx-coroutines-core/native/src/DefaultDelay.kt @@ -0,0 +1,102 @@ +package kotlinx.coroutines + +import kotlinx.atomicfu.* +import kotlinx.coroutines.internal.* +import kotlin.coroutines.* +import kotlin.native.concurrent.ObsoleteWorkersApi +import kotlin.native.concurrent.Worker + +@PublishedApi +internal actual val DefaultDelay: Delay get() = DefaultDelayImpl + +@OptIn(ObsoleteWorkersApi::class) +private object DefaultDelayImpl : EventLoopImplBase(), Runnable { + init { + incrementUseCount() // this event loop is never completed + } + + private val _thread = atomic(null) + + /** Can only happen when tests close the default executor */ + override fun reschedule(now: Long, delayedTask: DelayedTask) { + throw IllegalStateException("Attempted to schedule $delayedTask at $now after shutdown") + } + + /** + * All event loops are using DefaultDelay#invokeOnTimeout to avoid livelock on + * ``` + * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } } + * ``` + * + * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), + * but it's not exposed as public API. + */ + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduleInvokeOnTimeout(timeMillis, block) + + override fun run() { + val currentThread = Worker.current + // Identity comparisons do not work for value classes, but comparing `null` with non-null should still work + if (!_thread.compareAndSet(null, currentThread)) return // some other thread won the race to start the thread + ThreadLocalEventLoop.setEventLoop(DelegatingUnconfinedEventLoop) + try { + while (true) { + val parkNanos = processNextEvent() + if (parkNanos == Long.MAX_VALUE) break // no more events + if (parkNanos > 0) currentThread.park(parkNanos / 1000L, true) + } + } finally { + _thread.value = null + ThreadLocalEventLoop.resetEventLoop() + // recheck if queues are empty after _thread reference was set to null (!!!) + if (!delayedQueueIsEmpty) { + /* recreate the thread, as there is still work to do, + and `unpark` could have awoken the thread we're currently running on */ + startThreadOrObtainSleepingThread() + } + } + } + + override fun startThreadOrObtainSleepingThread(): Worker? { + // Check if the thread is already running + _thread.value?.let { return it } + /* Now we know that at the moment of this call the thread was not initially running. + This means that whatever thread is going to be running by the end of this function, + it's going to notice the tasks it's supposed to run. + We can return `null` unconditionally. */ + scheduleBackgroundIoTask(this) + return null + } + + override fun toString(): String = "DefaultDelay" +} + +private object DelegatingUnconfinedEventLoop: UnconfinedEventLoop { + override val thisLoopsTaskCanAvoidYielding: Boolean + get() = defaultDelayRunningUnconfinedLoop() + + override val isUnconfinedLoopActive: Boolean get() = false + + override fun runUnconfinedEventLoop(initialBlock: () -> Unit) { + ioView.dispatch(ioView, Runnable { + ThreadLocalEventLoop.unconfinedEventLoop.runUnconfinedEventLoop(initialBlock) + }) + } + + override fun dispatchUnconfined(task: DispatchedTask<*>) = + defaultDelayRunningUnconfinedLoop() + + override fun tryUseAsEventLoop(): EventLoop? = null +} + +private fun defaultDelayRunningUnconfinedLoop(): Nothing { + throw UnsupportedOperationException( + "This method can only be called from the thread where an unconfined event loop is running, " + + "but no tasks can run on this thread." + ) +} + +/** A view separate from [Dispatchers.IO]. + * [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */ +private val ioView by lazy { Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) } +// Without `lazy`, there is a cycle between `ioView` and `DefaultDelay` initialization, leading to a segfault. diff --git a/kotlinx-coroutines-core/native/src/Dispatchers.kt b/kotlinx-coroutines-core/native/src/Dispatchers.kt index e66c05f61d..4ad5db289e 100644 --- a/kotlinx-coroutines-core/native/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/native/src/Dispatchers.kt @@ -1,6 +1,5 @@ package kotlinx.coroutines -import kotlinx.coroutines.internal.* import kotlin.coroutines.* @@ -41,9 +40,14 @@ internal object DefaultIoScheduler : CoroutineDispatcher() { io.dispatchYield(context, block) } + internal fun dispatchToUnlimitedPool(block: Runnable) { + unlimitedPool.dispatch(EmptyCoroutineContext, block) + } + override fun toString(): String = "Dispatchers.IO" } +internal inline fun scheduleBackgroundIoTask(block: Runnable) = DefaultIoScheduler.dispatchToUnlimitedPool(block) @Suppress("EXTENSION_SHADOWED_BY_MEMBER") public actual val Dispatchers.IO: CoroutineDispatcher get() = IO diff --git a/kotlinx-coroutines-core/native/src/EventLoop.kt b/kotlinx-coroutines-core/native/src/EventLoop.kt index 58128d52fd..9cc87a48a7 100644 --- a/kotlinx-coroutines-core/native/src/EventLoop.kt +++ b/kotlinx-coroutines-core/native/src/EventLoop.kt @@ -2,30 +2,29 @@ package kotlinx.coroutines -import kotlin.coroutines.* import kotlin.native.concurrent.* import kotlin.time.* internal actual abstract class EventLoopImplPlatform : EventLoop() { - - private val current = Worker.current + /** Returns `null` if a thread was created and doesn't need to be awoken. + * Returns a thread to awaken if the thread already existed when this method was called. */ + protected abstract fun startThreadOrObtainSleepingThread(): Worker? protected actual fun unpark() { - current.executeAfter(0L, {})// send an empty task to unpark the waiting event loop - } - - protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now) - DefaultExecutor.invokeOnTimeout(delayTimeMillis, delayedTask, EmptyCoroutineContext) + startThreadOrObtainSleepingThread()?.let { + it.executeAfter(0L, {}) + } } } -internal class EventLoopImpl: EventLoopImplBase() { - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - DefaultDelay.invokeOnTimeout(timeMillis, block, context) +internal class BlockingEventLoop( + private val worker: Worker +) : EventLoopImplBase() { + override fun startThreadOrObtainSleepingThread(): Worker? = + if (Worker.current.id != worker.id) worker else null } -internal actual fun createEventLoop(): EventLoop = EventLoopImpl() +internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Worker.current) private val startingPoint = TimeSource.Monotonic.markNow() diff --git a/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt b/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt index 5d200d328a..bc9fe4a019 100644 --- a/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt @@ -8,9 +8,13 @@ internal actual fun createMainDispatcher(default: CoroutineDispatcher): MainCoro internal actual fun createDefaultDispatcher(): CoroutineDispatcher = DefaultDispatcher +/** + * This is not just `private val DefaultDispatcher = newFixedThreadPoolContext(...)` to + * 1. Prevent casting [Dispatchers.Default] to [CloseableCoroutineDispatcher] and closing it + * 2. Make it non-[Delay] + */ private object DefaultDispatcher : CoroutineDispatcher() { // Be consistent with JVM -- at least 2 threads to provide some liveness guarantees in case of improper uses - @OptIn(ExperimentalStdlibApi::class) private val ctx = newFixedThreadPoolContext(Platform.getAvailableProcessors().coerceAtLeast(2), "Dispatchers.Default") override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt b/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt index a0f392e5b0..1c7ebe44a0 100644 --- a/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt +++ b/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt @@ -13,7 +13,7 @@ private external fun wasiRawClockTimeGet(clockId: Int, precision: Long, resultPt private const val CLOCKID_MONOTONIC = 1 -internal actual fun createEventLoop(): EventLoop = DefaultExecutor +internal actual fun createEventLoop(): EventLoop = GlobalEventLoop internal actual fun nanoTime(): Long = withScopedMemoryAllocator { allocator: MemoryAllocator -> val ptrTo8Bytes = allocator.allocate(8) @@ -38,7 +38,7 @@ private fun sleep(nanos: Long, ptrTo32Bytes: Pointer, ptrTo8Bytes: Pointer, ptrT check(returnCode == 0) { "poll_oneoff failed with the return code $returnCode" } } -internal actual object DefaultExecutor : EventLoopImplBase() { +private object GlobalEventLoop : EventLoopImplBase() { init { if (kotlin.wasm.internal.onExportedFunctionExit == null) { @@ -59,12 +59,6 @@ internal actual abstract class EventLoopImplPlatform : EventLoop() { // do nothing: in WASI, no external callbacks can be invoked while `poll_oneoff` is running, // so it is both impossible and unnecessary to unpark the event loop } - - protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - // throw; on WASI, the event loop is the default executor, we can't shut it down or reschedule tasks - // to anyone else - throw UnsupportedOperationException("runBlocking event loop is not supported") - } } internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() @@ -74,7 +68,7 @@ internal fun runEventLoop() { val ptrToSubscription = initializeSubscriptionPtr(allocator) val ptrTo32Bytes = allocator.allocate(32) val ptrTo8Bytes = allocator.allocate(8) - val eventLoop = DefaultExecutor + val eventLoop = GlobalEventLoop eventLoop.incrementUseCount() try { while (true) { @@ -117,4 +111,7 @@ private fun initializeSubscriptionPtr(allocator: MemoryAllocator): Pointer { return ptrToSubscription } -internal actual fun createDefaultDispatcher(): CoroutineDispatcher = DefaultExecutor +internal actual fun createDefaultDispatcher(): CoroutineDispatcher = GlobalEventLoop + +internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) = + GlobalEventLoop.enqueue(task) diff --git a/kotlinx-coroutines-debug/test/DebugTestBase.kt b/kotlinx-coroutines-debug/test/DebugTestBase.kt index 97c2906e0b..f3f2097dd1 100644 --- a/kotlinx-coroutines-debug/test/DebugTestBase.kt +++ b/kotlinx-coroutines-debug/test/DebugTestBase.kt @@ -2,7 +2,6 @@ package kotlinx.coroutines.debug import kotlinx.coroutines.testing.* -import kotlinx.coroutines.* import kotlinx.coroutines.debug.junit4.* import org.junit.* diff --git a/test-utils/jvm/src/TestBase.kt b/test-utils/jvm/src/TestBase.kt index e0d4cba1da..da67b39c93 100644 --- a/test-utils/jvm/src/TestBase.kt +++ b/test-utils/jvm/src/TestBase.kt @@ -172,7 +172,6 @@ fun initPoolsBeforeTest() { @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") fun shutdownPoolsAfterTest() { DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) DefaultScheduler.restore() }