Skip to content

Commit

Permalink
feat: anyhow introduction (#38)
Browse files Browse the repository at this point in the history
* feat: anyhow introduction

* fix: readme example
  • Loading branch information
newfla authored Sep 12, 2024
1 parent 1216392 commit 20d503f
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 60 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tokio-native-tls = "0.3"
tokio-dtls-stream-sink = "0.6"
openssl = { version = "0.10", features = ["vendored"] }
socket2 = "0.5.7"
anyhow = "1.0.88"

[dev-dependencies]
mimalloc = { version = "0.1", default-features = false }
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Based on [tokio](https://github.com/tokio-rs/tokio) and [rust rdkafka](https://g
It's strongly encouraged the use of alternative allocator like [MiMalloc](https://crates.io/crates/mimalloc)

```no_run
use anyhow::Result;
use std::collections::HashMap;
use mimalloc::MiMalloc;
use crab_kafka::{forwarder::ForwarderBuilder,Receiver,PartitionStrategies,CheckpointStrategies,TransformStrategies};
Expand All @@ -27,7 +28,7 @@ use crab_kafka::{forwarder::ForwarderBuilder,Receiver,PartitionStrategies,Checkp
static GLOBAL: MiMalloc = MiMalloc;
#[tokio::main]
async fn main() -> Result<(),String> {
async fn main() -> Result<()> {
ForwarderBuilder::default()
.receiver(Receiver::new_tcp_stream("127.0.0.1".to_owned(), "8888".to_owned(), 2000))
.checkpoint(CheckpointStrategies::OpenDoors)
Expand Down
51 changes: 25 additions & 26 deletions src/forwarder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::Duration,
};

use anyhow::{anyhow, Result};
use derive_builder::Builder;
use kanal::{bounded_async, unbounded_async};
use rdkafka::{
Expand All @@ -26,7 +27,7 @@ use crate::{
sender::KafkaPacketSenderBuilder,
statistics::StatisticsTaskBuilder,
strategies::{CheckpointStrategy, PartitionStrategy},
ForwarderReturn, Receiver, TransformStrategy,
Receiver, TransformStrategy,
};

type GlobalForwarder = Mutex<Vec<ForwarderShutdownHandle>>;
Expand Down Expand Up @@ -131,7 +132,7 @@ where
P: PartitionStrategy + Send + 'static,
T: TransformStrategy + Clone + Send + Sync + 'static,
{
async fn run(mut self) -> ForwarderReturn {
async fn run(mut self) -> Result<()> {
let producer = self.build_kafka_producer()?;
let partitions_count = self.find_partition_number(&producer)? as i32;

Expand All @@ -149,8 +150,7 @@ where
.output_topic(ustr(&self.topic))
.stats_tx(stats_tx)
.transform_strategy(Arc::new(self.transform))
.build()
.map_err(|err| err.to_string())?;
.build()?;

self.partition.set_num_partitions(partitions_count);

Expand All @@ -159,23 +159,20 @@ where
.shutdown_token(handle.cancel_token.clone())
.stats_rx(stats_rx)
.timeout(self.stats_interval)
.build()
.map_err(|err| err.to_string())?;
.build()?;

let receiver_task = ReceiverTaskBuilder::from(self.receiver)
.shutdown_token(handle.cancel_token.clone())
.dispatcher_sender(dispatcher_tx)
.build()
.map_err(|err| err.to_string())?;
.build()?;

let dispatcher_task = DispatcherTaskBuilder::default()
.shutdown_token(handle.cancel_token.clone())
.dispatcher_receiver(dispatcher_rx)
.checkpoint_strategy(self.checkpoint)
.partition_strategy(self.partition)
.kafka_sender(kafka_sender)
.build()
.map_err(|err| err.to_string())?;
.build()?;

//Schedule task
let mut task_set = JoinSet::new();
Expand All @@ -194,30 +191,32 @@ where
GLOBAL_HANDLE.get().unwrap().lock().unwrap()[self.id].clone()
}

fn build_kafka_producer(&self) -> Result<FutureProducer, String> {
fn build_kafka_producer(&self) -> Result<FutureProducer> {
let mut client_config = ClientConfig::new();
self.kafka_settings.iter().for_each(|(key, value)| {
client_config.set(key, value);
});
client_config.create().map_err(|err| err.to_string())
let producer = client_config.create()?;
Ok(producer)
}

fn find_partition_number(&self, producer: &FutureProducer) -> Result<usize, String> {
fn find_partition_number(&self, producer: &FutureProducer) -> Result<usize> {
let topic_name = self.topic.as_str();
let timeout = Duration::from_secs(30);

match producer.client().fetch_metadata(Some(topic_name), timeout) {
Err(_) => Err("Failed to retrieve topic metadata".to_string()),
Ok(metadata) => match metadata.topics().first() {
None => Err("Topic".to_string() + topic_name + "not found"),
Some(data) => {
if data.partitions().is_empty() {
Err("Topic has 0 partitions".to_string())
} else {
Ok(data.partitions().len())
}
}
},
let metadata = producer
.client()
.fetch_metadata(Some(topic_name), timeout)?;
let topics = metadata
.topics()
.first()
.map(|m| m.partitions().len())
.ok_or(anyhow!("Topic {} not found", topic_name));

if let Ok(0) = topics {
Err(anyhow!("Topic has 0 partitions"))
} else {
topics
}
}
}
Expand All @@ -228,7 +227,7 @@ where
P: PartitionStrategy + Send + 'static,
T: TransformStrategy + Clone + Send + Sync + 'static,
{
type Output = ForwarderReturn;
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@ type DataPacket = (Vec<u8>, SocketAddr, Instant);
type PartitionDetails = (Option<i32>, Ustr, Ustr);
type Ticket = Arc<Notify>;
type DataTransmitted = Option<StatisticData>;
type ForwarderReturn = Result<(), String>;
type TlsOption = (Option<String>, Option<String>);
48 changes: 16 additions & 32 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
pin::Pin,
};

use anyhow::Result;
use branches::unlikely;
use coarsetime::Instant;
use derive_builder::Builder;
Expand Down Expand Up @@ -481,43 +482,26 @@ impl ReceiverTask {
});
}

fn build_openssl_context(cert: String, key: String) -> Result<SslContext, ()> {
let mut ctx = SslContext::builder(SslMethod::dtls()).unwrap();

let setup_context = ctx
.set_private_key_file(key, SslFiletype::PEM)
.and_then(|_| ctx.set_certificate_chain_file(cert))
.and_then(|_| ctx.check_private_key());
setup_context
.map_err(|err| {
error!("{}", err);
})
.map(|_| ctx.build())
fn build_openssl_context(cert: String, key: String) -> Result<SslContext> {
let mut ctx_builder = SslContext::builder(SslMethod::dtls())?;

ctx_builder.set_private_key_file(key, SslFiletype::PEM)?;
ctx_builder.set_certificate_chain_file(cert)?;
ctx_builder.check_private_key()?;
Ok(ctx_builder.build())
}

fn build_udp_socket_reuse_addr_port(addr: &SocketAddr) -> Result<UdpSocket, ()> {
fn build_udp_socket_reuse_addr_port(addr: &SocketAddr) -> Result<UdpSocket> {
let address = (*addr).into();

let socket =
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).map_err(|err| {
error!("{}", err);
})?;
socket.set_reuse_address(true).map_err(|err| {
error!("{}", err);
})?;
socket.set_reuse_port(true).map_err(|err| {
error!("{}", err);
})?;
socket.bind(&address).map_err(|err| {
error!("{}", err);
})?;
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
socket.set_reuse_address(true)?;
socket.set_reuse_port(true)?;
socket.bind(&address)?;
let std_sock: std::net::UdpSocket = socket.into();
std_sock.set_nonblocking(true).map_err(|err| {
error!("{}", err);
})?;
UdpSocket::from_std(std_sock).map_err(|err| {
error!("{}", err);
})
std_sock.set_nonblocking(true)?;
let socket = UdpSocket::from_std(std_sock)?;
Ok(socket)
}

async fn run(self) {
Expand Down

0 comments on commit 20d503f

Please sign in to comment.