From ff653408156b78027a97a7f32fad0fe8a19c2da6 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Mon, 9 Dec 2024 10:02:55 -0800 Subject: [PATCH] Provide backpressure on crypto binary We've been reading from a file faster than we were encrypting. That caused sender buffer to grow infinitely. --- ipa-core/src/cli/crypto/hybrid_encrypt.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ipa-core/src/cli/crypto/hybrid_encrypt.rs b/ipa-core/src/cli/crypto/hybrid_encrypt.rs index 818ae8c2f..1a60d6944 100644 --- a/ipa-core/src/cli/crypto/hybrid_encrypt.rs +++ b/ipa-core/src/cli/crypto/hybrid_encrypt.rs @@ -4,7 +4,7 @@ use std::{ fs::{read_to_string, File, OpenOptions}, io::{BufWriter, Write}, path::{Path, PathBuf}, - sync::mpsc::{channel, Sender}, + sync::mpsc::SyncSender, thread, thread::JoinHandle, time::Instant, @@ -109,20 +109,20 @@ impl HybridEncryptArgs { /// A thread-per-core pool responsible for encrypting reports in parallel. /// This pool is shared across all writers to reduce the number of context switches. struct EncryptorPool { - pool: Vec<(Sender, JoinHandle)>, + pool: Vec<(SyncSender, JoinHandle)>, next_worker: usize, } impl EncryptorPool { pub fn with_worker_threads( thread_count: usize, - file_writer: [Sender; 3], + file_writer: [SyncSender; 3], key_registries: [KeyRegistry; 3], ) -> Self { Self { pool: (0..thread_count) .map(move |i| { - let (tx, rx) = channel::(); + let (tx, rx) = std::sync::mpsc::sync_channel::(65535); let key_registries = key_registries.clone(); let file_writer = file_writer.clone(); ( @@ -234,13 +234,13 @@ impl ReportWriter { /// just the index of file input row that guarantees consistency /// of shares written across 3 files struct FileWriteWorker { - sender: Sender, + sender: SyncSender, handle: JoinHandle, } impl FileWriteWorker { pub fn new(file: File) -> Self { - let (tx, rx) = std::sync::mpsc::channel(); + let (tx, rx) = std::sync::mpsc::sync_channel(65535); Self { sender: tx, handle: thread::spawn(move || {