Skip to content

Commit

Permalink
improve append error handling and notifications;rules are applied at …
Browse files Browse the repository at this point in the history
…shutdown
  • Loading branch information
maksimryndin committed Jan 31, 2024
1 parent 0ec34f9 commit 018260b
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 169 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ So Goral provides the following features being deployed next to your app(s):

* Memory: RSS 30M, 900M for virtual memory. An actual requirement may be different - as it depends on the amount of data, scrape and push intervals (see below for each [service](#services))
* Binary size is around 15 Mb
* Platforms: Linux, MacOS. Other platform will probably work also.
* Platforms: Linux (x86-64 or aarch64). Other platform will probably work also.

### Installation

Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async fn start() -> Result<(), String> {
get_google_auth(&config.general.service_account_credentials_path).await;
let (tx, rx) = setup_general_messenger_channel();
let sheets_api = SpreadsheetAPI::new(auth, tx.clone());
let storage = Arc::new(Storage::new(args.id.to_string(), sheets_api, tx.clone()));
let storage = Arc::new(Storage::new(args.id.to_string(), sheets_api));
let shared = Shared::new(tx.clone());

let messengers = collect_messengers(&config);
Expand Down
10 changes: 3 additions & 7 deletions src/services/general/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl Service for GeneralService {
let latest = release.tag_name;
if !release.prerelease && current != latest {
let msg = format!(
"Your {APP_NAME} version `{current}` is not the latest `{latest}`,\
consider [upgrading](https://github.com/maksimryndin/goral/releases).\
"Your {APP_NAME} version `{current}` is not the latest `{latest}`, \
consider [upgrading](https://github.com/maksimryndin/goral/releases). \
If you like {APP_NAME}, consider giving a star to the [repo](https://github.com/maksimryndin/goral). \
Thank you!"
);
Expand Down Expand Up @@ -203,11 +203,7 @@ mod tests {
send_notification.clone(),
TestState::new(vec![], None, None),
);
let storage = Arc::new(Storage::new(
TEST_HOST_ID.to_string(),
sheets_api,
send_notification.clone(),
));
let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api));
let log = AppendableLog::new(
storage.clone(),
"spreadsheet1".to_string(),
Expand Down
19 changes: 10 additions & 9 deletions src/services/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::rules::RULES_LOG_NAME;
use crate::services::kv::configuration::Kv;
use crate::services::{messenger_queue, rules_notifications, Data, Service};
use crate::spreadsheet::datavalue::{Datarow, Datavalue};
use crate::storage::{AppendError, AppendableLog};
use crate::storage::{AppendableLog, StorageError};
use crate::{capture_datetime, MessengerApi, Notification, Sender, Shared};
use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
Expand Down Expand Up @@ -121,7 +121,7 @@ struct Response {

struct AppendRequest {
datarows: Vec<Datarow>,
reply_to: oneshot::Sender<Result<Vec<String>, AppendError>>,
reply_to: oneshot::Sender<Result<Vec<String>, StorageError>>,
}

struct ReadyHandle(Arc<AtomicBool>);
Expand Down Expand Up @@ -217,10 +217,11 @@ impl KvService {
send_notification.fatal(msg).await;
panic!("assert: kv service should be able to get a result of append");
}
Ok(Err(AppendError::ApiTimeout)) => {
Ok(Err(StorageError::Timeout(_))) | Ok(Err(StorageError::RetryTimeout(_))) => {
panic!("assert: for kv service google api timeout is not applied");
}
Ok(Err(AppendError::Http(http_response))) => {
Ok(Err(StorageError::Retriable(http_response)))
| Ok(Err(StorageError::NonRetriable(http_response))) => {
return Ok(HyperResponse::builder()
.status(
StatusCode::from_u16(http_response.status().as_u16())
Expand Down Expand Up @@ -372,6 +373,7 @@ impl Service for KvService {
tasks.push(server);
let tasks = try_join_all(tasks);
tokio::pin!(tasks);
self.welcome_hook(&log).await;

loop {
tokio::select! {
Expand All @@ -391,6 +393,9 @@ impl Service for KvService {
// drain remaining messages
while let Some(append_request) = data_receiver.recv().await {
let AppendRequest{datarows, reply_to} = append_request;
let mut data = Data::Many(datarows);
self.send_for_rule_processing(&log, &mut data, &mut rules_input).await;
let Data::Many(datarows) = data else {panic!("assert: packing/unpacking of KV data")};
let res = log.append_no_retry(datarows).await;
if reply_to.send(res).is_err() {
tracing::warn!("client of the kv server dropped connection");
Expand Down Expand Up @@ -453,11 +458,7 @@ mod tests {
send_notification.clone(),
TestState::new(vec![], None, None),
);
let storage = Arc::new(Storage::new(
TEST_HOST_ID.to_string(),
sheets_api,
send_notification.clone(),
));
let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api));
let log = AppendableLog::new(
storage.clone(),
"spreadsheet1".to_string(),
Expand Down
38 changes: 30 additions & 8 deletions src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,18 @@ pub trait Service: Send + Sync {
self.shared().send_notification.fatal(msg.clone()).await;
panic!("{}", msg);
}
}

async fn welcome_hook(&self, log: &AppendableLog) {
let msg = format!(
"service `{}` is running with [spreadsheet]({})",
self.name(),
log.spreadsheet_baseurl(),
);
tracing::info!("{}", msg);
self.shared().send_notification.try_info(msg);
self.messenger()
.unwrap_or(self.shared().send_notification.clone())
.try_info(msg);
tracing::debug!(
"channel capacity `{}` for service `{}`",
self.channel_capacity(),
Expand Down Expand Up @@ -505,6 +510,7 @@ pub trait Service: Send + Sync {
let mut push_interval = tokio::time::interval(self.push_interval());
let mut rules_update_interval = tokio::time::interval(self.rules_update_interval());
let mut accumulated_data = vec![];
self.welcome_hook(&log).await;
loop {
tokio::select! {
result = shutdown.recv() => {
Expand All @@ -522,7 +528,8 @@ pub trait Service: Send + Sync {
_ = async {
// drain remaining messages
while let Some(task_result) = data_receiver.recv().await {
let data = self.process_task_result_on_shutdown(task_result, &log).await;
let mut data = self.process_task_result_on_shutdown(task_result, &log).await;
self.send_for_rule_processing(&log, &mut data, &mut rules_input).await;
match data {
Data::Empty | Data::Message(_) => {},
Data::Single(datarow) => {accumulated_data.push(datarow);}
Expand All @@ -537,9 +544,28 @@ pub trait Service: Send + Sync {
return;
},
_ = push_interval.tick() => {
let rows_count = accumulated_data.len();
if rows_count > 0 {
tracing::info!(
"appending {} rows for service {}",
rows_count,
self.name()
);
}
let example_rules = self.get_example_rules();
accumulated_data.extend(example_rules);
let _ = log.append(accumulated_data).await;
if let Err(e) = log.append(accumulated_data).await {
let msg = format!("{e} for service `{}`, failed to append {rows_count} rows", self.name());
tracing::error!("{}", msg);
self.messenger().unwrap_or(self.shared().send_notification.clone()).try_error(msg);
} else if rows_count > 0 {
tracing::info!(
"appended {} rows for service {}",
rows_count,
self.name()
);
}

accumulated_data = vec![];
},
_ = rules_update_interval.tick() => {
Expand Down Expand Up @@ -685,11 +711,7 @@ mod tests {
Some(APPEND_DURATION_MS / 2), // because append is 2 Google api calls
),
);
let storage = Arc::new(Storage::new(
TEST_HOST_ID.to_string(),
sheets_api,
tx.clone(),
));
let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api));

let (shutdown, rx) = broadcast::channel(1);

Expand Down
57 changes: 37 additions & 20 deletions src/spreadsheet/spreadsheet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::spreadsheet::sheet::{CleanupSheet, Rows, Sheet, SheetId, UpdateSheet, VirtualSheet};
use crate::spreadsheet::{HttpResponse, Metadata};
use crate::spreadsheet::Metadata;
use crate::storage::StorageError;

#[cfg(not(test))]
use crate::HyperConnector;
Expand Down Expand Up @@ -33,7 +34,7 @@ pub(crate) const GOOGLE_SPREADSHEET_MAXIMUM_CHARS_PER_CELL: usize = 50_000;
async fn handle_error<T>(
spreadsheet: &SpreadsheetAPI,
result: SheetsResult<(Response<Body>, T)>,
) -> Result<T, HttpResponse> {
) -> Result<T, StorageError> {
match result {
Err(e) => match e {
// fatal
Expand Down Expand Up @@ -64,13 +65,25 @@ async fn handle_error<T>(
panic!("Fatal error for Google API access: `{}`", msg);
}
// retry
Error::Failure(v) => Err(v),
Error::HttpError(v) => Err(Response::new(Body::from(v.to_string()))),
Error::BadRequest(v) => Err(Response::new(Body::from(v.to_string()))),
Error::Io(v) => Err(Response::new(Body::from(v.to_string()))),
Error::JsonDecodeError(_, v) => Err(Response::new(Body::from(v.to_string()))),
Error::FieldClash(v) => Err(Response::new(Body::from(v.to_string()))),
Error::Cancelled => Err(Response::new(Body::from("cancelled"))),
Error::Failure(v) => Err(StorageError::Retriable(v)),
Error::HttpError(v) => Err(StorageError::Retriable(Response::new(Body::from(
v.to_string(),
)))),
Error::BadRequest(v) => Err(StorageError::NonRetriable(Response::new(Body::from(
v.to_string(),
)))),
Error::Io(v) => Err(StorageError::Retriable(Response::new(Body::from(
v.to_string(),
)))),
Error::JsonDecodeError(_, v) => Err(StorageError::NonRetriable(Response::new(
Body::from(v.to_string()),
))),
Error::FieldClash(v) => Err(StorageError::NonRetriable(Response::new(Body::from(
v.to_string(),
)))),
Error::Cancelled => Err(StorageError::Retriable(Response::new(Body::from(
"cancelled",
)))),
},
Ok(res) => Ok(res.1),
}
Expand Down Expand Up @@ -147,7 +160,7 @@ impl SpreadsheetAPI {
&self,
spreadsheet_id: &str,
sheet_id: SheetId,
) -> Result<Vec<Vec<Value>>, HttpResponse> {
) -> Result<Vec<Vec<Value>>, StorageError> {
let req = BatchGetValuesByDataFilterRequest {
data_filters: Some(vec![DataFilter {
grid_range: Some(GridRange {
Expand All @@ -168,7 +181,7 @@ impl SpreadsheetAPI {
.values_batch_get_by_data_filter(req, spreadsheet_id)
.doit()
.await;
tracing::debug!("{:?}", result);
tracing::trace!("{:?}", result);
let response = handle_error(self, result).await.map_err(|e| {
tracing::error!("{:?}", e);
e
Expand All @@ -188,7 +201,7 @@ impl SpreadsheetAPI {
&self,
spreadsheet_id: &str,
sheet_id: SheetId,
) -> Result<Vec<Vec<Value>>, HttpResponse> {
) -> Result<Vec<Vec<Value>>, StorageError> {
let mut state = self.state.lock().await;
state.get_sheet_data(spreadsheet_id, sheet_id).await
}
Expand Down Expand Up @@ -244,10 +257,10 @@ impl SpreadsheetAPI {
&self,
spreadsheet_id: &str,
metadata: &Metadata,
) -> Result<Vec<Sheet>, HttpResponse> {
) -> Result<Vec<Sheet>, StorageError> {
let result = self.get(spreadsheet_id, metadata).await;

tracing::debug!("{:?}", result);
tracing::trace!("{:?}", result);
let response = handle_error(self, result).await.map_err(|e| {
tracing::error!("{:?}", e);
e
Expand All @@ -271,7 +284,7 @@ impl SpreadsheetAPI {
updates: Vec<UpdateSheet>,
sheets: Vec<VirtualSheet>,
data: Vec<Rows>,
) -> Result<BatchUpdateSpreadsheetResponse, HttpResponse> {
) -> Result<BatchUpdateSpreadsheetResponse, StorageError> {
// capacity for actual usage
let mut requests = Vec::with_capacity(
truncates.len() + sheets.len() * 10 + data.len() * 2 + updates.len(),
Expand All @@ -292,7 +305,7 @@ impl SpreadsheetAPI {
requests.append(&mut rows.into_api_requests())
}

tracing::debug!("requests:\n{:?}", requests);
tracing::trace!("requests:\n{:?}", requests);

let req = BatchUpdateSpreadsheetRequest {
include_spreadsheet_in_response: Some(false),
Expand All @@ -303,7 +316,7 @@ impl SpreadsheetAPI {

let result = self.update(req, spreadsheet_id).await;

tracing::debug!("{:?}", result);
tracing::trace!("{:?}", result);
handle_error(self, result).await
}

Expand All @@ -314,7 +327,7 @@ impl SpreadsheetAPI {
updates: Vec<UpdateSheet>,
sheets: Vec<VirtualSheet>,
data: Vec<Rows>,
) -> Result<(), HttpResponse> {
) -> Result<(), StorageError> {
self._crud_sheets(spreadsheet_id, truncates, updates, sheets, data)
.await
.map_err(|e| {
Expand Down Expand Up @@ -421,7 +434,7 @@ pub(crate) mod tests {
}
}

pub(crate) fn bad_response(text: String) -> Error {
pub(crate) fn failure_response(text: String) -> Error {
Error::Failure(
HyperResponse::builder()
.status(StatusCode::BAD_REQUEST)
Expand All @@ -431,6 +444,10 @@ pub(crate) mod tests {
)
}

pub(crate) fn bad_response(text: String) -> Error {
Error::BadRequest(serde_json::json!(text))
}

pub(crate) async fn get(&mut self, _: &str) -> SheetsResult<(Response<Body>, Spreadsheet)> {
let response_duration_millis = self.basic_response_duration_millis;
self.basic_response_duration_millis =
Expand Down Expand Up @@ -466,7 +483,7 @@ pub(crate) mod tests {
&mut self,
_spreadsheet_id: &str,
_sheet_id: SheetId,
) -> Result<Vec<Vec<Value>>, HttpResponse> {
) -> Result<Vec<Vec<Value>>, StorageError> {
Ok(vec![])
}

Expand Down
Loading

0 comments on commit 018260b

Please sign in to comment.