Skip to content

Commit

Permalink
chore: readability
Browse files Browse the repository at this point in the history
  • Loading branch information
CookiePieWw committed Dec 21, 2024
1 parent ab94583 commit 5d26354
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions src/meta-srv/src/election/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,23 @@ struct ValueWithLease {
}

/// PostgreSql implementation of Election.
/// TODO: Currently only support candidate registration. Add election logic.
pub struct PgElection {
leader_value: String,
client: Client,
is_leader: AtomicBool,
infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl: u64,
candidate_lease_ttl_secs: u64,
}

impl PgElection {
pub async fn with_pg_client(
leader_value: String,
client: Client,
store_key_prefix: String,
candidate_lease_ttl: u64,
candidate_lease_ttl_secs: u64,
) -> Result<ElectionRef> {
let (tx, _) = broadcast::channel(100);
Ok(Arc::new(Self {
Expand All @@ -65,7 +66,7 @@ impl PgElection {
infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl,
candidate_lease_ttl_secs,
}))
}

Expand Down Expand Up @@ -112,7 +113,7 @@ impl Election for PgElection {
.duration_since(time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
+ self.candidate_lease_ttl as f64,
+ self.candidate_lease_ttl_secs as f64,
};
let res = self.put_value_with_lease(&key, &value_with_lease).await?;
// May registered before, just update the lease.
Expand All @@ -123,7 +124,7 @@ impl Election for PgElection {

// Check if the current lease has expired and renew the lease.
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl / 2));
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
loop {
let _ = keep_alive_interval.tick().await;

Expand Down Expand Up @@ -357,7 +358,7 @@ mod tests {
infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl: 10,
candidate_lease_ttl_secs: 10,
};

let res = pg_election
Expand Down Expand Up @@ -423,7 +424,7 @@ mod tests {
infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl: 10,
candidate_lease_ttl_secs: 10,
};

let node_info = MetasrvNodeInfo {
Expand Down Expand Up @@ -459,7 +460,7 @@ mod tests {
infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl: 5,
candidate_lease_ttl_secs: 5,
};

let candidates = pg_election.all_candidates().await.unwrap();
Expand Down

0 comments on commit 5d26354

Please sign in to comment.