Skip to content

Commit

Permalink
fix: resume connectors on network errors (#1956)
Browse files Browse the repository at this point in the history
* fix: retry on network errors in MySQL connector

Resume database queries on network errors.
Select queries with multiple rows resume from the last row received.
CDC continues from its last position.

* fix: retry on network errors in Postgres connector

Similarly to the MySQL connector, select queries resume from the last row received.
The CDC resumes from the position where it was stopped.

* fix: retry on network errors in Kafka connector

Detect network failures, reconnect, and resume.

* fix: retry on network errors in Object Store connector

This is not a complete solution but we should use retry infrastructure provided by the object_store crate.

* chore: add sleep between retries

---------

Co-authored-by: chubei <[email protected]>
  • Loading branch information
abcpro1 and chubei authored Sep 1, 2023
1 parent 420af04 commit 5244a7d
Show file tree
Hide file tree
Showing 34 changed files with 970 additions and 202 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mysql_async = { version = "0.32.2", default-features = false, features = ["defau
mysql_common = { version = "0.30", default-features = false, features = ["chrono", "rust_decimal"] }
chrono = "0.4.26"
geozero = { version = "0.10.0", default-features = false, features = ["with-wkb"] }
bytes = "1.4.0"

[dev-dependencies]
criterion = { version = "0.4.0", features = ["html_reports"] }
Expand Down
13 changes: 4 additions & 9 deletions dozer-ingestion/src/connectors/kafka/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,14 @@ async fn run(
ingestor: &Ingestor,
schema_registry_url: &Option<String>,
) -> Result<(), ConnectorError> {
let con: BaseConsumer = ClientConfig::new()
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", broker)
.set("group.id", "dozer")
.set("enable.auto.commit", "true")
.create()
.map_err(KafkaConnectionError)?;

let topics: Vec<&str> = tables.iter().map(|t| t.name.as_ref()).collect();
con.subscribe(topics.iter().as_slice())
.map_err(KafkaConnectionError)?;
.set("enable.auto.commit", "true");

let consumer = StreamConsumerBasic::default();
consumer
.run(con, ingestor, tables, schema_registry_url)
.run(client_config, ingestor, tables, schema_registry_url)
.await
}
17 changes: 14 additions & 3 deletions dozer-ingestion/src/connectors/kafka/debezium/schema_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::connectors::{CdcType, SourceSchema};
use crate::errors::KafkaError::{JsonDecodeError, SchemaRegistryFetchError};
use crate::errors::KafkaSchemaError::TypeNotSupported;
use crate::errors::{ConnectorError, KafkaError, KafkaSchemaError};
use dozer_types::log::error;
use dozer_types::serde_json;
use dozer_types::serde_json::Value;
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
Expand Down Expand Up @@ -65,13 +66,23 @@ impl SchemaRegistry {
table_name: &str,
is_key: bool,
) -> Result<DebeziumSchemaStruct, KafkaError> {
let schema_result =
schema_registry_converter::async_impl::schema_registry::get_schema_by_subject(
let schema_result = loop {
match schema_registry_converter::async_impl::schema_registry::get_schema_by_subject(
sr_settings,
&SubjectNameStrategy::TopicNameStrategy(table_name.to_string(), is_key),
)
.await
.map_err(SchemaRegistryFetchError)?;
{
Ok(schema_result) => break schema_result,
Err(err) if err.retriable => {
const RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
error!("schema registry fetch error {err}. retrying in {RETRY_INTERVAL:?}...");
tokio::time::sleep(RETRY_INTERVAL).await;
continue;
}
Err(err) => return Err(SchemaRegistryFetchError(err)),
}
};

serde_json::from_str::<DebeziumSchemaStruct>(&schema_result.schema).map_err(JsonDecodeError)
}
Expand Down
28 changes: 20 additions & 8 deletions dozer-ingestion/src/connectors/kafka/debezium/stream_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::connectors::kafka::debezium::mapper::convert_value_to_schema;
use crate::connectors::kafka::debezium::schema::map_schema;
use crate::connectors::kafka::stream_consumer::StreamConsumer;
use crate::connectors::kafka::stream_consumer_helper::{
is_network_failure, OffsetsMap, StreamConsumerHelper,
};
use crate::errors::KafkaError::{BytesConvertError, JsonDecodeError};
use crate::errors::{ConnectorError, KafkaError, KafkaStreamError};
use crate::ingestion::Ingestor;
Expand All @@ -10,10 +13,9 @@ use dozer_types::serde::{Deserialize, Serialize};
use dozer_types::serde_json;
use dozer_types::serde_json::Value;
use dozer_types::types::{Operation, Record};
use rdkafka::consumer::BaseConsumer;

use crate::connectors::TableInfo;
use rdkafka::Message;
use rdkafka::{ClientConfig, Message};
use tonic::async_trait;

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -85,16 +87,26 @@ impl DebeziumStreamConsumer {}
impl StreamConsumer for DebeziumStreamConsumer {
async fn run(
&self,
con: BaseConsumer,
client_config: ClientConfig,
ingestor: &Ingestor,
_tables: Vec<TableInfo>,
tables: Vec<TableInfo>,
_schema_registry_url: &Option<String>,
) -> Result<(), ConnectorError> {
let topics: Vec<&str> = tables.iter().map(|t| t.name.as_str()).collect();
let mut con = StreamConsumerHelper::start(&client_config, &topics).await?;
let mut offsets = OffsetsMap::new();
loop {
let m = con
.poll(None)
.unwrap()
.map_err(|e| KafkaError::KafkaStreamError(KafkaStreamError::PollingError(e)))?;
let m = match con.poll(None).unwrap() {
Ok(m) => m,
Err(err) if is_network_failure(&err) => {
con = StreamConsumerHelper::resume(&client_config, &topics, &offsets).await?;
continue;
}
Err(err) => Err(KafkaError::KafkaStreamError(
KafkaStreamError::PollingError(err),
))?,
};
StreamConsumerHelper::update_offsets(&mut offsets, &m);

if let (Some(message), Some(key)) = (m.payload(), m.key()) {
let mut value_struct: DebeziumMessage =
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/src/connectors/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod no_schema_registry_basic;
pub mod schema_registry_basic;
pub mod stream_consumer;
pub mod stream_consumer_basic;
mod stream_consumer_helper;
#[cfg(any(test, feature = "debezium_bench"))]
pub mod test_utils;
#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/src/connectors/kafka/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use crate::errors::ConnectorError;
use crate::ingestion::Ingestor;

use crate::connectors::TableInfo;
use rdkafka::consumer::BaseConsumer;
use rdkafka::ClientConfig;
use tonic::async_trait;

#[async_trait]
pub trait StreamConsumer {
async fn run(
&self,
con: BaseConsumer,
client_config: ClientConfig,
ingestor: &Ingestor,
tables: Vec<TableInfo>,
schema_registry_url: &Option<String>,
Expand Down
20 changes: 15 additions & 5 deletions dozer-ingestion/src/connectors/kafka/stream_consumer_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ use dozer_types::serde::{Deserialize, Serialize};
use dozer_types::serde_json;
use dozer_types::serde_json::Value;
use dozer_types::types::{Field, Operation, Record};
use rdkafka::consumer::BaseConsumer;

use crate::connectors::kafka::no_schema_registry_basic::NoSchemaRegistryBasic;
use crate::connectors::kafka::schema_registry_basic::SchemaRegistryBasic;
use tonic::async_trait;

use crate::connectors::TableInfo;
use crate::errors::KafkaStreamError::PollingError;
use rdkafka::Message;
use rdkafka::{ClientConfig, Message};

use super::stream_consumer_helper::{is_network_failure, OffsetsMap, StreamConsumerHelper};

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(crate = "dozer_types::serde")]
Expand Down Expand Up @@ -79,17 +80,17 @@ pub struct Payload {
#[derive(Default)]
pub struct StreamConsumerBasic {}

impl StreamConsumerBasic {}

#[async_trait]
impl StreamConsumer for StreamConsumerBasic {
async fn run(
&self,
con: BaseConsumer,
client_config: ClientConfig,
ingestor: &Ingestor,
tables: Vec<TableInfo>,
schema_registry_url: &Option<String>,
) -> Result<(), ConnectorError> {
let topics: Vec<String> = tables.iter().map(|t| t.name.clone()).collect();

let mut schemas = HashMap::new();
for (table_index, table) in tables.into_iter().enumerate() {
let schema = if let Some(url) = schema_registry_url {
Expand All @@ -101,10 +102,19 @@ impl StreamConsumer for StreamConsumerBasic {
schemas.insert(table.name.clone(), (table_index, schema));
}

let topics: Vec<&str> = topics.iter().map(|t| t.as_str()).collect();
let mut con = StreamConsumerHelper::start(&client_config, &topics).await?;

let mut offsets = OffsetsMap::new();
let mut counter = 0;
loop {
if let Some(result) = con.poll(None) {
if matches!(result.as_ref(), Err(err) if is_network_failure(err)) {
con = StreamConsumerHelper::resume(&client_config, &topics, &offsets).await?;
continue;
}
let m = result.map_err(|e| KafkaStreamError(PollingError(e)))?;
StreamConsumerHelper::update_offsets(&mut offsets, &m);
match schemas.get(m.topic()) {
None => return Err(ConnectorError::KafkaError(TopicNotDefined)),
Some((table_index, (schema, fields_map))) => {
Expand Down
114 changes: 114 additions & 0 deletions dozer-ingestion/src/connectors/kafka/stream_consumer_helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use crate::errors::ConnectorError;
use crate::errors::KafkaError::KafkaConnectionError;
use rdkafka::{
consumer::{BaseConsumer, Consumer},
message::BorrowedMessage,
util::Timeout,
ClientConfig, Message, Offset,
};
use std::collections::HashMap;

pub struct StreamConsumerHelper;

pub type OffsetsMap = HashMap<String, (i32, i64)>; // key: topic, value: (partition, offset)

impl StreamConsumerHelper {
pub async fn start(
client_config: &ClientConfig,
topics: &[&str],
) -> Result<BaseConsumer, ConnectorError> {
Self::resume_impl(client_config, topics, None).await
}

pub async fn resume(
client_config: &ClientConfig,
topics: &[&str],
offsets: &OffsetsMap,
) -> Result<BaseConsumer, ConnectorError> {
Self::resume_impl(client_config, topics, Some(offsets)).await
}

pub fn update_offsets(offsets: &mut OffsetsMap, message: &BorrowedMessage<'_>) {
let _ = offsets.insert(
message.topic().into(),
(message.partition(), message.offset()),
);
}

async fn resume_impl(
client_config: &ClientConfig,
topics: &[&str],
offsets: Option<&OffsetsMap>,
) -> Result<BaseConsumer, ConnectorError> {
loop {
match Self::try_resume(client_config, topics, offsets).await {
Ok(con) => return Ok(con),
Err(err) if is_network_failure(&err) => {
const RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
dozer_types::log::error!(
"stream resume error {err}. retrying in {RETRY_INTERVAL:?}..."
);
tokio::time::sleep(RETRY_INTERVAL).await;
continue;
}
Err(err) => Err(KafkaConnectionError(err))?,
}
}
}

async fn try_resume(
client_config: &ClientConfig,
topics: &[&str],
offsets: Option<&OffsetsMap>,
) -> Result<BaseConsumer, rdkafka::error::KafkaError> {
let con: BaseConsumer = client_config.create()?;
con.subscribe(topics.iter().as_slice())?;

if let Some(offsets) = offsets {
for (topic, &(partition, offset)) in offsets.iter() {
con.seek(topic, partition, Offset::Offset(offset), Timeout::Never)?;
}
}

Ok(con)
}
}

pub fn is_network_failure(err: &rdkafka::error::KafkaError) -> bool {
use rdkafka::error::KafkaError::*;
let error_code = match err {
ConsumerCommit(error_code) => error_code,
Flush(error_code) => error_code,
Global(error_code) => error_code,
GroupListFetch(error_code) => error_code,
MessageConsumption(error_code) => error_code,
MessageProduction(error_code) => error_code,
MetadataFetch(error_code) => error_code,
OffsetFetch(error_code) => error_code,
Rebalance(error_code) => error_code,
SetPartitionOffset(error_code) => error_code,
StoreOffset(error_code) => error_code,
MockCluster(error_code) => error_code,
Transaction(rdkafka_err) => return rdkafka_err.is_retriable(),
other => {
dozer_types::log::warn!(
"unregonized kafka error error: {other}. treating as non-network error."
);
return false;
}
};
use rdkafka::types::RDKafkaErrorCode::*;
matches!(
error_code,
Fail | BrokerTransportFailure
| Resolve
| MessageTimedOut
| AllBrokersDown
| OperationTimedOut
| TimedOutQueue
| Retry
| PollExceeded
| RequestTimedOut
| NetworkException
)
}
21 changes: 16 additions & 5 deletions dozer-ingestion/src/connectors/mysql/binlog.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use super::{
connection::Conn,
conversion::{IntoField, IntoFields, IntoJsonValue},
schema::{ColumnDefinition, TableDefinition},
};
use crate::{
connectors::mysql::connection::is_network_failure,
errors::{ConnectorError, MySQLConnectorError},
ingestion::Ingestor,
};
Expand All @@ -13,7 +15,7 @@ use dozer_types::{
};
use dozer_types::{json_types::JsonValue, types::Field};
use futures::StreamExt;
use mysql_async::{binlog::EventFlags, prelude::Queryable, BinlogStream, Conn, Pool};
use mysql_async::{binlog::EventFlags, BinlogStream, Pool};
use mysql_common::{
binlog::{
self,
Expand Down Expand Up @@ -99,8 +101,7 @@ impl<'a, 'b, 'c, 'd, 'e> BinlogIngestor<'a, 'b, 'c, 'd, 'e> {

impl BinlogIngestor<'_, '_, '_, '_, '_> {
async fn connect(&self) -> Result<Conn, MySQLConnectorError> {
self.conn_pool
.get_conn()
Conn::new(self.conn_pool.clone())
.await
.map_err(|err| MySQLConnectorError::ConnectionFailure(self.conn_url.clone(), err))
}
Expand Down Expand Up @@ -138,15 +139,25 @@ impl BinlogIngestor<'_, '_, '_, '_, '_> {
let mut seq_no = 0;
let mut table_cache = BinlogTableCache::new(self.tables);

'binlog_read: while let Some(event) = self.binlog_stream.as_mut().unwrap().next().await {
'binlog_read: while let Some(result) = self.binlog_stream.as_mut().unwrap().next().await {
match self.local_stop_position {
Some(stop_position) if self.next_position.position >= stop_position => {
break 'binlog_read;
}
_ => {}
}

let binlog_event = event.map_err(MySQLConnectorError::BinlogReadError)?;
let binlog_event = match result {
Ok(event) => event,
Err(err) => {
if is_network_failure(&err) {
self.open_binlog().await?;
continue 'binlog_read;
} else {
Err(MySQLConnectorError::BinlogReadError(err))?
}
}
};

let is_artificial = binlog_event
.header()
Expand Down
Loading

0 comments on commit 5244a7d

Please sign in to comment.