Skip to content

Commit

Permalink
Support multiwallet. Returns to async synchronizer
Browse files Browse the repository at this point in the history
  • Loading branch information
ant013 authored and esen committed Apr 24, 2023
1 parent f697343 commit de86eac
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 127 deletions.
210 changes: 103 additions & 107 deletions UnstoppableWallet/UnstoppableWallet/Core/Adapters/ZcashAdapter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ZcashAdapter {

private let saplingDownloader = DownloadService(queueLabel: "io.SaplingDownloader")

private let closureSynchronizer: ClosureSynchronizer
private let synchronizer: Synchronizer

private var address: UnifiedAddress?
private var transactionPool: ZcashTransactionPool?
Expand Down Expand Up @@ -85,9 +85,8 @@ class ZcashAdapter {
defaultFee(network: network, height: height).decimalValue.decimalValue
}


init(wallet: Wallet, restoreSettings: RestoreSettings) throws {
logger = App.shared.logger.scoped(with: "ZCashKit")//HsToolKit.Logger(minLogLevel: .debug)//
logger = /*App.shared.logger.scoped(with: "ZCashKit")*/HsToolKit.Logger(minLogLevel: .debug)//

guard let seed = wallet.account.type.mnemonicSeed else {
throw AdapterError.unsupportedAccount
Expand Down Expand Up @@ -133,25 +132,24 @@ class ZcashAdapter {
spendParamsURL: try ZcashAdapter.spendParamsURL(uniqueId: uniqueId),
outputParamsURL: try ZcashAdapter.outputParamsURL(uniqueId: uniqueId),
saplingParamsSourceURL: SaplingParamsSourceURL.default,
alias: .default,
alias: .custom(uniqueId),
logLevel: .error
)

spendingKey = unifiedSpendingKey
viewingKey = unifiedViewingKey

let synchronizer = SDKSynchronizer(initializer: initializer)
closureSynchronizer = ClosureSDKSynchronizer(synchronizer: synchronizer)
synchronizer = SDKSynchronizer(initializer: initializer)

// subscribe on sync states
closureSynchronizer
synchronizer
.stateStream
.throttle(for: .seconds(0.3), scheduler: DispatchQueue.main, latest: true)
.sink(receiveValue: { [weak self] state in self?.sync(state: state) })
.store(in: &cancellables)

// subscribe on new transactions
closureSynchronizer
synchronizer
.eventStream
.receive(on: DispatchQueue.main)
.sink(receiveValue: { [weak self] event in self?.sync(event: event) })
Expand All @@ -161,86 +159,63 @@ class ZcashAdapter {
NotificationCenter.default.addObserver(self, selector: #selector(didEnterBackground), name: UIApplication.didEnterBackgroundNotification, object: nil)
subscribe(disposeBag, saplingDownloader.stateObservable) { [weak self] in self?.downloaderStatusUpdated(state: $0) }

queue.async { [weak self] in
self?.prepare(seedData: seedData, viewingKeys: [unifiedViewingKey], walletBirthday: birthday)
}
prepare(seedData: seedData, viewingKeys: [unifiedViewingKey], walletBirthday: birthday)
}

private func prepare(seedData: [UInt8]?, viewingKeys: [UnifiedFullViewingKey], walletBirthday: BlockHeight) {
preparing = true
state = .preparing
closureSynchronizer.prepare(
with: seedData,
viewingKeys: viewingKeys,
walletBirthday: birthday
) { [weak self] result in
switch result {
case .success(let initializationResult):
self?.logger?.log(level: .debug, message: "Successful prepared! \(initializationResult)")
self?.initiateAddress(initializationResult: initializationResult)
case .failure(let error):
self?.setPreparing(error: error)
}
}
}

private func initiateAddress(initializationResult: Initializer.InitializationResult) {
switch initializationResult {
case .seedRequired:
setPreparing(error: AppError.ZcashError.seedRequired)
case .success:
closureSynchronizer.getUnifiedAddress(accountIndex: 0) { [weak self] address in
self?.logger?.log(level: .debug, message: "Successful get address for 0 account! \(address?.saplingReceiver()?.stringEncoded ?? "N/A")")
self?.initiateTransactions(address: address)
}
}
}
Task {
do {
let result = try await synchronizer.prepare(with: seedData, viewingKeys: viewingKeys, walletBirthday: walletBirthday)
if case .seedRequired = result {
throw AppError.ZcashError.seedRequired
}
logger?.log(level: .debug, message: "Successful prepared!")
guard let address = await synchronizer.getUnifiedAddress(accountIndex: 0),
let saplingAddress = address.saplingReceiver() else {
throw AppError.ZcashError.noReceiveAddress
}
self.address = address
logger?.log(level: .debug, message: "Successful get address for 0 account! \(saplingAddress.stringEncoded)")

private func initiateTransactions(address: UnifiedAddress?) {
self.address = address
guard let address, let saplingAddress = address.saplingReceiver() else {
setPreparing(error: AppError.ZcashError.noReceiveAddress)
return
}
let transactionPool = ZcashTransactionPool(receiveAddress: saplingAddress, synchronizer: synchronizer)
self.transactionPool = transactionPool

transactionPool = ZcashTransactionPool(receiveAddress: saplingAddress, closureSynchronizer: closureSynchronizer)
logger?.log(level: .debug, message: "Starting fetch transactions.")
let overviews = await synchronizer.clearedTransactions
let pending = await synchronizer.pendingTransactions
logger?.log(level: .debug, message: "Successful fetch \(overviews.count) txs and \(pending.count) pending txs")

logger?.log(level: .debug, message: "Starting fetch transactions.")
closureSynchronizer.clearedTransactions { [weak self] overviews in
self?.closureSynchronizer.pendingTransactions { pendings in
self?.logger?.log(level: .debug, message: "Successful fetch \(overviews.count) txs and \(pendings.count) pending txs")
self?.finishTransactions(overviews: overviews, pending: pendings)
}
}
}
await transactionPool.store(confirmedTransactions: overviews, pendingTransactions: pending)
let wrapped = transactionPool.all

private func finishTransactions(overviews: [ZcashTransaction.Overview], pending: [PendingTransactionEntity]) {
transactionPool?.store(confirmedTransactions: overviews, pendingTransactions: pending)
if !wrapped.isEmpty {
logger?.log(level: .debug, message: "Send to pool all transactions \(wrapped.count)")
transactionRecordsSubject.onNext(wrapped.map {
transactionRecord(fromTransaction: $0)
})
}

if let all = transactionPool?.all, !all.isEmpty {
logger?.log(level: .debug, message: "Send to pool all transactions \(all.count)")
transactionRecordsSubject.onNext(all.map {
transactionRecord(fromTransaction: $0)
})
}
let shielded = synchronizer.getShieldedBalance(accountIndex: 0).decimalValue.decimalValue
let shieldedVerified = synchronizer.getShieldedVerifiedBalance(accountIndex: 0).decimalValue.decimalValue
balanceSubject.onNext(BalanceData(
balance: shieldedVerified,
balanceLocked: shielded - shieldedVerified
))

let shielded = closureSynchronizer.getShieldedBalance(accountIndex: 0).decimalValue.decimalValue
let shieldedVerified = closureSynchronizer.getShieldedVerifiedBalance(accountIndex: 0).decimalValue.decimalValue
balanceSubject.onNext(BalanceData(
balance: shieldedVerified,
balanceLocked: shielded - shieldedVerified
))
DispatchQueue.main.async { [weak self] in
self?.finishPrepare()
finishPrepare()
} catch {
setPreparing(error: error)
}
}
}

private func setPreparing(error: Error) {
DispatchQueue.main.async { [weak self] in
self?.preparing = false
self?.state = .notSynced(error: error)
self?.logger?.log(level: .error, message: "Has preparing error! \(error)")
}
preparing = false
state = .notSynced(error: error)
logger?.log(level: .error, message: "Has preparing error! \(error)")
}

private func finishPrepare() {
Expand Down Expand Up @@ -321,20 +296,35 @@ class ZcashAdapter {
switch event {
case .foundTransactions(let transactions, let inRange):
logger?.log(level: .debug, message: "found \(transactions.count) mined txs in range: \(inRange)")
let newTxs = transactionPool?.sync(transactions: transactions) ?? []
transactionRecordsSubject.onNext(newTxs.map {
transactionRecord(fromTransaction: $0)
})
Task {
let newTxs = await transactionPool?.sync(transactions: transactions) ?? []
transactionRecordsSubject.onNext(newTxs.map {
transactionRecord(fromTransaction: $0)
})
}
case .minedTransaction(let pendingEntity):
logger?.log(level: .debug, message: "found pending tx")
update(transaction: pendingEntity)
update(transactions: [pendingEntity])
default:
logger?.log(level: .debug, message: "Event: \(event)")
}
}

private func update(transaction: PendingTransactionEntity) {
let newTxs = transactionPool?.sync(transactions: [transaction]) ?? []
private func reSyncPending() {
Task {
let pending = await synchronizer.pendingTransactions
logger?.log(level: .debug, message: "Found pending txs: \(pending.count)")
pending.forEach { entity in
logger?.log(level: .debug, message: "TX: \(entity.createTime) : \(entity.value.decimalValue.description) : \(entity.recipient.asString ?? "")")
}
if !pending.isEmpty {
update(transactions: pending)
}
}
}

private func update(transactions: [PendingTransactionEntity]) {
let newTxs = transactionPool?.sync(transactions: transactions) ?? []
transactionRecordsSubject.onNext(newTxs.map {
transactionRecord(fromTransaction: $0)
})
Expand Down Expand Up @@ -423,20 +413,21 @@ class ZcashAdapter {
return
}

closureSynchronizer.pendingTransactions { [weak self] txs in
Task {
let txs = await synchronizer.pendingTransactions
// fetch the first one that's reported to be unmined
guard let firstUnmined = txs.filter({ !$0.isMined }).first else {
App.shared.localStorage.zcashAlwaysPendingRewind = true
completion?()
return
}

self?.rewind(unmined: firstUnmined, completion: completion)
rewind(unmined: firstUnmined, completion: completion)
}
}

private func rewind(unmined: PendingTransactionEntity, completion: (() -> ())? = nil) {
closureSynchronizer
synchronizer
.rewind(.transaction(unmined.makeTransactionEntity(defaultFee: defaultFee(network: network))))
.sink(receiveCompletion: { result in
switch result {
Expand All @@ -453,7 +444,7 @@ class ZcashAdapter {
}

private func rewindQuick(completion: (() -> ())? = nil) {
closureSynchronizer
synchronizer
.rewind(.quick)
.sink(receiveCompletion: { [weak self] result in
switch result {
Expand Down Expand Up @@ -488,8 +479,9 @@ class ZcashAdapter {

deinit {
NotificationCenter.default.removeObserver(self)
closureSynchronizer.stop { [weak self] in
self?.logger?.log(level: .debug, message: "Synchronizer Was Stopped")
Task {
await synchronizer.stop()
logger?.log(level: .debug, message: "Synchronizer Was Stopped")
}
}

Expand Down Expand Up @@ -564,11 +556,7 @@ extension ZcashAdapter: IAdapter {

guard address != nil else { // else we need to try prepare library again
logger?.log(level: .debug, message: "No address, try to prepare kit again!")
queue.async { [weak self] in
if let seedData = self?.seedData, let viewingKey = self?.viewingKey, let birthday = self?.birthday {
self?.prepare(seedData: seedData, viewingKeys: [viewingKey], walletBirthday: birthday)
}
}
prepare(seedData: seedData, viewingKeys: [viewingKey], walletBirthday: birthday)
return
}

Expand All @@ -580,8 +568,9 @@ extension ZcashAdapter: IAdapter {
}

func stop() {
closureSynchronizer.stop() {
print("Synchronizer will stop")
Task {
await synchronizer.stop()
logger?.log(level: .debug, message: "Synchronizer will stop")
}
}

Expand All @@ -599,8 +588,11 @@ extension ZcashAdapter: IAdapter {
balanceSubject.onNext(_balanceData)
fixPendingTransactionsIfNeeded { [weak self] in
self?.logger?.log(level: .debug, message: "\(Date()) Try to start synchronizer : retry = \(retry), by Thread:\(Thread.current)")
self?.closureSynchronizer.start(retry: retry) { [weak self] error in
if let error {

Task { [weak self] in
do {
try await self?.synchronizer.start(retry: true)
} catch {
self?.state = .notSynced(error: error)
}
}
Expand All @@ -619,8 +611,8 @@ extension ZcashAdapter: IAdapter {
if let status = self.synchronizerState {
balanceState = """
shielded balance
total: \(closureSynchronizer.getShieldedBalance(accountIndex: 0).decimalValue.decimalValue)
verified: \(closureSynchronizer.getShieldedVerifiedBalance(accountIndex: 0).decimalValue.decimalValue)
total: \(synchronizer.getShieldedBalance(accountIndex: 0).decimalValue.decimalValue)
verified: \(synchronizer.getShieldedVerifiedBalance(accountIndex: 0).decimalValue.decimalValue)
transparent balance
total: \(String(describing: status.transparentBalance.total))
verified: \(String(describing: status.transparentBalance.verified))
Expand Down Expand Up @@ -718,7 +710,7 @@ extension ZcashAdapter: ISendZcashAdapter {
}

var availableBalance: Decimal {
max(0, closureSynchronizer.getShieldedVerifiedBalance(accountIndex: 0).decimalValue.decimalValue - fee)
max(0, synchronizer.getShieldedVerifiedBalance(accountIndex: 0).decimalValue.decimalValue - fee)
}

func validate(address: String) throws -> AddressType {
Expand All @@ -742,16 +734,20 @@ extension ZcashAdapter: ISendZcashAdapter {
func sendSingle(amount: Decimal, address: Recipient, memo: Memo?) -> Single<()> {
let spendingKey = spendingKey
return Single.create { [weak self] observer in
self?.closureSynchronizer.sendToAddress(
spendingKey: spendingKey,
zatoshi: Zatoshi.from(decimal: amount),
toAddress: address,
memo: memo) { [weak self] result in
switch result {
case .success(let entity):
self?.logger?.log(level: .debug, message: "Successful send TX: \(entity.createTime) : \(entity.value.decimalValue.description) : \(entity.recipient.asString ?? "")")
observer(.success(()))
case .failure(let error):
guard let self else {
observer(.error(AppError.unknownError))
return Disposables.create()
}
Task {
do {
let pendingEntity = try await self.synchronizer.sendToAddress(
spendingKey: spendingKey,
zatoshi: Zatoshi.from(decimal: amount),
toAddress: address,
memo: memo)
self.logger?.log(level: .debug, message: "Successful send TX: \(pendingEntity.createTime) : \(pendingEntity.value.decimalValue.description) : \(pendingEntity.recipient.asString ?? "")")
self.reSyncPending()
} catch {
observer(.error(error))
}
}
Expand Down
Loading

0 comments on commit de86eac

Please sign in to comment.