Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query history from history server #143

Merged
merged 24 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
298a257
[WIP] Query history from history server
progval Oct 26, 2024
b7369a8
LATEST impl with blanks to fill
progval Oct 26, 2024
2c0eaad
[WIP] decouple send_item
progval Oct 27, 2024
2ef9b6a
Make HistoryService return an inflated event instead of HistoryLogEntry
progval Oct 27, 2024
c93e5bd
Merge branch 'master' into remote-history-service
progval Oct 27, 2024
e5aa0e0
Finish HistoryRequest::Latest in pg_history_service
progval Oct 27, 2024
682bbf7
Partial fill-in-the-blanks for message translation
spb Oct 27, 2024
a8874b6
Add missing files
progval Oct 27, 2024
f0ac38d
Make target a String in case it expired
progval Oct 27, 2024
9b88fca
Make id a MessageId
progval Oct 27, 2024
96661e9
sable_history: Add support for running migrations on startup
progval Oct 27, 2024
984de4d
pg_history_service: Fix LATEST logic
progval Oct 27, 2024
d56db83
Make sure server-time is consistent between echoes and CHATHISTORY
progval Oct 27, 2024
71ae9c1
Update down.sql
progval Oct 27, 2024
b87c630
Split make_historical_event out of get_entries
progval Oct 28, 2024
82d4546
Implement BEFORE/AFTER/BETWEEN
progval Oct 28, 2024
60a6fac
Implement TieredHistoryService::list_targets
progval Oct 28, 2024
3859cf8
Merge branch 'master' into remote-history-service
progval Nov 2, 2024
bdcc1e6
Finish implementing TieredHistoryService
progval Nov 2, 2024
89143da
Remove unused dependency on async-trait
progval Nov 2, 2024
0b245bf
Remove unused enum HistoryMessage
progval Nov 2, 2024
61f78eb
Remove panics
progval Nov 2, 2024
a8f711b
Deduplicate query collection
progval Nov 9, 2024
ffd8f83
Implement Around
progval Nov 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sable_history/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ built = { version = "0.5", features = [ "git2" ] }
sable_network = { path = "../sable_network" }
sable_server = { path = "../sable_server" }

futures = "0.3"
tokio = { version = "1.14", features = [ "full" ] }
serde = { version = "1", features = [ "derive" ] }
serde_with = "1.11"
Expand Down
1 change: 1 addition & 0 deletions sable_history/diesel.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[print_schema]
file = "src/schema.rs"
custom_type_derives = ["diesel::query_builder::QueryId", "Clone"]
import_types = ["crate::type::*"]

[migrations_directory]
dir = "migrations"
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DROP INDEX messages_by_timestamp;

ALTER TABLE
DROP COLUMN message_type,
DROP COLUMN timestamp;

DROP TYPE "MessageType";
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TYPE "Message_Type" AS ENUM ('privmsg', 'notice');

ALTER TABLE messages
ADD COLUMN message_type "Message_Type" NOT NULL,
ADD COLUMN timestamp TIMESTAMP NOT NULL;

CREATE INDEX messages_by_timestamp ON messages USING BRIN (timestamp, id);
COMMENT ON INDEX messages_by_timestamp IS 'Includes the id in order to be a consistent total order across requests';
3 changes: 3 additions & 0 deletions sable_history/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
mod pg_history_service;
pub use pg_history_service::PgHistoryService;
mod server;
pub use server::*;

mod models;
mod schema;
mod types;
10 changes: 10 additions & 0 deletions sable_history/src/models/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,19 @@ use super::*;
#[diesel(check_for_backend(diesel::pg::Pg))]
#[diesel(belongs_to(Channel, foreign_key = target_channel))]
#[diesel(belongs_to(HistoricUser, foreign_key = source_user))]
#[derive(Debug)]
pub struct Message {
pub id: Uuid,
pub source_user: i32,
pub target_channel: i64,
pub text: String,
pub message_type: crate::types::MessageType,
/// Timestamp of the *update* introducing the message.
///
/// This is usually the same second as the one in [`id`] (a UUIDv7), but is
/// occasionally 1 second later, because the message id is created before being
/// pushed to the log.
/// It can also before significantly different, because both are based on the
/// system clock, which can change arbitrarily.
pub timestamp: chrono::NaiveDateTime,
}
326 changes: 326 additions & 0 deletions sable_history/src/pg_history_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,326 @@
use std::collections::HashMap;

use anyhow::{bail, Result};
use chrono::{DateTime, NaiveDateTime, Utc};
use diesel::dsl::sql;
use diesel::prelude::*;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use futures::stream::{StreamExt, TryStreamExt};
use tokio::sync::Mutex;
use uuid::Uuid;

use sable_network::prelude::*;

use crate::schema::{channels, historic_users, messages};

/// Implementation of [`HistoryService`] backed PostgreSQL
pub struct PgHistoryService<'a> {
database_connection: &'a Mutex<AsyncPgConnection>,
}

impl<'a> PgHistoryService<'a> {
pub fn new(database_connection: &'a Mutex<AsyncPgConnection>) -> Self {
Self {
database_connection,
}
}
}

impl<'a> HistoryService for PgHistoryService<'a> {
async fn list_targets(
&self,
_user: UserId,
_after_ts: Option<i64>,
_before_ts: Option<i64>,
_limit: Option<usize>,
) -> HashMap<TargetId, i64> {
// TODO: access control
// TODO: after_ts, before_ts, limit
match channels::dsl::channels
.select((
channels::dsl::id,
sql::<diesel::pg::sql_types::Uuid>(
"SELECT MAX(id) FROM messages WHERE target_channel=channels.id",
),
))
.load_stream(&mut *self.database_connection.lock().await)
.await
{
Err(e) => {
tracing::error!("Could not get history channels: {e}");
return HashMap::new();
}
Ok(rows) => rows
.map(|row| -> Result<(TargetId, i64)> {
let (channel_id, max_message_id): (i64, Uuid) = row?;
let channel =
TargetId::Channel(ChannelId::from(Snowflake::from(channel_id as u64)));

let Some(ts) = max_message_id.get_timestamp() else {
bail!("messages.id should be a UUID7, not {max_message_id}");
};
let (seconds, _) = ts.to_unix();
let Ok(seconds) = seconds.try_into() else {
bail!("message {max_message_id}'s UNIX timestamp is negative");
};
Ok((channel, seconds))
})
.try_collect()
.await
.unwrap_or_else(|e| {
tracing::error!("Could not read rows: {e}");
HashMap::new()
}),
}
}

async fn get_entries(
&self,
_user: UserId,
target: TargetId,
request: HistoryRequest,
) -> Result<impl IntoIterator<Item = HistoricalEvent>, HistoryError> {
// TODO: access control
let TargetId::Channel(channel_id) = target else {
// TODO: PMs
return Err(HistoryError::InvalidTarget(target));
};

let mut connection_lock = self.database_connection.lock().await;

let db_channel_id = channel_id.as_u64() as i64;
let channel = match channels::dsl::channels
.find(db_channel_id)
.select(crate::models::Channel::as_select())
.first(&mut *connection_lock)
.await
.optional()
{
Ok(Some(channel)) => channel,
Ok(None) => return Err(HistoryError::InvalidTarget(target)),
Err(e) => {
tracing::error!("Could not check if channel exists: {e}");
return Err(HistoryError::InternalError(
"Could not check if channel exists".to_string(),
));
}
};

let base_query = messages::dsl::messages
.inner_join(historic_users::dsl::historic_users)
.select((
messages::dsl::id,
messages::dsl::timestamp,
messages::dsl::message_type,
messages::dsl::text,
historic_users::dsl::nick,
historic_users::dsl::ident,
historic_users::dsl::vhost,
historic_users::dsl::account_name,
))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth packaging this query up into a helper function in the models?

.filter(messages::dsl::target_channel.eq(db_channel_id));
match request {
HistoryRequest::Latest { to_ts, limit } => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no commonality in these arms that can be broken out? They're quite hard to follow as they are.

Copy link
Collaborator Author

@progval progval Nov 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rewrite Latest/Before/After as a Between, I guess

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to deduplicate most of the code: a8f711b

let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
match to_ts {
Some(to_ts) => {
let to_ts = DateTime::from_timestamp(to_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
.filter(messages::dsl::timestamp.gt(to_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
)
.await
}
None => {
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
)
.await
}
}
}
HistoryRequest::Before { from_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
let from_ts = DateTime::from_timestamp(from_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
.filter(messages::dsl::timestamp.lt(from_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
)
.await
}
HistoryRequest::After { start_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
let start_ts = DateTime::from_timestamp(start_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
collect_query(
connection_lock,
&channel,
false, // don't reverse
base_query
.filter(messages::dsl::timestamp.gt(start_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp, messages::dsl::id))
.limit(limit),
)
.await
}
HistoryRequest::Around { around_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
let around_ts = DateTime::from_timestamp(around_ts, 0)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
collect_query(
connection_lock,
&channel,
false, // don't reverse
CombineDsl::union(
base_query
.filter(messages::dsl::timestamp.le(around_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
base_query
.filter(messages::dsl::timestamp.gt(around_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp, messages::dsl::id))
.limit(limit),
),
)
.await
.map(|mut events| {
// TODO: make postgresql sort it, it may be able to do it directly from
// the index scan instead of sorting after the union
events.sort_unstable_by_key(|event| match event {
HistoricalEvent::Message { id, timestamp, .. } => (*timestamp, *id),
});
events
})
}
HistoryRequest::Between {
start_ts,
end_ts,
limit,
} => {
if start_ts <= end_ts {
let start_ts = DateTime::from_timestamp(start_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
let end_ts = DateTime::from_timestamp(end_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
collect_query(
connection_lock,
&channel,
false, // don't reverse
base_query
.filter(messages::dsl::timestamp.gt(start_ts))
.filter(messages::dsl::timestamp.lt(end_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp, messages::dsl::id))
.limit(limit),
)
.await
} else {
let start_ts = DateTime::from_timestamp(start_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
let end_ts = DateTime::from_timestamp(end_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
.filter(messages::dsl::timestamp.gt(end_ts))
.filter(messages::dsl::timestamp.lt(start_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
)
.await
}
}
}
}
}

type JoinedMessageRow = (
uuid::Uuid,
NaiveDateTime,
crate::types::MessageType,
String,
String,
String,
String,
Option<String>,
);

async fn collect_query<'query>(
mut connection: tokio::sync::MutexGuard<'_, AsyncPgConnection>,
channel: &crate::models::Channel,
reverse: bool,
query: impl diesel_async::RunQueryDsl<AsyncPgConnection>
+ diesel_async::methods::LoadQuery<'query, AsyncPgConnection, JoinedMessageRow>
+ 'query,
) -> Result<Vec<HistoricalEvent>, HistoryError> {
let events = query
.load_stream(&mut *connection)
.await
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(channel, row))
.try_collect::<Vec<_>>()
.await
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?;
Ok(if reverse {
events.into_iter().rev().collect()
} else {
events
})
}

fn make_historical_event(
channel: &crate::models::Channel,
(id, timestamp, message_type, text, source_nick, source_ident, source_vhost, source_account): JoinedMessageRow,
) -> HistoricalEvent {
HistoricalEvent::Message {
id: MessageId::new(id.try_into().expect("Message id is a non-v7 UUID")),
timestamp: timestamp.and_utc().timestamp(),
source: format!("{}!{}@{}", source_nick, source_ident, source_vhost),
source_account,
message_type: message_type.into(),
target: channel.name.clone(), // assume it's the same
text,
}
}
Loading
Loading