Skip to content

Commit

Permalink
Publisher test
Browse files Browse the repository at this point in the history
  • Loading branch information
kosyloa committed Oct 18, 2023
1 parent 5a78f81 commit 02230e8
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 82 deletions.
4 changes: 0 additions & 4 deletions DXFeedFramework.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@
803BAC1629BFA50700FFAB1C /* DXFeedFramework.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 803BAC0D29BFA50700FFAB1C /* DXFeedFramework.framework */; };
803BAC1C29BFA50700FFAB1C /* DxFeedSwiftFramework.h in Headers */ = {isa = PBXBuildFile; fileRef = 803BAC1029BFA50700FFAB1C /* DxFeedSwiftFramework.h */; settings = {ATTRIBUTES = (Public, ); }; };
8088D76529C0FBCE00F240CB /* ThreadManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8088D76329C0FBCE00F240CB /* ThreadManager.swift */; };
8088D76629C0FBCE00F240CB /* IsolateThread.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8088D76429C0FBCE00F240CB /* IsolateThread.swift */; };
8088D76929C0FC5C00F240CB /* Isolate.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8088D76829C0FC5C00F240CB /* Isolate.swift */; };
8088D77129C3A25D00F240CB /* SystemProperty.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8088D77029C3A25D00F240CB /* SystemProperty.swift */; };
8088D77329C3A2F400F240CB /* GraalException.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8088D77229C3A2F400F240CB /* GraalException.swift */; };
Expand Down Expand Up @@ -795,7 +794,6 @@
803BAC1029BFA50700FFAB1C /* DxFeedSwiftFramework.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = DxFeedSwiftFramework.h; sourceTree = "<group>"; };
803BAC1529BFA50700FFAB1C /* DXFeedFrameworkTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = DXFeedFrameworkTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
8088D76329C0FBCE00F240CB /* ThreadManager.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ThreadManager.swift; sourceTree = "<group>"; };
8088D76429C0FBCE00F240CB /* IsolateThread.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = IsolateThread.swift; sourceTree = "<group>"; };
8088D76829C0FC5C00F240CB /* Isolate.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Isolate.swift; sourceTree = "<group>"; };
8088D76A29C0FE1700F240CB /* IsolateTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IsolateTest.swift; sourceTree = "<group>"; };
8088D76C29C101CF00F240CB /* ThreadsTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThreadsTest.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -1496,7 +1494,6 @@
children = (
8088D76829C0FC5C00F240CB /* Isolate.swift */,
8088D76329C0FBCE00F240CB /* ThreadManager.swift */,
8088D76429C0FBCE00F240CB /* IsolateThread.swift */,
);
path = Graal;
sourceTree = "<group>";
Expand Down Expand Up @@ -2270,7 +2267,6 @@
64656F602A1B9EC2006A0B19 /* EnumUtil.swift in Sources */,
642BE4CA2A2E1C640052340A /* MarketEvent.swift in Sources */,
6486B95B2AD015B400D8D5FA /* PriceType.swift in Sources */,
8088D76629C0FBCE00F240CB /* IsolateThread.swift in Sources */,
64BDDB202AD6CC8300694210 /* AnalyticOrder.swift in Sources */,
64AAF0532A8113E800E8942B /* String+Range.swift in Sources */,
642BE4D02A2F1D3C0052340A /* Quote+Ext.swift in Sources */,
Expand Down
3 changes: 3 additions & 0 deletions DXFeedFramework/Native/Endpoint/NativeEndpoint.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class NativeEndpoint {
if let context = context {
let endpoint: AnyObject = bridge(ptr: context)
if let listener = endpoint as? WeakListener {
Isolate.updateThreads()

var old = (try? EnumUtil.valueOf(value: DXEndpointState.convert(oldState))) ?? .notConnected
var new = (try? EnumUtil.valueOf(value: DXEndpointState.convert(newState))) ?? .notConnected
listener.value?.changeState(old: old, new: new)
Expand Down Expand Up @@ -73,6 +75,7 @@ class NativeEndpoint {
}

deinit {
try? close()
removeListener()
if let endpoint = self.endpoint {
let thread = currentThread()
Expand Down
3 changes: 2 additions & 1 deletion DXFeedFramework/Native/Feed/NativeFeed.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import Foundation
class NativeFeed {
let feed: UnsafeMutablePointer<dxfg_feed_t>?
deinit {
print("deinit feed")
if let feed = feed {
let thread = currentThread()
_ = try? ErrorCheck.nativeCall(thread, dxfg_JavaObjectHandler_release(thread, &(feed.pointee.handler)))
// _ = try? ErrorCheck.nativeCall(thread, dxfg_JavaObjectHandler_release(thread, &(feed.pointee.handler)))
}
}
init(feed: UnsafeMutablePointer<dxfg_feed_t>) {
Expand Down
12 changes: 8 additions & 4 deletions DXFeedFramework/Native/Feed/NativePublisher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ class NativePublisher {
private let mapper = EventMapper()

deinit {
print("deinit publisher \(Thread.current.threadName) \(Date().timeIntervalSince1970)")
if let publisher = publisher {
let thread = currentThread()
_ = try? ErrorCheck.nativeCall(thread, dxfg_JavaObjectHandler_release(thread, &(publisher.pointee.handler)))

print("deinit publisher1 \(Date().timeIntervalSince1970)")
// _ = try? ErrorCheck.nativeCall(thread, dxfg_JavaObjectHandler_release(thread, &(publisher.pointee.handler)))
print("deinit publisher2 \(Date().timeIntervalSince1970)")
}
}
init(publisher: UnsafeMutablePointer<dxfg_publisher_t>) {
Expand All @@ -33,9 +37,9 @@ class NativePublisher {
let listPointer = nativePointers.newList()

defer {
listPointer.deinitialize(count: 1)
listPointer.deallocate()
nativeEvents.forEach { mapper.releaseNative(native: $0) }
// listPointer.deinitialize(count: 1)
// listPointer.deallocate()
// nativeEvents.forEach { mapper.releaseNative(native: $0) }

}
let thread = currentThread()
Expand Down
12 changes: 12 additions & 0 deletions DXFeedFramework/Native/Graal/Isolate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ class Isolate {
internal let isolate = UnsafeMutablePointer<OpaquePointer?>.allocate(capacity: 1)
private let params = UnsafeMutablePointer<graal_create_isolate_params_t>.allocate(capacity: 1)
private let thread = UnsafeMutablePointer<OpaquePointer?>.allocate(capacity: 1)
static var map = [String: OpaquePointer]()

deinit {
print("deinit isolate")
self.isolate.deallocate()
self.params.deallocate()
self.thread.deallocate()
Expand All @@ -46,6 +48,16 @@ class Isolate {
graal_detach_all_threads_and_tear_down_isolate(self.thread.pointee)
}
}
/// Internal save threads function.
///
/// Is save graal thread for future using.
/// Just for testing purposes
static func updateThreads() {
if let thread = graal_get_current_thread(Isolate.shared.isolate.pointee) {
let pthread = pthread_self()
Isolate.map["\(pthread)"] = thread
}
}

/// Isolate should be initialized in main thread to avoid problem with overcommited queues.
///
Expand Down
48 changes: 0 additions & 48 deletions DXFeedFramework/Native/Graal/IsolateThread.swift

This file was deleted.

43 changes: 34 additions & 9 deletions DXFeedFramework/Native/Graal/ThreadManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ import Foundation
func currentThread() -> OpaquePointer! {
let currentThread = graal_get_current_thread(Isolate.shared.isolate.pointee)
if currentThread == nil {
return ThreadManager.shared.attachThread().threadPointer.pointee
let pthread = pthread_self()
let value = Isolate.map["\(pthread)"]
if value != nil {
return value
} else {
return ThreadManager.shared.attachThread().pointee
}
} else {
return currentThread
}
Expand All @@ -27,21 +33,40 @@ func currentThread() -> OpaquePointer! {
class ThreadManager {
fileprivate static let shared = ThreadManager()
private static let kThreadKey = "GraalThread"
private init() {
private static let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
static var str1: String = ""

private init() {
pthread_key_create(ThreadManager.key) { pointer in
pointer.withMemoryRebound(to: OpaquePointer.self, capacity: 1) { pointer1 in
graal_detach_thread(pointer1.pointee)
pointer1.deallocate()
}
}
}
fileprivate func attachThread() -> IsolateThread {

fileprivate func attachThread() -> UnsafeMutablePointer<OpaquePointer?> {
defer {
objc_sync_exit(self)
}
objc_sync_enter(self)
if let thread = Thread.current.threadDictionary[ThreadManager.kThreadKey] as? IsolateThread {
return thread
let threadPointer = UnsafeMutablePointer<OpaquePointer?>.allocate(capacity: 1)
let value = graal_attach_thread(Isolate.shared.isolate.pointee, threadPointer)
_ = pthread_setspecific(ThreadManager.key.pointee, threadPointer)
return threadPointer
}

}

internal extension Thread {
var threadName: String {
if let currentOperationQueue = OperationQueue.current?.name {
return "OperationQueue: \(currentOperationQueue)"
} else if let underlyingDispatchQueue = OperationQueue.current?.underlyingQueue?.label {
return "DispatchQueue: \(underlyingDispatchQueue)"
} else {
let thread = IsolateThread()
Thread.current.threadDictionary[ThreadManager.kThreadKey] = thread
return thread
let name = __dispatch_queue_get_label(nil)
return String(cString: name, encoding: .utf8) ?? Thread.current.description
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class NativeInstrumentProfileCollector {
var profiles = [InstrumentProfile]()
let listener: AnyObject = bridge(ptr: context)
if let listener = listener as? WeakListener {
Isolate.updateThreads()

let iterator = NativeProfileIterator(nativeProfiles, isDeallocated: false)

while (try? iterator.hasNext()) ?? false {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class NativeInstrumentProfileConnection {
if let context = context {
let endpoint: AnyObject = bridge(ptr: context)
if let listener = endpoint as? WeakListener {
Isolate.updateThreads()

var old = (try? EnumUtil.valueOf(value: DXInstrumentProfileConnectionState.convert(oldState)))
var new = (try? EnumUtil.valueOf(value: DXInstrumentProfileConnectionState.convert(newState)))
listener.value?.listener?.connectionDidChangeState(old: old ?? .notConnected,
Expand Down
2 changes: 2 additions & 0 deletions DXFeedFramework/Native/Subscription/NativeSubscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class NativeSubscription {
var events = [MarketEvent]()
let listener: AnyObject = bridge(ptr: context)
if let listener = listener as? WeakSubscription {
Isolate.updateThreads()

guard let subscription = listener.value else {
return
}
Expand Down
25 changes: 23 additions & 2 deletions DXFeedFrameworkTests/DXFeedAllTests.xctestplan
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,44 @@
"id" : "A4145306-F50A-45F8-AEBE-8517F9446F91",
"name" : "Test Scheme Action",
"options" : {

"threadSanitizerEnabled" : false,
"undefinedBehaviorSanitizerEnabled" : false
}
}
],
"defaultOptions" : {
"addressSanitizer" : {
"enabled" : true
},
"codeCoverage" : false,
"commandLineArgumentEntries" : [

],
"environmentVariableEntries" : [

],
"testRepetitionMode" : "retryOnFailure"
"testRepetitionMode" : "retryOnFailure",
"threadSanitizerEnabled" : true
},
"testTargets" : [
{
"parallelizable" : true,
"skippedTests" : [
"CandleTests",
"DXConnectionStateTests",
"DateTests",
"DateTimeParserTest",
"EndpointTest",
"EventsTest",
"FeedTest",
"IPFTests",
"IsolateTest",
"OrderSourceTest",
"ScheduleTest",
"SystemPropertyTest",
"ThreadsTest",
"UtilsTest"
],
"target" : {
"containerPath" : "container:DXFeedFramework.xcodeproj",
"identifier" : "803BAC1429BFA50700FFAB1C",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import Foundation

class TestEndpoointStateListener: DXEndpointObserver, Hashable {
func endpointDidChangeState(old: DXFeedFramework.DXEndpointState, new: DXFeedFramework.DXEndpointState) {
self.callback(new)
callback(new)
}

deinit {
print("deinit TestEndpoointStateListener \(Thread.current.threadName)")
}

static func == (lhs: TestEndpoointStateListener, rhs: TestEndpoointStateListener) -> Bool {
lhs === rhs
Expand Down
Loading

0 comments on commit 02230e8

Please sign in to comment.