Skip to content

Commit

Permalink
Merge pull request #5 from ideal-world/fix-cluster-issue
Browse files Browse the repository at this point in the history
Fix cluster issue
  • Loading branch information
4t145 authored Nov 7, 2024
2 parents 72438d0 + 3dad2a4 commit 149bb0f
Show file tree
Hide file tree
Showing 18 changed files with 228 additions and 131 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/cicd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ jobs:

- name: Check clippy
run: cargo clippy -p asteroid-mq --all-features
- name: Test all
run: cargo test -p asteroid-mq --all-features
- name: Package
if: ${{ startsWith(github.ref, 'refs/tags/release') }}
run: cargo package -p asteroid-mq
Expand Down Expand Up @@ -153,9 +155,10 @@ jobs:

- name: Check format
run: cargo fmt -p asteroid-mq-sdk -- --check

- name: Check clippy
run: cargo clippy -p asteroid-mq-sdk --all-features
- name: Cargo test
run: cargo test -p asteroid-mq-sdk --all-features
- name: Package
if: ${{ startsWith(github.ref, 'refs/tags/release') }}
run: cargo package -p asteroid-mq-sdk
Expand Down
6 changes: 6 additions & 0 deletions model/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ impl MessageDurableConfig {
max_receiver: None,
}
}
pub fn new_pull(expire: DateTime<Utc>) -> Self {
Self {
expire,
max_receiver: Some(1),
}
}
pub fn with_max_receiver(mut self, max_receiver: u32) -> Self {
self.max_receiver = Some(max_receiver);
self
Expand Down
13 changes: 11 additions & 2 deletions model/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
durable::MessageDurableConfig, interest::Subject, topic::TopicCode, util::MaybeBase64Bytes,
};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use typeshare::typeshare;

Expand Down Expand Up @@ -319,6 +320,11 @@ impl MessageHeaderBuilder {
self.durability = Some(config);
self
}
pub fn mode_pull(mut self, expire_at: DateTime<Utc>) -> Self {
self.target_kind = MessageTargetKind::Durable;
self.durability = Some(MessageDurableConfig::new_pull(expire_at));
self
}
pub fn mode_push(mut self) -> Self {
self.target_kind = MessageTargetKind::Push;
self
Expand Down Expand Up @@ -349,7 +355,9 @@ pub struct MessageAck {
pub enum MessageTargetKind {
Durable = 0,
Online = 1,
Available = 2,
// #[allow(deprecated)]
// #[deprecated(note = "not supported yet")]
// Available = 2,
#[default]
Push = 3,
}
Expand All @@ -359,7 +367,8 @@ impl From<u8> for MessageTargetKind {
match kind {
0 => MessageTargetKind::Durable,
1 => MessageTargetKind::Online,
2 => MessageTargetKind::Available,
// #[allow(deprecated)]
// 2 => MessageTargetKind::Available,
_ => MessageTargetKind::Push,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public enum MessageAckExpectKind {
public enum MessageTargetKind {
Durable,
Online,
Available,
// not supported yet
// Available,
Push
}

Expand Down
1 change: 0 additions & 1 deletion sdk/typescript/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ export enum MessageAckExpectKind {
export enum MessageTargetKind {
Durable = "Durable",
Online = "Online",
Available = "Available",
Push = "Push",
}

Expand Down
2 changes: 1 addition & 1 deletion server/examples/axum_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ async fn main() -> asteroid_mq::Result<()> {
.init();
let node = Node::new(NodeConfig::default());
let cluster_provider = StaticClusterProvider::singleton(node.id(), node.config().addr);
node.init_raft(cluster_provider).await?;
node.start(cluster_provider).await?;
let topic = node.create_new_topic(TopicCode::const_new("test")).await?;

let receiver_endpoint = topic
Expand Down
37 changes: 11 additions & 26 deletions server/src/protocol/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl Node {
inner: Arc::new(inner),
}
}
pub async fn init_raft<C: ClusterProvider>(
pub async fn start<C: ClusterProvider>(
&self,
mut cluster_provider: C,
) -> Result<(), crate::Error> {
Expand Down Expand Up @@ -193,33 +193,18 @@ impl Node {
)
.await
.map_err(crate::Error::contextual_custom("create raft node"))?;
let members = cluster_provider
.pristine_nodes()
.await?
.into_iter()
.map(|(id, addr)| {
let node = TcpNode::new(addr);
(id, node)
})
.collect::<BTreeMap<_, _>>();
{
let tcp_service = tcp_service.clone();
let members = members.clone();
tokio::spawn(async move {
for (peer_id, peer_node) in members.iter() {
if id == *peer_id {
continue;
}
tracing::debug!(peer=%peer_id, local=%id, "ensure connection");
let _ = tcp_service
.ensure_connection(*peer_id, peer_node.addr)
.await;
}
});
}
raft.initialize(members.clone())
// waiting for members contain self
let pristine_nodes = cluster_provider.pristine_nodes().await?;
if pristine_nodes.contains_key(&id) {
raft.initialize(
pristine_nodes
.into_iter()
.map(|(k, n)| (k, TcpNode::new(n)))
.collect::<BTreeMap<_, _>>(),
)
.await
.map_err(crate::Error::contextual_custom("init raft node"))?;
}
maybe_loading_raft.set(raft.clone());
let cluster_service =
ClusterService::new(cluster_provider, tcp_service, cluster_service_ct);
Expand Down
92 changes: 45 additions & 47 deletions server/src/protocol/node/raft/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pub(crate) mod k8s;
#[cfg(feature = "cluster-k8s")]
pub use k8s::K8sClusterProvider;
pub(crate) mod r#static;
use openraft::ChangeMembers;
pub use r#static::StaticClusterProvider;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
Expand All @@ -20,9 +19,7 @@ use super::{network_factory::TcpNetworkService, raft_node::TcpNode};
pub trait ClusterProvider: Send + 'static {
fn pristine_nodes(
&mut self,
) -> impl Future<Output = crate::Result<BTreeMap<NodeId, SocketAddr>>> + Send {
self.next_update()
}
) -> impl Future<Output = crate::Result<BTreeMap<NodeId, SocketAddr>>> + Send;
fn next_update(
&mut self,
) -> impl Future<Output = crate::Result<BTreeMap<NodeId, SocketAddr>>> + Send;
Expand Down Expand Up @@ -176,58 +173,59 @@ impl ClusterService {
}
})
.collect::<BTreeMap<_, _>>();
tracing::debug!(ensured = ?ensured_nodes, remove = ?to_remove, add = ?to_add, "updating raft members");
let leader_node = raft.current_leader().await;
let Some(leader_node) = leader_node else {
tracing::warn!("no leader node found");
continue;
};
if to_remove.contains(&leader_node) {
let elect_result = raft.trigger().elect().await;
if let Err(e) = elect_result {
tracing::error!("failed to trigger election: {}", e);
}
if to_remove.is_empty() && to_add.is_empty() {
tracing::trace!(leader=?leader_node, "no change in nodes");
} else {
tracing::info!(ensured = ?ensured_nodes, remove = ?to_remove, add = ?to_add, leader=?leader_node, "updating raft members");
}
if local_id == leader_node {
if !to_add.is_empty() {
let raft = raft.clone();
tokio::spawn(async move {
let add_nodes_result = raft
.change_membership(ChangeMembers::AddNodes(to_add.clone()), false)
.await;
if let Err(e) = add_nodes_result {
tracing::warn!("failed to add nodes: {}", e);
if let Some(leader_node) = leader_node {
if to_remove.contains(&leader_node) && local_id != leader_node {
tracing::warn!("leader {} is removed from cluster", leader_node);
let trigger_elect_result = raft.trigger().elect().await;
match trigger_elect_result {
Ok(resp) => {
tracing::info!(?resp, "leader removed, trigger election");
}
let add_voters_result = raft
.change_membership(ChangeMembers::AddVoters(to_add.clone()), false)
.await;
if let Err(e) = add_voters_result {
tracing::warn!("failed to add voters: {}", e);
Err(e) => {
tracing::warn!("failed to trigger election: {}", e);
}
});
}
}
if !to_remove.is_empty() {
let raft = raft.clone();
tokio::spawn(async move {
let remove_nodes_result = raft
.change_membership(ChangeMembers::RemoveNodes(to_remove.clone()), false)
.await;
if let Err(e) = remove_nodes_result {
tracing::warn!("failed to remove nodes: {}", e);
}
if to_remove.contains(&local_id) {
tracing::warn!("local node {} is removed from cluster", local_id);
break;
}

if Some(local_id) == leader_node && !to_add.is_empty() {
let raft = raft.clone();
for (id, node) in to_add {
let add_result = raft.add_learner(id, node, true).await;
match add_result {
Ok(resp) => {
tracing::info!(?resp, "learner {} added", id);
}
let remove_voters_result = raft
.change_membership(
ChangeMembers::RemoveVoters(to_remove.clone()),
false,
)
.await;
if let Err(e) = remove_voters_result {
tracing::warn!("failed to remove voters: {}", e);
Err(e) => {
tracing::warn!("failed to add learner {}: {}", id, e);
}
});
}
}
let add_voters_result = raft
.change_membership(
ensured_nodes.keys().cloned().collect::<BTreeSet<_>>(),
false,
)
.await;
match add_voters_result {
Ok(resp) => {
tracing::info!(?resp, "voters added");
}
Err(e) => {
tracing::warn!("failed to add voters: {}", e);
}
}
}
tracing::trace!("waiting for next update")
}
Ok(())
}
Expand Down
3 changes: 3 additions & 0 deletions server/src/protocol/node/raft/cluster/k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ impl ClusterProvider for K8sClusterProvider {
fn name(&self) -> Cow<'static, str> {
Cow::Owned(format!("k8s/{}/{}", self.namespace, self.service,))
}
async fn pristine_nodes(&mut self) -> crate::Result<BTreeMap<NodeId, SocketAddr>> {
self.next_update().await
}
async fn next_update(&mut self) -> crate::Result<BTreeMap<NodeId, SocketAddr>> {
tokio::time::sleep_until(self.next_update).await;
self.next_update += self.poll_interval;
Expand Down
Loading

0 comments on commit 149bb0f

Please sign in to comment.