Skip to content

Commit

Permalink
hotfix: update presage fixing connection to Signal servers
Browse files Browse the repository at this point in the history
See <whisperfish/presage#300>.

Also the contact sync is disabled for now:
<#349>.
  • Loading branch information
boxdot committed Jan 16, 2025
1 parent ecf8a94 commit ef316bc
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 99 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ debug = true
dev = ["prost", "base64"]

[dependencies]
presage = { git = "https://github.com/whisperfish/presage", rev = "8b9af8ee4758c73550024bea8c715a893e9e4e47" }
presage-store-sled = { git = "https://github.com/whisperfish/presage", rev = "8b9af8ee4758c73550024bea8c715a893e9e4e47" }
presage = { git = "https://github.com/whisperfish/presage", rev = "35c2c98ba782fb212ad6cd3356fd4807e5d5a8eb" }
presage-store-sled = { git = "https://github.com/whisperfish/presage", rev = "35c2c98ba782fb212ad6cd3356fd4807e5d5a8eb" }

# dev feature dependencies
prost = { version = "0.13.4", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion benches/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn bench_on_message(c: &mut Criterion) {
|| (test_app(), data.clone()),
|(mut app, data)| async move {
for content in data {
app.on_message(content).await.unwrap();
app.on_message(Box::new(content)).await.unwrap();
}
},
BatchSize::SmallInput,
Expand Down
73 changes: 23 additions & 50 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,14 @@
use crate::channels::SelectChannel;
use crate::command::{
get_keybindings, Command, DirectionVertical, ModeKeybinding, MoveAmountText, MoveAmountVisual,
MoveDirection, Widget, WindowMode,
};
use crate::config::Config;
use crate::data::{BodyRange, Channel, ChannelId, Message, TypingAction, TypingSet};
use crate::event::Event;
use crate::input::Input;
use crate::receipt::{Receipt, ReceiptEvent, ReceiptHandler};
use crate::signal::{
Attachment, GroupIdentifierBytes, GroupMasterKeyBytes, ProfileKeyBytes, ResolvedGroup,
SignalManager,
};
use crate::storage::{MessageId, Storage};
use crate::util::{self, LazyRegex, StatefulList, ATTACHMENT_REGEX, URL_REGEX};
use std::borrow::Cow;
use std::cell::Cell;
use std::cmp::Reverse;
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::future::Future;
use std::io::Cursor;
use std::path::Path;

use anyhow::{anyhow, Context as _};
use arboard::Clipboard;
use chrono::{DateTime, Utc};
use crokey::Combiner;
use crossterm::event::{KeyCode, KeyEvent};
use image::codecs::png::PngEncoder;
Expand All @@ -39,17 +28,22 @@ use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use uuid::Uuid;

use std::borrow::Cow;
use std::cmp::Reverse;
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::future::Future;
use std::path::Path;
use std::time::Duration;

/// Amount of time to skip contacts sync after the last sync
const CONTACTS_SYNC_DEADLINE_SEC: i64 = 60 * 60 * 24; // 1 day
const CONTACTS_SYNC_TIMEOUT: Duration = Duration::from_secs(20);
use crate::channels::SelectChannel;
use crate::command::{
get_keybindings, Command, DirectionVertical, ModeKeybinding, MoveAmountText, MoveAmountVisual,
MoveDirection, Widget, WindowMode,
};
use crate::config::Config;
use crate::data::{BodyRange, Channel, ChannelId, Message, TypingAction, TypingSet};
use crate::event::Event;
use crate::input::Input;
use crate::receipt::{Receipt, ReceiptEvent, ReceiptHandler};
use crate::signal::{
Attachment, GroupIdentifierBytes, GroupMasterKeyBytes, ProfileKeyBytes, ResolvedGroup,
SignalManager,
};
use crate::storage::{MessageId, Storage};
use crate::util::{self, LazyRegex, StatefulList, ATTACHMENT_REGEX, URL_REGEX};

pub struct App {
pub config: Config,
Expand Down Expand Up @@ -582,7 +576,7 @@ impl App {
}
}

pub async fn on_message(&mut self, content: Content) -> anyhow::Result<()> {
pub async fn on_message(&mut self, content: Box<Content>) -> anyhow::Result<()> {
// tracing::info!(?content, "incoming");

#[cfg(feature = "dev")]
Expand Down Expand Up @@ -1487,27 +1481,6 @@ impl App {
self.display_help
}

pub fn request_contacts_sync(
&self,
) -> Option<impl Future<Output = anyhow::Result<DateTime<Utc>>> + 'static> {
let now = Utc::now();
let metadata = self.storage.metadata();
let do_sync = metadata
.contacts_sync_request_at
.map(|dt| dt + chrono::Duration::seconds(CONTACTS_SYNC_DEADLINE_SEC) < now)
.unwrap_or(true);
let signal_manager = self.signal_manager.clone_boxed();
do_sync.then_some(async move {
info!(timeout =? CONTACTS_SYNC_TIMEOUT, "requesting contact sync");
tokio::time::timeout(
CONTACTS_SYNC_TIMEOUT,
signal_manager.request_contacts_sync(),
)
.await??;
Ok(Utc::now())
})
}

pub fn is_select_channel_shown(&self) -> bool {
self.select_channel.is_shown
}
Expand Down
21 changes: 1 addition & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub enum Event {
Click(MouseEvent),
Input(KeyEvent),
Paste(String),
Message(Content),
Message(Box<Content>),
Resize { cols: u16, rows: u16 },
Quit(Option<anyhow::Error>),
ContactSynced(DateTime<Utc>),
Expand Down Expand Up @@ -136,10 +136,6 @@ async fn run_single_threaded(relink: bool) -> anyhow::Result<()> {
let (mut app, mut app_events) = App::try_new(config, signal_manager.clone_boxed(), storage)?;
app.populate_names_cache().await;

// sync task can be only spawned after we start to listen to message, because it relies on
// message sender to be running
let mut contact_sync_task = app.request_contacts_sync();

let (tx, mut rx) = tokio::sync::mpsc::channel::<Event>(100);
tokio::spawn({
let tx = tx.clone();
Expand Down Expand Up @@ -186,21 +182,6 @@ async fn run_single_threaded(relink: bool) -> anyhow::Result<()> {
}
};

if let Some(task) = contact_sync_task.take() {
let inner_tx = inner_tx.clone();
tokio::task::spawn_local(async move {
match task.await {
Ok(at) => inner_tx
.send(Event::ContactSynced(at))
.await
.expect("logic error: events channel closed"),
Err(error) => {
error!(%error, "failed to sync contacts");
}
}
});
}

while let Some(message) = messages.next().await {
backoff.reset();
inner_tx
Expand Down
35 changes: 21 additions & 14 deletions src/signal/impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@ use std::pin::Pin;

use anyhow::Context;
use async_trait::async_trait;
use presage::libsignal_service::content::{Content, ContentBody};
use presage::libsignal_service::prelude::ProfileKey;
use presage::libsignal_service::protocol::ServiceId;
use presage::libsignal_service::sender::AttachmentSpec;
use presage::manager::{ReceivingMode, Registered};
use presage::manager::Registered;
use presage::model::contacts::Contact;
use presage::model::groups::Group;
use presage::proto::data_message::{Quote, Reaction};
use presage::proto::{AttachmentPointer, DataMessage, EditMessage, GroupContextV2, ReceiptMessage};
use presage::store::ContentsStore;
use presage::{
libsignal_service::content::{Content, ContentBody},
model::messages::Received,
};
use presage_store_sled::SledStore;
use tokio::sync::oneshot;
use tokio_stream::Stream;
use tracing::error;
use tokio_stream::{Stream, StreamExt};
use tracing::{error, warn};
use uuid::Uuid;

use crate::data::{Channel, ChannelId, GroupData, Message};
Expand Down Expand Up @@ -322,10 +325,6 @@ impl SignalManager for PresageManager {
}
}

async fn request_contacts_sync(&self) -> anyhow::Result<()> {
Ok(self.manager.clone().sync_contacts().await?)
}

async fn profile_name(&self, id: Uuid) -> Option<String> {
let profile_key = self.manager.store().profile_key(&id).await.ok()??;
let profile = self.manager.store().profile(id, profile_key).await.ok()??;
Expand All @@ -341,12 +340,20 @@ impl SignalManager for PresageManager {
self.manager.store().contact_by_id(&id).await.ok()?
}

async fn receive_messages(&mut self) -> anyhow::Result<Pin<Box<dyn Stream<Item = Content>>>> {
Ok(Box::pin(
self.manager
.receive_messages(ReceivingMode::Forever)
.await?,
))
async fn receive_messages(
&mut self,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = Box<Content>>>>> {
Ok(Box::pin(self.manager.receive_messages().await?.filter_map(
|received| match received {
Received::Content(content) => Some(content),
Received::QueueEmpty => None,
Received::Contacts => {
// TODO: <https://github.com/boxdot/gurk-rs/issues/349>
warn!("Received contacts, but not implemented yet");
None
}
},
)))
}

async fn contacts(&self) -> Box<dyn Iterator<Item = Contact>> {
Expand Down
6 changes: 3 additions & 3 deletions src/signal/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ pub trait SignalManager {
profile_key: ProfileKeyBytes,
) -> Option<String>;

async fn request_contacts_sync(&self) -> anyhow::Result<()>;

async fn contact(&self, id: Uuid) -> Option<Contact>;

async fn receive_messages(&mut self) -> anyhow::Result<Pin<Box<dyn Stream<Item = Content>>>>;
async fn receive_messages(
&mut self,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = Box<Content>>>>>;

async fn contacts(&self) -> Box<dyn Iterator<Item = Contact>>;
async fn groups(&self) -> Box<dyn Iterator<Item = (GroupMasterKeyBytes, Group)>>;
Expand Down
8 changes: 3 additions & 5 deletions src/signal/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ impl SignalManager for SignalManagerMock {
None
}

async fn request_contacts_sync(&self) -> anyhow::Result<()> {
Ok(())
}

async fn profile_name(&self, _id: Uuid) -> Option<String> {
None
}
Expand All @@ -138,7 +134,9 @@ impl SignalManager for SignalManagerMock {
None
}

async fn receive_messages(&mut self) -> anyhow::Result<Pin<Box<dyn Stream<Item = Content>>>> {
async fn receive_messages(
&mut self,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = Box<Content>>>>> {
Ok(Box::pin(tokio_stream::empty()))
}

Expand Down

0 comments on commit ef316bc

Please sign in to comment.