-
Hi, I am trying to implement a Kafka consumer (with use flume::Sender;
use futures::TryStreamExt;
use rdkafka::consumer::{stream_consumer::StreamConsumer, MessageStream};
use rdkafka::{admin::ConfigSource, config::ClientConfig};
pub struct Click;
pub struct ClicksConsumer {
consumer: StreamConsumer,
tx: Sender<Click>,
}
impl ClicksConsumer {
pub fn new(brokers: String, group_id: String, tx: Sender<Click>) -> ClicksConsumer {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", &group_id)
.set("bootstrap.servers", &brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
ClicksConsumer { consumer, tx }
}
pub async fn start(&self) {
tokio::spawn(async move {
let stream = self.consumer.start();
stream.try_for_each(|burrwed_msg| {
let message = burrwed_msg.detach();
async move {
self.tx.send(Click {}); // this is blocking the task. is that a good idea?
Ok(())
}
})
});
}
} I am getting this error from the compiler which I don't know how ti interpret:
This |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
This is due to Rust's single-ownership principle. To give the new task ownership of the One approach is to change pub async fn start(self) {
tokio::spawn(async move {
let stream = self.consumer.start();
stream.try_for_each(|burrwed_msg| {
let message = burrwed_msg.detach();
async move {
self.tx.send(Click {});
Ok(())
}
})
});
} An alternative is to move the pub async fn start(&mut self) {
let stream = self.consumer.start();
stream.try_for_each(|burrwed_msg| {
let message = burrwed_msg.detach();
async move {
self.tx.send(Click {}); // this is blocking the task. is that a good idea?
Ok(())
}
})
}
tokio::spawn(async move { consumer.start() }); As for the Luckily you are using the Regarding As a word of advice, I typically recommend using a real loop rather than use futures::stream::StreamExt;
let stream = self.consumer.start();
while let Some(burrwed_msg) = stream.next().await {
let message = burrwed_msg.detach();
self.tx.send_async(Click {}).await;
} |
Beta Was this translation helpful? Give feedback.
This is due to Rust's single-ownership principle. To give the new task ownership of the
ClicksConsumer
, that task must be the only place that can access it, however thestart
method takes&self
, which means thatstart
only has borrowed access to theClickConsumer
. Sincestart
does not have ownership, it cannot give away ownership to the new task.One approach is to change
start
to take ownership ofself
. This looks like the following: