Skip to content

Commit

Permalink
Merge pull request #2 from kosyloa/feature/observers_as_weak_references
Browse files Browse the repository at this point in the history
Feature/observers as weak references
  • Loading branch information
kosyloa authored Oct 6, 2023
2 parents 6cb4eae + b803d06 commit cc58211
Show file tree
Hide file tree
Showing 18 changed files with 165 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
run: gradle fetchDependencies

- name: Test
run: xcodebuild test -project DxFeedFramework.xcodeproj -scheme DXFeedFramework -testPlan DXExceptPublisherTests
run: xcodebuild test -project DxFeedFramework.xcodeproj -scheme DXFeedFramework -testPlan DXExceptPublisherTests -destination 'platform=macOS'

release:
if: (startsWith(github.event.ref, 'refs/tags/') && endsWith(github.event.ref, 'build'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@
<CommandLineArguments>
<CommandLineArgument
argument = "PerfTest localhost:6666 TimeAndSale YQKNT"
isEnabled = "NO">
isEnabled = "YES">
</CommandLineArgument>
<CommandLineArgument
argument = "Connect mddqa.in.devexperts.com:7400 Candle AAPL{=d} 20230201Z"
isEnabled = "YES">
isEnabled = "NO">
</CommandLineArgument>
<CommandLineArgument
argument = "Connect mddqa.in.devexperts.com:7400 Quote AAPL"
Expand Down
5 changes: 0 additions & 5 deletions DXFeedFramework/Api/DXEndpoint.swift
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,6 @@ public class DXEndpoint {
return instance
}
}

// only for testing
func callGC() throws {
try endpointNative.callGC()
}
}

/// Builder class for ``DXEndpoint`` that supports additional configuration properties.
Expand Down
19 changes: 2 additions & 17 deletions DXFeedFramework/Ipf/InstrumentProfile.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@ public class InstrumentProfile {
/// It takes precedence in conflict cases with other fields.
/// It is a mandatory field. It may not be empty.
/// Example: "STOCK", "FUTURE", "OPTION".
public var type = "" {
didSet {
if type != "" {
ipfType = InstrumentProfileType.find(type)
}
}
}
public var type = ""
/// Identifier of instrument,
public var symbol = ""
/// description of instrument
Expand Down Expand Up @@ -189,17 +183,8 @@ public class InstrumentProfile {

var customFields = [String: String]()

public var ipfType: InstrumentProfileType?

/// Creates an instrument profile with default values.
public init() {
// to activaate didSet methods
// willSet and didSet will never get called on setting the initial value of the property
updateValues()
}
private func updateValues() {
type = ""
}
public init() { }
}

extension InstrumentProfile {
Expand Down
6 changes: 6 additions & 0 deletions DXFeedFramework/Ipf/InstrumentProfileType.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@ public enum InstrumentProfileType: String, CaseIterable {
return value
}
}

public extension InstrumentProfile {
func getIpfType() -> InstrumentProfileType? {
return InstrumentProfileType.find(self.type)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ extension DXInstrumentProfileConnection: NativeIPFConnectionListener {
func connectionDidChangeState(old: DXInstrumentProfileConnectionState, new: DXInstrumentProfileConnectionState) {
observersSet.reader { items in
let enumerator = items.objectEnumerator()
while let observer = enumerator.nextObject() as? DXInstrumentProfileConnection {
while let observer = enumerator.nextObject() as? DXInstrumentProfileConnectionObserver {
observer.connectionDidChangeState(old: old, new: new)
}
}
Expand Down
21 changes: 5 additions & 16 deletions DXFeedFramework/Native/Endpoint/NativeEndpoint.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,12 @@ import Foundation

/// Native wrapper over the Java com.dxfeed.api.DXEndpoint class.
class NativeEndpoint {
class WeakListener: WeakBox<EndpointListener>, EndpointListener {
func changeState(old: DXEndpointState, new: DXEndpointState) {
guard let endpoint = self.value else {
return
}
endpoint.changeState(old: old, new: new)
}
}

private class WeakListener: WeakBox<EndpointListener> { }
private static let listeners = ConcurrentArray<WeakListener>()

let endpoint: UnsafeMutablePointer<dxfg_endpoint_t>!
var listener: UnsafeMutablePointer<dxfg_endpoint_state_change_listener_t>?
static let listeners = ConcurrentArray<WeakListener>()

private static let finalizeCallback: dxfg_finalize_function = { _, context in
if let context = context {
let endpoint: AnyObject = bridge(ptr: context)
Expand All @@ -39,7 +33,7 @@ class NativeEndpoint {
if let listener = endpoint as? WeakListener {
var old = (try? EnumUtil.valueOf(value: DXEndpointState.convert(oldState))) ?? .notConnected
var new = (try? EnumUtil.valueOf(value: DXEndpointState.convert(newState))) ?? .notConnected
listener.changeState(old: old, new: new)
listener.value?.changeState(old: old, new: new)
}
}
}
Expand Down Expand Up @@ -150,9 +144,4 @@ class NativeEndpoint {
let value = try ErrorCheck.nativeCall(thread, dxfg_DXEndpoint_getState(thread, self.endpoint))
return try EnumUtil.valueOf(value: DXEndpointState.convert(value))
}

func callGC() throws {
let thread = currentThread()
try ErrorCheck.nativeCall(thread, dxfg_gc(thread))
}
}
8 changes: 7 additions & 1 deletion DXFeedFramework/Native/Graal/Isolate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Isolate {
/// their tasks may be transferred to the overcommitted queue.
/// Within the context of GraalVM, this transfer can result in the creation of a new thread, which might have already been attached to other tasks.
/// This could lead to a fatalError, so it's crucial to carefully manage these processes and consider potential issues when working with the SDK."
init() {
private init() {
#if DEBUG
print("FEED SDK: Debug")
#else
Expand Down Expand Up @@ -84,4 +84,10 @@ class Isolate {
fatalError(errorMessage)
}
}

// only for testing
func callGC() {
let thread = currentThread()
_ = try? ErrorCheck.nativeCall(thread, dxfg_gc(thread))
}
}
5 changes: 5 additions & 0 deletions DXFeedFramework/Native/Ipf/InstrumentProfile+Ext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ extension InstrumentProfile {
}

func copy(to pointer: UnsafeMutablePointer<dxfg_instrument_profile_t>) {
pointer.pointee.type = type.toCStringRef()
pointer.pointee.symbol = symbol.toCStringRef()
pointer.pointee.description = descriptionStr.toCStringRef()
pointer.pointee.localSymbol = localSymbol.toCStringRef()
Expand Down Expand Up @@ -92,5 +93,9 @@ extension InstrumentProfile {
pointer.pointee.settlementStyle = settlementStyle.toCStringRef()
pointer.pointee.priceIncrements = priceIncrements.toCStringRef()
pointer.pointee.tradingHours = tradingHours.toCStringRef()
let list = UnsafeMutablePointer<dxfg_string_list>.allocate(capacity: 1)
list.pointee.size = 0
list.pointee.elements = nil
pointer.pointee.customFields = list
}
}
2 changes: 2 additions & 0 deletions DXFeedFramework/Native/Ipf/InstrumentProfileMapper.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class InstrumentProfileMapper {
}

func releaseNative(native: UnsafeMutablePointer<dxfg_instrument_profile_t>) {
native.pointee.customFields.deinitialize(count: 1)
native.pointee.customFields.deallocate()
native.deinitialize(count: 1)
native.deallocate()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,46 @@ import Foundation

/// Native wrapper over the Java com.dxfeed.ipf.live.InstrumentProfileCollector class.
/// The location of the imported functions is in the header files "dxfg_ipf.h".
class NativeInstrumentProfileCollector {
public class NativeInstrumentProfileCollector {
private class WeakListener: WeakBox<NativeInstrumentProfileCollector> { }
private static let listeners = ConcurrentArray<WeakListener>()

let collector: UnsafeMutablePointer<dxfg_ipf_collector_t>?
private var nativeListener: UnsafeMutablePointer<dxfg_ipf_update_listener_t>?

private weak var listener: DXInstrumentProfileUpdateListener?
private static let mapper = InstrumentProfileMapper()

static let listenerCallback: dxfg_ipf_update_listener_function = {_, nativeProfiles, context in
private static let finalizeCallback: dxfg_finalize_function = { _, context in
if let context = context {
let endpoint: AnyObject = bridge(ptr: context)
if let listener = endpoint as? WeakListener {
NativeInstrumentProfileCollector.listeners.removeAll(where: {
return $0 === listener
})
}
}
}

private static let listenerCallback: dxfg_ipf_update_listener_function = {_, nativeProfiles, context in
guard let nativeProfiles = nativeProfiles else {
return
}
if let context = context {
var profiles = [InstrumentProfile]()
let listener: AnyObject = bridge(ptr: context)
if let listener = listener as? NativeInstrumentProfileCollector {
if let listener = listener as? WeakListener {
let iterator = NativeProfileIterator(nativeProfiles)

while (try? iterator.hasNext()) ?? false {
do {
let profile = try iterator.next()
profiles.append(profile)
} catch {
print("NativeInstrumentProfileCollector: excpetion \(error)")
print("NativeInstrumentProfileCollector: exception \(error)")
}
}
listener.listener?.instrumentProfilesUpdated(profiles)

listener.value?.listener?.instrumentProfilesUpdated(profiles)
}
}
}
Expand All @@ -46,11 +59,12 @@ class NativeInstrumentProfileCollector {
let thread = currentThread()
_ = try? ErrorCheck.nativeCall(thread,
dxfg_InstrumentProfileCollector_removeUpdateListener(thread,
self.collector,
collector,
nativeListener))
_ = try? ErrorCheck.nativeCall(thread,
dxfg_JavaObjectHandler_release(thread,
&(nativeListener.pointee.handler)))
self.nativeListener = nil
self.listener = nil
}
}
Expand Down Expand Up @@ -86,10 +100,10 @@ class NativeInstrumentProfileCollector {
}
let thread = currentThread()
_ = try ErrorCheck.nativeCall(thread,
dxfg_InstrumentProfileCollector_updateInstrumentProfile(
thread,
collector,
native))
dxfg_InstrumentProfileCollector_updateInstrumentProfile(
thread,
collector,
native))
}

func view() throws -> NativeProfileIterator {
Expand All @@ -116,13 +130,23 @@ class NativeInstrumentProfileCollector {
removeListener()
let thread = currentThread()
self.listener = listener
let voidPtr = bridge(obj: self)

let weakListener = WeakListener(value: self)
NativeInstrumentProfileCollector.listeners.append(newElement: weakListener)
let voidPtr = bridge(obj: weakListener)

let callback = NativeInstrumentProfileCollector.listenerCallback
let listener = try ErrorCheck.nativeCall(thread,
let nativeListener = try ErrorCheck.nativeCall(thread,
dxfg_InstrumentProfileUpdateListener_new(thread,
callback,
voidPtr))
self.nativeListener = listener
self.nativeListener = nativeListener

try ErrorCheck.nativeCall(thread, dxfg_Object_finalize(thread,
&(nativeListener.pointee.handler),
NativeInstrumentProfileCollector.finalizeCallback,
voidPtr))

_ = try ErrorCheck.nativeCall(thread,
dxfg_InstrumentProfileCollector_addUpdateListener(thread,
self.collector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import Foundation
/// Native wrapper over the Java com.dxfeed.ipf.live.InstrumentProfileConnection class.
/// The location of the imported functions is in the header files "dxfg_ipf.h".
class NativeInstrumentProfileConnection {
private class WeakListener: WeakBox<NativeInstrumentProfileConnection> { }
private static let listeners = ConcurrentArray<WeakListener>()

private let connection: UnsafeMutablePointer<dxfg_ipf_connection_t>
private let address: String

Expand All @@ -19,14 +22,25 @@ class NativeInstrumentProfileConnection {
var nativeListener: UnsafeMutablePointer<dxfg_ipf_connection_state_change_listener_t>?
private weak var listener: NativeIPFConnectionListener?

static let listenerCallback: dxfg_ipf_connection_state_change_listener_func = {_, oldState, newState, context in
private static let finalizeCallback: dxfg_finalize_function = { _, context in
if let context = context {
let endpoint: AnyObject = bridge(ptr: context)
if let listener = endpoint as? NativeInstrumentProfileConnection {
if let listener = endpoint as? WeakListener {
NativeInstrumentProfileConnection.listeners.removeAll(where: {
return $0 === listener
})
}
}
}

private static let listenerCallback: dxfg_ipf_connection_state_change_listener_func = {_, oldState, newState, context in
if let context = context {
let endpoint: AnyObject = bridge(ptr: context)
if let listener = endpoint as? WeakListener {
var old = (try? EnumUtil.valueOf(value: DXInstrumentProfileConnectionState.convert(oldState)))
var new = (try? EnumUtil.valueOf(value: DXInstrumentProfileConnectionState.convert(newState)))
listener.listener?.connectionDidChangeState(old: old ?? .notConnected,
new: new ?? .notConnected)
listener.value?.listener?.connectionDidChangeState(old: old ?? .notConnected,
new: new ?? .notConnected)
}
}
}
Expand All @@ -39,6 +53,7 @@ class NativeInstrumentProfileConnection {
connection,
listener))
_ = try? ErrorCheck.nativeCall(thread, dxfg_JavaObjectHandler_release(thread, &(listener.pointee.handler)))
self.nativeListener = nil
self.listener = nil
}
}
Expand Down Expand Up @@ -128,13 +143,21 @@ class NativeInstrumentProfileConnection {
func addListener(_ listener: NativeIPFConnectionListener) throws {
removeListener()
self.listener = listener
let voidPtr = bridge(obj: self)
let weakListener = WeakListener(value: self)
NativeInstrumentProfileConnection.listeners.append(newElement: weakListener)
let voidPtr = bridge(obj: weakListener)
let thread = currentThread()
let listener = try ErrorCheck.nativeCall(thread,
dxfg_IpfPropertyChangeListener_new(
thread,
NativeInstrumentProfileConnection.listenerCallback,
voidPtr))

try ErrorCheck.nativeCall(thread, dxfg_Object_finalize(thread,
&(listener.pointee.handler),
NativeInstrumentProfileConnection.finalizeCallback,
voidPtr))

self.nativeListener = listener

try ErrorCheck.nativeCall(currentThread(),
Expand Down
Loading

0 comments on commit cc58211

Please sign in to comment.