Skip to content

Commit

Permalink
core: expose election delay in config
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Sep 30, 2024
1 parent 8a01ba9 commit 4553537
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub fn main() !void {
.onJoinAddr = onJoinAddr,

// So we won't overload the free service we are using.
.on_join_every = 10,
.on_join_every = 50,
};

var member = hm.getEntry(2).?.value_ptr.*;
Expand Down
44 changes: 36 additions & 8 deletions src/zgroup.zig
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub fn Fleet(UserData: type) type {
proto_time: u64,
suspect_time: u64,
ping_req_k: u32,
elex_delay: u64,

// Our per-member data. Key format is "ip:port", eg. "127.0.0.1:8080".
members: std.StringHashMap(MemberData),
Expand Down Expand Up @@ -58,8 +59,8 @@ pub fn Fleet(UserData: type) type {
voted_for: []const u8,
elex_tm: std.time.Timer,
candidate_tm: std.time.Timer,
elex_tm_min: u64 = std.time.ns_per_ms * 2000,
elex_tm_max: u64 = std.time.ns_per_ms * 3000,
elex_tm_min: u64, // set via config
elex_tm_max: u64, // set via config
leader: []const u8,

const ElectionState = enum(u8) {
Expand Down Expand Up @@ -205,6 +206,9 @@ pub fn Fleet(UserData: type) type {
/// The only valid value at the moment is `1`.
ping_req_k: u32 = 1,

/// Delay between leader's liveness pings to all nodes.
elex_delay: u64 = std.time.ns_per_ms * 100,

/// See `Callbacks` struct for more information.
callbacks: Callbacks,
};
Expand All @@ -215,6 +219,10 @@ pub fn Fleet(UserData: type) type {
/// expected to be long-running. Some areas will utilize an arena allocator
/// based on the input allocator when it's appropriate.
pub fn init(allocator: std.mem.Allocator, config: *const Config) !Self {
const edf: f64 = @floatFromInt(config.elex_delay);
const minf: f64 = edf / 0.05;
const emin: u64 = @intFromFloat(minf);

return Self{
.allocator = allocator,
.name = if (config.name.len > 8) config.name[0..8] else config.name,
Expand All @@ -223,6 +231,9 @@ pub fn Fleet(UserData: type) type {
.proto_time = config.proto_time,
.suspect_time = config.suspect_time,
.ping_req_k = config.ping_req_k,
.elex_delay = config.elex_delay,
.elex_tm_min = emin,
.elex_tm_max = emin + std.time.ns_per_s,
.members = std.StringHashMap(MemberData).init(allocator),
.refkeys = std.StringHashMap(void).init(allocator),
.ping_queue = std.ArrayList([]const u8).init(allocator),
Expand Down Expand Up @@ -252,11 +263,28 @@ pub fn Fleet(UserData: type) type {

/// Start group membership tracking.
pub fn run(self: *Self) !void {
log.debug("Message: size={d}, align={d}", .{
log.debug("run: name={s}, address={s}:{d}", .{
self.name,
self.ip,
self.port,
});

log.debug("*Message: size={d}, align={d}", .{
@sizeOf(Message),
@alignOf(Message),
});

log.debug("SWIM: prototime={any}, suspecttime={any}, k={d}", .{
std.fmt.fmtDuration(self.proto_time),
std.fmt.fmtDuration(self.suspect_time),
self.ping_req_k,
});

log.debug("leader election timeout range: min={any}, max={any}", .{
std.fmt.fmtDuration(self.elex_tm_min),
std.fmt.fmtDuration(self.elex_tm_max),
});

const me = try self.getOwnKey();
defer self.allocator.free(me);
_ = try self.ensureKeyRef(me);
Expand Down Expand Up @@ -356,7 +384,7 @@ pub fn Fleet(UserData: type) type {
// Run internal UDP server for handling both SWIM- and Raft-related
// protocols. Uses a single allocation of *Message all throughout.
fn udpListen(self: *Self) !void {
log.info("Starting UDP server on :{d}...", .{self.port});
log.info("starting UDP server on :{d}...", .{self.port});

const name = std.mem.readVarInt(u64, self.name, .little);
const buf = try self.allocator.alloc(u8, @sizeOf(Message));
Expand Down Expand Up @@ -1006,13 +1034,13 @@ pub fn Fleet(UserData: type) type {
var fails: usize = 0;
var deferlog = false;
defer {
if (fails > 0) std.time.sleep(std.time.ns_per_ms * 50);
if (fails > 0) std.time.sleep(self.elex_delay);
if (deferlog) {
if (@mod(i, 40) == 0) {
log.debug("[{d}:{d}] leader here, hb took {any}", .{
i,
self.getTerm(),
std.fmt.fmtDuration(tm.read()),
std.fmt.fmtDuration(tm.read() - self.elex_delay),
});
}
}
Expand Down Expand Up @@ -1071,7 +1099,7 @@ pub fn Fleet(UserData: type) type {
self.setTermAndN(msg);

ltm.reset();
send(ip, port, buf, 50_000) catch |err| {
send(ip, port, buf, null) catch |err| {
log.err("[{d}] hb:send failed: {any}", .{ i, err });
fails += 1;
continue;
Expand All @@ -1097,7 +1125,7 @@ pub fn Fleet(UserData: type) type {
}

ldr_last_sweep = if (fails == 0) true else false;
std.time.sleep(std.time.ns_per_ms * 50);
std.time.sleep(self.elex_delay);
},
}
}
Expand Down

0 comments on commit 4553537

Please sign in to comment.