From 6ce82667931e344a9ee116db6332f52b96c1c19f Mon Sep 17 00:00:00 2001 From: flowerinthenight Date: Mon, 30 Sep 2024 08:16:57 +0900 Subject: [PATCH] core: move send out of self --- src/zgroup.zig | 54 +++++++++++++++++++++++++------------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/zgroup.zig b/src/zgroup.zig index fc3254d..29f0714 100644 --- a/src/zgroup.zig +++ b/src/zgroup.zig @@ -298,7 +298,7 @@ pub fn Fleet(UserData: type) type { msg.cmd = .join; try self.setMsgSrcToOwn(msg); - try self.send(dst_ip, dst_port, buf, null); + try send(dst_ip, dst_port, buf, null); switch (msg.cmd) { .ack => { @@ -950,7 +950,7 @@ pub fn Fleet(UserData: type) type { continue; msg.proto1 = self.getTerm(); - self.send(ip, port, buf, null) catch continue; + send(ip, port, buf, null) catch continue; if (msg.cmd != .ack) continue; @@ -1071,7 +1071,7 @@ pub fn Fleet(UserData: type) type { self.setTermAndN(msg); ltm.reset(); - self.send(ip, port, buf, 50_000) catch |err| { + send(ip, port, buf, 50_000) catch |err| { log.err("[{d}] hb:send failed: {any}", .{ i, err }); fails += 1; continue; @@ -1260,7 +1260,7 @@ pub fn Fleet(UserData: type) type { const n = self.getCounts(); msg.proto2 = n[0] + n[1]; - try self.send(ip, port, buf, null); + try send(ip, port, buf, null); // Handle join address protocol (ingress). const cmdm: JoinCmd = @enumFromInt((msg.proto1 & @@ -1381,28 +1381,6 @@ pub fn Fleet(UserData: type) type { } } - // Helper function for internal one-shot send/recv. The same message ptr is - // used for both request and response payloads. If `tm_us` is not null, - // default timeout will be 5s. - fn send(_: *Self, ip: []const u8, port: u16, msg: []u8, tm_us: ?u32) !void { - const addr = try std.net.Address.resolveIp(ip, port); - const sock = try std.posix.socket( - std.posix.AF.INET, - std.posix.SOCK.DGRAM | std.posix.SOCK.CLOEXEC, - 0, - ); - - var tm: u32 = 1_000_000; - if (tm_us) |v| tm = v; - - defer std.posix.close(sock); - try setReadTimeout(sock, tm); - try setWriteTimeout(sock, tm); - try std.posix.connect(sock, &addr.any, addr.getOsSockLen()); - _ = try std.posix.write(sock, msg); - _ = try std.posix.recv(sock, msg, 0); - } - // Handle the isd_* infection protocol of the message payload. // We are passing in an arena allocator here. fn handleIsd(self: *Self, allocator: std.mem.Allocator, msg: *Message, force: bool) !void { @@ -1841,7 +1819,7 @@ pub fn Fleet(UserData: type) type { const ip = leader[0..sep]; const port = try std.fmt.parseUnsigned(u16, leader[sep + 1 ..], 10); - try self.send(ip, port, msg, null); + try send(ip, port, msg, null); } fn getTerm(self: *Self) u64 { @@ -1924,6 +1902,28 @@ pub fn Fleet(UserData: type) type { }; } +// Helper function for internal one-shot send/recv. The same message ptr is +// used for both request and response payloads. If `tm_us` is not null, +// default timeout will be used. +fn send(ip: []const u8, port: u16, msg: []u8, tm_us: ?u32) !void { + const addr = try std.net.Address.resolveIp(ip, port); + const sock = try std.posix.socket( + std.posix.AF.INET, + std.posix.SOCK.DGRAM | std.posix.SOCK.CLOEXEC, + 0, + ); + + var tm: u32 = 1_000_000; + if (tm_us) |v| tm = v; + + defer std.posix.close(sock); + try setReadTimeout(sock, tm); + try setWriteTimeout(sock, tm); + try std.posix.connect(sock, &addr.any, addr.getOsSockLen()); + _ = try std.posix.write(sock, msg); + _ = try std.posix.recv(sock, msg, 0); +} + /// Converts an ip and port to a string with format ip:port, eg. "127.0.0.1:8080". /// Caller is responsible for releasing the returned memory. fn keyFromIpPort(allocator: std.mem.Allocator, ip: u32, port: u16) ![]const u8 {