From c7d59248dc9d8b671dd213c1497b7fff16930b92 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 8 Nov 2024 13:49:51 +0100 Subject: [PATCH 01/10] Do not use DefaultExecutor for cleanup work Before this change, DefaultExecutor was occasionally used for executing the work of dispatchers that no longer function. This is no longer the case: instead, Dispatchers.IO is used for that on our multithreaded targets. --- .../common/src/Dispatchers.common.kt | 24 ++++++++++++++++++ .../common/src/EventLoop.common.kt | 17 +++++++------ .../concurrent/src/Builders.concurrent.kt | 3 +++ ...spatchers.kt => Dispatchers.concurrent.kt} | 24 +++++++++++++++++- .../jsAndWasmJsShared/src/EventLoop.kt | 9 +++---- kotlinx-coroutines-core/jvm/src/Builders.kt | 4 ++- .../jvm/src/DefaultExecutor.kt | 21 +++++++++++----- .../jvm/src/Dispatchers.kt | 1 - kotlinx-coroutines-core/jvm/src/EventLoop.kt | 5 +--- kotlinx-coroutines-core/jvm/src/Executors.kt | 16 +++++++++--- .../jvm/test/DefaultExecutorStressTest.kt | 2 +- .../jvm/test/EventLoopsTest.kt | 21 +--------------- .../jvm/test/RunBlockingJvmTest.kt | 1 - .../jvm/test/knit/TestUtil.kt | 5 ++-- .../native/src/Builders.kt | 4 +-- .../native/src/CoroutineContext.kt | 25 ++----------------- .../native/src/Dispatchers.kt | 1 - .../native/src/EventLoop.kt | 4 --- .../nativeOther/src/Dispatchers.kt | 6 ++++- .../wasmWasi/src/EventLoop.kt | 17 ++++++------- 20 files changed, 114 insertions(+), 96 deletions(-) rename kotlinx-coroutines-core/concurrent/src/{Dispatchers.kt => Dispatchers.concurrent.kt} (56%) diff --git a/kotlinx-coroutines-core/common/src/Dispatchers.common.kt b/kotlinx-coroutines-core/common/src/Dispatchers.common.kt index c499a47f92..ae51283a95 100644 --- a/kotlinx-coroutines-core/common/src/Dispatchers.common.kt +++ b/kotlinx-coroutines-core/common/src/Dispatchers.common.kt @@ -71,3 +71,27 @@ public expect object Dispatchers { */ public val Unconfined: CoroutineDispatcher } + +/** + * If a task can no longer run because its dispatcher is closed, it is rescheduled to another dispatcher. + * + * This is required to avoid a situation where some finalizers do not run: + * ``` + * val dispatcher = newSingleThreadContext("test") + * launch(dispatcher) { + * val resource = Resource() + * try { + * // do something `suspending` with resource + * } finally { + * resource.close() + * } + * } + * dispatcher.close() + * ``` + * + * `close` needs to run somewhere, but it can't run on the closed dispatcher. + * + * On the JVM and Native, we reschedule to the thread pool backing `Dispatchers.IO`, + * because an arbitrary task may well have blocking behavior. + */ +internal expect fun rescheduleTaskFromClosedDispatcher(task: Runnable) diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 84291a1b69..83dca6f0d5 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -169,9 +169,6 @@ private typealias Queue = LockFreeTaskQueueCore internal expect abstract class EventLoopImplPlatform() : EventLoop { // Called to unpark this event loop's thread protected fun unpark() - - // Called to reschedule to DefaultExecutor when this event loop is complete - protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) } internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { @@ -275,7 +272,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { // todo: we should unpark only when this delayed task became first in the queue unpark() } else { - DefaultExecutor.enqueue(task) + rescheduleTaskFromClosedDispatcher(task) } } @@ -408,6 +405,14 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } + // Called to reschedule when this event loop is complete + protected open fun reschedule(now: Long, delayedTask: DelayedTask) { + val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now) + DefaultDelay.invokeOnTimeout(delayTimeMillis, Runnable { + rescheduleTaskFromClosedDispatcher(delayedTask) + }, EmptyCoroutineContext) + } + internal abstract class DelayedTask( /** * This field can be only modified in [scheduleTask] before putting this DelayedTask @@ -530,10 +535,6 @@ internal expect fun createEventLoop(): EventLoop internal expect fun nanoTime(): Long -internal expect object DefaultExecutor { - fun enqueue(task: Runnable) -} - /** * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and * non-Darwin native targets. diff --git a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt index 7c0581b9d9..4309092319 100644 --- a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt +++ b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt @@ -20,5 +20,8 @@ import kotlin.coroutines.* * * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will * block, potentially leading to thread starvation issues. + * + * If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns, + * they are resubmitted to [Dispatchers.IO]. */ public expect fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T diff --git a/kotlinx-coroutines-core/concurrent/src/Dispatchers.kt b/kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt similarity index 56% rename from kotlinx-coroutines-core/concurrent/src/Dispatchers.kt rename to kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt index d18efdc35f..b32264b02f 100644 --- a/kotlinx-coroutines-core/concurrent/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt @@ -39,4 +39,26 @@ package kotlinx.coroutines @Suppress("EXTENSION_SHADOWED_BY_MEMBER") public expect val Dispatchers.IO: CoroutineDispatcher - +internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) { + /** + * We do not create a separate view of [Dispatchers.IO] for the cleanup needs. + * + * If [Dispatchers.IO] is not flooded with other tasks + the cleanup view does not have more threads than + * [Dispatchers.IO], there is no difference between sending cleanup tasks to [Dispatchers.IO] and creating + * a separate view of [Dispatchers.IO] for cleanup. + * + * If [Dispatchers.IO] is flooded with other tasks, we are at a crossroads: + * - On the one hand, we do not want to create too many threads. + * Some clients are carefully monitoring the number of threads and are relying on it not being larger than + * the system property + the sum of explicit `limitedParallelism` arguments in the system. + * - On the other hand, we don't want to delay productive tasks in favor of cleanup tasks. + * + * The first consideration wins on two accounts: + * - As of writing this, [Dispatchers.IO] has been in use as the cleanup dispatcher for dispatchers obtained + * from JVM executors for years, and this has not caused any issues that we know of. + * - On the other hand, some people likely rely on the number of threads not exceeding the number they control. + * If we were to create a separate view of [Dispatchers.IO] for cleanup, this number would be exceeded, which + * is a regression. + */ + Dispatchers.IO.dispatch(Dispatchers.IO, task) +} diff --git a/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt b/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt index 90549eecf4..404fb498ba 100644 --- a/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt @@ -12,14 +12,13 @@ internal class UnconfinedEventLoop : EventLoop() { internal actual abstract class EventLoopImplPlatform : EventLoop() { protected actual fun unpark(): Unit = unsupported() - protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit = unsupported() -} - -internal actual object DefaultExecutor { - public actual fun enqueue(task: Runnable): Unit = unsupported() } private fun unsupported(): Nothing = throw UnsupportedOperationException("runBlocking event loop is not supported") internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() + +internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) { + Dispatchers.Default.dispatch(Dispatchers.Default, task) +} diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index d2249bfdd0..fe9b91d996 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -4,7 +4,6 @@ package kotlinx.coroutines -import java.util.concurrent.locks.* import kotlin.contracts.* import kotlin.coroutines.* @@ -38,6 +37,9 @@ import kotlin.coroutines.* * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and * this `runBlocking` invocation throws [InterruptedException]. * + * If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns, + * they are resubmitted to [Dispatchers.IO]. + * * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available * for a newly created coroutine. * diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index 3ce7e0d333..3e783a077d 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -11,18 +11,27 @@ internal actual val DefaultDelay: Delay = initializeDefaultDelay() private fun initializeDefaultDelay(): Delay { // Opt-out flag - if (!defaultMainDelayOptIn) return DefaultExecutor + if (!defaultMainDelayOptIn) return DefaultDelayImpl val main = Dispatchers.Main /* * When we already are working with UI and Main threads, it makes * no sense to create a separate thread with timer that cannot be controller * by the UI runtime. */ - return if (main.isMissing() || main !is Delay) DefaultExecutor else main + return if (main.isMissing() || main !is Delay) DefaultDelayImpl else main } -@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") -internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { +internal object DefaultExecutor { + fun shutdown() = DefaultDelayImpl.shutdown() + + fun ensureStarted() = DefaultDelayImpl.ensureStarted() + + fun shutdownForTests(timeout: Long) = DefaultDelayImpl.shutdownForTests(timeout) + + val isThreadPresent: Boolean get() = DefaultDelayImpl.isThreadPresent +} + +private object DefaultDelayImpl : EventLoopImplBase(), Runnable { const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor" init { @@ -61,7 +70,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK } - actual override fun enqueue(task: Runnable) { + override fun enqueue(task: Runnable) { if (isShutDown) shutdownError() super.enqueue(task) } @@ -137,7 +146,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { * the singleton itself instead of using parent' thread one * in order not to accidentally capture temporary application classloader. */ - contextClassLoader = this@DefaultExecutor.javaClass.classLoader + contextClassLoader = this@DefaultDelayImpl.javaClass.classLoader isDaemon = true start() } diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index a5e8da4c40..a6acc129cc 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -2,7 +2,6 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlinx.coroutines.scheduling.* -import kotlin.coroutines.* /** * Name of the property that defines the maximal number of threads that are used by [Dispatchers.IO] coroutines dispatcher. diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index 15d4ab5c85..b4d40aeaad 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -3,6 +3,7 @@ package kotlinx.coroutines import kotlinx.coroutines.Runnable import kotlinx.coroutines.scheduling.* import kotlinx.coroutines.scheduling.CoroutineScheduler +import kotlin.coroutines.EmptyCoroutineContext internal actual abstract class EventLoopImplPlatform: EventLoop() { @@ -14,9 +15,6 @@ internal actual abstract class EventLoopImplPlatform: EventLoop() { unpark(thread) } - protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - DefaultExecutor.schedule(now, delayedTask) - } } internal class BlockingEventLoop( @@ -122,4 +120,3 @@ internal fun Thread.isIoDispatcherThread(): Boolean { if (this !is CoroutineScheduler.Worker) return false return isIo() } - diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index bdfbe6dbbc..5247a2a606 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -131,7 +131,7 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) } catch (e: RejectedExecutionException) { unTrackTask() cancelJobOnRejection(context, e) - Dispatchers.IO.dispatch(context, block) + rescheduleTaskFromClosedDispatcher(block) } } @@ -146,15 +146,15 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) continuation.invokeOnCancellation(CancelFutureOnCancel(future)) return } - // Otherwise fallback to default executor - DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) + // Otherwise fallback to default delay + DefaultDelay.scheduleResumeAfterDelay(timeMillis, continuation) } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) return when { future != null -> DisposableFutureHandle(future) - else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) + else -> DefaultDelay.invokeOnTimeout(timeMillis, block, context) } } @@ -189,6 +189,14 @@ private class ResumeUndispatchedRunnable( } } +private class ResumeDispatchedRunnable( + private val continuation: CancellableContinuation +) : Runnable { + override fun run() { + continuation.resume(Unit) + } +} + /** * An implementation of [DisposableHandle] that cancels the specified future on dispose. * @suppress **This is unstable API and it is subject to change.** diff --git a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt index 58b8024547..e0bf41508e 100644 --- a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt @@ -8,7 +8,7 @@ class DefaultExecutorStressTest : TestBase() { @Test fun testDelay() = runTest { val iterations = 100_000 * stressTestMultiplier - withContext(DefaultExecutor) { + withContext(DefaultDelay as CoroutineDispatcher) { expect(1) var expected = 1 repeat(iterations) { diff --git a/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt b/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt index 551d1977c0..c31f6e67bc 100644 --- a/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt @@ -49,25 +49,6 @@ class EventLoopsTest : TestBase() { finish(5) } - @Test - fun testEventLoopInDefaultExecutor() = runTest { - expect(1) - withContext(Dispatchers.Unconfined) { - delay(1) - assertTrue(Thread.currentThread().name.startsWith(DefaultExecutor.THREAD_NAME)) - expect(2) - // now runBlocking inside default executor thread --> should use outer event loop - DefaultExecutor.enqueue(Runnable { - expect(4) // will execute when runBlocking runs loop - }) - expect(3) - runBlocking { - expect(5) - } - } - finish(6) - } - /** * Simple test for [processNextEventInCurrentThread] API use-case. */ @@ -159,4 +140,4 @@ class EventLoopsTest : TestBase() { waitingThread.value = null } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt index cc2291e6c1..6535fabb6e 100644 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt @@ -13,4 +13,3 @@ class RunBlockingJvmTest : TestBase() { rb.hashCode() // unused } } - diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt index d0a5551567..18eb062e91 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt @@ -35,9 +35,8 @@ fun test(name: String, block: () -> R): List = outputException(name) } finally { // the shutdown log.println("--- shutting down") - DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) shutdownDispatcherPools(SHUTDOWN_TIMEOUT) - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks + DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks } checkTestThreads(threadsBefore) // check thread if the main completed successfully } @@ -55,7 +54,7 @@ private fun shutdownDispatcherPools(timeout: Long) { (thread.dispatcher.executor as ExecutorService).apply { shutdown() awaitTermination(timeout, TimeUnit.MILLISECONDS) - shutdownNow().forEach { DefaultExecutor.enqueue(it) } + shutdownNow().forEach { rescheduleTaskFromClosedDispatcher(it) } } } } diff --git a/kotlinx-coroutines-core/native/src/Builders.kt b/kotlinx-coroutines-core/native/src/Builders.kt index 77b7cacebb..6e70df9f8c 100644 --- a/kotlinx-coroutines-core/native/src/Builders.kt +++ b/kotlinx-coroutines-core/native/src/Builders.kt @@ -33,8 +33,8 @@ import kotlin.native.concurrent.* * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, * then this invocation uses the outer event loop. * - * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and - * this `runBlocking` invocation throws [InterruptedException]. + * If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns, + * they are resubmitted to [Dispatchers.IO]. * * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available * for a newly created coroutine. diff --git a/kotlinx-coroutines-core/native/src/CoroutineContext.kt b/kotlinx-coroutines-core/native/src/CoroutineContext.kt index 3f4c8d9a01..9f6b7fc0a4 100644 --- a/kotlinx-coroutines-core/native/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/native/src/CoroutineContext.kt @@ -3,32 +3,11 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* -internal actual object DefaultExecutor : CoroutineDispatcher(), Delay { - - private val delegate = WorkerDispatcher(name = "DefaultExecutor") - - override fun dispatch(context: CoroutineContext, block: Runnable) { - delegate.dispatch(context, block) - } - - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - delegate.scheduleResumeAfterDelay(timeMillis, continuation) - } - - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - return delegate.invokeOnTimeout(timeMillis, block, context) - } - - actual fun enqueue(task: Runnable): Unit { - delegate.dispatch(EmptyCoroutineContext, task) - } -} +@PublishedApi +internal actual val DefaultDelay: Delay = WorkerDispatcher(name = "DefaultDelay") internal expect fun createDefaultDispatcher(): CoroutineDispatcher -@PublishedApi -internal actual val DefaultDelay: Delay = DefaultExecutor - public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) diff --git a/kotlinx-coroutines-core/native/src/Dispatchers.kt b/kotlinx-coroutines-core/native/src/Dispatchers.kt index e66c05f61d..471adb7417 100644 --- a/kotlinx-coroutines-core/native/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/native/src/Dispatchers.kt @@ -1,6 +1,5 @@ package kotlinx.coroutines -import kotlinx.coroutines.internal.* import kotlin.coroutines.* diff --git a/kotlinx-coroutines-core/native/src/EventLoop.kt b/kotlinx-coroutines-core/native/src/EventLoop.kt index 58128d52fd..fd59b09c99 100644 --- a/kotlinx-coroutines-core/native/src/EventLoop.kt +++ b/kotlinx-coroutines-core/native/src/EventLoop.kt @@ -14,10 +14,6 @@ internal actual abstract class EventLoopImplPlatform : EventLoop() { current.executeAfter(0L, {})// send an empty task to unpark the waiting event loop } - protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now) - DefaultExecutor.invokeOnTimeout(delayTimeMillis, delayedTask, EmptyCoroutineContext) - } } internal class EventLoopImpl: EventLoopImplBase() { diff --git a/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt b/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt index 5d200d328a..bc9fe4a019 100644 --- a/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt @@ -8,9 +8,13 @@ internal actual fun createMainDispatcher(default: CoroutineDispatcher): MainCoro internal actual fun createDefaultDispatcher(): CoroutineDispatcher = DefaultDispatcher +/** + * This is not just `private val DefaultDispatcher = newFixedThreadPoolContext(...)` to + * 1. Prevent casting [Dispatchers.Default] to [CloseableCoroutineDispatcher] and closing it + * 2. Make it non-[Delay] + */ private object DefaultDispatcher : CoroutineDispatcher() { // Be consistent with JVM -- at least 2 threads to provide some liveness guarantees in case of improper uses - @OptIn(ExperimentalStdlibApi::class) private val ctx = newFixedThreadPoolContext(Platform.getAvailableProcessors().coerceAtLeast(2), "Dispatchers.Default") override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt b/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt index a0f392e5b0..1c7ebe44a0 100644 --- a/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt +++ b/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt @@ -13,7 +13,7 @@ private external fun wasiRawClockTimeGet(clockId: Int, precision: Long, resultPt private const val CLOCKID_MONOTONIC = 1 -internal actual fun createEventLoop(): EventLoop = DefaultExecutor +internal actual fun createEventLoop(): EventLoop = GlobalEventLoop internal actual fun nanoTime(): Long = withScopedMemoryAllocator { allocator: MemoryAllocator -> val ptrTo8Bytes = allocator.allocate(8) @@ -38,7 +38,7 @@ private fun sleep(nanos: Long, ptrTo32Bytes: Pointer, ptrTo8Bytes: Pointer, ptrT check(returnCode == 0) { "poll_oneoff failed with the return code $returnCode" } } -internal actual object DefaultExecutor : EventLoopImplBase() { +private object GlobalEventLoop : EventLoopImplBase() { init { if (kotlin.wasm.internal.onExportedFunctionExit == null) { @@ -59,12 +59,6 @@ internal actual abstract class EventLoopImplPlatform : EventLoop() { // do nothing: in WASI, no external callbacks can be invoked while `poll_oneoff` is running, // so it is both impossible and unnecessary to unpark the event loop } - - protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - // throw; on WASI, the event loop is the default executor, we can't shut it down or reschedule tasks - // to anyone else - throw UnsupportedOperationException("runBlocking event loop is not supported") - } } internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() @@ -74,7 +68,7 @@ internal fun runEventLoop() { val ptrToSubscription = initializeSubscriptionPtr(allocator) val ptrTo32Bytes = allocator.allocate(32) val ptrTo8Bytes = allocator.allocate(8) - val eventLoop = DefaultExecutor + val eventLoop = GlobalEventLoop eventLoop.incrementUseCount() try { while (true) { @@ -117,4 +111,7 @@ private fun initializeSubscriptionPtr(allocator: MemoryAllocator): Pointer { return ptrToSubscription } -internal actual fun createDefaultDispatcher(): CoroutineDispatcher = DefaultExecutor +internal actual fun createDefaultDispatcher(): CoroutineDispatcher = GlobalEventLoop + +internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) = + GlobalEventLoop.enqueue(task) From 9e08eaa95f9cae2f955a1af0f3ff63bc798018e4 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 8 Nov 2024 15:24:55 +0100 Subject: [PATCH 02/10] Add some internal documentation --- kotlinx-coroutines-core/common/src/Delay.kt | 2 + .../jvm/src/AbstractTimeSource.kt | 62 +++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index 67d3d16bb1..f3ec6c3433 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -48,6 +48,8 @@ public interface Delay { * Schedules invocation of a specified [block] after a specified delay [timeMillis]. * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation * request if it is not needed anymore. + * + * [block] must execute quickly, be non-blocking, and must not throw any exceptions. */ public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = DefaultDelay.invokeOnTimeout(timeMillis, block, context) diff --git a/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt index f497dc803c..3f3c1a2fba 100644 --- a/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt @@ -27,43 +27,105 @@ internal inline fun mockTimeSource(source: AbstractTimeSource?) { timeSource = source } +/** + * The current system time in milliseconds. + * + * This is only used for automatically-generated tests in place of [System.currentTimeMillis], + * it is not used in production code. + */ @InlineOnly internal inline fun currentTimeMillis(): Long = timeSource?.currentTimeMillis() ?: System.currentTimeMillis() +/** + * The reading from a high-precision monotonic clock used for measuring time intervals. + * Logically equivalent to [kotlin.time.TimeSource.Monotonic.markNow]. + */ @InlineOnly internal actual inline fun nanoTime(): Long = timeSource?.nanoTime() ?: System.nanoTime() +/** + * Calls [trackTask] and returns a wrapped version of the given [block] that calls [unTrackTask] after it completes. + * Is optimized to just returning [block] if [trackTask] and [unTrackTask] are no-ops. + */ @InlineOnly internal inline fun wrapTask(block: Runnable): Runnable = timeSource?.wrapTask(block) ?: block +/** + * Increments the number of tasks not under our control. + * + * Virtual time source uses this to decide whether to jump to the moment of virtual time when the next sleeping thread + * should wake up. + * If there are some uncontrollable tasks, it will not jump to the moment of the next sleeping thread, + * because the uncontrollable tasks may change the shared state in a way that affects the sleeping thread. + * + * Example: + * + * ``` + * thread { // controlled thread + * while (true) { + * if (sharedState == 42) { + * break + * } + * Thread.sleep(1000) + * } + * } + * + * thread { // uncontrolled thread + * sharedState = 42 + * } + * ``` + * + * If the second thread is not tracked, the first thread effectively enters a spin loop until the second thread + * physically changes the shared state. + */ @InlineOnly internal inline fun trackTask() { timeSource?.trackTask() } +/** + * Decrements the number of tasks not under our control. See [trackTask] for more details. + */ @InlineOnly internal inline fun unTrackTask() { timeSource?.unTrackTask() } +/** + * Increases the registered number of nested loops of the form + * `while (nanoTime() < deadline) { parkNanos(deadline - nanoTime()) }` running in the current thread. + * + * While at least one such loop is running, other threads are allowed to call [unpark] on the current thread + * and wake it up. Before [registerTimeLoopThread] is called, [unpark] is not guaranteed to have any effect. + */ @InlineOnly internal inline fun registerTimeLoopThread() { timeSource?.registerTimeLoopThread() } +/** + * The opposite of [registerTimeLoopThread]. + */ @InlineOnly internal inline fun unregisterTimeLoopThread() { timeSource?.unregisterTimeLoopThread() } +/** + * Waits for either the specified number of nanoseconds to pass or until [unpark] is called. + */ @InlineOnly internal inline fun parkNanos(blocker: Any, nanos: Long) { timeSource?.parkNanos(blocker, nanos) ?: LockSupport.parkNanos(blocker, nanos) } +/** + * Preliminarily unparks the specified thread that's currently parked in [parkNanos]. + * Can be called even before the thread is parked. + */ @InlineOnly internal inline fun unpark(thread: Thread) { timeSource?.unpark(thread) ?: LockSupport.unpark(thread) From 2a51586d8d02f6784905bcf849f023734fab0cfb Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 21 Nov 2024 13:44:23 +0100 Subject: [PATCH 03/10] Use a Dispatchers.IO thread for DefaultDelay on the JVM --- .../coroutine-context-and-dispatchers.md | 2 +- .../jvm/src/DefaultExecutor.kt | 217 ++++++------------ .../jvm/src/Dispatchers.kt | 5 +- kotlinx-coroutines-core/jvm/src/EventLoop.kt | 20 +- .../jvm/test/DefaultExecutorStressTest.kt | 26 --- .../jvm/test/DispatchersToStringTest.kt | 6 +- .../jvm/test/VirtualTimeSource.kt | 8 +- .../test/guide/test/DispatcherGuideTest.kt | 2 +- .../jvm/test/knit/TestUtil.kt | 1 - .../scheduling/CoroutineDispatcherTest.kt | 6 +- .../jvm/test/scheduling/SchedulerTestBase.kt | 65 +++--- .../native/src/EventLoop.kt | 2 +- .../test/DebugTestBase.kt | 1 - test-utils/jvm/src/TestBase.kt | 1 - 14 files changed, 131 insertions(+), 231 deletions(-) diff --git a/docs/topics/coroutine-context-and-dispatchers.md b/docs/topics/coroutine-context-and-dispatchers.md index 89498af00e..b99fcc4a2c 100644 --- a/docs/topics/coroutine-context-and-dispatchers.md +++ b/docs/topics/coroutine-context-and-dispatchers.md @@ -115,7 +115,7 @@ Produces the output: ```text Unconfined : I'm working in thread main main runBlocking: I'm working in thread main -Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor +Unconfined : After delay in thread kotlinx.coroutines.DefaultDelay main runBlocking: After delay in thread main ``` diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index 3e783a077d..094a026ddb 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -1,8 +1,9 @@ package kotlinx.coroutines +import kotlinx.atomicfu.* import kotlinx.coroutines.internal.* -import java.util.concurrent.* import kotlin.coroutines.* +import kotlin.time.Duration private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false) @@ -21,78 +22,38 @@ private fun initializeDefaultDelay(): Delay { return if (main.isMissing() || main !is Delay) DefaultDelayImpl else main } -internal object DefaultExecutor { - fun shutdown() = DefaultDelayImpl.shutdown() - - fun ensureStarted() = DefaultDelayImpl.ensureStarted() - - fun shutdownForTests(timeout: Long) = DefaultDelayImpl.shutdownForTests(timeout) - - val isThreadPresent: Boolean get() = DefaultDelayImpl.isThreadPresent +/** + * This method can be invoked after all coroutines are completed to wait for the default delay executor to shut down + * in response to the lack of tasks. + * + * This is only useful in tests to ensure that setting a fresh virtual time source will not confuse the default delay + * still running the previous test. + * + * Does nothing if the default delay executor is not in use. + * + * @throws IllegalStateException if the shutdown process notices new tasks entering the system + * @throws IllegalStateException if the shutdown process times out + */ +internal fun ensureDefaultDelayDeinitialized(timeout: Duration) { + (DefaultDelay as? DefaultDelayImpl)?.shutdownForTests(timeout) } private object DefaultDelayImpl : EventLoopImplBase(), Runnable { - const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor" + const val THREAD_NAME = "kotlinx.coroutines.DefaultDelay" init { incrementUseCount() // this event loop is never completed } - private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds - - private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos( - try { - java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS) - } catch (e: SecurityException) { - DEFAULT_KEEP_ALIVE_MS - }) - - @Suppress("ObjectPropertyName") - @Volatile - private var _thread: Thread? = null - - override val thread: Thread - get() = _thread ?: createThreadSync() - - private const val FRESH = 0 - private const val ACTIVE = 1 - private const val SHUTDOWN_REQ = 2 - private const val SHUTDOWN_ACK = 3 - private const val SHUTDOWN = 4 - - @Volatile - private var debugStatus: Int = FRESH - - private val isShutDown: Boolean get() = debugStatus == SHUTDOWN - - private val isShutdownRequested: Boolean get() { - val debugStatus = debugStatus - return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK - } - - override fun enqueue(task: Runnable) { - if (isShutDown) shutdownError() - super.enqueue(task) - } - - override fun reschedule(now: Long, delayedTask: DelayedTask) { - // Reschedule on default executor can only be invoked after Dispatchers.shutdown - shutdownError() - } + private val _thread = atomic(null) - private fun shutdownError() { - throw RejectedExecutionException("DefaultExecutor was shut down. " + - "This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " + - "Please refer to Dispatchers.shutdown documentation for more details") - } - - override fun shutdown() { - debugStatus = SHUTDOWN - super.shutdown() + /** Can only happen when tests close the default executor */ + override fun reschedule(now: Long, delayedTask: DelayedTask) { + throw IllegalStateException("Attempted to schedule $delayedTask at $now after shutdown") } /** - * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on + * All event loops are using DefaultDelay#invokeOnTimeout to avoid livelock on * ``` * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } } * ``` @@ -104,100 +65,70 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable { scheduleInvokeOnTimeout(timeMillis, block) override fun run() { - ThreadLocalEventLoop.setEventLoop(this) - registerTimeLoopThread() + val currentThread = Thread.currentThread() + if (!_thread.compareAndSet(null, currentThread)) return // some other thread won the race to start the thread + val oldName = currentThread.name + currentThread.name = THREAD_NAME try { - var shutdownNanos = Long.MAX_VALUE - if (!notifyStartup()) return - while (true) { - Thread.interrupted() // just reset interruption flag - var parkNanos = processNextEvent() - if (parkNanos == Long.MAX_VALUE) { - // nothing to do, initialize shutdown timeout - val now = nanoTime() - if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS - val tillShutdown = shutdownNanos - now - if (tillShutdown <= 0) return // shut thread down - parkNanos = parkNanos.coerceAtMost(tillShutdown) - } else - shutdownNanos = Long.MAX_VALUE - if (parkNanos > 0) { - // check if shutdown was requested and bail out in this case - if (isShutdownRequested) return - parkNanos(this, parkNanos) + ThreadLocalEventLoop.setEventLoop(DefaultDelayImpl) + registerTimeLoopThread() + try { + while (true) { + Thread.interrupted() // just reset interruption flag + val parkNanos = processNextEvent() + if (parkNanos == Long.MAX_VALUE) break // no more events + parkNanos(DefaultDelayImpl, parkNanos) + } + } finally { + _thread.value = null + unregisterTimeLoopThread() + // recheck if queues are empty after _thread reference was set to null (!!!) + if (isEmpty) { + notifyAboutThreadExiting() + } else { + /* recreate the thread, as there is still work to do, + and `unpark` could have awoken the thread we're currently running on */ + startThreadOrObtainSleepingThread() } } } finally { - _thread = null // this thread is dead - acknowledgeShutdownIfNeeded() - unregisterTimeLoopThread() - // recheck if queues are empty after _thread reference was set to null (!!!) - if (!isEmpty) thread // recreate thread if it is needed + currentThread.name = oldName } } - @Synchronized - private fun createThreadSync(): Thread { - return _thread ?: Thread(this, THREAD_NAME).apply { - _thread = this - /* - * `DefaultExecutor` is a global singleton that creates its thread lazily. - * To isolate the classloaders properly, we are inherting the context classloader from - * the singleton itself instead of using parent' thread one - * in order not to accidentally capture temporary application classloader. - */ - contextClassLoader = this@DefaultDelayImpl.javaClass.classLoader - isDaemon = true - start() - } - } - - // used for tests - @Synchronized - internal fun ensureStarted() { - assert { _thread == null } // ensure we are at a clean state - assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK } - debugStatus = FRESH - createThreadSync() // create fresh thread - while (debugStatus == FRESH) (this as Object).wait() + override fun startThreadOrObtainSleepingThread(): Thread? { + // Check if the thread is already running + _thread.value?.let { return it } + /* Now we know that at the moment of this call the thread was not initially running. + This means that whatever thread is going to be running by the end of this function, + it's going to notice the tasks it's supposed to run. + We can return `null` unconditionally. */ + ioView.dispatch(ioView, this) + return null } - @Synchronized - private fun notifyStartup(): Boolean { - if (isShutdownRequested) return false - debugStatus = ACTIVE - (this as Object).notifyAll() - return true - } - - @Synchronized // used _only_ for tests - fun shutdownForTests(timeout: Long) { - val deadline = System.currentTimeMillis() + timeout - if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ - // loop while there is anything to do immediately or deadline passes - while (debugStatus != SHUTDOWN_ACK && _thread != null) { - _thread?.let { unpark(it) } // wake up thread if present - val remaining = deadline - System.currentTimeMillis() - if (remaining <= 0) break - (this as Object).wait(timeout) + fun shutdownForTests(timeout: Duration) { + if (_thread.value != null) { + val end = System.currentTimeMillis() + timeout.inWholeMilliseconds + while (true) { + check(isEmpty) { "There are tasks in the DefaultExecutor" } + synchronized(this) { + unpark(_thread.value ?: return) + val toWait = end - System.currentTimeMillis() + check(toWait > 0) { "Timeout waiting for DefaultExecutor to shutdown" } + (this as Object).wait(toWait) + } + } } - // restore fresh status - debugStatus = FRESH } - @Synchronized - private fun acknowledgeShutdownIfNeeded() { - if (!isShutdownRequested) return - debugStatus = SHUTDOWN_ACK - resetAll() // clear queues - (this as Object).notifyAll() + private fun notifyAboutThreadExiting() { + synchronized(this) { (this as Object).notifyAll() } } - // User only for testing and nothing else - internal val isThreadPresent - get() = _thread != null - - override fun toString(): String { - return "DefaultExecutor" - } + override fun toString(): String = "DefaultDelay" } + +/** A view separate from [Dispatchers.IO]. + * [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */ +private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index a6acc129cc..c0c244c0b6 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -66,8 +66,8 @@ public actual object Dispatchers { /** * Shuts down built-in dispatchers, such as [Default] and [IO], * stopping all the threads associated with them and making them reject all new tasks. - * Dispatcher used as a fallback for time-related operations (`delay`, `withTimeout`) - * and to handle rejected tasks from other dispatchers is also shut down. + * Dispatchers used as fallbacks for time-related operations (`delay`, `withTimeout`) + * and to handle rejected tasks from other dispatchers are also shut down. * * This is a **delicate** API. It is not supposed to be called from a general * application-level code and its invocation is irreversible. @@ -85,7 +85,6 @@ public actual object Dispatchers { */ @DelicateCoroutinesApi public fun shutdown() { - DefaultExecutor.shutdown() // Also shuts down Dispatchers.IO DefaultScheduler.shutdown() } diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index b4d40aeaad..3d6c15e989 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -1,25 +1,25 @@ package kotlinx.coroutines -import kotlinx.coroutines.Runnable -import kotlinx.coroutines.scheduling.* import kotlinx.coroutines.scheduling.CoroutineScheduler -import kotlin.coroutines.EmptyCoroutineContext internal actual abstract class EventLoopImplPlatform: EventLoop() { - - protected abstract val thread: Thread + /** Returns `null` if a thread was created and doesn't need to be awoken. + * Returns a thread to awaken if the thread already existed when this method was called. */ + protected abstract fun startThreadOrObtainSleepingThread(): Thread? protected actual fun unpark() { - val thread = thread // atomic read - if (Thread.currentThread() !== thread) - unpark(thread) + startThreadOrObtainSleepingThread()?.let(::unpark) } } internal class BlockingEventLoop( - override val thread: Thread -) : EventLoopImplBase() + private val thread: Thread +) : EventLoopImplBase() { + override fun startThreadOrObtainSleepingThread(): Thread? = + if (Thread.currentThread() !== thread) thread else null + +} internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread()) diff --git a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt index e0bf41508e..773b518b8b 100644 --- a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt @@ -33,30 +33,4 @@ class DefaultExecutorStressTest : TestBase() { } finish(2 + iterations * 4) } - - @Test - fun testWorkerShutdown() = withVirtualTimeSource { - val iterations = 1_000 * stressTestMultiplier - // wait for the worker to shut down - suspend fun awaitWorkerShutdown() { - val executorTimeoutMs = 1000L - delay(executorTimeoutMs) - while (DefaultExecutor.isThreadPresent) { delay(10) } // hangs if the thread refuses to stop - assertFalse(DefaultExecutor.isThreadPresent) // just to make sure - } - runTest { - awaitWorkerShutdown() // so that the worker shuts down after the initial launch - repeat (iterations) { - val job = launch(Dispatchers.Unconfined) { - // this line runs in the main thread - delay(1) - // this line runs in the DefaultExecutor worker - } - delay(100) // yield the execution, allow the worker to spawn - assertTrue(DefaultExecutor.isThreadPresent) // the worker spawned - job.join() - awaitWorkerShutdown() - } - } - } } diff --git a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt index 32573ca1f6..06560391c1 100644 --- a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt @@ -29,12 +29,12 @@ class DispatchersToStringTest { ) } // Not overridden at all, limited parallelism returns `this` - assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + assertEquals("DefaultDelay", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString()) assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString()) assertEquals("\uD80C\uDE11", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "\uD80C\uDE11").toString()) - assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + assertEquals("DefaultDelay", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) val limitedNamed = Dispatchers.IO.limitedParallelism(10, "limited") assertEquals("limited.limitedParallelism(2)", limitedNamed.limitedParallelism(2).toString()) @@ -53,4 +53,4 @@ class DispatchersToStringTest { assertEquals("Named", named.toString()) } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index 8a461087c3..30b3d7f162 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -3,18 +3,18 @@ package kotlinx.coroutines import java.io.* import java.util.concurrent.* import java.util.concurrent.locks.* +import kotlin.time.Duration.Companion.seconds -private const val SHUTDOWN_TIMEOUT = 1000L +private val SHUTDOWN_TIMEOUT = 1.seconds internal inline fun withVirtualTimeSource(log: PrintStream? = null, block: () -> Unit) { - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working) + ensureDefaultDelayDeinitialized(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working) val testTimeSource = VirtualTimeSource(log) mockTimeSource(testTimeSource) - DefaultExecutor.ensureStarted() // should start with new time source try { block() } finally { - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) + ensureDefaultDelayDeinitialized(SHUTDOWN_TIMEOUT) testTimeSource.shutdown() mockTimeSource(null) // restore time source } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt index 1cf6c2bd4d..4bd131d85e 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt @@ -20,7 +20,7 @@ class DispatcherGuideTest { test("ExampleContext02") { kotlinx.coroutines.guide.exampleContext02.main() }.verifyLinesStart( "Unconfined : I'm working in thread main", "main runBlocking: I'm working in thread main", - "Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor", + "Unconfined : After delay in thread kotlinx.coroutines.DefaultDelay", "main runBlocking: After delay in thread main" ) } diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt index 18eb062e91..3821cb6824 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt @@ -24,7 +24,6 @@ fun test(name: String, block: () -> R): List = outputException(name) try { captureOutput(name, stdoutEnabled = OUT_ENABLED) { log -> DefaultScheduler.usePrivateScheduler() - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) resetCoroutineId() val threadsBefore = currentThreads() try { diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt index ee21be23fc..e4895cdc06 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt @@ -1,6 +1,5 @@ package kotlinx.coroutines.scheduling -import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test @@ -17,17 +16,22 @@ class CoroutineDispatcherTest : SchedulerTestBase() { @Test fun testSingleThread() = runBlocking { corePoolSize = 1 + println("1. Thread is ${Thread.currentThread()}") expect(1) withContext(dispatcher) { require(Thread.currentThread() is CoroutineScheduler.Worker) + println("2. Thread is ${Thread.currentThread()}") expect(2) val job = async { + println("3. Thread is ${Thread.currentThread()}") expect(3) delay(10) + println("4. Thread is ${Thread.currentThread()}") expect(4) } job.await() + println("5. Thread is ${Thread.currentThread()}") expect(5) } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt index 33e32838da..87dbe8b77b 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt @@ -13,33 +13,6 @@ abstract class SchedulerTestBase : TestBase() { companion object { val CORES_COUNT = AVAILABLE_PROCESSORS - /** - * Asserts that [expectedThreadsCount] pool worker threads were created. - * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking - */ - fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) { - val threadsCount = maxSequenceNumber()!! - assertEquals(expectedThreadsCount, threadsCount, "Expected $expectedThreadsCount pool threads, but has $threadsCount") - } - - /** - * Asserts that any number of pool worker threads in [range] were created. - * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking - */ - fun checkPoolThreadsCreated(range: IntRange, base: Int = CORES_COUNT) { - val maxSequenceNumber = maxSequenceNumber()!! - val r = (range.first)..(range.last + base) - assertTrue( - maxSequenceNumber in r, - "Expected pool threads to be in interval $r, but has $maxSequenceNumber" - ) - } - - private fun maxSequenceNumber(): Int? { - return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker } - .map { sequenceNumber(it.name) }.maxOrNull() - } - private fun sequenceNumber(threadName: String): Int { val suffix = threadName.substring(threadName.lastIndexOf("-") + 1) val separatorIndex = suffix.indexOf(' ') @@ -49,8 +22,6 @@ abstract class SchedulerTestBase : TestBase() { return suffix.substring(0, separatorIndex).toInt() } - - suspend fun Iterable.joinAll() = forEach { it.join() } } protected var corePoolSize = CORES_COUNT @@ -85,19 +56,43 @@ abstract class SchedulerTestBase : TestBase() { return _dispatcher!!.limitedParallelism(parallelism) } + /** + * Asserts that [expectedThreadsCount] pool worker threads were created. + * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking + */ + fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) { + val threadsCount = maxSequenceNumber()!! + assertEquals(expectedThreadsCount, threadsCount, "Expected $expectedThreadsCount pool threads, but has $threadsCount") + } + + /** + * Asserts that any number of pool worker threads in [range] were created. + * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking + */ + fun checkPoolThreadsCreated(range: IntRange, base: Int = CORES_COUNT) { + val maxSequenceNumber = maxSequenceNumber()!! + val r = (range.first)..(range.last + base) + assertTrue( + maxSequenceNumber in r, + "Expected pool threads to be in interval $r, but has $maxSequenceNumber" + ) + } + + private fun maxSequenceNumber(): Int? { + return Thread.getAllStackTraces().keys.asSequence().filter { + it is CoroutineScheduler.Worker && it.scheduler === _dispatcher?.executor + }.map { sequenceNumber(it.name) }.maxOrNull() + } + @After fun after() { - runBlocking { - withTimeout(5_000) { - _dispatcher?.close() - } - } + _dispatcher?.close() } } /** * Implementation note: - * Our [Dispatcher.IO] is a [limitedParallelism][CoroutineDispatcher.limitedParallelism] dispatcher + * Our [Dispatchers.IO] is a [limitedParallelism][CoroutineDispatcher.limitedParallelism] dispatcher * on top of unbounded scheduler. We want to test this scenario, but on top of non-singleton * scheduler so we can control the number of threads, thus this method. */ diff --git a/kotlinx-coroutines-core/native/src/EventLoop.kt b/kotlinx-coroutines-core/native/src/EventLoop.kt index fd59b09c99..f993ee7fb0 100644 --- a/kotlinx-coroutines-core/native/src/EventLoop.kt +++ b/kotlinx-coroutines-core/native/src/EventLoop.kt @@ -8,7 +8,7 @@ import kotlin.time.* internal actual abstract class EventLoopImplPlatform : EventLoop() { - private val current = Worker.current + private val current = Worker.current // not `get()`! We're interested in the worker at the moment of creation. protected actual fun unpark() { current.executeAfter(0L, {})// send an empty task to unpark the waiting event loop diff --git a/kotlinx-coroutines-debug/test/DebugTestBase.kt b/kotlinx-coroutines-debug/test/DebugTestBase.kt index 97c2906e0b..f3f2097dd1 100644 --- a/kotlinx-coroutines-debug/test/DebugTestBase.kt +++ b/kotlinx-coroutines-debug/test/DebugTestBase.kt @@ -2,7 +2,6 @@ package kotlinx.coroutines.debug import kotlinx.coroutines.testing.* -import kotlinx.coroutines.* import kotlinx.coroutines.debug.junit4.* import org.junit.* diff --git a/test-utils/jvm/src/TestBase.kt b/test-utils/jvm/src/TestBase.kt index e0d4cba1da..da67b39c93 100644 --- a/test-utils/jvm/src/TestBase.kt +++ b/test-utils/jvm/src/TestBase.kt @@ -172,7 +172,6 @@ fun initPoolsBeforeTest() { @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") fun shutdownPoolsAfterTest() { DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) DefaultScheduler.restore() } From 27067fcaa645e3aeb92360709bafd7ce5156ada0 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 21 Nov 2024 13:53:10 +0100 Subject: [PATCH 04/10] Cleanup --- .../jvm/src/{DefaultExecutor.kt => DefaultDelay.kt} | 0 kotlinx-coroutines-core/jvm/src/Executors.kt | 8 -------- .../jvm/test/scheduling/CoroutineDispatcherTest.kt | 5 ----- 3 files changed, 13 deletions(-) rename kotlinx-coroutines-core/jvm/src/{DefaultExecutor.kt => DefaultDelay.kt} (100%) diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt similarity index 100% rename from kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt rename to kotlinx-coroutines-core/jvm/src/DefaultDelay.kt diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 5247a2a606..e8090c1cf4 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -189,14 +189,6 @@ private class ResumeUndispatchedRunnable( } } -private class ResumeDispatchedRunnable( - private val continuation: CancellableContinuation -) : Runnable { - override fun run() { - continuation.resume(Unit) - } -} - /** * An implementation of [DisposableHandle] that cancels the specified future on dispose. * @suppress **This is unstable API and it is subject to change.** diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt index e4895cdc06..398d1150f7 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt @@ -16,22 +16,17 @@ class CoroutineDispatcherTest : SchedulerTestBase() { @Test fun testSingleThread() = runBlocking { corePoolSize = 1 - println("1. Thread is ${Thread.currentThread()}") expect(1) withContext(dispatcher) { require(Thread.currentThread() is CoroutineScheduler.Worker) - println("2. Thread is ${Thread.currentThread()}") expect(2) val job = async { - println("3. Thread is ${Thread.currentThread()}") expect(3) delay(10) - println("4. Thread is ${Thread.currentThread()}") expect(4) } job.await() - println("5. Thread is ${Thread.currentThread()}") expect(5) } From d04fea7862232eb862c6e5120d8a26c3619875d8 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Mon, 2 Dec 2024 13:13:35 +0100 Subject: [PATCH 05/10] WIP --- .../coroutine-context-and-dispatchers.md | 2 +- .../common/src/EventLoop.common.kt | 96 ++++++++++++++----- .../src/internal/DispatchedContinuation.kt | 6 +- .../common/src/internal/DispatchedTask.kt | 15 +-- .../common/test/flow/VirtualTime.kt | 2 +- .../concurrent/src/Builders.concurrent.kt | 3 + .../jsAndWasmJsShared/src/EventLoop.kt | 4 +- kotlinx-coroutines-core/jvm/src/Builders.kt | 4 +- .../jvm/src/DefaultDelay.kt | 35 ++++++- .../jvm/src/Dispatchers.kt | 3 +- kotlinx-coroutines-core/jvm/src/EventLoop.kt | 2 +- .../test/channels/TickerChannelCommonTest.kt | 2 +- .../test/guide/test/DispatcherGuideTest.kt | 2 +- .../native/src/Builders.kt | 5 +- 14 files changed, 127 insertions(+), 54 deletions(-) diff --git a/docs/topics/coroutine-context-and-dispatchers.md b/docs/topics/coroutine-context-and-dispatchers.md index b99fcc4a2c..13f0cfd9ab 100644 --- a/docs/topics/coroutine-context-and-dispatchers.md +++ b/docs/topics/coroutine-context-and-dispatchers.md @@ -115,7 +115,7 @@ Produces the output: ```text Unconfined : I'm working in thread main main runBlocking: I'm working in thread main -Unconfined : After delay in thread kotlinx.coroutines.DefaultDelay +Unconfined : After delay in thread DefaultDispatcher oroutine#2 main runBlocking: After delay in thread main ``` diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 83dca6f0d5..e17870392d 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -6,6 +6,48 @@ import kotlin.concurrent.Volatile import kotlin.coroutines.* import kotlin.jvm.* +internal interface UnconfinedEventLoop { + /** + * Returns `true` if calling [yield] in a coroutine in this event loop can avoid yielding and continue executing + * due to there being no other tasks in the queue. + * + * This can only be called from the thread that owns this event loop. + */ + val thisLoopsTaskCanAvoidYielding: Boolean + + /** + * Returns `true` if someone (typically a call to [runUnconfinedEventLoop]) is currently processing the tasks, + * so calling [dispatchUnconfined] is guaranteed to be processed eventually. + * + * This can only be called from the thread that owns this event loop. + */ + val isUnconfinedLoopActive: Boolean + + /** + * Executes [initialBlock] and then processes unconfined tasks until there are no more, blocking the current thread. + * + * This can only be called when no other [runUnconfinedEventLoop] is currently active on this event loop. + * + * This can only be called from the thread that owns this event loop. + */ + fun runUnconfinedEventLoop(initialBlock: () -> Unit) + + /** + * Sends the [task] to this event loop for execution. + * + * This method should only be called while [isUnconfinedLoopActive] is `true`. + * Otherwise, the task may be left unprocessed. + * + * This can only be called from the thread that owns this event loop. + */ + fun dispatchUnconfined(task: DispatchedTask<*>) + + /** + * Tries to interpret this event loop for unconfined tasks as a proper event loop and returns it if successful. + */ + fun tryUseAsEventLoop(): EventLoop? +} + /** * Extended by [CoroutineDispatcher] implementations that have event loop inside and can * be asked to process next event from their event queue. @@ -16,7 +58,7 @@ import kotlin.jvm.* * * @suppress **This an internal API and should not be used from general code.** */ -internal abstract class EventLoop : CoroutineDispatcher() { +internal abstract class EventLoop : CoroutineDispatcher(), UnconfinedEventLoop { /** * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop. */ @@ -51,8 +93,6 @@ internal abstract class EventLoop : CoroutineDispatcher() { return 0 } - protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty - protected open val nextTime: Long get() { val queue = unconfinedQueue ?: return Long.MAX_VALUE @@ -65,6 +105,7 @@ internal abstract class EventLoop : CoroutineDispatcher() { task.run() return true } + /** * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). @@ -77,28 +118,26 @@ internal abstract class EventLoop : CoroutineDispatcher() { * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded] * into the current event loop. */ - fun dispatchUnconfined(task: DispatchedTask<*>) { - val queue = unconfinedQueue ?: - ArrayDeque>().also { unconfinedQueue = it } + override fun dispatchUnconfined(task: DispatchedTask<*>) { + val queue = unconfinedQueue ?: ArrayDeque>().also { unconfinedQueue = it } queue.addLast(task) } val isActive: Boolean get() = useCount > 0 - val isUnconfinedLoopActive: Boolean + override val isUnconfinedLoopActive: Boolean get() = useCount >= delta(unconfined = true) - // May only be used from the event loop's thread - val isUnconfinedQueueEmpty: Boolean - get() = unconfinedQueue?.isEmpty() ?: true + override val thisLoopsTaskCanAvoidYielding: Boolean + get() = unconfinedQueue?.isEmpty() != false private fun delta(unconfined: Boolean) = if (unconfined) (1L shl 32) else 1L fun incrementUseCount(unconfined: Boolean = false) { useCount += delta(unconfined) - if (!unconfined) shared = true + if (!unconfined) shared = true } fun decrementUseCount(unconfined: Boolean = false) { @@ -117,22 +156,37 @@ internal abstract class EventLoop : CoroutineDispatcher() { } open fun shutdown() {} + + override fun runUnconfinedEventLoop(initialBlock: () -> Unit) { + incrementUseCount(unconfined = true) + try { + initialBlock() + while (true) { + // break when all unconfined continuations where executed + if (!processUnconfinedEvent()) break + } + } finally { + decrementUseCount(unconfined = true) + } + } + + override fun tryUseAsEventLoop(): EventLoop? = this } internal object ThreadLocalEventLoop { - private val ref = commonThreadLocal(Symbol("ThreadLocalEventLoop")) + private val ref = commonThreadLocal(Symbol("ThreadLocalEventLoop")) - internal val eventLoop: EventLoop + internal val unconfinedEventLoop: UnconfinedEventLoop get() = ref.get() ?: createEventLoop().also { ref.set(it) } - internal fun currentOrNull(): EventLoop? = + internal fun currentOrNull(): UnconfinedEventLoop? = ref.get() internal fun resetEventLoop() { ref.set(null) } - internal fun setEventLoop(eventLoop: EventLoop) { + internal fun setEventLoop(eventLoop: UnconfinedEventLoop) { ref.set(eventLoop) } } @@ -183,8 +237,10 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { get() = _isCompleted.value set(value) { _isCompleted.value = value } - override val isEmpty: Boolean get() { - if (!isUnconfinedQueueEmpty) return false + /** + * Checks that at the moment this method is called, there are no tasks in the delayed tasks queue. + */ + protected val delayedQueueIsEmpty: Boolean get() { val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) return false return when (val queue = _queue.value) { @@ -383,12 +439,6 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { return delayedTask.scheduleTask(now, delayedQueue, this) } - // It performs "hard" shutdown for test cleanup purposes - protected fun resetAll() { - _queue.value = null - _delayed.value = null - } - // This is a "soft" (normal) shutdown private fun rescheduleAllDelayed() { val now = nanoTime() diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index 4c8f54e877..9891794d2d 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -292,12 +292,12 @@ internal fun DispatchedContinuation.yieldUndispatched(): Boolean = */ private inline fun DispatchedContinuation<*>.executeUnconfined( contState: Any?, mode: Int, doYield: Boolean = false, - block: () -> Unit + noinline block: () -> Unit ): Boolean { assert { mode != MODE_UNINITIALIZED } // invalid execution mode - val eventLoop = ThreadLocalEventLoop.eventLoop + val eventLoop = ThreadLocalEventLoop.unconfinedEventLoop // If we are yielding and unconfined queue is empty, we can bail out as part of fast path - if (doYield && eventLoop.isUnconfinedQueueEmpty) return false + if (doYield && eventLoop.thisLoopsTaskCanAvoidYielding) return false return if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow _state = contState diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt index ad5fed1205..a619827a96 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt @@ -165,7 +165,7 @@ internal fun DispatchedTask.resume(delegate: Continuation, undispatche } private fun DispatchedTask<*>.resumeUnconfined() { - val eventLoop = ThreadLocalEventLoop.eventLoop + val eventLoop = ThreadLocalEventLoop.unconfinedEventLoop if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow eventLoop.dispatchUnconfined(this) @@ -177,25 +177,18 @@ private fun DispatchedTask<*>.resumeUnconfined() { } } -internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( - eventLoop: EventLoop, +internal fun DispatchedTask<*>.runUnconfinedEventLoop( + eventLoop: UnconfinedEventLoop, block: () -> Unit ) { - eventLoop.incrementUseCount(unconfined = true) try { - block() - while (true) { - // break when all unconfined continuations where executed - if (!eventLoop.processUnconfinedEvent()) break - } + eventLoop.runUnconfinedEventLoop(block) } catch (e: Throwable) { /* * This exception doesn't happen normally, only if we have a bug in implementation. * Report it as a fatal exception. */ handleFatalException(e) - } finally { - eventLoop.decrementUseCount(unconfined = true) } } diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt index 771768e008..d89685796a 100644 --- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -19,7 +19,7 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine */ enclosingScope.launch { while (true) { - val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent() + val delayNanos = ThreadLocalEventLoop.currentOrNull()?.tryUseAsEventLoop()?.processNextEvent() ?: error("Event loop is missing, virtual time source works only as part of event loop") if (delayNanos <= 0) continue if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) { diff --git a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt index 4309092319..c35aa0b199 100644 --- a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt +++ b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt @@ -25,3 +25,6 @@ import kotlin.coroutines.* * they are resubmitted to [Dispatchers.IO]. */ public expect fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T + +internal fun UnconfinedEventLoop.useAsEventLoopForRunBlockingOrFail(): EventLoop = + tryUseAsEventLoop() ?: throw IllegalStateException("runBlocking can not be run in direct dispatchers") diff --git a/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt b/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt index 404fb498ba..d62f96557e 100644 --- a/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt @@ -2,11 +2,11 @@ package kotlinx.coroutines import kotlin.coroutines.* -internal actual fun createEventLoop(): EventLoop = UnconfinedEventLoop() +internal actual fun createEventLoop(): EventLoop = UnconfinedEventLoopImpl() internal actual fun nanoTime(): Long = unsupported() -internal class UnconfinedEventLoop : EventLoop() { +private class UnconfinedEventLoopImpl : EventLoop() { override fun dispatch(context: CoroutineContext, block: Runnable): Unit = unsupported() } diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index fe9b91d996..17765d2b8b 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -57,13 +57,13 @@ public actual fun runBlocking(context: CoroutineContext, block: suspend Coro val newContext: CoroutineContext if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop + eventLoop = ThreadLocalEventLoop.unconfinedEventLoop.useAsEventLoopForRunBlockingOrFail() newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { // See if context's interceptor is an event loop that we shall use (to support TestContext) // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } - ?: ThreadLocalEventLoop.currentOrNull() + ?: ThreadLocalEventLoop.currentOrNull()?.useAsEventLoopForRunBlockingOrFail() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine(newContext, currentThread, eventLoop) diff --git a/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt index 094a026ddb..a596ad9799 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt @@ -1,3 +1,4 @@ +@file:JvmName("DefaultExecutorKt") package kotlinx.coroutines import kotlinx.atomicfu.* @@ -70,20 +71,21 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable { val oldName = currentThread.name currentThread.name = THREAD_NAME try { - ThreadLocalEventLoop.setEventLoop(DefaultDelayImpl) + ThreadLocalEventLoop.setEventLoop(DelegatingUnconfinedEventLoop) registerTimeLoopThread() try { while (true) { Thread.interrupted() // just reset interruption flag val parkNanos = processNextEvent() if (parkNanos == Long.MAX_VALUE) break // no more events - parkNanos(DefaultDelayImpl, parkNanos) + parkNanos(this@DefaultDelayImpl, parkNanos) } } finally { _thread.value = null unregisterTimeLoopThread() + ThreadLocalEventLoop.resetEventLoop() // recheck if queues are empty after _thread reference was set to null (!!!) - if (isEmpty) { + if (delayedQueueIsEmpty) { notifyAboutThreadExiting() } else { /* recreate the thread, as there is still work to do, @@ -111,7 +113,7 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable { if (_thread.value != null) { val end = System.currentTimeMillis() + timeout.inWholeMilliseconds while (true) { - check(isEmpty) { "There are tasks in the DefaultExecutor" } + check(delayedQueueIsEmpty) { "There are tasks in the DefaultExecutor" } synchronized(this) { unpark(_thread.value ?: return) val toWait = end - System.currentTimeMillis() @@ -129,6 +131,31 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable { override fun toString(): String = "DefaultDelay" } +private object DelegatingUnconfinedEventLoop: UnconfinedEventLoop { + override val thisLoopsTaskCanAvoidYielding: Boolean + get() = defaultDelayRunningUnconfinedLoop() + + override val isUnconfinedLoopActive: Boolean get() = false + + override fun runUnconfinedEventLoop(initialBlock: () -> Unit) { + ioView.dispatch(ioView, Runnable { + ThreadLocalEventLoop.unconfinedEventLoop.runUnconfinedEventLoop(initialBlock) + }) + } + + override fun dispatchUnconfined(task: DispatchedTask<*>) = + defaultDelayRunningUnconfinedLoop() + + override fun tryUseAsEventLoop(): EventLoop? = null +} + +private fun defaultDelayRunningUnconfinedLoop(): Nothing { + throw UnsupportedOperationException( + "This method can only be called from the thread where an unconfined event loop is running, " + + "but no tasks can run on this thread." + ) +} + /** A view separate from [Dispatchers.IO]. * [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */ private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index c0c244c0b6..04aa1addea 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -19,7 +19,8 @@ public actual object Dispatchers { public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher @JvmStatic - public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined + public actual val Unconfined: CoroutineDispatcher get() = + kotlinx.coroutines.Unconfined /** * The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads. diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index 3d6c15e989..9a2125ba0e 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -46,7 +46,7 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr public fun processNextEventInCurrentThread(): Long = // This API is used in Ktor for serverless integration where a single thread awaits a blocking call // (and, to avoid actual blocking, does something via this call), see #850 - ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE + ThreadLocalEventLoop.currentOrNull()?.tryUseAsEventLoop()?.processNextEvent() ?: Long.MAX_VALUE internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 65102095b1..88c5bfdfe2 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -9,7 +9,7 @@ import org.junit.runners.* import kotlin.test.* @RunWith(Parameterized::class) -class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() { +class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase(disableOutCheck = true) { companion object { @Parameterized.Parameters(name = "{0}") @JvmStatic diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt index 4bd131d85e..a8c99dd9d8 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt @@ -20,7 +20,7 @@ class DispatcherGuideTest { test("ExampleContext02") { kotlinx.coroutines.guide.exampleContext02.main() }.verifyLinesStart( "Unconfined : I'm working in thread main", "main runBlocking: I'm working in thread main", - "Unconfined : After delay in thread kotlinx.coroutines.DefaultDelay", + "Unconfined : After delay in thread DefaultDispatcher oroutine#2", "main runBlocking: After delay in thread main" ) } diff --git a/kotlinx-coroutines-core/native/src/Builders.kt b/kotlinx-coroutines-core/native/src/Builders.kt index 6e70df9f8c..7a318e86f1 100644 --- a/kotlinx-coroutines-core/native/src/Builders.kt +++ b/kotlinx-coroutines-core/native/src/Builders.kt @@ -1,7 +1,6 @@ @file:OptIn(ExperimentalContracts::class, ObsoleteWorkersApi::class) package kotlinx.coroutines -import kotlinx.cinterop.* import kotlin.contracts.* import kotlin.coroutines.* import kotlin.native.concurrent.* @@ -51,13 +50,13 @@ public actual fun runBlocking(context: CoroutineContext, block: suspend Coro val newContext: CoroutineContext if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop + eventLoop = ThreadLocalEventLoop.unconfinedEventLoop.useAsEventLoopForRunBlockingOrFail() newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { // See if context's interceptor is an event loop that we shall use (to support TestContext) // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } - ?: ThreadLocalEventLoop.currentOrNull() + ?: ThreadLocalEventLoop.currentOrNull()?.useAsEventLoopForRunBlockingOrFail() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine(newContext, eventLoop) From 8f343fbd6abe188009f3cda063d67c5d88a316cf Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Mon, 2 Dec 2024 15:01:28 +0100 Subject: [PATCH 06/10] Fix VirtualTimeSource not being able to skip delays --- .../jvm/src/AbstractTimeSource.kt | 17 ++++++++------ .../jvm/src/DefaultDelay.kt | 8 ++++--- kotlinx-coroutines-core/jvm/src/Executors.kt | 2 +- .../jvm/src/scheduling/CoroutineScheduler.kt | 8 ++++--- .../jvm/src/scheduling/Dispatcher.kt | 12 ++++++---- .../jvm/test/VirtualTimeSource.kt | 18 +++++++-------- .../test/channels/TickerChannelCommonTest.kt | 18 +++++++-------- .../jvm/test/channels/TickerChannelTest.kt | 22 +++++++++---------- .../jvm/test/scheduling/SchedulerTestBase.kt | 4 ++-- 9 files changed, 60 insertions(+), 49 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt index 3f3c1a2fba..ec44e6686b 100644 --- a/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt @@ -10,8 +10,8 @@ internal abstract class AbstractTimeSource { abstract fun currentTimeMillis(): Long abstract fun nanoTime(): Long abstract fun wrapTask(block: Runnable): Runnable - abstract fun trackTask() - abstract fun unTrackTask() + abstract fun trackTask(obj: Any) + abstract fun unTrackTask(obj: Any) abstract fun registerTimeLoopThread() abstract fun unregisterTimeLoopThread() abstract fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0 @@ -80,18 +80,21 @@ internal inline fun wrapTask(block: Runnable): Runnable = * * If the second thread is not tracked, the first thread effectively enters a spin loop until the second thread * physically changes the shared state. + * + * Every call to [trackTask] must be accompanied by a call to [unTrackTask] with the same [obj], + * but [unTrackTask] can be called even if the corresponding [trackTask] wasn't called. */ @InlineOnly -internal inline fun trackTask() { - timeSource?.trackTask() +internal inline fun trackTask(obj: Any) { + timeSource?.trackTask(obj) } /** - * Decrements the number of tasks not under our control. See [trackTask] for more details. + * Marks the task [obj] as complete. If [obj] wasn't tracked, does nothing. See [trackTask] for more details. */ @InlineOnly -internal inline fun unTrackTask() { - timeSource?.unTrackTask() +internal inline fun unTrackTask(obj: Any) { + timeSource?.unTrackTask(obj) } /** diff --git a/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt index a596ad9799..0ca013b2ac 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt @@ -3,6 +3,8 @@ package kotlinx.coroutines import kotlinx.atomicfu.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.scheduling.* +import kotlinx.coroutines.scheduling.scheduleBackgroundIoTask import kotlin.coroutines.* import kotlin.time.Duration @@ -78,7 +80,7 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable { Thread.interrupted() // just reset interruption flag val parkNanos = processNextEvent() if (parkNanos == Long.MAX_VALUE) break // no more events - parkNanos(this@DefaultDelayImpl, parkNanos) + if (parkNanos > 0) parkNanos(this@DefaultDelayImpl, parkNanos) } } finally { _thread.value = null @@ -105,7 +107,7 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable { This means that whatever thread is going to be running by the end of this function, it's going to notice the tasks it's supposed to run. We can return `null` unconditionally. */ - ioView.dispatch(ioView, this) + scheduleBackgroundIoTask(this) return null } @@ -113,7 +115,6 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable { if (_thread.value != null) { val end = System.currentTimeMillis() + timeout.inWholeMilliseconds while (true) { - check(delayedQueueIsEmpty) { "There are tasks in the DefaultExecutor" } synchronized(this) { unpark(_thread.value ?: return) val toWait = end - System.currentTimeMillis() @@ -156,6 +157,7 @@ private fun defaultDelayRunningUnconfinedLoop(): Nothing { ) } + /** A view separate from [Dispatchers.IO]. * [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */ private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index e8090c1cf4..7430b8c6de 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -129,7 +129,7 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) try { executor.execute(wrapTask(block)) } catch (e: RejectedExecutionException) { - unTrackTask() + unTrackTask(block) cancelJobOnRejection(context, e) rescheduleTaskFromClosedDispatcher(block) } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 3430ebadec..5a4dcf9316 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -391,9 +391,11 @@ internal class CoroutineScheduler( * - Concurrent [close] that effectively shutdowns the worker thread. * Used for [yield]. */ - fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) { - trackTask() // this is needed for virtual time support + fun dispatch( + block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false, track: Boolean = true + ) { val task = createTask(block, taskContext) + if (track) trackTask(task) // this is needed for virtual time support val isBlockingTask = task.isBlocking // Invariant: we increment counter **before** publishing the task // so executing thread can safely decrement the number of blocking tasks @@ -588,7 +590,7 @@ internal class CoroutineScheduler( val thread = Thread.currentThread() thread.uncaughtExceptionHandler.uncaughtException(thread, e) } finally { - unTrackTask() + unTrackTask(task) } } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 28d5537108..a510f02f46 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -37,11 +37,11 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { @InternalCoroutinesApi override fun dispatchYield(context: CoroutineContext, block: Runnable) { - DefaultScheduler.dispatchWithContext(block, BlockingContext, true) + DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = true, track = true) } override fun dispatch(context: CoroutineContext, block: Runnable) { - DefaultScheduler.dispatchWithContext(block, BlockingContext, false) + DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = false, track = true) } override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { @@ -58,6 +58,10 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { } } +internal fun scheduleBackgroundIoTask(block: Runnable) { + DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = false, track = false) +} + // Dispatchers.IO internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { @@ -126,8 +130,8 @@ internal open class SchedulerCoroutineDispatcher( coroutineScheduler.dispatch(block, fair = true) } - internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) { - coroutineScheduler.dispatch(block, context, fair) + internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean, track: Boolean) { + coroutineScheduler.dispatch(block, context, fair = fair, track = track) } override fun close() { diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index 30b3d7f162..d91abbd907 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -48,7 +48,7 @@ internal class VirtualTimeSource( @Volatile private var time: Long = 0 - private var trackedTasks = 0 + private val trackedTasks = HashSet() private val threads = ConcurrentHashMap() @@ -56,22 +56,21 @@ internal class VirtualTimeSource( override fun nanoTime(): Long = time override fun wrapTask(block: Runnable): Runnable { - trackTask() + trackTask(block) return Runnable { try { block.run() } - finally { unTrackTask() } + finally { unTrackTask(block) } } } @Synchronized - override fun trackTask() { - trackedTasks++ + override fun trackTask(obj: Any) { + trackedTasks.add(obj) } @Synchronized - override fun unTrackTask() { - assert(trackedTasks > 0) - trackedTasks-- + override fun unTrackTask(obj: Any) { + trackedTasks.remove(obj) } @Synchronized @@ -125,7 +124,7 @@ internal class VirtualTimeSource( return } if (threads[mainThread] == null) return - if (trackedTasks != 0) return + if (trackedTasks.isNotEmpty()) return val minParkedTill = minParkedTill() if (minParkedTill <= time) return time = minParkedTill @@ -145,6 +144,7 @@ internal class VirtualTimeSource( isShutdown = true wakeupAll() while (!threads.isEmpty()) (this as Object).wait() + assert(trackedTasks.isEmpty()) { "There are still tracked tasks: $trackedTasks" } } private fun wakeupAll() { diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 88c5bfdfe2..644bdf2912 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -9,12 +9,12 @@ import org.junit.runners.* import kotlin.test.* @RunWith(Parameterized::class) -class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase(disableOutCheck = true) { +class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() { companion object { @Parameterized.Parameters(name = "{0}") @JvmStatic fun params(): Collection> = - Channel.values().map { arrayOf(it) } + Channel.entries.map { arrayOf(it) } } enum class Channel { @@ -35,13 +35,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase(di fun testDelay() = withVirtualTimeSource { runTest { val delayChannel = channelFactory(delay = 10000) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(5000) delayChannel.checkEmpty() delay(5100) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() delay(5100) @@ -57,13 +57,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase(di delay(500) delayChannel.checkEmpty() delay(300) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() // Regular delay delay(750) delayChannel.checkEmpty() delay(260) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } @@ -72,7 +72,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase(di fun testReceive() = withVirtualTimeSource { runTest { val delayChannel = channelFactory(delay = 1000) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() var value = withTimeoutOrNull(750) { delayChannel.receive() 1 @@ -158,7 +158,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase(di fun ReceiveChannel.checkEmpty() = assertNull(tryReceive().getOrNull()) -fun ReceiveChannel.checkNotEmpty() { - assertNotNull(tryReceive().getOrNull()) +suspend fun ReceiveChannel.receiveSingle() { + receive() assertNull(tryReceive().getOrNull()) } diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt index 051d670743..0f7e6b1bf9 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt @@ -9,15 +9,15 @@ class TickerChannelTest : TestBase() { fun testFixedDelayChannelBackpressure() = withVirtualTimeSource { runTest { val delayChannel = ticker(delayMillis = 1000, initialDelayMillis = 0, mode = TickerMode.FIXED_DELAY) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(1500) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(500) delayChannel.checkEmpty() delay(520) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } @@ -26,17 +26,17 @@ class TickerChannelTest : TestBase() { fun testDelayChannelBackpressure() = withVirtualTimeSource { runTest { val delayChannel = ticker(delayMillis = 1000, initialDelayMillis = 0) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(1500) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(520) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(500) delayChannel.checkEmpty() delay(520) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } @@ -45,17 +45,17 @@ class TickerChannelTest : TestBase() { fun testDelayChannelBackpressure2() = withVirtualTimeSource { runTest { val delayChannel = ticker(delayMillis = 200, initialDelayMillis = 0) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(500) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(110) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(110) delayChannel.checkEmpty() delay(110) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt index 87dbe8b77b..81d66422ba 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt @@ -101,11 +101,11 @@ internal fun SchedulerCoroutineDispatcher.blocking(parallelism: Int = 16): Corou @InternalCoroutinesApi override fun dispatchYield(context: CoroutineContext, block: Runnable) { - this@blocking.dispatchWithContext(block, BlockingContext, true) + this@blocking.dispatchWithContext(block, BlockingContext, fair = true, track = true) } override fun dispatch(context: CoroutineContext, block: Runnable) { - this@blocking.dispatchWithContext(block, BlockingContext, false) + this@blocking.dispatchWithContext(block, BlockingContext, fair = false, track = true) } }.limitedParallelism(parallelism) } From 3b843a81a317f003c6d2b260d9ad268f60710d2c Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 4 Dec 2024 13:49:32 +0100 Subject: [PATCH 07/10] Fix VirtualTimeSource not noticing that DefaultDelay is about to get created --- .../jvm/src/AbstractTimeSource.kt | 3 ++- kotlinx-coroutines-core/jvm/src/DefaultDelay.kt | 12 ++++++++++++ .../jvm/test/VirtualTimeSource.kt | 1 - 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt index ec44e6686b..619d7d6809 100644 --- a/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt @@ -99,7 +99,8 @@ internal inline fun unTrackTask(obj: Any) { /** * Increases the registered number of nested loops of the form - * `while (nanoTime() < deadline) { parkNanos(deadline - nanoTime()) }` running in the current thread. + * `while (nanoTime() < deadline) { parkNanos(deadline - nanoTime()) }` corresponding to the object [key] + * and signals that the current thread is in such a loop. * * While at least one such loop is running, other threads are allowed to call [unpark] on the current thread * and wake it up. Before [registerTimeLoopThread] is called, [unpark] is not guaranteed to have any effect. diff --git a/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt index 0ca013b2ac..d556221677 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt @@ -75,6 +75,7 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable { try { ThreadLocalEventLoop.setEventLoop(DelegatingUnconfinedEventLoop) registerTimeLoopThread() + unTrackTask(this) /** see the comment in [startThreadOrObtainSleepingThread] */ try { while (true) { Thread.interrupted() // just reset interruption flag @@ -107,6 +108,17 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable { This means that whatever thread is going to be running by the end of this function, it's going to notice the tasks it's supposed to run. We can return `null` unconditionally. */ + /** If this function is called from a thread that's already registered as a time loop thread, + because a time loop thread is not parked right now, the time source will not advance time *currently*, + but it may do that as soon as the thread calling this is parked, which may happen earlier than the default + delay thread has a chance to run. + Because of that, we notify the time source that something is actually happening right now. + This would work automatically if instead of [scheduleBackgroundIoTask] we used [CoroutineDispatcher.dispatch] on + [Dispatchers.IO], but then, none of the delays would be skipped, as the whole time a [DefaultDelay] thread runs + would be considered as a task. + Therefore, we register a task right now and mark it as completed as soon as a [DefaultDelay] time loop gets + registered. */ + trackTask(this) scheduleBackgroundIoTask(this) return null } diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index d91abbd907..1c139841a9 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -144,7 +144,6 @@ internal class VirtualTimeSource( isShutdown = true wakeupAll() while (!threads.isEmpty()) (this as Object).wait() - assert(trackedTasks.isEmpty()) { "There are still tracked tasks: $trackedTasks" } } private fun wakeupAll() { From 448ecaadb27e36d0ace19c25b93c2a716eb2b851 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 4 Dec 2024 14:25:13 +0100 Subject: [PATCH 08/10] Fix a test --- .../jvm/test/channels/TickerChannelCommonTest.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 644bdf2912..f60a32505b 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -93,6 +93,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() fun testComplexOperator() = withVirtualTimeSource { runTest { val producer = GlobalScope.produce { + delay(1) // ensure that the ordering of dispatches doesn't affect the result for (i in 1..7) { send(i) delay(1000) From 7ccad6df56a02e7ad8972ea40bfaea774cb47581 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 28 Jan 2025 14:10:55 +0100 Subject: [PATCH 09/10] Port the JVM implementation of DefaultDelay to Native Tests segfault on my machine with this stacktrace: 0 kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism(kotlin.Int;kotlin.String?){}kotlinx.coroutines.CoroutineDispatcher-trampoline () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 1 0x00000000003bed59 in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism$default(kotlin.Int;kotlin.String?;kotlin.Int){}kotlinx.coroutines.CoroutineDispatcher (_this=0x0, parallelism=2147483647, name=0x0, $mask0=2) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 2 0x00000000004bfa97 in kfun:kotlinx.coroutines.$init_global#internal.18 () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/DefaultDelay.kt:101 3 0x0000000000cac194 in CallInitGlobalPossiblyLock () 4 0x00000000004bfb60 in kfun:kotlinx.coroutines#(){}kotlinx.coroutines.Delay () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/DefaultDelay.kt:1 5 0x0000000000496840 in kfun:kotlinx.coroutines.internal.LimitedDispatcher#(kotlinx.coroutines.CoroutineDispatcher;kotlin.Int;kotlin.String?){} ($this=0x7ffff64a0668, dispatcher=0x7ffff64a0620, parallelism=64, name=0x0) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt:26 6 0x00000000003bec1d in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism(kotlin.Int;kotlin.String?){}kotlinx.coroutines.CoroutineDispatcher (_this=0x7ffff64a0620, parallelism=64, name=0x0) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:178 7 0x00000000004c53ad in kfun:kotlinx.coroutines.MultiWorkerDispatcher.limitedParallelism#internal (_this=0x7ffff64a0620, parallelism=64, name=0x0) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt:151 8 0x0000000000b3de7a in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism(kotlin.Int;kotlin.String?){}kotlinx.coroutines.CoroutineDispatcher-trampoline () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 9 0x00000000003bed59 in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism$default(kotlin.Int;kotlin.String?;kotlin.Int){}kotlinx.coroutines.CoroutineDispatcher (_this=0x7ffff64a0620, parallelism=64, name=0x0, $mask0=2) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 10 0x00000000004c13c0 in kfun:kotlinx.coroutines.DefaultIoScheduler.#internal ($this=0x7ffff6630700) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/Dispatchers.kt:27 11 0x00000000004c127f in kfun:kotlinx.coroutines.DefaultIoScheduler.$init_global#internal () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/Dispatchers.kt:1 --- .../jvm/src/DefaultDelay.kt | 1 - .../native/src/CoroutineContext.kt | 3 - .../native/src/DefaultDelay.kt | 101 ++++++++++++++++++ .../native/src/Dispatchers.kt | 5 + .../native/src/EventLoop.kt | 21 ++-- 5 files changed, 118 insertions(+), 13 deletions(-) create mode 100644 kotlinx-coroutines-core/native/src/DefaultDelay.kt diff --git a/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt index d556221677..80d878707b 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt @@ -3,7 +3,6 @@ package kotlinx.coroutines import kotlinx.atomicfu.* import kotlinx.coroutines.internal.* -import kotlinx.coroutines.scheduling.* import kotlinx.coroutines.scheduling.scheduleBackgroundIoTask import kotlin.coroutines.* import kotlin.time.Duration diff --git a/kotlinx-coroutines-core/native/src/CoroutineContext.kt b/kotlinx-coroutines-core/native/src/CoroutineContext.kt index 9f6b7fc0a4..2334ab7164 100644 --- a/kotlinx-coroutines-core/native/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/native/src/CoroutineContext.kt @@ -3,9 +3,6 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* -@PublishedApi -internal actual val DefaultDelay: Delay = WorkerDispatcher(name = "DefaultDelay") - internal expect fun createDefaultDispatcher(): CoroutineDispatcher public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { diff --git a/kotlinx-coroutines-core/native/src/DefaultDelay.kt b/kotlinx-coroutines-core/native/src/DefaultDelay.kt new file mode 100644 index 0000000000..cc2bc8ae87 --- /dev/null +++ b/kotlinx-coroutines-core/native/src/DefaultDelay.kt @@ -0,0 +1,101 @@ +package kotlinx.coroutines + +import kotlinx.atomicfu.* +import kotlinx.coroutines.internal.* +import kotlin.coroutines.* +import kotlin.native.concurrent.ObsoleteWorkersApi +import kotlin.native.concurrent.Worker + +@PublishedApi +internal actual val DefaultDelay: Delay get() = DefaultDelayImpl + +@OptIn(ObsoleteWorkersApi::class) +private object DefaultDelayImpl : EventLoopImplBase(), Runnable { + init { + incrementUseCount() // this event loop is never completed + } + + private val _thread = atomic(null) + + /** Can only happen when tests close the default executor */ + override fun reschedule(now: Long, delayedTask: DelayedTask) { + throw IllegalStateException("Attempted to schedule $delayedTask at $now after shutdown") + } + + /** + * All event loops are using DefaultDelay#invokeOnTimeout to avoid livelock on + * ``` + * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } } + * ``` + * + * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), + * but it's not exposed as public API. + */ + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduleInvokeOnTimeout(timeMillis, block) + + override fun run() { + val currentThread = Worker.current + // Identity comparisons do not work for value classes, but comparing `null` with non-null should still work + if (!_thread.compareAndSet(null, currentThread)) return // some other thread won the race to start the thread + ThreadLocalEventLoop.setEventLoop(DelegatingUnconfinedEventLoop) + try { + while (true) { + val parkNanos = processNextEvent() + if (parkNanos == Long.MAX_VALUE) break // no more events + if (parkNanos > 0) currentThread.park(parkNanos / 1000L, true) + } + } finally { + _thread.value = null + ThreadLocalEventLoop.resetEventLoop() + // recheck if queues are empty after _thread reference was set to null (!!!) + if (!delayedQueueIsEmpty) { + /* recreate the thread, as there is still work to do, + and `unpark` could have awoken the thread we're currently running on */ + startThreadOrObtainSleepingThread() + } + } + } + + override fun startThreadOrObtainSleepingThread(): Worker? { + // Check if the thread is already running + _thread.value?.let { return it } + /* Now we know that at the moment of this call the thread was not initially running. + This means that whatever thread is going to be running by the end of this function, + it's going to notice the tasks it's supposed to run. + We can return `null` unconditionally. */ + scheduleBackgroundIoTask(this) + return null + } + + override fun toString(): String = "DefaultDelay" +} + +private object DelegatingUnconfinedEventLoop: UnconfinedEventLoop { + override val thisLoopsTaskCanAvoidYielding: Boolean + get() = defaultDelayRunningUnconfinedLoop() + + override val isUnconfinedLoopActive: Boolean get() = false + + override fun runUnconfinedEventLoop(initialBlock: () -> Unit) { + ioView.dispatch(ioView, Runnable { + ThreadLocalEventLoop.unconfinedEventLoop.runUnconfinedEventLoop(initialBlock) + }) + } + + override fun dispatchUnconfined(task: DispatchedTask<*>) = + defaultDelayRunningUnconfinedLoop() + + override fun tryUseAsEventLoop(): EventLoop? = null +} + +private fun defaultDelayRunningUnconfinedLoop(): Nothing { + throw UnsupportedOperationException( + "This method can only be called from the thread where an unconfined event loop is running, " + + "but no tasks can run on this thread." + ) +} + +/** A view separate from [Dispatchers.IO]. + * [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */ +private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) diff --git a/kotlinx-coroutines-core/native/src/Dispatchers.kt b/kotlinx-coroutines-core/native/src/Dispatchers.kt index 471adb7417..4ad5db289e 100644 --- a/kotlinx-coroutines-core/native/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/native/src/Dispatchers.kt @@ -40,9 +40,14 @@ internal object DefaultIoScheduler : CoroutineDispatcher() { io.dispatchYield(context, block) } + internal fun dispatchToUnlimitedPool(block: Runnable) { + unlimitedPool.dispatch(EmptyCoroutineContext, block) + } + override fun toString(): String = "Dispatchers.IO" } +internal inline fun scheduleBackgroundIoTask(block: Runnable) = DefaultIoScheduler.dispatchToUnlimitedPool(block) @Suppress("EXTENSION_SHADOWED_BY_MEMBER") public actual val Dispatchers.IO: CoroutineDispatcher get() = IO diff --git a/kotlinx-coroutines-core/native/src/EventLoop.kt b/kotlinx-coroutines-core/native/src/EventLoop.kt index f993ee7fb0..9cc87a48a7 100644 --- a/kotlinx-coroutines-core/native/src/EventLoop.kt +++ b/kotlinx-coroutines-core/native/src/EventLoop.kt @@ -2,26 +2,29 @@ package kotlinx.coroutines -import kotlin.coroutines.* import kotlin.native.concurrent.* import kotlin.time.* internal actual abstract class EventLoopImplPlatform : EventLoop() { - - private val current = Worker.current // not `get()`! We're interested in the worker at the moment of creation. + /** Returns `null` if a thread was created and doesn't need to be awoken. + * Returns a thread to awaken if the thread already existed when this method was called. */ + protected abstract fun startThreadOrObtainSleepingThread(): Worker? protected actual fun unpark() { - current.executeAfter(0L, {})// send an empty task to unpark the waiting event loop + startThreadOrObtainSleepingThread()?.let { + it.executeAfter(0L, {}) + } } - } -internal class EventLoopImpl: EventLoopImplBase() { - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - DefaultDelay.invokeOnTimeout(timeMillis, block, context) +internal class BlockingEventLoop( + private val worker: Worker +) : EventLoopImplBase() { + override fun startThreadOrObtainSleepingThread(): Worker? = + if (Worker.current.id != worker.id) worker else null } -internal actual fun createEventLoop(): EventLoop = EventLoopImpl() +internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Worker.current) private val startingPoint = TimeSource.Monotonic.markNow() From 63f72d9d76eca0d3e355a1071f5661df2f614885 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 28 Jan 2025 14:18:16 +0100 Subject: [PATCH 10/10] Fix the segfault --- kotlinx-coroutines-core/native/src/DefaultDelay.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/native/src/DefaultDelay.kt b/kotlinx-coroutines-core/native/src/DefaultDelay.kt index cc2bc8ae87..b4570744e8 100644 --- a/kotlinx-coroutines-core/native/src/DefaultDelay.kt +++ b/kotlinx-coroutines-core/native/src/DefaultDelay.kt @@ -98,4 +98,5 @@ private fun defaultDelayRunningUnconfinedLoop(): Nothing { /** A view separate from [Dispatchers.IO]. * [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */ -private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) +private val ioView by lazy { Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) } +// Without `lazy`, there is a cycle between `ioView` and `DefaultDelay` initialization, leading to a segfault.