From 20d503f8e23cae10243d806a09b10dced6c495bb Mon Sep 17 00:00:00 2001 From: Flavio Bizzarri <33379291+newfla@users.noreply.github.com> Date: Thu, 12 Sep 2024 10:34:47 +0200 Subject: [PATCH] feat: anyhow introduction (#38) * feat: anyhow introduction * fix: readme example --- Cargo.toml | 1 + README.md | 3 ++- src/forwarder/mod.rs | 51 ++++++++++++++++++++++---------------------- src/lib.rs | 1 - src/receiver/mod.rs | 48 ++++++++++++++--------------------------- 5 files changed, 44 insertions(+), 60 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4ef8b40..7901fa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ tokio-native-tls = "0.3" tokio-dtls-stream-sink = "0.6" openssl = { version = "0.10", features = ["vendored"] } socket2 = "0.5.7" +anyhow = "1.0.88" [dev-dependencies] mimalloc = { version = "0.1", default-features = false } diff --git a/README.md b/README.md index 168ab73..726ce6c 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Based on [tokio](https://github.com/tokio-rs/tokio) and [rust rdkafka](https://g It's strongly encouraged the use of alternative allocator like [MiMalloc](https://crates.io/crates/mimalloc) ```no_run +use anyhow::Result; use std::collections::HashMap; use mimalloc::MiMalloc; use crab_kafka::{forwarder::ForwarderBuilder,Receiver,PartitionStrategies,CheckpointStrategies,TransformStrategies}; @@ -27,7 +28,7 @@ use crab_kafka::{forwarder::ForwarderBuilder,Receiver,PartitionStrategies,Checkp static GLOBAL: MiMalloc = MiMalloc; #[tokio::main] -async fn main() -> Result<(),String> { +async fn main() -> Result<()> { ForwarderBuilder::default() .receiver(Receiver::new_tcp_stream("127.0.0.1".to_owned(), "8888".to_owned(), 2000)) .checkpoint(CheckpointStrategies::OpenDoors) diff --git a/src/forwarder/mod.rs b/src/forwarder/mod.rs index f860e31..6f6884e 100644 --- a/src/forwarder/mod.rs +++ b/src/forwarder/mod.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use anyhow::{anyhow, Result}; use derive_builder::Builder; use kanal::{bounded_async, unbounded_async}; use rdkafka::{ @@ -26,7 +27,7 @@ use crate::{ sender::KafkaPacketSenderBuilder, statistics::StatisticsTaskBuilder, strategies::{CheckpointStrategy, PartitionStrategy}, - ForwarderReturn, Receiver, TransformStrategy, + Receiver, TransformStrategy, }; type GlobalForwarder = Mutex>; @@ -131,7 +132,7 @@ where P: PartitionStrategy + Send + 'static, T: TransformStrategy + Clone + Send + Sync + 'static, { - async fn run(mut self) -> ForwarderReturn { + async fn run(mut self) -> Result<()> { let producer = self.build_kafka_producer()?; let partitions_count = self.find_partition_number(&producer)? as i32; @@ -149,8 +150,7 @@ where .output_topic(ustr(&self.topic)) .stats_tx(stats_tx) .transform_strategy(Arc::new(self.transform)) - .build() - .map_err(|err| err.to_string())?; + .build()?; self.partition.set_num_partitions(partitions_count); @@ -159,14 +159,12 @@ where .shutdown_token(handle.cancel_token.clone()) .stats_rx(stats_rx) .timeout(self.stats_interval) - .build() - .map_err(|err| err.to_string())?; + .build()?; let receiver_task = ReceiverTaskBuilder::from(self.receiver) .shutdown_token(handle.cancel_token.clone()) .dispatcher_sender(dispatcher_tx) - .build() - .map_err(|err| err.to_string())?; + .build()?; let dispatcher_task = DispatcherTaskBuilder::default() .shutdown_token(handle.cancel_token.clone()) @@ -174,8 +172,7 @@ where .checkpoint_strategy(self.checkpoint) .partition_strategy(self.partition) .kafka_sender(kafka_sender) - .build() - .map_err(|err| err.to_string())?; + .build()?; //Schedule task let mut task_set = JoinSet::new(); @@ -194,30 +191,32 @@ where GLOBAL_HANDLE.get().unwrap().lock().unwrap()[self.id].clone() } - fn build_kafka_producer(&self) -> Result { + fn build_kafka_producer(&self) -> Result { let mut client_config = ClientConfig::new(); self.kafka_settings.iter().for_each(|(key, value)| { client_config.set(key, value); }); - client_config.create().map_err(|err| err.to_string()) + let producer = client_config.create()?; + Ok(producer) } - fn find_partition_number(&self, producer: &FutureProducer) -> Result { + fn find_partition_number(&self, producer: &FutureProducer) -> Result { let topic_name = self.topic.as_str(); let timeout = Duration::from_secs(30); - match producer.client().fetch_metadata(Some(topic_name), timeout) { - Err(_) => Err("Failed to retrieve topic metadata".to_string()), - Ok(metadata) => match metadata.topics().first() { - None => Err("Topic".to_string() + topic_name + "not found"), - Some(data) => { - if data.partitions().is_empty() { - Err("Topic has 0 partitions".to_string()) - } else { - Ok(data.partitions().len()) - } - } - }, + let metadata = producer + .client() + .fetch_metadata(Some(topic_name), timeout)?; + let topics = metadata + .topics() + .first() + .map(|m| m.partitions().len()) + .ok_or(anyhow!("Topic {} not found", topic_name)); + + if let Ok(0) = topics { + Err(anyhow!("Topic has 0 partitions")) + } else { + topics } } } @@ -228,7 +227,7 @@ where P: PartitionStrategy + Send + 'static, T: TransformStrategy + Clone + Send + Sync + 'static, { - type Output = ForwarderReturn; + type Output = Result<()>; type IntoFuture = Pin + Send>>; fn into_future(self) -> Self::IntoFuture { diff --git a/src/lib.rs b/src/lib.rs index 7b63b31..7fbcbb2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,5 +26,4 @@ type DataPacket = (Vec, SocketAddr, Instant); type PartitionDetails = (Option, Ustr, Ustr); type Ticket = Arc; type DataTransmitted = Option; -type ForwarderReturn = Result<(), String>; type TlsOption = (Option, Option); diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs index b9845b5..05c4803 100644 --- a/src/receiver/mod.rs +++ b/src/receiver/mod.rs @@ -7,6 +7,7 @@ use std::{ pin::Pin, }; +use anyhow::Result; use branches::unlikely; use coarsetime::Instant; use derive_builder::Builder; @@ -481,43 +482,26 @@ impl ReceiverTask { }); } - fn build_openssl_context(cert: String, key: String) -> Result { - let mut ctx = SslContext::builder(SslMethod::dtls()).unwrap(); - - let setup_context = ctx - .set_private_key_file(key, SslFiletype::PEM) - .and_then(|_| ctx.set_certificate_chain_file(cert)) - .and_then(|_| ctx.check_private_key()); - setup_context - .map_err(|err| { - error!("{}", err); - }) - .map(|_| ctx.build()) + fn build_openssl_context(cert: String, key: String) -> Result { + let mut ctx_builder = SslContext::builder(SslMethod::dtls())?; + + ctx_builder.set_private_key_file(key, SslFiletype::PEM)?; + ctx_builder.set_certificate_chain_file(cert)?; + ctx_builder.check_private_key()?; + Ok(ctx_builder.build()) } - fn build_udp_socket_reuse_addr_port(addr: &SocketAddr) -> Result { + fn build_udp_socket_reuse_addr_port(addr: &SocketAddr) -> Result { let address = (*addr).into(); - let socket = - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).map_err(|err| { - error!("{}", err); - })?; - socket.set_reuse_address(true).map_err(|err| { - error!("{}", err); - })?; - socket.set_reuse_port(true).map_err(|err| { - error!("{}", err); - })?; - socket.bind(&address).map_err(|err| { - error!("{}", err); - })?; + let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; + socket.set_reuse_address(true)?; + socket.set_reuse_port(true)?; + socket.bind(&address)?; let std_sock: std::net::UdpSocket = socket.into(); - std_sock.set_nonblocking(true).map_err(|err| { - error!("{}", err); - })?; - UdpSocket::from_std(std_sock).map_err(|err| { - error!("{}", err); - }) + std_sock.set_nonblocking(true)?; + let socket = UdpSocket::from_std(std_sock)?; + Ok(socket) } async fn run(self) {