Skip to content

Commit

Permalink
Merge pull request #2411 from bitfriend/activate-integration-test
Browse files Browse the repository at this point in the history
Wrap receiver in broadcast so that receiver can be called multiple times
  • Loading branch information
bitfriend authored Dec 13, 2024
2 parents da11a5e + 0c79d2f commit e7298f4
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 147 deletions.
1 change: 1 addition & 0 deletions app/lib/features/cross_signing/widgets/cross_signing.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ final _log = Logger('a3::cross_signing::widget');

// this widget has no elements
// it just pops up stage dialogs for verification
@immutable
class CrossSigning extends ConsumerStatefulWidget {
const CrossSigning({super.key});

Expand Down
2 changes: 1 addition & 1 deletion app/lib/features/main/app_shell.dart
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class AppShellState extends ConsumerState<AppShell> {
controller: screenshotController,
child: Column(
children: [
const CrossSigning(),
CrossSigning(),
Expanded(
child: buildBody(context),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import 'package:riverpod/riverpod.dart';
final _log = Logger('a3::settings::devices_notifier');

class AsyncDevicesNotifier extends AsyncNotifier<List<DeviceRecord>> {
Stream<DeviceEvent>? _listener;
StreamSubscription<DeviceEvent>? _poller;
late Stream<DeviceEvent> _listener;
late StreamSubscription<DeviceEvent> _poller;

Future<List<DeviceRecord>> _getSessions(SessionManager manager) async {
return (await manager.allSessions()).toList();
Expand All @@ -22,7 +22,7 @@ class AsyncDevicesNotifier extends AsyncNotifier<List<DeviceRecord>> {
final manager = client.sessionManager();

_listener = client.deviceEventRx();
_poller = _listener?.listen(
_poller = _listener.listen(
(data) async {
state = await AsyncValue.guard(() async => await _getSessions(manager));
},
Expand All @@ -33,7 +33,7 @@ class AsyncDevicesNotifier extends AsyncNotifier<List<DeviceRecord>> {
_log.info('stream ended');
},
);
ref.onDispose(() => _poller?.cancel());
ref.onDispose(() => _poller.cancel());

return await _getSessions(manager);
}
Expand Down
4 changes: 2 additions & 2 deletions native/acter/api.rsh
Original file line number Diff line number Diff line change
Expand Up @@ -2847,7 +2847,7 @@ object Client {
fn logout() -> Future<Result<bool>>;

/// Get the verification event receiver
fn verification_event_rx() -> Option<Stream<VerificationEvent>>;
fn verification_event_rx() -> Stream<VerificationEvent>;

/// Get session manager that returns all/verified/unverified/inactive session list
fn session_manager() -> SessionManager;
Expand All @@ -2863,7 +2863,7 @@ object Client {
fn install_sas_event_handler(flow_id: string) -> Future<Result<bool>>;

/// Return the event handler that new device was found or existing device was changed
fn device_event_rx() -> Option<Stream<DeviceEvent>>;
fn device_event_rx() -> Stream<DeviceEvent>;

/// Return the typing event receiver
fn subscribe_to_typing_event_stream(room_id: string) -> Stream<TypingEvent>;
Expand Down
25 changes: 13 additions & 12 deletions native/acter/src/api/device.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
use anyhow::{Context, Result};
use futures::{
channel::mpsc::{channel, Receiver, Sender},
pin_mut,
stream::StreamExt,
stream::{Stream, StreamExt},
};
use matrix_sdk::{executor::JoinHandle, Client as SdkClient};
use matrix_sdk_base::ruma::{OwnedDeviceId, OwnedUserId};
use std::{
marker::Unpin,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::sync::Mutex;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio_stream::wrappers::BroadcastStream;
use tracing::{error, info};

use super::{client::Client, common::DeviceRecord, RUNTIME};

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct DeviceEvent {
new_devices: Vec<OwnedDeviceId>,
changed_devices: Vec<OwnedDeviceId>,
Expand Down Expand Up @@ -50,7 +51,7 @@ impl DeviceEvent {
#[derive(Clone, Debug)]
pub(crate) struct DeviceController {
event_tx: Sender<DeviceEvent>, // keep it resident in memory
event_rx: Arc<Mutex<Option<Receiver<DeviceEvent>>>>,
event_rx: Arc<Receiver<DeviceEvent>>,
listener: Arc<JoinHandle<()>>, // keep it resident in memory
}

Expand Down Expand Up @@ -89,7 +90,7 @@ impl DeviceController {
}
if !new_devices.is_empty() || !changed_devices.is_empty() {
let evt = DeviceEvent::new(new_devices, changed_devices);
if let Err(e) = tx.try_send(evt) {
if let Err(e) = tx.send(evt) {
error!("Dropping device event: {}", e);
}
}
Expand All @@ -98,18 +99,18 @@ impl DeviceController {

DeviceController {
event_tx,
event_rx: Arc::new(Mutex::new(Some(event_rx))),
event_rx: Arc::new(event_rx),
listener: Arc::new(listener),
}
}
}

impl Client {
pub fn device_event_rx(&self) -> Option<Receiver<DeviceEvent>> {
match self.device_controller.event_rx.try_lock() {
Ok(mut r) => r.take(),
Err(e) => None,
}
// this return value should be Unpin, because next() of this stream is called in interactive_verification_started_from_request
// this return value should be wrapped in Box::pin, to make unpin possible
pub fn device_event_rx(&self) -> impl Stream<Item = DeviceEvent> + Unpin {
let mut stream = BroadcastStream::new(self.device_controller.event_rx.resubscribe());
Box::pin(stream.filter_map(|o| async move { o.ok() }))
}

pub async fn device_records(&self, verified: bool) -> Result<Vec<DeviceRecord>> {
Expand Down
Loading

0 comments on commit e7298f4

Please sign in to comment.