Skip to content

Commit

Permalink
Seed DynamoDB locally as well (#197)
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy authored Jan 5, 2025
1 parent 3b76c55 commit 1c1f77e
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 83 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ auto-update. If you modify files under `server/`, you'll have to re-run
Note that when run this way, to aid in development, the server will
auto-populate an event with a set of questions from a past live Q&A
session I ran at
<http://localhost:5173/#/event/00000000-0000-0000-0000-000000000000/secret>.
<http://localhost:5173/event/00000000000000000000000000/secret>.
It will also auto-generate user votes over time for the questions there.

If you're curious about the technologies used in the server and client,
Expand Down
14 changes: 3 additions & 11 deletions server/src/ask.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{Backend, Local};
use crate::to_dynamo_timestamp;
use crate::{to_dynamo_timestamp, QUESTIONS_TTL};
use aws_sdk_dynamodb::{
error::SdkError,
operation::put_item::{PutItemError, PutItemOutput},
Expand All @@ -9,17 +9,12 @@ use axum::extract::{Path, State};
use axum::response::Json;
use http::StatusCode;
use serde::Deserialize;
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};
use std::{collections::HashMap, time::SystemTime};
use ulid::Ulid;

#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};

const QUESTIONS_EXPIRE_AFTER_DAYS: u64 = 30;

impl Backend {
pub(super) async fn ask(
&self,
Expand All @@ -35,10 +30,7 @@ impl Backend {
("when", to_dynamo_timestamp(SystemTime::now())),
(
"expire",
to_dynamo_timestamp(
SystemTime::now()
+ Duration::from_secs(QUESTIONS_EXPIRE_AFTER_DAYS * 24 * 60 * 60),
),
to_dynamo_timestamp(SystemTime::now() + QUESTIONS_TTL),
),
("hidden", AttributeValue::Bool(false)),
];
Expand Down
220 changes: 157 additions & 63 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::Router;
use http::StatusCode;
use http_body_util::BodyExt;
use lambda_http::Error;
use std::time::SystemTime;
use std::time::{Duration, SystemTime};
use std::{
collections::HashMap,
future::Future,
Expand All @@ -19,6 +19,12 @@ use tower_service::Service;
use tracing_subscriber::EnvFilter;
use ulid::Ulid;

const QUESTIONS_EXPIRE_AFTER_DAYS: u64 = 30;
const QUESTIONS_TTL: Duration = Duration::from_secs(QUESTIONS_EXPIRE_AFTER_DAYS * 24 * 60 * 60);

const EVENTS_EXPIRE_AFTER_DAYS: u64 = 60;
const EVENTS_TTL: Duration = Duration::from_secs(EVENTS_EXPIRE_AFTER_DAYS * 24 * 60 * 60);

#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};

Expand All @@ -33,7 +39,6 @@ enum Backend {
}

impl Backend {
#[cfg(test)]
async fn local() -> Self {
Backend::Local(Arc::new(Mutex::new(Local::default())))
}
Expand Down Expand Up @@ -178,83 +183,172 @@ fn mint_service_error<E>(e: E) -> SdkError<E> {
)
}

/// Seed the database.
///
/// This will register a test event (with id `00000000000000000000000000`) and
/// a number of questions for it in the database, whether it's an in-memory [`Local`]
/// database or a local instance of DynamoDB. Note that in the latter case
/// we are checking if the test event is already there, and - if so - we are _not_ seeding
/// the questions. This is to avoid creating duplicated questions when re-running the app.
/// And this is not an issue of course when running against our in-memory [`Local`] database.
///
/// The returned vector contains IDs of the questions related to the test event.
#[cfg(debug_assertions)]
async fn seed(backend: &mut Backend) -> Vec<Ulid> {
#[derive(serde::Deserialize)]
struct LiveAskQuestion {
likes: usize,
text: String,
hidden: bool,
answered: bool,
#[serde(rename = "createTimeUnix")]
created: usize,
}

let seed: Vec<LiveAskQuestion> = serde_json::from_str(SEED).unwrap();
let seed_e = Ulid::from_string("00000000000000000000000000").unwrap();
let seed_e_secret = "secret";

info!("going to seed test event");
match backend.event(&seed_e).await.unwrap() {
output if output.item().is_some() => {
warn!("test event is already there, skipping seeding questions");
}
_ => {
backend.new(&seed_e, seed_e_secret).await.unwrap();
info!("successfully registered test event, going to seed questions now");
// first create questions ...
let mut qs = Vec::new();
for q in seed {
let qid = ulid::Ulid::new();
backend
.ask(
&seed_e,
&qid,
ask::Question {
body: q.text,
asker: None,
},
)
.await
.unwrap();
qs.push((qid, q.created, q.likes, q.hidden, q.answered));
}
// ... then set the vote count + answered/hidden flags
match backend {
Backend::Dynamo(ref mut client) => {
use aws_sdk_dynamodb::types::BatchStatementRequest;
// DynamoDB supports batch operations using PartiQL syntax with `25` as max batch size
// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchExecuteStatement.html
for chunk in qs.chunks(25) {
let batch_update = chunk
.iter()
.map(|(qid, created, votes, hidden, answered)| {
let builder = BatchStatementRequest::builder();
let builder = if *answered {
builder.statement(
// numerous words are reserved in the DynamoDB engine (e.g. Key, Id, When) and
// should be qouted; we are quoting all of our attrs to avoid possible collisions
r#"UPDATE "questions" SET "answered"=? SET "votes"=? SET "when"=? SET "hidden"=? WHERE "id"=?"#,
)
.parameters(to_dynamo_timestamp(SystemTime::now())) // answered
} else {
builder.statement(
r#"UPDATE "questions" SET "votes"=? SET "when"=? SET "hidden"=? WHERE "id"=?"#,
)
};
builder
.parameters(AttributeValue::N(votes.to_string())) // votes
.parameters(AttributeValue::N(created.to_string())) // when
.parameters(AttributeValue::Bool(*hidden)) // hidden
.parameters(AttributeValue::S(qid.to_string())) // id
.build()
.unwrap()
})
.collect::<Vec<_>>();
client
.batch_execute_statement()
.set_statements(Some(batch_update))
.send()
.await
.expect("batch to have been written ok");
}
}
Backend::Local(ref mut state) => {
let state = Arc::get_mut(state).unwrap();
let state = Mutex::get_mut(state).unwrap();
for (qid, created, votes, hidden, answered) in qs {
let q = state.questions.get_mut(&qid).unwrap();
q.insert("votes", AttributeValue::N(votes.to_string()));
if answered {
q.insert("answered", to_dynamo_timestamp(SystemTime::now()));
}
q.insert("hidden", AttributeValue::Bool(hidden));
q.insert("when", AttributeValue::N(created.to_string()));
}
}
}
info!("successfully registered questions");
}
}
// let's collect ids of the questions related to the test event,
// we can then use them to auto-generate user votes over time
backend
.list(&seed_e, true)
.await
.expect("scenned index ok")
.items()
.iter()
.filter_map(|item| {
let id = item
.get("id")
.expect("id is in projection")
.as_s()
.expect("id is of type string");
ulid::Ulid::from_string(id).ok()
})
.collect()
}

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
// TODO: we may _not_ want `without_time` when deploying
// TODO: on non-Lambda runtimes; this can be addressed as
// TODO: part of https://github.com/jonhoo/wewerewondering/issues/202
.without_time(/* cloudwatch does that */).init();

#[cfg(not(debug_assertions))]
let backend = Backend::dynamo().await;

#[cfg(debug_assertions)]
let backend = if std::env::var_os("USE_DYNAMODB").is_some() {
Backend::dynamo().await
} else {
let backend = {
use rand::prelude::SliceRandom;
use serde::Deserialize;
use std::time::Duration;

#[cfg(debug_assertions)]
#[derive(Deserialize)]
struct LiveAskQuestion {
likes: usize,
text: String,
hidden: bool,
answered: bool,
#[serde(rename = "createTimeUnix")]
created: usize,
}

let mut state = Local::default();
let seed: Vec<LiveAskQuestion> = serde_json::from_str(SEED).unwrap();
let seed_e = "00000000000000000000000000";
let seed_e = Ulid::from_string(seed_e).unwrap();
state.events.insert(seed_e, String::from("secret"));
state.questions_by_eid.insert(seed_e, Vec::new());
let mut state = Backend::Local(Arc::new(Mutex::new(state)));
let mut qs = Vec::new();
for q in seed {
let qid = ulid::Ulid::new();
state
.ask(
&seed_e,
&qid,
ask::Question {
body: q.text,
asker: None,
},
)
.await
.unwrap();
qs.push((qid, q.created, q.likes, q.hidden, q.answered));
}
let mut qids = Vec::new();
{
let Backend::Local(ref mut state): Backend = state else {
unreachable!();
};
let state = Arc::get_mut(state).unwrap();
let state = Mutex::get_mut(state).unwrap();
for (qid, created, votes, hidden, answered) in qs {
let q = state.questions.get_mut(&qid).unwrap();
q.insert("votes", AttributeValue::N(votes.to_string()));
if answered {
q.insert("answered", to_dynamo_timestamp(SystemTime::now()));
}
q.insert("hidden", AttributeValue::Bool(hidden));
q.insert("when", AttributeValue::N(created.to_string()));
qids.push(qid);
}
}
let cheat = state.clone();
let mut backend = if std::env::var_os("USE_DYNAMODB").is_some() {
Backend::dynamo().await
} else {
Backend::local().await
};

// to aid in development, seed the backend with a test event and related
// questions, and auto-generate user votes over time
let qids = seed(&mut backend).await;
let cheat = backend.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.tick().await;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let qid = qids.choose(&mut rand::thread_rng()).unwrap();
interval.tick().await;
let qid = qids
.choose(&mut rand::thread_rng())
.expect("there _are_ some questions for our test event");
let _ = cheat.vote(qid, vote::UpDown::Up).await;
}
});
state

backend
};

let app = Router::new()
Expand Down
11 changes: 3 additions & 8 deletions server/src/new.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::to_dynamo_timestamp;
use crate::{to_dynamo_timestamp, EVENTS_TTL};

use super::{Backend, Local};
use aws_sdk_dynamodb::{
Expand All @@ -11,14 +11,12 @@ use axum::response::Json;
use http::StatusCode;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::time::{Duration, SystemTime};
use std::time::SystemTime;
use ulid::Ulid;

#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};

const EVENTS_EXPIRE_AFTER_DAYS: u64 = 60;

impl Backend {
#[allow(clippy::wrong_self_convention)]
#[allow(clippy::new_ret_no_self)]
Expand All @@ -37,10 +35,7 @@ impl Backend {
.item("when", to_dynamo_timestamp(SystemTime::now()))
.item(
"expire",
to_dynamo_timestamp(
SystemTime::now()
+ Duration::from_secs(EVENTS_EXPIRE_AFTER_DAYS * 24 * 60 * 60),
),
to_dynamo_timestamp(SystemTime::now() + EVENTS_TTL),
)
.send()
.await
Expand Down

0 comments on commit 1c1f77e

Please sign in to comment.