diff --git a/crates/store/re_protos/proto/rerun/v0/remote_store.proto b/crates/store/re_protos/proto/rerun/v0/remote_store.proto index a1d420c8f0ea8..851d5b508a36e 100644 --- a/crates/store/re_protos/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_protos/proto/rerun/v0/remote_store.proto @@ -12,6 +12,9 @@ service StorageNode { rpc CreateIndex(CreateIndexRequest) returns (CreateIndexResponse) {} rpc ReIndex(ReIndexRequest) returns (ReIndexResponse) {} + rpc GetChunkIds(GetChunkIdsRequest) returns (stream GetChunkIdsResponse) {} + rpc GetChunks(GetChunksRequest) returns (stream rerun.common.v0.RerunChunk) {} + // The response to `SearchIndex` a RecordBatch with 3 columns: // - 'resource_id' column with the id of the resource // - timepoint column with the values representing the points in time @@ -46,6 +49,31 @@ message DataframePart { bytes payload = 1000; } +// ---------------- GetChunkIds ------------------ + +message GetChunkIdsRequest { + // recording id from which we're want to fetch the chunk ids + rerun.common.v0.RecordingId recording_id = 1; + // timeline for which we specify the time range + rerun.common.v0.IndexColumnSelector time_index = 2; + // time range for which we want to fetch the chunk ids + rerun.common.v0.TimeRange time_range = 3; +} + +message GetChunkIdsResponse { + // a batch of chunk ids for chunks that are within the specified time range + repeated rerun.common.v0.Tuid chunk_ids = 1; +} + +// ---------------- GetChunk --------------------- + +message GetChunksRequest { + // recording id from which we're want to fetch the chunk ids + rerun.common.v0.RecordingId recording_id = 1; + // batch of chunk ids for which we want to stream back chunks + repeated rerun.common.v0.Tuid chunk_ids = 2; +} + // ---------------- CreateIndex ------------------ // used to define which column we want to index diff --git a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs index e862088c96945..c861a5024c8b8 100644 --- a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs @@ -19,6 +19,63 @@ impl ::prost::Name for DataframePart { "/rerun.remote_store.v0.DataframePart".into() } } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetChunkIdsRequest { + /// recording id from which we're want to fetch the chunk ids + #[prost(message, optional, tag = "1")] + pub recording_id: ::core::option::Option, + /// timeline for which we specify the time range + #[prost(message, optional, tag = "2")] + pub time_index: ::core::option::Option, + /// time range for which we want to fetch the chunk ids + #[prost(message, optional, tag = "3")] + pub time_range: ::core::option::Option, +} +impl ::prost::Name for GetChunkIdsRequest { + const NAME: &'static str = "GetChunkIdsRequest"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.GetChunkIdsRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.GetChunkIdsRequest".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetChunkIdsResponse { + /// a batch of chunk ids for chunks that are within the specified time range + #[prost(message, repeated, tag = "1")] + pub chunk_ids: ::prost::alloc::vec::Vec, +} +impl ::prost::Name for GetChunkIdsResponse { + const NAME: &'static str = "GetChunkIdsResponse"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.GetChunkIdsResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.GetChunkIdsResponse".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetChunksRequest { + /// recording id from which we're want to fetch the chunk ids + #[prost(message, optional, tag = "1")] + pub recording_id: ::core::option::Option, + /// batch of chunk ids for which we want to stream back chunks + #[prost(message, repeated, tag = "2")] + pub chunk_ids: ::prost::alloc::vec::Vec, +} +impl ::prost::Name for GetChunksRequest { + const NAME: &'static str = "GetChunksRequest"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.GetChunksRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.GetChunksRequest".into() + } +} /// used to define which column we want to index #[derive(Clone, PartialEq, ::prost::Message)] pub struct IndexColumn { @@ -848,6 +905,49 @@ pub mod storage_node_client { )); self.inner.unary(req, path, codec).await } + pub async fn get_chunk_ids( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.remote_store.v0.StorageNode/GetChunkIds", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.remote_store.v0.StorageNode", + "GetChunkIds", + )); + self.inner.server_streaming(req, path, codec).await + } + pub async fn get_chunks( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.remote_store.v0.StorageNode/GetChunks", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.remote_store.v0.StorageNode", + "GetChunks", + )); + self.inner.server_streaming(req, path, codec).await + } + /// The response to `SearchIndex` a RecordBatch with 3 columns: /// - 'resource_id' column with the id of the resource /// - timepoint column with the values representing the points in time @@ -1041,6 +1141,27 @@ pub mod storage_node_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the GetChunkIds method. + type GetChunkIdsStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + std::marker::Send + + 'static; + async fn get_chunk_ids( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the GetChunks method. + type GetChunksStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result< + super::super::super::common::v0::RerunChunk, + tonic::Status, + >, + > + std::marker::Send + + 'static; + async fn get_chunks( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; /// Server streaming response type for the SearchIndex method. type SearchIndexStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, @@ -1327,6 +1448,94 @@ pub mod storage_node_server { }; Box::pin(fut) } + "/rerun.remote_store.v0.StorageNode/GetChunkIds" => { + #[allow(non_camel_case_types)] + struct GetChunkIdsSvc(pub Arc); + impl + tonic::server::ServerStreamingService + for GetChunkIdsSvc + { + type Response = super::GetChunkIdsResponse; + type ResponseStream = T::GetChunkIdsStream; + type Future = + BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_chunk_ids(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetChunkIdsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/rerun.remote_store.v0.StorageNode/GetChunks" => { + #[allow(non_camel_case_types)] + struct GetChunksSvc(pub Arc); + impl + tonic::server::ServerStreamingService + for GetChunksSvc + { + type Response = super::super::super::common::v0::RerunChunk; + type ResponseStream = T::GetChunksStream; + type Future = + BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_chunks(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetChunksSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/rerun.remote_store.v0.StorageNode/SearchIndex" => { #[allow(non_camel_case_types)] struct SearchIndexSvc(pub Arc);