Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to Handle Automatic Reconnection in Lapin When Connection is Lost? #420

Open
nitsarut1 opened this issue Nov 14, 2024 · 0 comments
Open

Comments

@nitsarut1
Copy link

nitsarut1 commented Nov 14, 2024

I'm new to Rust and currently using lapin version 2.5.0 to connect to RabbitMQ for message consumption. I've successfully set up connections and channels, but I'm encountering issues with maintaining the connection and handling automatic reconnection when the connection drops (e.g., due to network instability or a RabbitMQ restart).

One of the error logs I see is:

2024-11-14T10:35:33.300071Z  WARN job::services::amqp: 301: Error receiving message: ProtocolError(AMQPError { kind: Hard(CONNECTIONFORCED), message: ShortString("CONNECTION_FORCED - Closed via Web management") })
main.rs:
let amqp_url = env::var("AMQP_URL").expect("AMQP_URL must be set")
let amqp_conn = amqp::create_connection(&amqp_url).await
let channel_set = amqp::create_channel(&amqp_conn).await
tokio::spawn(async move {
    amqp::setup_consumer(
        &channel_set,
        &"test".to_string(),
        &"next".to_string(),
        &queue_trigger,
    )
    .await
})
amqp.rs:
pub async fn create_connection(url: &str) -> Connection {
    Connection::connect(url, ConnectionProperties::default())
        .await
        .expect("Failed to create connection")
}

pub async fn create_channel(connection: &Connection) -> Channel {
    connection
        .create_channel()
        .await
        .expect("Failed to create channel")
}

pub async fn setup_consumer(channel: &Channel, exchange: &String, queue_name: &String) {
    let consumer = channel
        .basic_consume(
            queue_name,
            queue_name,
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await
        .expect("Failed to start consumer");

    consumer
        .for_each(|delivery| async {
            match delivery {
                Ok(delivery) => {
                    let message = String::from_utf8_lossy(&delivery.data);
                    info!("Render received payload for upload: {}", message);
                    acknowledge_message(&delivery, "Render Consumer".to_string()).await;
                }
                Err(error) => {
                    warn!("Error receiving message: {:?}", error);
                }
            }
        })
        .await;
}

Questions
What’s the best way to implement automatic reconnection in lapin?

I'd like to avoid having to manually reinitialize Connection and Channels every time the connection drops. Is there a recommended way to handle this within lapin?
Is there an option to set up a handler to automatically reconnect and recreate channels and consumers?

I found that on_error can detect errors, but it still requires custom logic for managing reconnections. Is there a pattern or approach that the team recommends for handling reconnections in lapin?
Since I'm new to Rust, I’d really appreciate any advice or examples you can provide. Thank you very much for your time and assistance!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant