From ee5e8d7eeb77e4201f544cd77333cabe7e48732d Mon Sep 17 00:00:00 2001 From: Stefan Haustein Date: Thu, 19 Dec 2024 13:54:39 +0100 Subject: [PATCH 1/3] Apple QoS support via donation --- atomicfu/build.gradle.kts | 24 +- .../kotlinx/atomicfu/locks/Synchronized.kt | 297 ++++++++++++++++++ .../kotlinx/atomicfu/locks/NativeMutexNode.kt | 0 .../kotlinx/atomicfu/locks/Synchronized.kt | 0 4 files changed, 312 insertions(+), 9 deletions(-) create mode 100644 atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt rename atomicfu/src/{nativeMain => nativeNonAppleMain}/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt (100%) rename atomicfu/src/{nativeMain => nativeNonAppleMain}/kotlin/kotlinx/atomicfu/locks/Synchronized.kt (100%) diff --git a/atomicfu/build.gradle.kts b/atomicfu/build.gradle.kts index 5e18fa6d..22a472a0 100644 --- a/atomicfu/build.gradle.kts +++ b/atomicfu/build.gradle.kts @@ -129,11 +129,8 @@ kotlin { @OptIn(ExperimentalKotlinGradlePluginApi::class) applyDefaultHierarchyTemplate { - group("native") { - group("nativeUnixLike") { - withLinux() - withApple() - } + group("nativeUnixLike") { + withLinux() } group("androidNative32Bit") { withAndroidNativeX86() @@ -145,23 +142,32 @@ kotlin { withAndroidNativeArm64() withAndroidNativeX64() } - } sourceSets { + val nativeNonAppleMain by creating { + kotlin.srcDir("src/nativeNonAppleMain/kotlin") + dependsOn(nativeMain.get()) + } + val nativeUnixLikeMain by getting { kotlin.srcDir("src/nativeUnixLikeMain/kotlin") - dependsOn(nativeMain.get()) + dependsOn(nativeNonAppleMain) } val androidNative32BitMain by getting { kotlin.srcDir("src/androidNative32BitMain/kotlin") - dependsOn(nativeMain.get()) + dependsOn(nativeNonAppleMain) } val androidNative64BitMain by getting { kotlin.srcDir("src/androidNative64BitMain/kotlin") - dependsOn(nativeMain.get()) + dependsOn(nativeNonAppleMain) + } + + val mingwMain by getting { + kotlin.srcDir("src/mingwMain/kotlin") + dependsOn(nativeNonAppleMain) } val androidNative32BitTest by getting { diff --git a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt new file mode 100644 index 00000000..14d19302 --- /dev/null +++ b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt @@ -0,0 +1,297 @@ +/* + * Copyright 2024 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kotlinx.atomicfu.locks + +import kotlin.native.ref.createCleaner +import kotlinx.atomicfu.* +import kotlinx.cinterop.Arena +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.alloc +import kotlinx.cinterop.memScoped +import kotlinx.cinterop.ptr +import kotlinx.cinterop.value +import kotlinx.cinterop.IntVar +import kotlinx.cinterop.UIntVar +import kotlinx.cinterop.toCPointer +import kotlinx.cinterop.toLong +import platform.posix.pthread_cond_destroy +import platform.posix.pthread_cond_init +import platform.posix.pthread_cond_signal +import platform.posix.pthread_cond_t +import platform.posix.pthread_cond_wait +import platform.posix.pthread_get_qos_class_np +import platform.posix.pthread_mutex_destroy +import platform.posix.pthread_mutex_init +import platform.posix.pthread_mutex_lock +import platform.posix.pthread_mutex_t +import platform.posix.pthread_mutex_unlock +import platform.posix.pthread_mutexattr_destroy +import platform.posix.pthread_mutexattr_init +import platform.posix.pthread_mutexattr_settype +import platform.posix.pthread_mutexattr_t +import platform.posix.pthread_override_qos_class_end_np +import platform.posix.pthread_override_qos_class_start_np +import platform.posix.pthread_override_t +import platform.posix.pthread_self +import platform.posix.qos_class_self + +import platform.posix.PTHREAD_MUTEX_ERRORCHECK + + +@ThreadLocal +var currentThreadId = 0L + +// Based on the compose-multiplatform-core implementation with added qos and the pool back-ported +// from the atomicfu implementation. +public actual open class SynchronizedObject { + + companion object { + private const val NO_OWNER = 0L + } + + private val owner: AtomicLong = atomic(NO_OWNER) + private var reEnterCount: Int = 0 + private val waiters: AtomicInt = atomic(0) + + private val monitor: DonatingMonitor by lazy { DonatingMonitor() } + + + public fun lock() { + var self = currentThreadId + if (self == 0L) { + currentThreadId = pthread_self().toLong() + self = currentThreadId + } + if (owner.value == self) { + reEnterCount += 1 + } else if (waiters.incrementAndGet() > 1) { + waitForUnlockAndLock(self) + } else { + if (!owner.compareAndSet(NO_OWNER, self)) { + waitForUnlockAndLock(self) + } + } + } + + public fun tryLock(): Boolean { + var self = currentThreadId + if (self == 0L) { + currentThreadId = pthread_self().toLong() + self = currentThreadId + } + return if (owner.value == self) { + reEnterCount += 1 + true + } else if (waiters.incrementAndGet() == 1 && owner.compareAndSet(NO_OWNER, self)) { + true + } else { + waiters.decrementAndGet() + false + } + } + + + private fun waitForUnlockAndLock(self: Long) { + withMonitor(monitor) { + while (!owner.compareAndSet(NO_OWNER, self)) { + monitor.waitWithDonation(owner.value) + } + } + } + + public fun unlock() { + require (owner.value == currentThreadId) + if (reEnterCount > 0) { + reEnterCount -= 1 + } else { + owner.value = NO_OWNER + if (waiters.decrementAndGet() > 0) { + withMonitor(monitor) { + // We expect the highest priority thread to be woken up, but this should work + // in any case. + monitor.nativeMutex.notify() + } + } + } + } + + private inline fun withMonitor(monitor: DonatingMonitor, block: () -> Unit) { + monitor.nativeMutex.enter() + return try { + block() + } finally { + monitor.nativeMutex.exit() + } + } + + @OptIn(kotlin.experimental.ExperimentalNativeApi::class) + private class DonatingMonitor { + val nativeMutex = mutexPool.allocate() + val cleaner = createCleaner(nativeMutex) { mutexPool.release(it) } + var qosOverride: pthread_override_t? = null + var qosOverrideQosClass: UInt = 0U + + fun waitWithDonation(lockOwner: Long) { + donateQos(lockOwner) + nativeMutex.wait() + clearDonation() + } + + private fun donateQos(lockOwner: Long) { + if (lockOwner == NO_OWNER) { + return + } + val ourQosClass = qos_class_self() as UInt + // Set up a new override if required: + if (qosOverride != null) { + // There is an existing override, but we need to go higher. + if (ourQosClass > qosOverrideQosClass) { + pthread_override_qos_class_end_np(qosOverride) + qosOverride = pthread_override_qos_class_start_np(lockOwner.toCPointer(), qos_class_self(), 0) + qosOverrideQosClass = ourQosClass + } + } else { + // No existing override, check if we need to set one up. + memScoped { + val lockOwnerQosClass = alloc() + val lockOwnerRelPrio = alloc() + pthread_get_qos_class_np(lockOwner.toCPointer(), lockOwnerQosClass.ptr, lockOwnerRelPrio.ptr) + if (ourQosClass > lockOwnerQosClass.value) { + qosOverride = pthread_override_qos_class_start_np(lockOwner.toCPointer(), qos_class_self(), 0) + qosOverrideQosClass = ourQosClass + } + } + } + } + + private fun clearDonation() { + if (qosOverride != null) { + pthread_override_qos_class_end_np(qosOverride) + qosOverride = null + } + } + } +} + + +@OptIn(ExperimentalForeignApi::class) +private class NativeMutexNode { + var next: NativeMutexNode? = null + + private val arena: Arena = Arena() + private val cond: pthread_cond_t = arena.alloc() + private val mutex: pthread_mutex_t = arena.alloc() + private val attr: pthread_mutexattr_t = arena.alloc() + + init { + require (pthread_cond_init(cond.ptr, null) == 0) + require(pthread_mutexattr_init(attr.ptr) == 0) + require (pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK) == 0) + require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) + } + + fun enter() = require(pthread_mutex_lock(mutex.ptr) == 0) + + fun exit() = require(pthread_mutex_unlock(mutex.ptr) == 0) + + fun wait() = require(pthread_cond_wait(cond.ptr, mutex.ptr) == 0) + + fun notify() = require (pthread_cond_signal(cond.ptr) == 0) + + fun dispose() { + pthread_cond_destroy(cond.ptr) + pthread_mutex_destroy(mutex.ptr) + pthread_mutexattr_destroy(attr.ptr) + arena.clear() + } +} + + +public actual fun reentrantLock() = ReentrantLock() + +public actual typealias ReentrantLock = SynchronizedObject + +public actual inline fun ReentrantLock.withLock(block: () -> T): T { + lock() + try { + return block() + } finally { + unlock() + } +} + +public actual inline fun synchronized(lock: SynchronizedObject, block: () -> T): T { + lock.lock() + try { + return block() + } finally { + lock.unlock() + } +} + + + +private const val INITIAL_POOL_CAPACITY = 64 +private const val MAX_POOL_SIZE = 1024 + +private val mutexPool by lazy { MutexPool() } + +private class MutexPool() { + private val size = atomic(0) + private val top = atomic(null) + + init { + // Immediately form a stack + for (i in 0 until INITIAL_POOL_CAPACITY) { + release(NativeMutexNode()) + } + } + + private fun allocMutexNode() = NativeMutexNode() + + fun allocate(): NativeMutexNode = pop() ?: allocMutexNode() + + fun release(mutexNode: NativeMutexNode) { + if (size.value > MAX_POOL_SIZE) { + mutexNode.dispose() + } else { + while (true) { + val oldTop = top.value + mutexNode.next = oldTop + if (top.compareAndSet(oldTop, mutexNode)) { + size.incrementAndGet() + return + } + } + } + } + + private fun pop(): NativeMutexNode? { + while (true) { + val oldTop = top.value + if (oldTop == null) + return null + val newHead = oldTop.next + if (top.compareAndSet(oldTop, newHead)) { + size.decrementAndGet() + return oldTop + } + } + } +} + + + diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/nativeNonAppleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt similarity index 100% rename from atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt rename to atomicfu/src/nativeNonAppleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt b/atomicfu/src/nativeNonAppleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt similarity index 100% rename from atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt rename to atomicfu/src/nativeNonAppleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt From 00cbb2d7a77a45ebb07728403d712d8cc66dd839 Mon Sep 17 00:00:00 2001 From: Stefan Haustein Date: Wed, 29 Jan 2025 17:06:18 +0100 Subject: [PATCH 2/3] Various PR feeback --- .../kotlinx/atomicfu/locks/Synchronized.kt | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt index 14d19302..08a69a82 100644 --- a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt +++ b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt @@ -49,38 +49,36 @@ import platform.posix.pthread_self import platform.posix.qos_class_self import platform.posix.PTHREAD_MUTEX_ERRORCHECK +import kotlin.native.concurrent.ThreadLocal +private const val NO_OWNER = 0L +private const val UNSET = 0L @ThreadLocal -var currentThreadId = 0L +internal var currentThreadId = UNSET // Based on the compose-multiplatform-core implementation with added qos and the pool back-ported // from the atomicfu implementation. public actual open class SynchronizedObject { - - companion object { - private const val NO_OWNER = 0L - } - - private val owner: AtomicLong = atomic(NO_OWNER) + private val ownerThreadId: AtomicLong = atomic(NO_OWNER) private var reEnterCount: Int = 0 - private val waiters: AtomicInt = atomic(0) + private val threadsOnLock: AtomicInt = atomic(0) private val monitor: DonatingMonitor by lazy { DonatingMonitor() } public fun lock() { var self = currentThreadId - if (self == 0L) { + if (self == UNSET) { currentThreadId = pthread_self().toLong() self = currentThreadId } - if (owner.value == self) { + if (ownerThreadId.value == self) { reEnterCount += 1 - } else if (waiters.incrementAndGet() > 1) { + } else if (threadsOnLock.incrementAndGet() > 1) { waitForUnlockAndLock(self) } else { - if (!owner.compareAndSet(NO_OWNER, self)) { + if (!ownerThreadId.compareAndSet(NO_OWNER, self)) { waitForUnlockAndLock(self) } } @@ -92,13 +90,13 @@ public actual open class SynchronizedObject { currentThreadId = pthread_self().toLong() self = currentThreadId } - return if (owner.value == self) { + return if (ownerThreadId.value == self) { reEnterCount += 1 true - } else if (waiters.incrementAndGet() == 1 && owner.compareAndSet(NO_OWNER, self)) { + } else if (threadsOnLock.incrementAndGet() == 1 && ownerThreadId.compareAndSet(NO_OWNER, self)) { true } else { - waiters.decrementAndGet() + threadsOnLock.decrementAndGet() false } } @@ -106,19 +104,19 @@ public actual open class SynchronizedObject { private fun waitForUnlockAndLock(self: Long) { withMonitor(monitor) { - while (!owner.compareAndSet(NO_OWNER, self)) { - monitor.waitWithDonation(owner.value) + while (!ownerThreadId.compareAndSet(NO_OWNER, self)) { + monitor.waitWithDonation(ownerThreadId.value) } } } public fun unlock() { - require (owner.value == currentThreadId) + require (ownerThreadId.value == currentThreadId) if (reEnterCount > 0) { reEnterCount -= 1 } else { - owner.value = NO_OWNER - if (waiters.decrementAndGet() > 0) { + ownerThreadId.value = NO_OWNER + if (threadsOnLock.decrementAndGet() > 0) { withMonitor(monitor) { // We expect the highest priority thread to be woken up, but this should work // in any case. @@ -154,7 +152,7 @@ public actual open class SynchronizedObject { if (lockOwner == NO_OWNER) { return } - val ourQosClass = qos_class_self() as UInt + val ourQosClass = qos_class_self() // Set up a new override if required: if (qosOverride != null) { // There is an existing override, but we need to go higher. @@ -170,7 +168,7 @@ public actual open class SynchronizedObject { val lockOwnerRelPrio = alloc() pthread_get_qos_class_np(lockOwner.toCPointer(), lockOwnerQosClass.ptr, lockOwnerRelPrio.ptr) if (ourQosClass > lockOwnerQosClass.value) { - qosOverride = pthread_override_qos_class_start_np(lockOwner.toCPointer(), qos_class_self(), 0) + qosOverride = pthread_override_qos_class_start_np(lockOwner.toCPointer(), ourQosClass, 0) qosOverrideQosClass = ourQosClass } } @@ -197,9 +195,9 @@ private class NativeMutexNode { private val attr: pthread_mutexattr_t = arena.alloc() init { - require (pthread_cond_init(cond.ptr, null) == 0) + require(pthread_cond_init(cond.ptr, null) == 0) require(pthread_mutexattr_init(attr.ptr) == 0) - require (pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK) == 0) + require(pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK) == 0) require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) } From 1bfd98fe2d80588b26b475db0ec508443a14106f Mon Sep 17 00:00:00 2001 From: Stefan Haustein Date: Fri, 7 Feb 2025 12:18:03 +0100 Subject: [PATCH 3/3] Return variables moved to NativeMutexNode --- .../kotlin/kotlinx/atomicfu/locks/Synchronized.kt | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt index 08a69a82..1130f46e 100644 --- a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt +++ b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt @@ -164,10 +164,9 @@ public actual open class SynchronizedObject { } else { // No existing override, check if we need to set one up. memScoped { - val lockOwnerQosClass = alloc() - val lockOwnerRelPrio = alloc() - pthread_get_qos_class_np(lockOwner.toCPointer(), lockOwnerQosClass.ptr, lockOwnerRelPrio.ptr) - if (ourQosClass > lockOwnerQosClass.value) { + + pthread_get_qos_class_np(lockOwner.toCPointer(), nativeMutex.lockOwnerQosClass.ptr, nativeMutex.lockOwnerRelPrio.ptr) + if (ourQosClass > nativeMutex.lockOwnerQosClass.value) { qosOverride = pthread_override_qos_class_start_np(lockOwner.toCPointer(), ourQosClass, 0) qosOverrideQosClass = ourQosClass } @@ -194,6 +193,10 @@ private class NativeMutexNode { private val mutex: pthread_mutex_t = arena.alloc() private val attr: pthread_mutexattr_t = arena.alloc() + // Used locally as return parameters in donateQos + val lockOwnerQosClass = arena.alloc() + val lockOwnerRelPrio = arena.alloc() + init { require(pthread_cond_init(cond.ptr, null) == 0) require(pthread_mutexattr_init(attr.ptr) == 0)