diff --git a/DXFeedFramework.xcodeproj/project.pbxproj b/DXFeedFramework.xcodeproj/project.pbxproj index f5c9357c1..e84113cd3 100644 --- a/DXFeedFramework.xcodeproj/project.pbxproj +++ b/DXFeedFramework.xcodeproj/project.pbxproj @@ -334,7 +334,6 @@ 803BAC1629BFA50700FFAB1C /* DXFeedFramework.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 803BAC0D29BFA50700FFAB1C /* DXFeedFramework.framework */; }; 803BAC1C29BFA50700FFAB1C /* DxFeedSwiftFramework.h in Headers */ = {isa = PBXBuildFile; fileRef = 803BAC1029BFA50700FFAB1C /* DxFeedSwiftFramework.h */; settings = {ATTRIBUTES = (Public, ); }; }; 8088D76529C0FBCE00F240CB /* ThreadManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8088D76329C0FBCE00F240CB /* ThreadManager.swift */; }; - 8088D76629C0FBCE00F240CB /* IsolateThread.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8088D76429C0FBCE00F240CB /* IsolateThread.swift */; }; 8088D76929C0FC5C00F240CB /* Isolate.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8088D76829C0FC5C00F240CB /* Isolate.swift */; }; 8088D77129C3A25D00F240CB /* SystemProperty.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8088D77029C3A25D00F240CB /* SystemProperty.swift */; }; 8088D77329C3A2F400F240CB /* GraalException.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8088D77229C3A2F400F240CB /* GraalException.swift */; }; @@ -795,7 +794,6 @@ 803BAC1029BFA50700FFAB1C /* DxFeedSwiftFramework.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = DxFeedSwiftFramework.h; sourceTree = ""; }; 803BAC1529BFA50700FFAB1C /* DXFeedFrameworkTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = DXFeedFrameworkTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; }; 8088D76329C0FBCE00F240CB /* ThreadManager.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ThreadManager.swift; sourceTree = ""; }; - 8088D76429C0FBCE00F240CB /* IsolateThread.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = IsolateThread.swift; sourceTree = ""; }; 8088D76829C0FC5C00F240CB /* Isolate.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Isolate.swift; sourceTree = ""; }; 8088D76A29C0FE1700F240CB /* IsolateTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IsolateTest.swift; sourceTree = ""; }; 8088D76C29C101CF00F240CB /* ThreadsTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThreadsTest.swift; sourceTree = ""; }; @@ -1496,7 +1494,6 @@ children = ( 8088D76829C0FC5C00F240CB /* Isolate.swift */, 8088D76329C0FBCE00F240CB /* ThreadManager.swift */, - 8088D76429C0FBCE00F240CB /* IsolateThread.swift */, ); path = Graal; sourceTree = ""; @@ -2270,7 +2267,6 @@ 64656F602A1B9EC2006A0B19 /* EnumUtil.swift in Sources */, 642BE4CA2A2E1C640052340A /* MarketEvent.swift in Sources */, 6486B95B2AD015B400D8D5FA /* PriceType.swift in Sources */, - 8088D76629C0FBCE00F240CB /* IsolateThread.swift in Sources */, 64BDDB202AD6CC8300694210 /* AnalyticOrder.swift in Sources */, 64AAF0532A8113E800E8942B /* String+Range.swift in Sources */, 642BE4D02A2F1D3C0052340A /* Quote+Ext.swift in Sources */, diff --git a/DXFeedFramework/Native/Endpoint/NativeEndpoint.swift b/DXFeedFramework/Native/Endpoint/NativeEndpoint.swift index d88c29da2..c25e87ab4 100644 --- a/DXFeedFramework/Native/Endpoint/NativeEndpoint.swift +++ b/DXFeedFramework/Native/Endpoint/NativeEndpoint.swift @@ -31,6 +31,8 @@ class NativeEndpoint { if let context = context { let endpoint: AnyObject = bridge(ptr: context) if let listener = endpoint as? WeakListener { + Isolate.updateThreads() + var old = (try? EnumUtil.valueOf(value: DXEndpointState.convert(oldState))) ?? .notConnected var new = (try? EnumUtil.valueOf(value: DXEndpointState.convert(newState))) ?? .notConnected listener.value?.changeState(old: old, new: new) @@ -73,6 +75,7 @@ class NativeEndpoint { } deinit { + try? close() removeListener() if let endpoint = self.endpoint { let thread = currentThread() diff --git a/DXFeedFramework/Native/Feed/NativeFeed.swift b/DXFeedFramework/Native/Feed/NativeFeed.swift index 9ee2d0ff4..298569eed 100644 --- a/DXFeedFramework/Native/Feed/NativeFeed.swift +++ b/DXFeedFramework/Native/Feed/NativeFeed.swift @@ -13,9 +13,10 @@ import Foundation class NativeFeed { let feed: UnsafeMutablePointer? deinit { + print("deinit feed") if let feed = feed { let thread = currentThread() - _ = try? ErrorCheck.nativeCall(thread, dxfg_JavaObjectHandler_release(thread, &(feed.pointee.handler))) +// _ = try? ErrorCheck.nativeCall(thread, dxfg_JavaObjectHandler_release(thread, &(feed.pointee.handler))) } } init(feed: UnsafeMutablePointer) { diff --git a/DXFeedFramework/Native/Feed/NativePublisher.swift b/DXFeedFramework/Native/Feed/NativePublisher.swift index 3a3a69723..6640d16d1 100644 --- a/DXFeedFramework/Native/Feed/NativePublisher.swift +++ b/DXFeedFramework/Native/Feed/NativePublisher.swift @@ -15,9 +15,13 @@ class NativePublisher { private let mapper = EventMapper() deinit { + print("deinit publisher \(Thread.current.threadName) \(Date().timeIntervalSince1970)") if let publisher = publisher { let thread = currentThread() - _ = try? ErrorCheck.nativeCall(thread, dxfg_JavaObjectHandler_release(thread, &(publisher.pointee.handler))) + + print("deinit publisher1 \(Date().timeIntervalSince1970)") +// _ = try? ErrorCheck.nativeCall(thread, dxfg_JavaObjectHandler_release(thread, &(publisher.pointee.handler))) + print("deinit publisher2 \(Date().timeIntervalSince1970)") } } init(publisher: UnsafeMutablePointer) { @@ -33,9 +37,9 @@ class NativePublisher { let listPointer = nativePointers.newList() defer { - listPointer.deinitialize(count: 1) - listPointer.deallocate() - nativeEvents.forEach { mapper.releaseNative(native: $0) } +// listPointer.deinitialize(count: 1) +// listPointer.deallocate() +// nativeEvents.forEach { mapper.releaseNative(native: $0) } } let thread = currentThread() diff --git a/DXFeedFramework/Native/Graal/Isolate.swift b/DXFeedFramework/Native/Graal/Isolate.swift index 827eefddd..4760f3f6f 100644 --- a/DXFeedFramework/Native/Graal/Isolate.swift +++ b/DXFeedFramework/Native/Graal/Isolate.swift @@ -32,8 +32,10 @@ class Isolate { internal let isolate = UnsafeMutablePointer.allocate(capacity: 1) private let params = UnsafeMutablePointer.allocate(capacity: 1) private let thread = UnsafeMutablePointer.allocate(capacity: 1) + static var map = [String: OpaquePointer]() deinit { + print("deinit isolate") self.isolate.deallocate() self.params.deallocate() self.thread.deallocate() @@ -46,6 +48,16 @@ class Isolate { graal_detach_all_threads_and_tear_down_isolate(self.thread.pointee) } } + /// Internal save threads function. + /// + /// Is save graal thread for future using. + /// Just for testing purposes + static func updateThreads() { + if let thread = graal_get_current_thread(Isolate.shared.isolate.pointee) { + let pthread = pthread_self() + Isolate.map["\(pthread)"] = thread + } + } /// Isolate should be initialized in main thread to avoid problem with overcommited queues. /// diff --git a/DXFeedFramework/Native/Graal/IsolateThread.swift b/DXFeedFramework/Native/Graal/IsolateThread.swift deleted file mode 100644 index 39b566f38..000000000 --- a/DXFeedFramework/Native/Graal/IsolateThread.swift +++ /dev/null @@ -1,48 +0,0 @@ -// -// IsolateThread.swift -// DXFeedFramework -// -// Created by Aleksey Kosylo on 14.03.2023. -// - -import Foundation -@_implementationOnly import graal_api - -/// Just wrapper around iOS thread for attaching/detaching it from GraalVM -class IsolateThread { - let threadPointer = UnsafeMutablePointer.allocate(capacity: 1) - weak var thread: Thread? - let threadName: String - deinit { - assert(thread == Thread.current, - """ -Try \(String(describing: self)).\(#function) from non-parented thread. \ -Check if an object reference is being passed to a thread other than the parent. \ -Required thread: \(threadName). Current thread: \(Thread.current.threadName) -""") - if threadPointer.pointee != nil { - graal_detach_thread(self.threadPointer.pointee) - threadPointer.deallocate() - } - } - - init() { - thread = Thread.current - threadName = Thread.current.threadName - graal_attach_thread(Isolate.shared.isolate.pointee, self.threadPointer) - } - -} - -internal extension Thread { - var threadName: String { - if let currentOperationQueue = OperationQueue.current?.name { - return "OperationQueue: \(currentOperationQueue)" - } else if let underlyingDispatchQueue = OperationQueue.current?.underlyingQueue?.label { - return "DispatchQueue: \(underlyingDispatchQueue)" - } else { - let name = __dispatch_queue_get_label(nil) - return String(cString: name, encoding: .utf8) ?? Thread.current.description - } - } -} diff --git a/DXFeedFramework/Native/Graal/ThreadManager.swift b/DXFeedFramework/Native/Graal/ThreadManager.swift index 8846e8372..d1c7c3a1f 100644 --- a/DXFeedFramework/Native/Graal/ThreadManager.swift +++ b/DXFeedFramework/Native/Graal/ThreadManager.swift @@ -16,7 +16,13 @@ import Foundation func currentThread() -> OpaquePointer! { let currentThread = graal_get_current_thread(Isolate.shared.isolate.pointee) if currentThread == nil { - return ThreadManager.shared.attachThread().threadPointer.pointee + let pthread = pthread_self() + let value = Isolate.map["\(pthread)"] + if value != nil { + return value + } else { + return ThreadManager.shared.attachThread().pointee + } } else { return currentThread } @@ -27,21 +33,40 @@ func currentThread() -> OpaquePointer! { class ThreadManager { fileprivate static let shared = ThreadManager() private static let kThreadKey = "GraalThread" - private init() { + private static let key = UnsafeMutablePointer.allocate(capacity: 1) + static var str1: String = "" + private init() { + pthread_key_create(ThreadManager.key) { pointer in + pointer.withMemoryRebound(to: OpaquePointer.self, capacity: 1) { pointer1 in + graal_detach_thread(pointer1.pointee) + pointer1.deallocate() + } + } } - fileprivate func attachThread() -> IsolateThread { + + fileprivate func attachThread() -> UnsafeMutablePointer { defer { objc_sync_exit(self) } objc_sync_enter(self) - if let thread = Thread.current.threadDictionary[ThreadManager.kThreadKey] as? IsolateThread { - return thread + let threadPointer = UnsafeMutablePointer.allocate(capacity: 1) + let value = graal_attach_thread(Isolate.shared.isolate.pointee, threadPointer) + _ = pthread_setspecific(ThreadManager.key.pointee, threadPointer) + return threadPointer + } + +} + +internal extension Thread { + var threadName: String { + if let currentOperationQueue = OperationQueue.current?.name { + return "OperationQueue: \(currentOperationQueue)" + } else if let underlyingDispatchQueue = OperationQueue.current?.underlyingQueue?.label { + return "DispatchQueue: \(underlyingDispatchQueue)" } else { - let thread = IsolateThread() - Thread.current.threadDictionary[ThreadManager.kThreadKey] = thread - return thread + let name = __dispatch_queue_get_label(nil) + return String(cString: name, encoding: .utf8) ?? Thread.current.description } } - } diff --git a/DXFeedFramework/Native/Ipf/Live/NativeInstrumentProfileCollector.swift b/DXFeedFramework/Native/Ipf/Live/NativeInstrumentProfileCollector.swift index 3f854a91e..cb55bccab 100644 --- a/DXFeedFramework/Native/Ipf/Live/NativeInstrumentProfileCollector.swift +++ b/DXFeedFramework/Native/Ipf/Live/NativeInstrumentProfileCollector.swift @@ -39,6 +39,8 @@ public class NativeInstrumentProfileCollector { var profiles = [InstrumentProfile]() let listener: AnyObject = bridge(ptr: context) if let listener = listener as? WeakListener { + Isolate.updateThreads() + let iterator = NativeProfileIterator(nativeProfiles, isDeallocated: false) while (try? iterator.hasNext()) ?? false { diff --git a/DXFeedFramework/Native/Ipf/Live/NativeInstrumentProfileConnection.swift b/DXFeedFramework/Native/Ipf/Live/NativeInstrumentProfileConnection.swift index ecf976320..7beb78792 100644 --- a/DXFeedFramework/Native/Ipf/Live/NativeInstrumentProfileConnection.swift +++ b/DXFeedFramework/Native/Ipf/Live/NativeInstrumentProfileConnection.swift @@ -37,6 +37,8 @@ class NativeInstrumentProfileConnection { if let context = context { let endpoint: AnyObject = bridge(ptr: context) if let listener = endpoint as? WeakListener { + Isolate.updateThreads() + var old = (try? EnumUtil.valueOf(value: DXInstrumentProfileConnectionState.convert(oldState))) var new = (try? EnumUtil.valueOf(value: DXInstrumentProfileConnectionState.convert(newState))) listener.value?.listener?.connectionDidChangeState(old: old ?? .notConnected, diff --git a/DXFeedFramework/Native/Subscription/NativeSubscription.swift b/DXFeedFramework/Native/Subscription/NativeSubscription.swift index 7ff83b758..f52e2dc72 100644 --- a/DXFeedFramework/Native/Subscription/NativeSubscription.swift +++ b/DXFeedFramework/Native/Subscription/NativeSubscription.swift @@ -35,6 +35,8 @@ class NativeSubscription { var events = [MarketEvent]() let listener: AnyObject = bridge(ptr: context) if let listener = listener as? WeakSubscription { + Isolate.updateThreads() + guard let subscription = listener.value else { return } diff --git a/DXFeedFrameworkTests/DXFeedAllTests.xctestplan b/DXFeedFrameworkTests/DXFeedAllTests.xctestplan index dd6573135..6ea517a45 100644 --- a/DXFeedFrameworkTests/DXFeedAllTests.xctestplan +++ b/DXFeedFrameworkTests/DXFeedAllTests.xctestplan @@ -4,11 +4,15 @@ "id" : "A4145306-F50A-45F8-AEBE-8517F9446F91", "name" : "Test Scheme Action", "options" : { - + "threadSanitizerEnabled" : false, + "undefinedBehaviorSanitizerEnabled" : false } } ], "defaultOptions" : { + "addressSanitizer" : { + "enabled" : true + }, "codeCoverage" : false, "commandLineArgumentEntries" : [ @@ -16,11 +20,28 @@ "environmentVariableEntries" : [ ], - "testRepetitionMode" : "retryOnFailure" + "testRepetitionMode" : "retryOnFailure", + "threadSanitizerEnabled" : true }, "testTargets" : [ { "parallelizable" : true, + "skippedTests" : [ + "CandleTests", + "DXConnectionStateTests", + "DateTests", + "DateTimeParserTest", + "EndpointTest", + "EventsTest", + "FeedTest", + "IPFTests", + "IsolateTest", + "OrderSourceTest", + "ScheduleTest", + "SystemPropertyTest", + "ThreadsTest", + "UtilsTest" + ], "target" : { "containerPath" : "container:DXFeedFramework.xcodeproj", "identifier" : "803BAC1429BFA50700FFAB1C", diff --git a/DXFeedFrameworkTests/Listeners/TestEndpoointStateListener.swift b/DXFeedFrameworkTests/Listeners/TestEndpoointStateListener.swift index 1bcde9448..68901d1b3 100644 --- a/DXFeedFrameworkTests/Listeners/TestEndpoointStateListener.swift +++ b/DXFeedFrameworkTests/Listeners/TestEndpoointStateListener.swift @@ -10,9 +10,12 @@ import Foundation class TestEndpoointStateListener: DXEndpointObserver, Hashable { func endpointDidChangeState(old: DXFeedFramework.DXEndpointState, new: DXFeedFramework.DXEndpointState) { - self.callback(new) + callback(new) } + deinit { + print("deinit TestEndpoointStateListener \(Thread.current.threadName)") + } static func == (lhs: TestEndpoointStateListener, rhs: TestEndpoointStateListener) -> Bool { lhs === rhs diff --git a/DXFeedFrameworkTests/PublisherTest.swift b/DXFeedFrameworkTests/PublisherTest.swift index 0fb660d18..66edc102f 100644 --- a/DXFeedFrameworkTests/PublisherTest.swift +++ b/DXFeedFrameworkTests/PublisherTest.swift @@ -8,6 +8,12 @@ import XCTest @testable import DXFeedFramework +open class DispatchQueue1: DispatchQueue { + deinit { + print("deinit quuee") + } +} + final class PublisherTest: XCTestCase { override func setUpWithError() throws { @@ -19,32 +25,38 @@ final class PublisherTest: XCTestCase { } func testCreatePublisher() throws { + try execute() + } + + func execute() throws { do { let endpoint: DXEndpoint? = try DXEndpoint.builder().withRole(.publisher).withProperty("test", "value").build() try endpoint?.connect(":7400") - let publisher = endpoint?.getPublisher() let testQuote = Quote("AAPL") testQuote.bidSize = 100 testQuote.askPrice = 666 - + try? testQuote.setSequence(10) +// print(testQuote) let feedEndpoint = try DXEndpoint.builder().withRole(.feed).withProperty("test", "value").build() + var publisher = endpoint?.getPublisher() let connectedExpectation = expectation(description: "Connected") - let stateListener = TestEndpoointStateListener { listener in - listener.callback = { state in + var stateListener: TestEndpoointStateListener? = TestEndpoointStateListener { listener in + listener.callback = { +// [weak publisher] + state in if state == .connected { - connectedExpectation.fulfill() - DispatchQueue.global(qos: .background).async { + connectedExpectation.fulfill() + DispatchQueue1.global(qos: .background).asyncAfter(deadline: .now() + 0.3) { + print(Thread.current.threadName) try? publisher?.publish(events: [testQuote]) } - - } } return listener } - feedEndpoint.add(observer: stateListener) + feedEndpoint.add(observer: stateListener!) let subscription = try feedEndpoint.getFeed()?.createSubscription(.quote) try feedEndpoint.connect("localhost:7400") let receivedEventExp = expectation(description: "Received events \(EventCode.quote)") @@ -52,19 +64,25 @@ final class PublisherTest: XCTestCase { let listener = AnonymousClass { anonymCl in anonymCl.callback = { events in + print("events: \(events)") receivedEventExp.fulfill() } return anonymCl } - + try subscription?.add(observer: listener) try subscription?.addSymbols(["AAPL"]) wait(for: [connectedExpectation], timeout: 1) - wait(for: [receivedEventExp], timeout: 30) - Isolate.shared.callGC() - wait(seconds: 10) + wait(for: [receivedEventExp], timeout: 2) +// feedEndpoint.remove(stateListener) +// publisher = nil +// Isolate.shared.callGC() +// Isolate.shared.callGC() +// try endpoint?.closeAndAWaitTermination() +// wait(seconds: 40) + stateListener = nil } catch { print("\(error)") }