Skip to content

Commit

Permalink
Rely on event & log streams of Liquid SDK plugin (#20)
Browse files Browse the repository at this point in the history
* Create Event, Log stream & connection management for Liquid SDK
* Renamed node state to wallet info
* Applied breaking changes for
- get_info: remove with_scan argument breez-liquid-sdk#306
  - getInfo API no longer has a request param
- Receive: Switch payment to pending state when lockup is in the mempool breez-liquid-sdk#301
  - Payment.txID is now an optional field
- Receive: Add zero-conf checks and properly manage state updates breez-liquid-sdk#292
  - A new optional field, zeroConfMinFeeRate is added to Config
- Add magic routing hint support breez-liquid-sdk#265
  - Payment.feesSat is no longer an optional field
  • Loading branch information
erdemyerebasmaz authored Jun 13, 2024
1 parent e87b256 commit fdbb970
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 173 deletions.
64 changes: 22 additions & 42 deletions lib/bloc/account/account_bloc.dart
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import 'dart:async';
import 'dart:io';

import 'package:breez_sdk/breez_sdk.dart';
import 'package:breez_sdk/exceptions.dart';
import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:flutter_breez_liquid/flutter_breez_liquid.dart' as liquid_sdk;
import 'package:flutter_fgbg/flutter_fgbg.dart';
import 'package:hydrated_bloc/hydrated_bloc.dart';
import 'package:l_breez/bloc/account/account_state.dart';
import 'package:l_breez/bloc/account/account_state_assembler.dart';
import 'package:l_breez/bloc/account/breez_liquid_sdk.dart';
import 'package:l_breez/bloc/account/credentials_manager.dart';
import 'package:l_breez/bloc/account/payment_filters.dart';
import 'package:l_breez/bloc/account/payment_result.dart';
import 'package:l_breez/config.dart';
import 'package:l_breez/models/payment_minutiae.dart';
import 'package:l_breez/services/injector.dart';
import 'package:logging/logging.dart';
import 'package:path/path.dart' as p;
import 'package:rxdart/rxdart.dart';
Expand All @@ -40,11 +38,11 @@ class AccountBloc extends Cubit<AccountState> with HydratedMixin {

Stream<PaymentFilters> get paymentFiltersStream => _paymentFiltersStreamController.stream;

final BreezSDK _breezSDK;
final CredentialsManager _credentialsManager;
final BreezLiquidSDK _breezLiquidSdk;

AccountBloc(
this._breezSDK,
this._breezLiquidSdk,
this._credentialsManager,
) : super(AccountState.initial()) {
hydrate();
Expand All @@ -55,34 +53,15 @@ class AccountBloc extends Cubit<AccountState> with HydratedMixin {
_listenPaymentResultEvents();
}

Stream<liquid_sdk.GetInfoResponse?> walletInfoStream() async* {
const req = liquid_sdk.GetInfoRequest(withScan: false);
final liquidSDK = ServiceInjector().liquidSDK;
yield await liquidSDK?.getInfo(req: req);
while (true) {
await Future.delayed(const Duration(seconds: 10));
yield await liquidSDK?.getInfo(req: req);
}
}

Stream<List<liquid_sdk.Payment>?> paymentsStream() async* {
final liquidSDK = ServiceInjector().liquidSDK;
yield await liquidSDK?.listPayments();
while (true) {
await Future.delayed(const Duration(seconds: 10));
yield await liquidSDK?.listPayments();
}
}

// _watchAccountChanges listens to every change in the local storage and assemble a new account state accordingly
Stream<AccountState> _watchAccountChanges() {
return Rx.combineLatest3<List<liquid_sdk.Payment>?, PaymentFilters, liquid_sdk.GetInfoResponse?,
AccountState>(
paymentsStream().asBroadcastStream(),
_breezLiquidSdk.paymentsStream,
paymentFiltersStream,
walletInfoStream().asBroadcastStream(),
(payments, paymentFilters, nodeState) {
return assembleAccountState(payments, paymentFilters, nodeState, state) ?? state;
_breezLiquidSdk.walletInfoStream,
(payments, paymentFilters, walletInfo) {
return assembleAccountState(payments, paymentFilters, walletInfo, state) ?? state;
},
);
}
Expand Down Expand Up @@ -137,8 +116,7 @@ class AccountBloc extends Cubit<AccountState> with HydratedMixin {
config: config.sdkConfig,
mnemonic: mnemonic,
);
final liquidSDK = await liquid_sdk.connect(req: req);
ServiceInjector().setLiquidSdk(liquidSDK);
await _breezLiquidSdk.connect(req: req);
_log.info("connected to breez lib");
emit(state.copyWith(connectionStatus: ConnectionStatus.CONNECTED));
_watchAccountChanges().listen((acc) {
Expand All @@ -158,7 +136,7 @@ class AccountBloc extends Cubit<AccountState> with HydratedMixin {
var lastSync = DateTime.fromMillisecondsSinceEpoch(0);
FGBGEvents.stream.listen((event) async {
if (event == FGBGType.foreground && DateTime.now().difference(lastSync).inSeconds > nodeSyncInterval) {
await ServiceInjector().liquidSDK?.sync();
_breezLiquidSdk.wallet?.sync();
lastSync = DateTime.now();
}
});
Expand All @@ -168,7 +146,7 @@ class AccountBloc extends Cubit<AccountState> with HydratedMixin {
_log.info("prepareSendPayment: $invoice");
try {
final req = liquid_sdk.PrepareSendRequest(invoice: invoice);
return await ServiceInjector().liquidSDK!.prepareSendPayment(req: req);
return await _breezLiquidSdk.wallet!.prepareSendPayment(req: req);
} catch (e) {
_log.severe("prepareSendPayment error", e);
return Future.error(e);
Expand All @@ -178,7 +156,7 @@ class AccountBloc extends Cubit<AccountState> with HydratedMixin {
Future<liquid_sdk.SendPaymentResponse> sendPayment(liquid_sdk.PrepareSendResponse req) async {
_log.info("sendPayment: $req");
try {
return await ServiceInjector().liquidSDK!.sendPayment(req: req);
return await _breezLiquidSdk.wallet!.sendPayment(req: req);
} catch (e) {
_log.severe("sendPayment error", e);
return Future.error(e);
Expand All @@ -189,7 +167,7 @@ class AccountBloc extends Cubit<AccountState> with HydratedMixin {
_log.info("prepareReceivePayment: $payerAmountSat");
try {
final req = liquid_sdk.PrepareReceiveRequest(payerAmountSat: BigInt.from(payerAmountSat));
return ServiceInjector().liquidSDK!.prepareReceivePayment(req: req);
return _breezLiquidSdk.wallet!.prepareReceivePayment(req: req);
} catch (e) {
_log.severe("prepareSendPayment error", e);
return Future.error(e);
Expand All @@ -199,7 +177,7 @@ class AccountBloc extends Cubit<AccountState> with HydratedMixin {
Future<liquid_sdk.ReceivePaymentResponse> receivePayment(liquid_sdk.PrepareReceiveResponse req) async {
_log.info("receivePayment: ${req.payerAmountSat}, fees: ${req.feesSat}");
try {
return ServiceInjector().liquidSDK!.receivePayment(req: req);
return _breezLiquidSdk.wallet!.receivePayment(req: req);
} catch (e) {
_log.severe("prepareSendPayment error", e);
return Future.error(e);
Expand Down Expand Up @@ -300,21 +278,23 @@ class AccountBloc extends Cubit<AccountState> with HydratedMixin {
void _listenPaymentResultEvents() {
_log.info("_listenPaymentResultEvents");
// TODO: Liquid - Listen to Liquid SDK's payment result stream
_breezSDK.paymentResultStream.listen((paymentInfo) {
_breezLiquidSdk.paymentResultStream.listen((paymentInfo) {
_paymentResultStreamController.add(
PaymentResult(paymentInfo: paymentInfo),
);
}, onError: (error) {
_log.info("Error in paymentResultStream", error);
var paymentHash = "";
var swapId = "";
if (error is PaymentException) {
final invoice = error.data.invoice;
if (invoice != null) {
paymentHash = invoice.paymentHash;
if (error.details.swapId != null) {
swapId = error.details.swapId!;
}
}
_paymentResultStreamController
.add(PaymentResult(error: PaymentResultError.fromException(paymentHash, error)));
_paymentResultStreamController.add(
PaymentResult(
error: PaymentResultError.fromException(swapId, error),
),
);
});
}

Expand Down
12 changes: 6 additions & 6 deletions lib/bloc/account/account_state_assembler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ import 'account_state.dart';
AccountState? assembleAccountState(
List<Payment>? payments,
PaymentFilters paymentFilters,
GetInfoResponse? nodeState,
GetInfoResponse? walletInfo,
AccountState state,
) {
if (nodeState == null) {
if (walletInfo == null) {
return null;
}

final texts = getSystemAppLocalizations();
// return the new account state
return state.copyWith(
id: nodeState.pubkey,
id: walletInfo.pubkey,
initial: false,
balance: nodeState.balanceSat.toInt(),
pendingReceive: nodeState.pendingReceiveSat.toInt(),
pendingSend: nodeState.pendingSendSat.toInt(),
balance: walletInfo.balanceSat.toInt(),
pendingReceive: walletInfo.pendingReceiveSat.toInt(),
pendingSend: walletInfo.pendingSendSat.toInt(),
maxPaymentAmount: maxPaymentAmount,
onChainFeeRate: 0,
payments: payments?.map((e) => PaymentMinutiae.fromPayment(e, texts)).toList(),
Expand Down
149 changes: 149 additions & 0 deletions lib/bloc/account/breez_liquid_sdk.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import 'dart:async';

import 'package:flutter_breez_liquid/flutter_breez_liquid.dart' as liquid_sdk;
import 'package:rxdart/rxdart.dart';

class BreezLiquidSDK {
liquid_sdk.BindingLiquidSdk? wallet;

Future<liquid_sdk.BindingLiquidSdk> connect({
required liquid_sdk.ConnectRequest req,
}) async {
wallet = await liquid_sdk.connect(req: req);
_initializeEventsStream(wallet!);
_subscribeToSdkStreams(wallet!);
await _fetchWalletData(wallet!);
return wallet!;
}

void disconnect(liquid_sdk.BindingLiquidSdk sdk) {
sdk.disconnect();
_unsubscribeFromSdkStreams();
}

Future _fetchWalletData(liquid_sdk.BindingLiquidSdk sdk) async {
await _getInfo(sdk);
await _listPayments(sdk);
}

Future<liquid_sdk.GetInfoResponse> _getInfo(liquid_sdk.BindingLiquidSdk sdk) async {
final walletInfo = await sdk.getInfo();
_walletInfoController.add(walletInfo);
return walletInfo;
}

Future<List<liquid_sdk.Payment>> _listPayments(liquid_sdk.BindingLiquidSdk sdk) async {
final paymentsList = await sdk.listPayments();
_paymentsController.add(paymentsList);
return paymentsList;
}

StreamSubscription<liquid_sdk.LogEntry>? _breezLogSubscription;

Stream<liquid_sdk.LogEntry>? _breezLogStream;

/// Initializes SDK log stream.
///
/// Call once on your Dart entrypoint file, e.g.; `lib/main.dart`.
void initializeLogStream() {
_breezLogStream ??= liquid_sdk.breezLogStream().asBroadcastStream();
}

StreamSubscription<liquid_sdk.LiquidSdkEvent>? _breezEventsSubscription;

Stream<liquid_sdk.LiquidSdkEvent>? _breezEventsStream;

void _initializeEventsStream(liquid_sdk.BindingLiquidSdk sdk) {
_breezEventsStream ??= sdk.addEventListener().asBroadcastStream();
}

/// Subscribes to SDK's event & log streams.
void _subscribeToSdkStreams(liquid_sdk.BindingLiquidSdk sdk) {
_subscribeToEventsStream(sdk);
_subscribeToLogStream();
}

final StreamController<liquid_sdk.GetInfoResponse> _walletInfoController =
BehaviorSubject<liquid_sdk.GetInfoResponse>();

Stream<liquid_sdk.GetInfoResponse> get walletInfoStream => _walletInfoController.stream;

final StreamController<liquid_sdk.Payment> _paymentResultStream = StreamController.broadcast();

final StreamController<List<liquid_sdk.Payment>> _paymentsController =
BehaviorSubject<List<liquid_sdk.Payment>>();

Stream<List<liquid_sdk.Payment>> get paymentsStream => _paymentsController.stream;

Stream<liquid_sdk.Payment> get paymentResultStream => _paymentResultStream.stream;

/* TODO: Liquid - Log statements are added for debugging purposes, should be removed after early development stage is complete & events are behaving as expected.*/
/// Subscribes to LiquidSdkEvent's stream
void _subscribeToEventsStream(liquid_sdk.BindingLiquidSdk sdk) {
_breezEventsSubscription = _breezEventsStream?.listen(
(event) async {
if (event is liquid_sdk.LiquidSdkEvent_PaymentFailed) {
_logStreamController
.add(liquid_sdk.LogEntry(line: "Payment Failed. ${event.details.swapId}", level: "WARN"));
_paymentResultStream.addError(PaymentException(event.details));
}
if (event is liquid_sdk.LiquidSdkEvent_PaymentPending) {
_logStreamController
.add(liquid_sdk.LogEntry(line: "Payment Pending. ${event.details.swapId}", level: "INFO"));
_paymentResultStream.add(event.details);
}
if (event is liquid_sdk.LiquidSdkEvent_PaymentRefunded) {
_logStreamController
.add(liquid_sdk.LogEntry(line: "Payment Refunded. ${event.details.swapId}", level: "INFO"));
_paymentResultStream.add(event.details);
}
if (event is liquid_sdk.LiquidSdkEvent_PaymentRefundPending) {
_logStreamController.add(
liquid_sdk.LogEntry(line: "Pending Payment Refund. ${event.details.swapId}", level: "INFO"));
_paymentResultStream.add(event.details);
}
if (event is liquid_sdk.LiquidSdkEvent_PaymentSucceeded) {
_logStreamController
.add(liquid_sdk.LogEntry(line: "Payment Succeeded. ${event.details.swapId}", level: "INFO"));
_paymentResultStream.add(event.details);
await _fetchWalletData(sdk);
}
if (event is liquid_sdk.LiquidSdkEvent_PaymentWaitingConfirmation) {
_logStreamController.add(liquid_sdk.LogEntry(
line: "Payment Waiting Confirmation. ${event.details.swapId}", level: "INFO"));
_paymentResultStream.add(event.details);
}
if (event is liquid_sdk.LiquidSdkEvent_Synced) {
_logStreamController.add(const liquid_sdk.LogEntry(line: "Received Synced event.", level: "INFO"));
await _fetchWalletData(sdk);
}
},
);
}

final _logStreamController = StreamController<liquid_sdk.LogEntry>.broadcast();

Stream<liquid_sdk.LogEntry> get logStream => _logStreamController.stream;

/// Subscribes to SDK's logs stream
void _subscribeToLogStream() {
_breezLogSubscription = _breezLogStream?.listen((logEntry) {
_logStreamController.add(logEntry);
}, onError: (e) {
_logStreamController.addError(e);
});
}

/// Unsubscribes from SDK's event & log streams.
void _unsubscribeFromSdkStreams() {
_breezEventsSubscription?.cancel();
_breezLogSubscription?.cancel();
}
}

// TODO: Liquid - Return this exception from the SDK directly
class PaymentException {
final liquid_sdk.Payment details;

const PaymentException(this.details);
}
2 changes: 1 addition & 1 deletion lib/bloc/account/payment_result.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import 'package:breez_sdk/sdk.dart';
import 'package:breez_translations/breez_translations_locales.dart';
import 'package:flutter_breez_liquid/flutter_breez_liquid.dart';
import 'package:l_breez/utils/exceptions.dart';
import 'package:flutter/cupertino.dart';

Expand Down
5 changes: 3 additions & 2 deletions lib/bloc/backup/backup_bloc.dart
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import 'package:flutter_breez_liquid/flutter_breez_liquid.dart';
import 'package:l_breez/bloc/account/breez_liquid_sdk.dart';
import 'package:l_breez/bloc/backup/backup_state.dart';
import 'package:hydrated_bloc/hydrated_bloc.dart';
import 'package:logging/logging.dart';

class BackupBloc extends Cubit<BackupState?> {
final _log = Logger("BackupBloc");
final BindingLiquidSdk? _liquidSDK;
final BreezLiquidSDK _liquidSDK;

BackupBloc(this._liquidSDK) : super(null);

Expand All @@ -21,7 +22,7 @@ class BackupBloc extends Cubit<BackupState?> {
Future<void> backup() async {
try {
emit(BackupState(status: BackupStatus.INPROGRESS));
_liquidSDK?.backup(req: const BackupRequest());
_liquidSDK.wallet?.backup(req: const BackupRequest());
emit(BackupState(status: BackupStatus.SUCCESS));
} catch (e) {
_log.info("Failed to backup");
Expand Down
8 changes: 5 additions & 3 deletions lib/bloc/currency/currency_bloc.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import 'dart:async';

import 'package:breez_sdk/breez_sdk.dart';
import 'package:breez_sdk/sdk.dart';
import 'package:l_breez/bloc/currency/currency_state.dart';
import 'package:hydrated_bloc/hydrated_bloc.dart';
import 'package:l_breez/bloc/currency/currency_state.dart';
import 'package:l_breez/services/injector.dart';

class CurrencyBloc extends Cubit<CurrencyState> with HydratedMixin {
final BreezSDK _breezSDK;
Expand All @@ -16,8 +17,9 @@ class CurrencyBloc extends Cubit<CurrencyState> with HydratedMixin {
void _initializeCurrencyBloc() {
late final StreamSubscription streamSubscription;
// TODO: Liquid - Listen to Liquid SDK's invoice paid stream
streamSubscription = _breezSDK.nodeStateStream.where((nodeState) => nodeState != null).listen(
(nodeState) {
final breezLiquidSdk = ServiceInjector().liquidSDK;
streamSubscription = breezLiquidSdk.walletInfoStream.listen(
(walletInfo) {
listFiatCurrencies();
fetchExchangeRates();
streamSubscription.cancel();
Expand Down
Loading

0 comments on commit fdbb970

Please sign in to comment.