Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Feb 25, 2025
1 parent 5577abd commit 4d500a8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
4 changes: 2 additions & 2 deletions vortexor/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.takes_value(true)
.help(
"Redirect logging to the specified file, '-' for standard error. Sending the \
SIGUSR1 signal to the vortexor process will cause it to re-open the log file",
SIGUSR1 signal to the vortexor process will cause it to re-open the log file.",
),
)
.arg(
Expand All @@ -191,6 +191,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.takes_value(true)
.multiple(true)
.validator(solana_net_utils::is_host_port)
.help("The destination validator address to which the vortexor will forward transaction"),
.help("The destination validator address to which the vortexor will forward transactions."),
)
}
2 changes: 1 addition & 1 deletion vortexor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub fn main() {
};
let _logger_thread = redirect_stderr_to_file(logfile);

info!("{} {}", crate_name!(), solana_version);
info!("{} {solana_version}", crate_name!());
info!(
"Starting vortexor {} with: {:#?}",
identity_keypair.pubkey(),
Expand Down
37 changes: 23 additions & 14 deletions vortexor/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ impl PacketBatchSender {
Ok(())
}

/// Receive verified packets from the channel `packet_batch_receiver`
/// and send them to the desintations.
fn recv_send(
send_sock: UdpSocket,
packet_batch_receiver: BankingPacketReceiver,
Expand All @@ -71,22 +73,26 @@ impl PacketBatchSender {
destinations: Arc<RwLock<Vec<SocketAddr>>>,
) {
loop {
let destinations = destinations.read().unwrap().clone();
match Self::receive_until(packet_batch_receiver.clone(), recv_timeout, batch_size) {
Ok((_packet_count, packet_batches)) => {
// Send out packet batches
Ok((packet_count, packet_batches)) => {
trace!("Received packet counts: {}", packet_count);
// Collect all packets once for all destinations
let mut packets: Vec<&[u8]> = Vec::new();

let destinations = destinations.read().unwrap();
for destination in destinations.iter() {
let mut packets: Vec<(&[u8], &SocketAddr)> = Vec::new();
for batch in &packet_batches {
for packet_batch in batch.iter() {
for packet in packet_batch {
packets.push(packet.data(0..).unwrap());
}
}
}

packet_batches.iter().for_each(|batch| {
batch.iter().for_each(|packet_batch| {
for packet in packet_batch.iter() {
packets.push((packet.data(0..).unwrap(), destination));
}
});
});
let _result = batch_send(&send_sock, packets.into_iter());
// Send all packets to each destination
for destination in &destinations {
let packet_refs: Vec<(&[u8], &SocketAddr)> =
packets.iter().map(|data| (*data, destination)).collect();
let _result = batch_send(&send_sock, packet_refs.into_iter());
}
}
Err(err) => match err {
Expand Down Expand Up @@ -119,7 +125,10 @@ impl PacketBatchSender {

while let Ok(message) = packet_batch_receiver.try_recv() {
let packet_batches = &message;
trace!("got more packet batches in packet receiver");
trace!(
"Got more packet batches in packet receiver: {}",
packet_batches.len()
);
num_packets_received
.checked_add(
packet_batches
Expand Down

0 comments on commit 4d500a8

Please sign in to comment.