Skip to content

Commit

Permalink
bugfix: etcd sync loop shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
zhu327 committed Dec 12, 2024
1 parent 79c606f commit ac2dbe2
Showing 1 changed file with 39 additions and 18 deletions.
57 changes: 39 additions & 18 deletions src/config/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{error::Error, sync::Arc, time::Duration};

use async_trait::async_trait;
use etcd_client::{Client, ConnectOptions, Event, GetOptions, GetResponse, WatchOptions};
use pingora_core::services::background::BackgroundService;
use pingora_core::{server::ShutdownWatch, services::background::BackgroundService};
use tokio::{sync::Mutex, time::sleep};

use super::Etcd;
Expand Down Expand Up @@ -122,33 +122,54 @@ impl EtcdConfigSync {
}

/// 主任务循环
async fn run_sync_loop(&self, shutdown: &pingora_core::server::ShutdownWatch) {
async fn run_sync_loop(&self, shutdown: &mut ShutdownWatch) {
loop {
if *shutdown.borrow() {
log::info!("Shutdown signal received, stopping etcd config sync");
return;
tokio::select! {
// Shutdown signal handling
_ = shutdown.changed() => {
if *shutdown.borrow() {
log::info!("Shutdown signal received, stopping etcd config sync");
return;
}
},

// Perform list operation
result = self.list() => {
if let Err(err) = result {
log::error!("List operation failed: {:?}", err);
self.reset_client().await;
sleep(Duration::from_secs(3)).await;
continue;
}
}
}

if let Err(err) = self.list().await {
log::error!("List operation failed: {:?}", err);
self.reset_client().await;
sleep(Duration::from_secs(3)).await;
continue;
}

if let Err(err) = self.watch().await {
log::error!("Watch operation failed: {:?}", err);
self.reset_client().await;
sleep(Duration::from_secs(1)).await;
tokio::select! {
// Shutdown signal handling during watch
_ = shutdown.changed() => {
if *shutdown.borrow() {
log::info!("Shutdown signal received, stopping etcd config sync");
return;
}
},

// Perform watch operation
result = self.watch() => {
if let Err(err) = result {
log::error!("Watch operation failed: {:?}", err);
self.reset_client().await;
sleep(Duration::from_secs(1)).await;
}
}
}
}
}
}

#[async_trait]
impl BackgroundService for EtcdConfigSync {
async fn start(&self, shutdown: pingora_core::server::ShutdownWatch) {
self.run_sync_loop(&shutdown).await;
async fn start(&self, mut shutdown: ShutdownWatch) {
self.run_sync_loop(&mut shutdown).await;
}
}

Expand Down

0 comments on commit ac2dbe2

Please sign in to comment.