Skip to content

Commit

Permalink
safekeeper: refactor WalAcceptor to be event-driven (#9462)
Browse files Browse the repository at this point in the history
## Problem

The `WalAcceptor` main loop currently uses two nested loops to consume
inbound messages. This makes it hard to slot in periodic events like
metrics collection. It also duplicates the event processing code, and assumes
all messages in steady state are AppendRequests (other messages types may
be dropped if following an AppendRequest).

## Summary of changes

Refactor the `WalAcceptor` loop to be event driven.
  • Loading branch information
erikgrinaker authored Oct 28, 2024
1 parent 3bad525 commit 248558d
Showing 1 changed file with 62 additions and 56 deletions.
118 changes: 62 additions & 56 deletions safekeeper/src/receive_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,15 @@ use postgres_backend::QueryError;
use pq_proto::BeMessage;
use serde::Deserialize;
use serde::Serialize;
use std::future;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio::time::Instant;
use tokio::time::{Duration, MissedTickBehavior};
use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
Expand Down Expand Up @@ -444,9 +441,9 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
}
}

// Send keepalive messages to walproposer, to make sure it receives updates
// even when it writes a steady stream of messages.
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// The WAL flush interval. This ensures we periodically flush the WAL and send AppendResponses to
/// walproposer, even when it's writing a steady stream of messages.
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);

/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx.
Expand Down Expand Up @@ -494,67 +491,76 @@ impl WalAcceptor {
async fn run(&mut self) -> anyhow::Result<()> {
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);

// After this timestamp we will stop processing AppendRequests and send a response
// to the walproposer. walproposer sends at least one AppendRequest per second,
// we will send keepalives by replying to these requests once per second.
let mut next_keepalive = Instant::now();
// Periodically flush the WAL.
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
flush_ticker.tick().await; // skip the initial, immediate tick

while let Some(mut next_msg) = self.msg_rx.recv().await {
// Update walreceiver state in shmem for reporting.
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
}
// Tracks unflushed appends.
let mut dirty = false;

let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
// Loop through AppendRequests while available to write as many WAL records as
// possible without fsyncing.
//
// Make sure the WAL is flushed before returning, see:
// https://github.com/neondatabase/neon/issues/9259
//
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
// Otherwise, we might end up in a situation where we read a message, but don't
// process it.
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);

if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
if self.reply_tx.send(reply).await.is_err() {
break; // disconnected, flush WAL and return on next send/recv
}
loop {
let reply = tokio::select! {
// Process inbound message.
msg = self.msg_rx.recv() => {
// If disconnected, break to flush WAL and return.
let Some(mut msg) = msg else {
break;
};

// Update walreceiver state in shmem for reporting.
if let ProposerAcceptorMessage::Elected(_) = &msg {
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
}

// get out of this loop if keepalive time is reached
if Instant::now() >= next_keepalive {
break;
// Don't flush the WAL on every append, only periodically via flush_ticker.
// This batches multiple appends per fsync. If the channel is empty after
// sending the reply, we'll schedule an immediate flush.
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
dirty = true;
}

// continue pulling AppendRequests if available
match self.msg_rx.try_recv() {
Ok(msg) => next_msg = msg,
Err(TryRecvError::Empty) => break,
// on disconnect, flush WAL and return on next send/recv
Err(TryRecvError::Disconnected) => break,
};
self.tli.process_msg(&msg).await?
}

// flush all written WAL to the disk
self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?
} else {
// process message other than AppendRequest
self.tli.process_msg(&next_msg).await?
// While receiving AppendRequests, flush the WAL periodically and respond with an
// AppendResponse to let walproposer know we're still alive.
_ = flush_ticker.tick(), if dirty => {
dirty = false;
self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?
}

// If there are no pending messages, flush the WAL immediately.
//
// TODO: this should be done via flush_ticker.reset_immediately(), but that's always
// delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
_ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
dirty = false;
flush_ticker.reset();
self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?
}
};

if let Some(reply) = reply_msg {
// Send reply, if any.
if let Some(reply) = reply {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
break; // disconnected, break to flush WAL and return
}
// reset keepalive time
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
}
}

// Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259.
if dirty {
self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?;
}

Ok(())
}
}

1 comment on commit 248558d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5274 tests run: 5051 passed, 1 failed, 222 skipped (full report)


Failures on Postgres 17

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_crafted_wal_end[debug-pg17-simple]"
Flaky tests (2)

Postgres 17

Test coverage report is not available

The comment gets automatically updated with the latest test results
248558d at 2024-10-28T18:51:14.415Z :recycle:

Please sign in to comment.