Skip to content

Commit

Permalink
Merge branch 'feature/ios_app_quotes' into swift
Browse files Browse the repository at this point in the history
  • Loading branch information
kosyloa committed Apr 8, 2024
2 parents dadfdab + 863a570 commit 7f670d6
Show file tree
Hide file tree
Showing 56 changed files with 1,149 additions and 641 deletions.
68 changes: 48 additions & 20 deletions DXFeedFramework.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
argument = "PerfTest localhost:7777 TimeAndSale YQKNT"
isEnabled = "NO">
</CommandLineArgument>
<CommandLineArgument
argument = "PerfTest demo.dxfeed.com:7300 TimeAndSale,Quote,profile ETH/USD:GDAX,AAPL -p monitoring.stat=2s"
isEnabled = "YES">
</CommandLineArgument>
<CommandLineArgument
argument = " qds post :6666"
isEnabled = "NO">
Expand All @@ -65,10 +69,10 @@
</CommandLineArgument>
<CommandLineArgument
argument = "Connect demo.dxfeed.com:7300 TRADE ETH/USD:GDAX"
isEnabled = "YES">
isEnabled = "NO">
</CommandLineArgument>
<CommandLineArgument
argument = "qds connect demo.dxfeed.com:7300 Quote AAPL,IBM"
argument = "qds connect demo.dxfeed.com:7300 TimeAndSale AAPL,AAPL2,IBM,ETH/USD:GDAX -p monitoring.stat=10s"
isEnabled = "NO">
</CommandLineArgument>
<CommandLineArgument
Expand Down
2 changes: 2 additions & 0 deletions DXFeedFramework/Native/Endpoint/NativeEndpoint.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class NativeEndpoint {

private static let listenerCallback: dxfg_endpoint_state_change_listener_func = {_, oldState, newState, context in
if let context = context {
ThreadManager.insertPthread()
let currentThread = graal_get_current_thread(Isolate.shared.isolate.pointee)
let endpoint: AnyObject = bridge(ptr: context)
if let listener = endpoint as? WeakListener {
var old = (try? EnumUtil.valueOf(value: DXEndpointState.convert(oldState))) ?? .notConnected
Expand Down
54 changes: 30 additions & 24 deletions DXFeedFramework/Native/Graal/Isolate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,32 @@ class Isolate {
internal let isolate = UnsafeMutablePointer<OpaquePointer?>.allocate(capacity: 1)
private let params = UnsafeMutablePointer<graal_create_isolate_params_t>.allocate(capacity: 1)
private let thread = UnsafeMutablePointer<OpaquePointer?>.allocate(capacity: 1)
let waiter = DispatchGroup()
lazy var osThread = {
let thread = Thread {
do {
try ErrorCheck.graalCall(graal_create_isolate(self.params, self.isolate, self.thread))
} catch GraalException.fail(let message, let className, let stack) {
let errorMessage = "!!!Isolate init failed: \(message) in \(className) with \(stack)"
fatalError(errorMessage)
} catch GraalException.isolateFail(let message) {
let errorMessage = "!!!Isolate init failed: \(message)"
fatalError(errorMessage)
} catch GraalException.undefined {
let errorMessage = "!!!Isolate init failed: undefined"
fatalError(errorMessage)
} catch {
let errorMessage = "!!!Isolate init failed: Unexpected error \(error)"
fatalError(errorMessage)
}
OrderSource.initAllValues()

self.waiter.leave()
Thread.sleep(forTimeInterval: .infinity)
}
thread.qualityOfService = .userInteractive
return thread
}()

deinit {
self.isolate.deallocate()
Expand Down Expand Up @@ -60,30 +86,10 @@ class Isolate {
#else
print("FEED SDK: Release")
#endif
print("DXFeedFramework.Isolate:init \(Thread.isMainThread) \(Thread.current) \(Thread.current.threadName)")
do {
if Thread.isMainThread {
try ErrorCheck.graalCall(graal_create_isolate(self.params, self.isolate, self.thread))
} else {
try DispatchQueue.main.sync {
try ErrorCheck.graalCall(graal_create_isolate(self.params, self.isolate, self.thread))
}
}
} catch GraalException.fail(let message, let className, let stack) {
let errorMessage = "!!!Isolate init failed: \(message) in \(className) with \(stack)"
fatalError(errorMessage)
} catch GraalException.isolateFail(let message) {
let errorMessage = "!!!Isolate init failed: \(message)"
fatalError(errorMessage)
} catch GraalException.undefined {
let errorMessage = "!!!Isolate init failed: undefined"
fatalError(errorMessage)
} catch {
let errorMessage = "!!!Isolate init failed: Unexpected error \(error)"
fatalError(errorMessage)
}

OrderSource.initAllValues()
osThread.qualityOfService = .userInteractive
waiter.enter()
osThread.start()
waiter.wait()
}

// only for testing
Expand Down
38 changes: 27 additions & 11 deletions DXFeedFramework/Native/Graal/ThreadManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ import Foundation
/// It checks whether the current Graal thread is available; if it is, the function returns a reference to it.
/// If not, it creates a new thread and returns a reference to the newly created thread. This function is commonly used in the context of multi-threaded programming to manage and work with threads in GraalVM.
func currentThread() -> OpaquePointer! {
let currentThread = graal_get_current_thread(Isolate.shared.isolate.pointee)
if currentThread == nil {
return ThreadManager.shared.attachThread().pointee
} else {
return currentThread
}
ThreadManager.shared.currentThread()
}

/// This is a utility class that implements a thread-local variable using Thread.current.threadDictionary.
Expand All @@ -28,29 +23,50 @@ class ThreadManager {
private static let kThreadKey = "GraalThread"
private static let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
static var str1: String = ""
private static var graalThreads = ConcurrentSet<String>()

private init() {
pthread_key_create(ThreadManager.key) { pointer in
pointer.withMemoryRebound(to: OpaquePointer.self, capacity: 1) { _ in
// The call to this method has been removed.
// In some cases: an attachment to a java stream crashes when you try to use this stream (deinit in this java stream)
// graal_detach_thread(pointer1.pointee)
pointer.withMemoryRebound(to: OpaquePointer.self, capacity: 1) { threadPointer in
if !ThreadManager.containsPthread() {
graal_detach_thread(threadPointer.pointee)
}
pointer.deallocate()
}
}
}

fileprivate func attachThread() -> UnsafeMutablePointer<OpaquePointer?> {
fileprivate func currentThread() -> OpaquePointer! {
defer {
objc_sync_exit(self)
}
objc_sync_enter(self)
let currentThread = graal_get_current_thread(Isolate.shared.isolate.pointee)
if currentThread == nil {
let result = ThreadManager.shared.attachThread().pointee
return result
} else {
return currentThread
}
}

fileprivate func attachThread() -> UnsafeMutablePointer<OpaquePointer?> {
let threadPointer = UnsafeMutablePointer<OpaquePointer?>.allocate(capacity: 1)
_ = graal_attach_thread(Isolate.shared.isolate.pointee, threadPointer)
_ = pthread_setspecific(ThreadManager.key.pointee, threadPointer)
return threadPointer
}

static func insertPthread() {
let value = "\(pthread_mach_thread_np(pthread_self()))"
graalThreads.insert(value)
}

static func containsPthread() -> Bool {
let value = "\(pthread_mach_thread_np(pthread_self()))"
return ThreadManager.graalThreads.contains(value)
}

}

internal extension Thread {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class NativeInstrumentProfileCollector {
profiles.append(profile)
} catch { }
}
ThreadManager.insertPthread()
listener.value?.instrumentProfilesUpdated(profiles)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class NativeInstrumentProfileConnection {
if let listener = endpoint as? WeakListener {
var old = (try? EnumUtil.valueOf(value: DXInstrumentProfileConnectionState.convert(oldState)))
var new = (try? EnumUtil.valueOf(value: DXInstrumentProfileConnectionState.convert(newState)))
ThreadManager.insertPthread()
listener.value?.listener?.connectionDidChangeState(old: old ?? .notConnected,
new: new ?? .notConnected)
}
Expand Down
1 change: 1 addition & 0 deletions DXFeedFramework/Native/Promise/NativePromise.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class NativePromise {

static let listenerCallback: dxfg_promise_handler_function = { _, promise, context in
if let context = context {
ThreadManager.insertPthread()

let listener: AnyObject = bridge(ptr: context)
if let weakListener = listener as? WeakListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class NativeSubscription {
}
}
}
ThreadManager.insertPthread()
subscription.listener?.receiveEvents(events)
}
}
Expand Down
5 changes: 5 additions & 0 deletions DXFeedFramework/Utils/ConcurrentSet.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class ConcurrentSet<T>: CustomStringConvertible where T: Hashable {
reader { $0.count }
}

public func contains(_ member: T) -> Bool {
reader { $0.contains(member) }
}

public func insert(_ newMember: T) {
writer { $0.insert(newMember) }
}
Expand All @@ -33,6 +37,7 @@ class ConcurrentSet<T>: CustomStringConvertible where T: Hashable {
public func remove(at position: Set<T>.Index) {
writer { $0.remove(at: position) }
}

public func removeAll() {
writer { $0.removeAll() }
}
Expand Down
4 changes: 4 additions & 0 deletions DXFeedFrameworkTests/CandleTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ final class CandleTests: XCTestCase {
try subscription?.addSymbols(symbol)
wait(for: [receivedEventExp], timeout: 10)
try? endpoint?.disconnect()
try endpoint?.closeAndAwaitTermination()
endpoint = nil

}

func testParseShortSymbol() throws {
Expand Down Expand Up @@ -195,6 +197,7 @@ final class CandleTests: XCTestCase {
try subscription?.addSymbols(symbol)
wait(for: [beginEventsExp, endEventsExp], timeout: 10)
try? endpoint?.disconnect()
try? endpoint?.closeAndAwaitTermination()
endpoint = nil
let sec = 5
_ = XCTWaiter.wait(for: [expectation(description: "\(sec) seconds waiting")], timeout: TimeInterval(sec))
Expand Down Expand Up @@ -224,6 +227,7 @@ final class CandleTests: XCTestCase {
try subscription?.addSymbols(symbol)
wait(for: [snapshotExpect, updateExpect], timeout: 10)
try? endpoint?.disconnect()
try? endpoint?.closeAndAwaitTermination()
endpoint = nil
}

Expand Down
6 changes: 5 additions & 1 deletion DXFeedFrameworkTests/DXConnectionStateTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ final class DXConnectionStateTests: XCTestCase {
static var publisherEndpoint: DXEndpoint?

override class func setUp() {
// publisherEndpoint receives incoming connection
publisherEndpoint = try? DXEndpoint.builder().withRole(.publisher).withProperty("test", "value").build()
_ = try? publisherEndpoint?.connect(":\(Self.port)")
}
Expand Down Expand Up @@ -50,6 +51,7 @@ final class DXConnectionStateTests: XCTestCase {
element.key == .notConnected
}).values)
wait(for: expsNotConnected, timeout: 1)
try? endpoint?.closeAndAwaitTermination()
}

func testListenerDealloc() throws {
Expand All @@ -69,6 +71,7 @@ final class DXConnectionStateTests: XCTestCase {
_ = try endpoint?.connect(Self.endpointAddress)
try endpoint?.disconnect()
XCTAssertEqual(try endpoint?.getState(), .notConnected)
try? endpoint?.closeAndAwaitTermination()
}

func testReconnect() throws {
Expand All @@ -81,7 +84,7 @@ final class DXConnectionStateTests: XCTestCase {
}), evaluatedWith: endpoint)], timeout: 2)
XCTAssertEqual(try endpoint.getState(), .connected)
try endpoint.reconnect()
try endpoint.close()
try endpoint.closeAndAwaitTermination()
XCTAssertEqual(try endpoint.getState(), .closed)
}

Expand All @@ -107,6 +110,7 @@ final class DXConnectionStateTests: XCTestCase {
}), evaluatedWith: endpoint)], timeout: 2)
try endpoint.disconnectAndClear()
XCTAssertEqual(try endpoint.getState(), .notConnected)
try? endpoint.closeAndAwaitTermination()
}

}
8 changes: 6 additions & 2 deletions DXFeedFrameworkTests/DXConnectionTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ final class DXConnectionTest: XCTestCase {
try subscription?.add(listener: eventListener)
try subscription?.addSymbols("AAPL")
try endpoint.connect("dxlink:wss://demo.dxfeed.com/dxlink-ws")

defer {
try? endpoint.closeAndAwaitTermination()
}
wait(for: [receivedEventsExpectation], timeout: 2)
}

Expand All @@ -69,7 +71,9 @@ final class DXConnectionTest: XCTestCase {
try subscription?.add(listener: eventListener)
try subscription?.addSymbols("AAPL")
try endpoint.connect("demo.dxfeed.com:7300")

defer {
try? endpoint.closeAndAwaitTermination()
}
wait(for: [receivedEventsExpectation], timeout: 2)
}
}
Loading

0 comments on commit 7f670d6

Please sign in to comment.