diff --git a/Cargo.lock b/Cargo.lock index a9b6815..84c8c35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -660,7 +660,7 @@ dependencies = [ [[package]] name = "goral" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "async-trait", diff --git a/README.md b/README.md index 36f6a9a..81a95fe 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ So Goral provides the following features being deployed next to your app(s): * Memory: RSS 30M, 900M for virtual memory. An actual requirement may be different - as it depends on the amount of data, scrape and push intervals (see below for each [service](#services)) * Binary size is around 15 Mb -* Platforms: Linux, MacOS. Other platform will probably work also. +* Platforms: Linux (x86-64 or aarch64). Other platform will probably work also. ### Installation diff --git a/src/main.rs b/src/main.rs index 4bdba58..4767f48 100644 --- a/src/main.rs +++ b/src/main.rs @@ -95,7 +95,7 @@ async fn start() -> Result<(), String> { get_google_auth(&config.general.service_account_credentials_path).await; let (tx, rx) = setup_general_messenger_channel(); let sheets_api = SpreadsheetAPI::new(auth, tx.clone()); - let storage = Arc::new(Storage::new(args.id.to_string(), sheets_api, tx.clone())); + let storage = Arc::new(Storage::new(args.id.to_string(), sheets_api)); let shared = Shared::new(tx.clone()); let messengers = collect_messengers(&config); diff --git a/src/services/general/mod.rs b/src/services/general/mod.rs index 8f9a9b0..1caae15 100644 --- a/src/services/general/mod.rs +++ b/src/services/general/mod.rs @@ -134,8 +134,8 @@ impl Service for GeneralService { let latest = release.tag_name; if !release.prerelease && current != latest { let msg = format!( - "Your {APP_NAME} version `{current}` is not the latest `{latest}`,\ - consider [upgrading](https://github.com/maksimryndin/goral/releases).\ + "Your {APP_NAME} version `{current}` is not the latest `{latest}`, \ + consider [upgrading](https://github.com/maksimryndin/goral/releases). \ If you like {APP_NAME}, consider giving a star to the [repo](https://github.com/maksimryndin/goral). \ Thank you!" ); @@ -203,11 +203,7 @@ mod tests { send_notification.clone(), TestState::new(vec![], None, None), ); - let storage = Arc::new(Storage::new( - TEST_HOST_ID.to_string(), - sheets_api, - send_notification.clone(), - )); + let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api)); let log = AppendableLog::new( storage.clone(), "spreadsheet1".to_string(), diff --git a/src/services/kv/mod.rs b/src/services/kv/mod.rs index 647bc3f..b8e42a2 100644 --- a/src/services/kv/mod.rs +++ b/src/services/kv/mod.rs @@ -4,7 +4,7 @@ use crate::rules::RULES_LOG_NAME; use crate::services::kv::configuration::Kv; use crate::services::{messenger_queue, rules_notifications, Data, Service}; use crate::spreadsheet::datavalue::{Datarow, Datavalue}; -use crate::storage::{AppendError, AppendableLog}; +use crate::storage::{AppendableLog, StorageError}; use crate::{capture_datetime, MessengerApi, Notification, Sender, Shared}; use async_trait::async_trait; use chrono::{DateTime, NaiveDateTime, Utc}; @@ -121,7 +121,7 @@ struct Response { struct AppendRequest { datarows: Vec, - reply_to: oneshot::Sender, AppendError>>, + reply_to: oneshot::Sender, StorageError>>, } struct ReadyHandle(Arc); @@ -217,10 +217,11 @@ impl KvService { send_notification.fatal(msg).await; panic!("assert: kv service should be able to get a result of append"); } - Ok(Err(AppendError::ApiTimeout)) => { + Ok(Err(StorageError::Timeout(_))) | Ok(Err(StorageError::RetryTimeout(_))) => { panic!("assert: for kv service google api timeout is not applied"); } - Ok(Err(AppendError::Http(http_response))) => { + Ok(Err(StorageError::Retriable(http_response))) + | Ok(Err(StorageError::NonRetriable(http_response))) => { return Ok(HyperResponse::builder() .status( StatusCode::from_u16(http_response.status().as_u16()) @@ -372,6 +373,7 @@ impl Service for KvService { tasks.push(server); let tasks = try_join_all(tasks); tokio::pin!(tasks); + self.welcome_hook(&log).await; loop { tokio::select! { @@ -391,6 +393,9 @@ impl Service for KvService { // drain remaining messages while let Some(append_request) = data_receiver.recv().await { let AppendRequest{datarows, reply_to} = append_request; + let mut data = Data::Many(datarows); + self.send_for_rule_processing(&log, &mut data, &mut rules_input).await; + let Data::Many(datarows) = data else {panic!("assert: packing/unpacking of KV data")}; let res = log.append_no_retry(datarows).await; if reply_to.send(res).is_err() { tracing::warn!("client of the kv server dropped connection"); @@ -453,11 +458,7 @@ mod tests { send_notification.clone(), TestState::new(vec![], None, None), ); - let storage = Arc::new(Storage::new( - TEST_HOST_ID.to_string(), - sheets_api, - send_notification.clone(), - )); + let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api)); let log = AppendableLog::new( storage.clone(), "spreadsheet1".to_string(), diff --git a/src/services/mod.rs b/src/services/mod.rs index 085df68..40af147 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -270,13 +270,18 @@ pub trait Service: Send + Sync { self.shared().send_notification.fatal(msg.clone()).await; panic!("{}", msg); } + } + + async fn welcome_hook(&self, log: &AppendableLog) { let msg = format!( "service `{}` is running with [spreadsheet]({})", self.name(), log.spreadsheet_baseurl(), ); tracing::info!("{}", msg); - self.shared().send_notification.try_info(msg); + self.messenger() + .unwrap_or(self.shared().send_notification.clone()) + .try_info(msg); tracing::debug!( "channel capacity `{}` for service `{}`", self.channel_capacity(), @@ -505,6 +510,7 @@ pub trait Service: Send + Sync { let mut push_interval = tokio::time::interval(self.push_interval()); let mut rules_update_interval = tokio::time::interval(self.rules_update_interval()); let mut accumulated_data = vec![]; + self.welcome_hook(&log).await; loop { tokio::select! { result = shutdown.recv() => { @@ -522,7 +528,8 @@ pub trait Service: Send + Sync { _ = async { // drain remaining messages while let Some(task_result) = data_receiver.recv().await { - let data = self.process_task_result_on_shutdown(task_result, &log).await; + let mut data = self.process_task_result_on_shutdown(task_result, &log).await; + self.send_for_rule_processing(&log, &mut data, &mut rules_input).await; match data { Data::Empty | Data::Message(_) => {}, Data::Single(datarow) => {accumulated_data.push(datarow);} @@ -537,9 +544,28 @@ pub trait Service: Send + Sync { return; }, _ = push_interval.tick() => { + let rows_count = accumulated_data.len(); + if rows_count > 0 { + tracing::info!( + "appending {} rows for service {}", + rows_count, + self.name() + ); + } let example_rules = self.get_example_rules(); accumulated_data.extend(example_rules); - let _ = log.append(accumulated_data).await; + if let Err(e) = log.append(accumulated_data).await { + let msg = format!("{e} for service `{}`, failed to append {rows_count} rows", self.name()); + tracing::error!("{}", msg); + self.messenger().unwrap_or(self.shared().send_notification.clone()).try_error(msg); + } else if rows_count > 0 { + tracing::info!( + "appended {} rows for service {}", + rows_count, + self.name() + ); + } + accumulated_data = vec![]; }, _ = rules_update_interval.tick() => { @@ -685,11 +711,7 @@ mod tests { Some(APPEND_DURATION_MS / 2), // because append is 2 Google api calls ), ); - let storage = Arc::new(Storage::new( - TEST_HOST_ID.to_string(), - sheets_api, - tx.clone(), - )); + let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api)); let (shutdown, rx) = broadcast::channel(1); diff --git a/src/spreadsheet/spreadsheet.rs b/src/spreadsheet/spreadsheet.rs index c362c08..9bfbd55 100644 --- a/src/spreadsheet/spreadsheet.rs +++ b/src/spreadsheet/spreadsheet.rs @@ -1,5 +1,6 @@ use crate::spreadsheet::sheet::{CleanupSheet, Rows, Sheet, SheetId, UpdateSheet, VirtualSheet}; -use crate::spreadsheet::{HttpResponse, Metadata}; +use crate::spreadsheet::Metadata; +use crate::storage::StorageError; #[cfg(not(test))] use crate::HyperConnector; @@ -33,7 +34,7 @@ pub(crate) const GOOGLE_SPREADSHEET_MAXIMUM_CHARS_PER_CELL: usize = 50_000; async fn handle_error( spreadsheet: &SpreadsheetAPI, result: SheetsResult<(Response, T)>, -) -> Result { +) -> Result { match result { Err(e) => match e { // fatal @@ -64,13 +65,25 @@ async fn handle_error( panic!("Fatal error for Google API access: `{}`", msg); } // retry - Error::Failure(v) => Err(v), - Error::HttpError(v) => Err(Response::new(Body::from(v.to_string()))), - Error::BadRequest(v) => Err(Response::new(Body::from(v.to_string()))), - Error::Io(v) => Err(Response::new(Body::from(v.to_string()))), - Error::JsonDecodeError(_, v) => Err(Response::new(Body::from(v.to_string()))), - Error::FieldClash(v) => Err(Response::new(Body::from(v.to_string()))), - Error::Cancelled => Err(Response::new(Body::from("cancelled"))), + Error::Failure(v) => Err(StorageError::Retriable(v)), + Error::HttpError(v) => Err(StorageError::Retriable(Response::new(Body::from( + v.to_string(), + )))), + Error::BadRequest(v) => Err(StorageError::NonRetriable(Response::new(Body::from( + v.to_string(), + )))), + Error::Io(v) => Err(StorageError::Retriable(Response::new(Body::from( + v.to_string(), + )))), + Error::JsonDecodeError(_, v) => Err(StorageError::NonRetriable(Response::new( + Body::from(v.to_string()), + ))), + Error::FieldClash(v) => Err(StorageError::NonRetriable(Response::new(Body::from( + v.to_string(), + )))), + Error::Cancelled => Err(StorageError::Retriable(Response::new(Body::from( + "cancelled", + )))), }, Ok(res) => Ok(res.1), } @@ -147,7 +160,7 @@ impl SpreadsheetAPI { &self, spreadsheet_id: &str, sheet_id: SheetId, - ) -> Result>, HttpResponse> { + ) -> Result>, StorageError> { let req = BatchGetValuesByDataFilterRequest { data_filters: Some(vec![DataFilter { grid_range: Some(GridRange { @@ -168,7 +181,7 @@ impl SpreadsheetAPI { .values_batch_get_by_data_filter(req, spreadsheet_id) .doit() .await; - tracing::debug!("{:?}", result); + tracing::trace!("{:?}", result); let response = handle_error(self, result).await.map_err(|e| { tracing::error!("{:?}", e); e @@ -188,7 +201,7 @@ impl SpreadsheetAPI { &self, spreadsheet_id: &str, sheet_id: SheetId, - ) -> Result>, HttpResponse> { + ) -> Result>, StorageError> { let mut state = self.state.lock().await; state.get_sheet_data(spreadsheet_id, sheet_id).await } @@ -244,10 +257,10 @@ impl SpreadsheetAPI { &self, spreadsheet_id: &str, metadata: &Metadata, - ) -> Result, HttpResponse> { + ) -> Result, StorageError> { let result = self.get(spreadsheet_id, metadata).await; - tracing::debug!("{:?}", result); + tracing::trace!("{:?}", result); let response = handle_error(self, result).await.map_err(|e| { tracing::error!("{:?}", e); e @@ -271,7 +284,7 @@ impl SpreadsheetAPI { updates: Vec, sheets: Vec, data: Vec, - ) -> Result { + ) -> Result { // capacity for actual usage let mut requests = Vec::with_capacity( truncates.len() + sheets.len() * 10 + data.len() * 2 + updates.len(), @@ -292,7 +305,7 @@ impl SpreadsheetAPI { requests.append(&mut rows.into_api_requests()) } - tracing::debug!("requests:\n{:?}", requests); + tracing::trace!("requests:\n{:?}", requests); let req = BatchUpdateSpreadsheetRequest { include_spreadsheet_in_response: Some(false), @@ -303,7 +316,7 @@ impl SpreadsheetAPI { let result = self.update(req, spreadsheet_id).await; - tracing::debug!("{:?}", result); + tracing::trace!("{:?}", result); handle_error(self, result).await } @@ -314,7 +327,7 @@ impl SpreadsheetAPI { updates: Vec, sheets: Vec, data: Vec, - ) -> Result<(), HttpResponse> { + ) -> Result<(), StorageError> { self._crud_sheets(spreadsheet_id, truncates, updates, sheets, data) .await .map_err(|e| { @@ -421,7 +434,7 @@ pub(crate) mod tests { } } - pub(crate) fn bad_response(text: String) -> Error { + pub(crate) fn failure_response(text: String) -> Error { Error::Failure( HyperResponse::builder() .status(StatusCode::BAD_REQUEST) @@ -431,6 +444,10 @@ pub(crate) mod tests { ) } + pub(crate) fn bad_response(text: String) -> Error { + Error::BadRequest(serde_json::json!(text)) + } + pub(crate) async fn get(&mut self, _: &str) -> SheetsResult<(Response, Spreadsheet)> { let response_duration_millis = self.basic_response_duration_millis; self.basic_response_duration_millis = @@ -466,7 +483,7 @@ pub(crate) mod tests { &mut self, _spreadsheet_id: &str, _sheet_id: SheetId, - ) -> Result>, HttpResponse> { + ) -> Result>, StorageError> { Ok(vec![]) } diff --git a/src/storage.rs b/src/storage.rs index 88249e7..c89cde5 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -8,6 +8,7 @@ use crate::spreadsheet::{HttpResponse, Metadata, SpreadsheetAPI}; use crate::{get_service_tab_color, jitter_duration, Sender, HOST_ID_CHARS_LIMIT}; use chrono::{DateTime, Utc}; use std::collections::HashMap; +use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -23,32 +24,34 @@ const KEYS_DELIMITER: &str = "~^~"; pub struct Storage { google: SpreadsheetAPI, host_id: String, - send_notification: Sender, } impl Storage { - pub fn new(host_id: String, google: SpreadsheetAPI, send_notification: Sender) -> Self { + pub fn new(host_id: String, google: SpreadsheetAPI) -> Self { assert!( host_id.chars().count() <= HOST_ID_CHARS_LIMIT, "host id should be no more than {HOST_ID_CHARS_LIMIT} characters" ); - Self { - host_id, - google, - send_notification, - } + Self { host_id, google } } } #[derive(Debug)] -pub enum AppendError { - ApiTimeout, - Http(HttpResponse), +pub enum StorageError { + Timeout(Duration), + RetryTimeout((Duration, usize)), + Retriable(HttpResponse), + NonRetriable(HttpResponse), } -impl From for AppendError { - fn from(r: HttpResponse) -> Self { - AppendError::Http(r) +impl fmt::Display for StorageError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use StorageError::*; + match self { + Timeout(duration) => write!(f, "timeout {:?}", duration), + RetryTimeout((maximum_backoff, retry)) => write!(f, "Google API is unavailable: maximum retry duration `{maximum_backoff:?}` is reached with `{retry}` retries"), + Retriable(e) | NonRetriable(e) => write!(f, "response with status {}", e.status()), + } } } @@ -84,7 +87,7 @@ impl AppendableLog { } } - async fn fetch_sheets(&self) -> Result)>, AppendError> { + async fn fetch_sheets(&self) -> Result)>, StorageError> { let basic_metadata = Metadata::new(vec![ (METADATA_HOST_ID_KEY, self.storage.host_id.to_string()), (METADATA_SERVICE_KEY, self.service.to_string()), @@ -110,13 +113,13 @@ impl AppendableLog { .collect()) } - pub(crate) async fn healthcheck(&self) -> Result<(), AppendError> { + pub(crate) async fn healthcheck(&self) -> Result<(), StorageError> { self.fetch_sheets().await?; Ok(()) } - pub(crate) async fn append(&mut self, datarows: Vec) -> Result<(), AppendError> { - self._append( + pub(crate) async fn append(&mut self, datarows: Vec) -> Result<(), StorageError> { + self.core_append( datarows, Some(Duration::from_secs(32)), Some(Duration::from_secs(5)), @@ -127,7 +130,7 @@ impl AppendableLog { pub(crate) async fn append_no_retry( &mut self, mut datarows: Vec, - ) -> Result, AppendError> { + ) -> Result, StorageError> { let sheet_ids = datarows .iter_mut() .map(|d| { @@ -135,49 +138,16 @@ impl AppendableLog { self.sheet_url(sheet_id) }) .collect(); - self._append(datarows, None, None).await?; + self.core_append(datarows, None, None).await?; Ok(sheet_ids) } - async fn _append( - &mut self, - datarows: Vec, - retry_limit: Option, - timeout: Option, - ) -> Result<(), AppendError> { - let rows_count_wo_rules = datarows - .iter() - .filter(|d| d.log_name() != RULES_LOG_NAME) - .count(); - if rows_count_wo_rules > 0 { - tracing::info!( - "appending {} rows for service {}", - rows_count_wo_rules, - self.service - ); - } - - self._core_append(datarows, retry_limit, timeout) - .await - .map_err(|e| { - if rows_count_wo_rules > 0 { - let message = format!( - "Saving batch data of `{rows_count_wo_rules}` rows failed for service `{}`", - self.service - ); - tracing::error!("{}", message); - self.storage.send_notification.try_error(message); - } - e - }) - } - // https://developers.google.com/sheets/api/limits#example-algorithm async fn timed_fetch_sheets( &self, maximum_backoff: Duration, timeout: Duration, - ) -> Result)>, AppendError> { + ) -> Result)>, StorageError> { let mut total_time = Duration::from_millis(0); let mut wait = Duration::from_millis(2); let mut retry = 0; @@ -190,12 +160,10 @@ impl AppendableLog { tracing::warn!("No response from Google API for timeout {:?} for retry {} for service `{}`", timeout, retry, self.service); }, res = self.fetch_sheets() => { - match res { - Ok(val) => { - return Ok(val);}, - Err(e) => { - tracing::error!("error {:?} for retry {} for service `{}`", e, retry, self.service); - } + if let Err(StorageError::Retriable(e)) = res { + tracing::error!("error {:?} for service `{}` retrying #{}", e, self.service, retry); + } else { + return res; } } } @@ -211,19 +179,16 @@ impl AppendableLog { total_time += jittered; wait *= 2; } - let msg = format!("Google API request maximum retry duration `{maximum_backoff:?}` is reached with `{retry}` retries for service `{}`", self.service); - tracing::error!("{}", msg); - self.storage.send_notification.try_error(msg); - Err(AppendError::ApiTimeout) + Err(StorageError::RetryTimeout((maximum_backoff, retry))) } // for newly created log sheet its headers order is determined by its first datarow. Fields for other datarows for the same sheet are sorted accordingly. - async fn _core_append( + async fn core_append( &mut self, datarows: Vec, retry_limit: Option, timeout: Option, - ) -> Result<(), AppendError> { + ) -> Result<(), StorageError> { if datarows.is_empty() { return Ok(()); } @@ -243,7 +208,7 @@ impl AppendableLog { self.fetch_sheets().await? }; - tracing::debug!("existing sheets:\n{:?}", existing_sheets); + tracing::trace!("existing sheets:\n{:?}", existing_sheets); let mut rows_count: HashMap = existing_sheets .iter() @@ -364,10 +329,10 @@ impl AppendableLog { self.update_rows_metadata(&rows_count, &mut sheets_to_update, &mut sheets_to_add); let data: Vec = data_to_append.into_values().collect(); - tracing::debug!("truncate_requests:\n{:?}", truncate_requests); - tracing::debug!("sheets_to_update:\n{:?}", sheets_to_update); - tracing::debug!("sheets_to_add:\n{:?}", sheets_to_add); - tracing::debug!("data:\n{:?}", data); + tracing::trace!("truncate_requests:\n{:?}", truncate_requests); + tracing::trace!("sheets_to_update:\n{:?}", sheets_to_update); + tracing::trace!("sheets_to_add:\n{:?}", sheets_to_add); + tracing::trace!("data:\n{:?}", data); if sheets_to_add.is_empty() && sheets_to_update.is_empty() { return Ok(()); @@ -571,10 +536,10 @@ impl AppendableLog { .spreadsheet_baseurl(&self.spreadsheet_id) } - pub(crate) async fn get_rules(&self) -> Result, String> { + pub(crate) async fn get_rules(&self) -> Result, StorageError> { let timeout = Duration::from_millis(3000); tokio::select! { - _ = tokio::time::sleep(timeout) => Err(format!("timeout {:?}", timeout)), + _ = tokio::time::sleep(timeout) => Err(StorageError::Timeout(timeout)), res = self .storage .google @@ -582,7 +547,7 @@ impl AppendableLog { &self.spreadsheet_id, self.rules_sheet_id.expect("assert: rules sheet id is saved at the start of the service at the first append") ) => { - let data = res.map_err(|e| format!("response with status {}", e.status()))?; + let data = res?; Ok(data.into_iter() .filter_map(|row| Rule::try_from_values(row, self.messenger.as_ref())) .collect() @@ -663,11 +628,7 @@ mod tests { tx.clone(), TestState::new(vec![mock_ordinary_google_sheet("some sheet")], None, None), ); - let storage = Arc::new(Storage::new( - TEST_HOST_ID.to_string(), - sheets_api, - tx.clone(), - )); + let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api)); let mut log = AppendableLog::new( storage, "spreadsheet1".to_string(), @@ -846,15 +807,11 @@ mod tests { tx.clone(), TestState::new( vec![mock_ordinary_google_sheet("some sheet")], - Some(TestState::bad_response("error to retry".to_string())), + Some(TestState::failure_response("error to retry".to_string())), None, ), ); - let storage = Arc::new(Storage::new( - TEST_HOST_ID.to_string(), - sheets_api, - tx.clone(), - )); + let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api)); let mut log = AppendableLog::new( storage, "spreadsheet1".to_string(), @@ -904,11 +861,7 @@ mod tests { None, ), ); - let storage = Arc::new(Storage::new( - TEST_HOST_ID.to_string(), - sheets_api, - tx.clone(), - )); + let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api)); let mut log = AppendableLog::new( storage, "spreadsheet1".to_string(), @@ -965,11 +918,7 @@ mod tests { 50, ), ); - let storage = Arc::new(Storage::new( - TEST_HOST_ID.to_string(), - sheets_api, - tx.clone(), - )); + let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api)); let mut log = AppendableLog::new( storage, "spreadsheet1".to_string(), @@ -1004,7 +953,7 @@ mod tests { ]; let res = log - ._append( + .core_append( datarows, Some(Duration::from_millis(1200)), // approx 1050 maximum jitter, 150 ms for the first response Some(Duration::from_millis(100)), @@ -1015,26 +964,22 @@ mod tests { #[tokio::test] async fn append_retry_maximum_backoff() { - let (tx, mut rx) = mpsc::channel(1); + let (tx, _) = mpsc::channel(1); let tx = Sender::new(tx, GENERAL_SERVICE_NAME); let sheets_api = SpreadsheetAPI::new( tx.clone(), TestState::new( vec![mock_ordinary_google_sheet("some sheet")], - Some(TestState::bad_response("error to retry".to_string())), + Some(TestState::failure_response("error to retry".to_string())), Some(150), ), ); - let storage = Arc::new(Storage::new( - TEST_HOST_ID.to_string(), - sheets_api, - tx.clone(), - )); + let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api)); let mut log = AppendableLog::new( storage, "spreadsheet1".to_string(), GENERAL_SERVICE_NAME.to_string(), - Some(tx.clone()), + Some(tx), 100.0, ); @@ -1063,24 +1008,17 @@ mod tests { ), ]; - let handle = tokio::task::spawn(async move { - assert!( - rx.recv().await.is_some(), - "notification is sent for nonrecoverable error" - ); - }); let res = log - ._append( + .core_append( datarows, Some(Duration::from_millis(100)), Some(Duration::from_millis(200)), ) .await; assert!( - matches!(res, Err(AppendError::ApiTimeout)), + matches!(res, Err(StorageError::RetryTimeout(_))), "Google API request maximum retry duration should happen" ); - handle.await.unwrap(); } #[tokio::test] @@ -1096,11 +1034,7 @@ mod tests { None, ), ); - let storage = Arc::new(Storage::new( - TEST_HOST_ID.to_string(), - sheets_api, - tx.clone(), - )); + let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api)); let mut log = AppendableLog::new( storage, "spreadsheet1".to_string(), @@ -1161,11 +1095,7 @@ mod tests { { let tx = Sender::new(tx, GENERAL_SERVICE_NAME); let sheets_api = SpreadsheetAPI::new(tx.clone(), TestState::new(vec![], None, None)); - let storage = Arc::new(Storage::new( - TEST_HOST_ID.to_string(), - sheets_api, - tx.clone(), - )); + let storage = Arc::new(Storage::new(TEST_HOST_ID.to_string(), sheets_api)); // for simplicity we create logs with one key to easily // make assertions on rows count (only two columns - timestamp and key) let mut log = AppendableLog::new(