Skip to content

Commit

Permalink
geyser: generate missed slot statuses (#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Oct 28, 2024
1 parent 348b67a commit 3a34d0b
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The minor version will be incremented upon a breaking change and the patch versi
### Fixes

- examples: fix commitment in TypeScript example ([#440](https://github.com/rpcpool/yellowstone-grpc/pull/440))
- geyser: fix missed status messages ([#444](https://github.com/rpcpool/yellowstone-grpc/pull/444))

### Features

Expand Down
67 changes: 59 additions & 8 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters},
filters::{Filter, FilterAccountsDataSlice},
metrics::{self, DebugClientMessage, CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE},
metrics::{self, DebugClientMessage},
version::GrpcVersionInfo,
},
agave_geyser_plugin_interface::geyser_plugin_interface::{
Expand All @@ -12,7 +12,7 @@ use {
anyhow::Context,
log::{error, info},
solana_sdk::{
clock::{UnixTimestamp, MAX_RECENT_BLOCKHASHES},
clock::{Slot, UnixTimestamp, MAX_RECENT_BLOCKHASHES},
pubkey::Pubkey,
signature::Signature,
transaction::SanitizedTransaction,
Expand Down Expand Up @@ -678,6 +678,9 @@ struct SlotMessages {
entries_count: usize,
confirmed_at: Option<usize>,
finalized_at: Option<usize>,
parent_slot: Option<Slot>,
confirmed: bool,
finalized: bool,
}

impl SlotMessages {
Expand Down Expand Up @@ -871,7 +874,7 @@ impl GrpcService {
loop {
tokio::select! {
Some(message) = messages_rx.recv() => {
MESSAGE_QUEUE_SIZE.dec();
metrics::message_queue_size_dec();

// Update metrics
if let Message::Slot(slot_message) = message.as_ref() {
Expand Down Expand Up @@ -947,6 +950,19 @@ impl GrpcService {

// Update block reconstruction info
let slot_messages = messages.entry(message.get_slot()).or_default();
if let Message::Slot(msg) = message.as_ref() {
match msg.status {
CommitmentLevel::Processed => {
slot_messages.parent_slot = msg.parent;
},
CommitmentLevel::Confirmed => {
slot_messages.confirmed = true;
},
CommitmentLevel::Finalized => {
slot_messages.finalized = true;
},
}
}
if !matches!(message.as_ref(), Message::Slot(_)) {
slot_messages.messages.push(Some(Arc::clone(&message)));

Expand Down Expand Up @@ -1006,12 +1022,47 @@ impl GrpcService {
}

// Send messages to filter (and to clients)
let mut messages_vec = vec![message];
let mut messages_vec = Vec::with_capacity(4);
if let Some(sealed_block_msg) = sealed_block_msg {
messages_vec.push(sealed_block_msg);
}
let slot_status = if let Message::Slot(msg) = message.as_ref() {
Some((msg.slot, msg.status))
} else {
None
};
messages_vec.push(message);

// sometimes we do not receive all statuses
if let Some((slot, status)) = slot_status {
let mut slots = vec![slot];
while let Some((parent, Some(entry))) = slots
.pop()
.and_then(|slot| messages.get(&slot))
.and_then(|entry| entry.parent_slot)
.map(|parent| (parent, messages.get_mut(&parent)))
{
if (status == CommitmentLevel::Confirmed && !entry.confirmed) ||
(status == CommitmentLevel::Finalized && !entry.finalized)
{
if status == CommitmentLevel::Confirmed {
entry.confirmed = true;
} else if status == CommitmentLevel::Finalized {
entry.finalized = true;
}

slots.push(parent);
messages_vec.push(Arc::new(Message::Slot(MessageSlot {
slot: parent,
parent: entry.parent_slot,
status,
})));
metrics::missed_status_message_inc(status);
}
}
}

for message in messages_vec {
for message in messages_vec.into_iter().rev() {
if let Message::Slot(slot) = message.as_ref() {
let (mut confirmed_messages, mut finalized_messages) = match slot.status {
CommitmentLevel::Processed => {
Expand Down Expand Up @@ -1148,7 +1199,7 @@ impl GrpcService {
.expect("empty filter");
metrics::update_subscriptions(&endpoint, None, Some(&filter));

CONNECTIONS_TOTAL.inc();
metrics::connections_total_inc();
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter {
id,
filter: Box::new(filter.clone()),
Expand Down Expand Up @@ -1258,7 +1309,7 @@ impl GrpcService {
}
}

CONNECTIONS_TOTAL.dec();
metrics::connections_total_dec();
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::Removed { id });
metrics::update_subscriptions(&endpoint, Some(&filter), None);
info!("client #{id}: removed");
Expand Down Expand Up @@ -1305,7 +1356,7 @@ impl GrpcService {
while *is_alive {
let message = match snapshot_rx.try_recv() {
Ok(message) => {
MESSAGE_QUEUE_SIZE.dec();
metrics::message_queue_size_dec();
message
}
Err(crossbeam_channel::TryRecvError::Empty) => {
Expand Down
52 changes: 42 additions & 10 deletions yellowstone-grpc-geyser/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,38 @@ lazy_static::lazy_static! {
&["buildts", "git", "package", "proto", "rustc", "solana", "version"]
).unwrap();

pub static ref SLOT_STATUS: IntGaugeVec = IntGaugeVec::new(
static ref SLOT_STATUS: IntGaugeVec = IntGaugeVec::new(
Opts::new("slot_status", "Lastest received slot from Geyser"),
&["status"]
).unwrap();

pub static ref SLOT_STATUS_PLUGIN: IntGaugeVec = IntGaugeVec::new(
static ref SLOT_STATUS_PLUGIN: IntGaugeVec = IntGaugeVec::new(
Opts::new("slot_status_plugin", "Latest processed slot in the plugin to client queues"),
&["status"]
).unwrap();

pub static ref INVALID_FULL_BLOCKS: IntGaugeVec = IntGaugeVec::new(
static ref INVALID_FULL_BLOCKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("invalid_full_blocks_total", "Total number of fails on constructin full blocks"),
&["reason"]
).unwrap();

pub static ref MESSAGE_QUEUE_SIZE: IntGauge = IntGauge::new(
static ref MESSAGE_QUEUE_SIZE: IntGauge = IntGauge::new(
"message_queue_size", "Size of geyser message queue"
).unwrap();

pub static ref CONNECTIONS_TOTAL: IntGauge = IntGauge::new(
static ref CONNECTIONS_TOTAL: IntGauge = IntGauge::new(
"connections_total", "Total number of connections to gRPC service"
).unwrap();

static ref SUBSCRIPTIONS_TOTAL: IntGaugeVec = IntGaugeVec::new(
Opts::new("subscriptions_total", "Total number of subscriptions to gRPC service"),
&["endpoint", "subscription"]
).unwrap();

static ref MISSED_STATUS_MESSAGE: IntCounterVec = IntCounterVec::new(
Opts::new("missed_status_message_total", "Number of missed messages by commitment"),
&["status"]
).unwrap();
}

#[derive(Debug)]
Expand Down Expand Up @@ -193,6 +198,7 @@ impl PrometheusService {
register!(MESSAGE_QUEUE_SIZE);
register!(CONNECTIONS_TOTAL);
register!(SUBSCRIPTIONS_TOTAL);
register!(MISSED_STATUS_MESSAGE);

VERSION
.with_label_values(&[
Expand Down Expand Up @@ -320,11 +326,7 @@ pub fn update_slot_status(status: SlotStatus, slot: u64) {

pub fn update_slot_plugin_status(status: CommitmentLevel, slot: u64) {
SLOT_STATUS_PLUGIN
.with_label_values(&[match status {
CommitmentLevel::Processed => "processed",
CommitmentLevel::Confirmed => "confirmed",
CommitmentLevel::Finalized => "finalized",
}])
.with_label_values(&[commitment_level_as_str(status)])
.set(slot as i64);
}

Expand All @@ -335,6 +337,22 @@ pub fn update_invalid_blocks(reason: impl AsRef<str>) {
INVALID_FULL_BLOCKS.with_label_values(&["all"]).inc();
}

pub fn message_queue_size_inc() {
MESSAGE_QUEUE_SIZE.inc()
}

pub fn message_queue_size_dec() {
MESSAGE_QUEUE_SIZE.dec()
}

pub fn connections_total_inc() {
CONNECTIONS_TOTAL.inc()
}

pub fn connections_total_dec() {
CONNECTIONS_TOTAL.dec()
}

pub fn update_subscriptions(endpoint: &str, old: Option<&Filter>, new: Option<&Filter>) {
for (multiplier, filter) in [(-1, old), (1, new)] {
if let Some(filter) = filter {
Expand All @@ -350,3 +368,17 @@ pub fn update_subscriptions(endpoint: &str, old: Option<&Filter>, new: Option<&F
}
}
}

pub fn missed_status_message_inc(status: CommitmentLevel) {
MISSED_STATUS_MESSAGE
.with_label_values(&[commitment_level_as_str(status)])
.inc()
}

const fn commitment_level_as_str(commitment: CommitmentLevel) -> &'static str {
match commitment {
CommitmentLevel::Processed => "processed",
CommitmentLevel::Confirmed => "confirmed",
CommitmentLevel::Finalized => "finalized",
}
}
6 changes: 3 additions & 3 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
config::Config,
grpc::{GrpcService, Message},
metrics::{self, PrometheusService, MESSAGE_QUEUE_SIZE},
metrics::{self, PrometheusService},
},
agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
Expand Down Expand Up @@ -36,7 +36,7 @@ pub struct PluginInner {
impl PluginInner {
fn send_message(&self, message: Message) {
if self.grpc_channel.send(Arc::new(message)).is_ok() {
MESSAGE_QUEUE_SIZE.inc();
metrics::message_queue_size_inc();
}
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ impl GeyserPlugin for Plugin {
if let Some(channel) = inner.snapshot_channel.lock().unwrap().as_ref() {
let message = Message::Account((account, slot, is_startup).into());
match channel.send(Box::new(message)) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Ok(()) => metrics::message_queue_size_inc(),
Err(_) => {
if !inner.snapshot_channel_closed.swap(true, Ordering::Relaxed) {
log::error!(
Expand Down

0 comments on commit 3a34d0b

Please sign in to comment.