diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 3dbf72298fd6..f97e127a1724 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -21,18 +21,15 @@ use postgres_backend::QueryError; use pq_proto::BeMessage; use serde::Deserialize; use serde::Serialize; +use std::future; use std::net::SocketAddr; use std::sync::Arc; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; -use tokio::sync::mpsc::channel; -use tokio::sync::mpsc::error::TryRecvError; -use tokio::sync::mpsc::Receiver; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::task; use tokio::task::JoinHandle; -use tokio::time::Duration; -use tokio::time::Instant; +use tokio::time::{Duration, MissedTickBehavior}; use tracing::*; use utils::id::TenantTimelineId; use utils::lsn::Lsn; @@ -444,9 +441,9 @@ async fn network_write( } } -// Send keepalive messages to walproposer, to make sure it receives updates -// even when it writes a steady stream of messages. -const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); +/// The WAL flush interval. This ensures we periodically flush the WAL and send AppendResponses to +/// walproposer, even when it's writing a steady stream of messages. +const FLUSH_INTERVAL: Duration = Duration::from_secs(1); /// Encapsulates a task which takes messages from msg_rx, processes and pushes /// replies to reply_tx. @@ -494,67 +491,76 @@ impl WalAcceptor { async fn run(&mut self) -> anyhow::Result<()> { let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id); - // After this timestamp we will stop processing AppendRequests and send a response - // to the walproposer. walproposer sends at least one AppendRequest per second, - // we will send keepalives by replying to these requests once per second. - let mut next_keepalive = Instant::now(); + // Periodically flush the WAL. + let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); + flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + flush_ticker.tick().await; // skip the initial, immediate tick - while let Some(mut next_msg) = self.msg_rx.recv().await { - // Update walreceiver state in shmem for reporting. - if let ProposerAcceptorMessage::Elected(_) = &next_msg { - walreceiver_guard.get().status = WalReceiverStatus::Streaming; - } + // Tracks unflushed appends. + let mut dirty = false; - let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) { - // Loop through AppendRequests while available to write as many WAL records as - // possible without fsyncing. - // - // Make sure the WAL is flushed before returning, see: - // https://github.com/neondatabase/neon/issues/9259 - // - // Note: this will need to be rewritten if we want to read non-AppendRequest messages here. - // Otherwise, we might end up in a situation where we read a message, but don't - // process it. - while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg { - let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); - - if let Some(reply) = self.tli.process_msg(&noflush_msg).await? { - if self.reply_tx.send(reply).await.is_err() { - break; // disconnected, flush WAL and return on next send/recv - } + loop { + let reply = tokio::select! { + // Process inbound message. + msg = self.msg_rx.recv() => { + // If disconnected, break to flush WAL and return. + let Some(mut msg) = msg else { + break; + }; + + // Update walreceiver state in shmem for reporting. + if let ProposerAcceptorMessage::Elected(_) = &msg { + walreceiver_guard.get().status = WalReceiverStatus::Streaming; } - // get out of this loop if keepalive time is reached - if Instant::now() >= next_keepalive { - break; + // Don't flush the WAL on every append, only periodically via flush_ticker. + // This batches multiple appends per fsync. If the channel is empty after + // sending the reply, we'll schedule an immediate flush. + if let ProposerAcceptorMessage::AppendRequest(append_request) = msg { + msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); + dirty = true; } - // continue pulling AppendRequests if available - match self.msg_rx.try_recv() { - Ok(msg) => next_msg = msg, - Err(TryRecvError::Empty) => break, - // on disconnect, flush WAL and return on next send/recv - Err(TryRecvError::Disconnected) => break, - }; + self.tli.process_msg(&msg).await? } - // flush all written WAL to the disk - self.tli - .process_msg(&ProposerAcceptorMessage::FlushWAL) - .await? - } else { - // process message other than AppendRequest - self.tli.process_msg(&next_msg).await? + // While receiving AppendRequests, flush the WAL periodically and respond with an + // AppendResponse to let walproposer know we're still alive. + _ = flush_ticker.tick(), if dirty => { + dirty = false; + self.tli + .process_msg(&ProposerAcceptorMessage::FlushWAL) + .await? + } + + // If there are no pending messages, flush the WAL immediately. + // + // TODO: this should be done via flush_ticker.reset_immediately(), but that's always + // delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866. + _ = future::ready(()), if dirty && self.msg_rx.is_empty() => { + dirty = false; + flush_ticker.reset(); + self.tli + .process_msg(&ProposerAcceptorMessage::FlushWAL) + .await? + } }; - if let Some(reply) = reply_msg { + // Send reply, if any. + if let Some(reply) = reply { if self.reply_tx.send(reply).await.is_err() { - return Ok(()); // chan closed, streaming terminated + break; // disconnected, break to flush WAL and return } - // reset keepalive time - next_keepalive = Instant::now() + KEEPALIVE_INTERVAL; } } + + // Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259. + if dirty { + self.tli + .process_msg(&ProposerAcceptorMessage::FlushWAL) + .await?; + } + Ok(()) } }