Skip to content

Commit

Permalink
connectd: limit to 10 connections at once.
Browse files Browse the repository at this point in the history
We wait until a connection fails, or a subd is connected to the peer,
before letting another one through.  This should prevent us from
overwhelming lightningd on large nodes, but unlike the previous back-off,
it's based on how fast lightningd is, not an arbitrary time.

We also let one through each second, in case we're connecting to many,
but not doing anything but gossip (e.g. 100 explicit connect
commands).

Signed-off-by: Rusty Russell <[email protected]>
Changelog-Changed: Reconnecting to peers at startup should be significantly faster (dependent on machine speed).
  • Loading branch information
rustyrussell committed Aug 31, 2024
1 parent 83f84b0 commit 09c9242
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 3 deletions.
52 changes: 50 additions & 2 deletions connectd/connectd.c
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,25 @@ static void schedule_reconnect_if_important(struct daemon *daemon,
imp->reconnect_secs = max_delay;
}

/* If another connection is waiting, unqueue it now. Called once a
* second, or when subds are first connected. This limits how many
* things we tell connectd about at once. */
void release_one_waiting_connection(struct daemon *daemon, const char *why)
{
struct connecting *c;
struct connecting_htable_iter it;

c = connecting_htable_pick(daemon->connecting, pseudorand_u64(), &it);
for (size_t i = 0; i < connecting_htable_count(daemon->connecting); i++) {
if (c->waiting) {
status_peer_debug(&c->id, "Unblocking for %s", why);
c->waiting = false;
try_connect_one_addr(c);
break;
}
}
}

static void connect_failed(struct daemon *daemon,
const struct node_id *id,
enum jsonrpc_errcode errcode,
Expand All @@ -851,7 +870,13 @@ static void connect_failed(struct daemon *daemon,
msg = towire_connectd_connect_failed(NULL, id, errcode, errmsg);
daemon_conn_send(daemon->master, take(msg));

/* If we're supposed to schedule a reconnect, do so */
schedule_reconnect_if_important(daemon, id);

/* We limit thundering herd: let one out if waiting. */
release_one_waiting_connection(daemon,
tal_fmt(tmpctx, "%s connect failure",
fmt_node_id(tmpctx, id)));
}

/* add errors to error list */
Expand Down Expand Up @@ -984,6 +1009,7 @@ static void try_connect_one_addr(struct connecting *connect)
struct sockaddr_in6 *sa6;

assert(!connect->conn);
assert(!connect->waiting);

/* Out of addresses? */
if (connect->addrnum == tal_count(connect->addrs)) {
Expand Down Expand Up @@ -1555,6 +1581,20 @@ setup_listeners(const tal_t *ctx,
return listen_fds;
}

/* Every second we release one connection, so we don't get stuck even
* if there are many peers to connect to and lightningd doesn't attach
* subds to any of them. */
static void release_one_connection_from_timer(struct daemon *daemon)
{
release_one_waiting_connection(daemon, "timer");

daemon->connect_release_timer
= new_reltimer(&daemon->timers,
daemon,
time_from_sec(1),
release_one_connection_from_timer,
daemon);
}

/*~ Parse the incoming connect init message from lightningd ("master") and
* assign config variables to the daemon; it should be the first message we
Expand Down Expand Up @@ -1661,6 +1701,9 @@ static void connect_init(struct daemon *daemon, const u8 *msg)
/* 500 bytes per second, not 1M per second */
if (dev_throttle_gossip)
daemon->gossip_stream_limit = 500;

/* Does nothing (no peers yet!) but arms timer */
release_one_connection_from_timer(daemon);
}

/* Returning functions in C is ugly! */
Expand Down Expand Up @@ -1867,8 +1910,13 @@ static void try_connect_peer(struct daemon *daemon,
connecting_htable_add(daemon->connecting, connect);
tal_add_destructor(connect, destroy_connecting);

/* Now we kick it off by recursively trying connect->addrs[connect->addrnum] */
try_connect_one_addr(connect);
/* We wait for another to be destroyed if too many are in
* progress (useful for startup of large nodes) */
connect->waiting = (connecting_htable_count(daemon->connecting) > 10);
if (connect->waiting)
status_peer_debug(id, "Too many connections, waiting...");
else
try_connect_one_addr(connect);
}

static void destroy_important_id(struct important_id *imp)
Expand Down
10 changes: 10 additions & 0 deletions connectd/connectd.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ struct connecting {
/* The ID of the peer (not necessarily unique, in transit!) */
struct node_id id;

/* Are we queued waiting, to avoid too many connections at once? */
bool waiting;

/* We iterate through the tal_count(addrs) */
size_t addrnum;
struct wireaddr_internal *addrs;
Expand Down Expand Up @@ -321,6 +324,9 @@ struct daemon {
/* What (even) custom messages we accept */
u16 *custom_msgs;

/* Timer which releases one pending connection per second. */
struct oneshot *connect_release_timer;

/* Hack to speed up gossip timer */
bool dev_fast_gossip;
/* Hack to avoid ping timeouts */
Expand Down Expand Up @@ -367,4 +373,8 @@ void destroy_peer(struct peer *peer);

/* Remove a random connection, when under stress. */
void close_random_connection(struct daemon *daemon);

/* If connections are waiting to avoid flooding lightningd, release one now */
void release_one_waiting_connection(struct daemon *daemon, const char *why);

#endif /* LIGHTNING_CONNECTD_CONNECTD_H */
8 changes: 7 additions & 1 deletion connectd/multiplex.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <bitcoin/block.h>
#include <bitcoin/chainparams.h>
#include <ccan/io/io.h>
#include <ccan/tal/str/str.h>
#include <common/cryptomsg.h>
#include <common/daemon_conn.h>
#include <common/dev_disconnect.h>
Expand Down Expand Up @@ -1357,8 +1358,13 @@ void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd)

/* If peer said something, we created this and queued msg. */
subd = find_subd(peer, &channel_id);
if (!subd)
if (!subd) {
subd = new_subd(peer, &channel_id);
/* Implies lightningd is ready for another peer. */
release_one_waiting_connection(peer->daemon,
tal_fmt(tmpctx, "%s given a subd",
fmt_node_id(tmpctx, &id)));
}

assert(!subd->conn);

Expand Down

0 comments on commit 09c9242

Please sign in to comment.