Skip to content

Commit b0f7090

Browse files
aradwanntillrohrmann
authored andcommitted
Resolve RUSTSEC-2023-0086
upgrade API breaking arrow dependencies fix SessionStateBuilder deprecation warnings replace closure with function remove unused tonic-0-11 cargo update upgrade arrow upgrade arrow convert This fixes #2004.
1 parent 86e6966 commit b0f7090

File tree

12 files changed

+496
-760
lines changed

12 files changed

+496
-760
lines changed

Cargo.lock

+467-658
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+5-6
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ restate-worker = { path = "crates/worker" }
7272
ahash = "0.8.5"
7373
anyhow = "1.0.68"
7474
arc-swap = "1.6"
75-
arrow = { version = "52.0.0", default-features = false }
76-
arrow-flight = { version = "52.0.0" }
75+
arrow = { version = "53.1.0", default-features = false }
76+
arrow-flight = { version = "53.1.0" }
7777
assert2 = "0.3.11"
7878
async-channel = "2.1.1"
7979
async-trait = "0.1.73"
@@ -83,7 +83,7 @@ bitflags = { version = "2.6.0" }
8383
bytes = { version = "1.7", features = ["serde"] }
8484
bytes-utils = "0.1.3"
8585
bytestring = { version = "1.2", features = ["serde"] }
86-
chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
86+
chrono = { version = "0.4.38", default-features = false, features = ["clock"] }
8787
comfy-table = { version = "7.1" }
8888
chrono-humanize = { version = "0.2.3" }
8989
clap = { version = "4", default-features = false }
@@ -92,13 +92,13 @@ cling = { version = "0.1", default-features = false, features = ["derive"] }
9292
criterion = "0.5"
9393
crossterm = { version = "0.27.0" }
9494
dashmap = { version = "6" }
95-
datafusion = { version = "40.0.0", default-features = false, features = [
95+
datafusion = { version = "42.0.0", default-features = false, features = [
9696
"crypto_expressions",
9797
"encoding_expressions",
9898
"regex_expressions",
9999
"unicode_expressions",
100100
] }
101-
datafusion-expr = { version = "40.0.0" }
101+
datafusion-expr = { version = "42.0.0" }
102102
derive_builder = "0.20.0"
103103
derive_more = { version = "1", features = ["full"] }
104104
dialoguer = { version = "0.11.0" }
@@ -186,7 +186,6 @@ tonic = { version = "0.12.3", default-features = false }
186186
tonic-reflection = { version = "0.12.3" }
187187
tonic-health = { version = "0.12.3" }
188188
tonic-build = { version = "0.12.3" }
189-
tonic-0-11 = { package = "tonic", version = "0.11.0", default-features = false }
190189
tower = "0.4"
191190
tower-http = { version = "0.5.2", default-features = false }
192191
tracing = "0.1"

cli/Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ restate-types = { workspace = true }
2323

2424
anyhow = { workspace = true }
2525
arc-swap = { workspace = true }
26-
arrow = { version = "51.0.0", features = ["ipc", "prettyprint", "json"] }
27-
arrow_convert = { version = "0.6.6" }
26+
arrow = { version = "53.1.0", features = ["ipc", "prettyprint", "json"] }
27+
arrow_convert = { version = "0.7.2" }
2828
axum = { workspace = true, default-features = false, features = ["http1", "http2", "query", "tokio"] }
2929
bytes = { workspace = true }
3030
base62 = { version = "2.0.2" }

crates/admin/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ serde_with = { workspace = true }
5353
thiserror = { workspace = true }
5454
tokio = { workspace = true }
5555
tonic = { workspace = true, features = ["transport", "codegen", "prost", "gzip"] }
56-
tonic-0-11 = { workspace = true }
5756
tower = { workspace = true, features = ["load-shed", "limit"] }
5857
tracing = { workspace = true }
5958

crates/admin/src/storage_query/query.rs

+1-41
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
// by the Apache License, Version 2.0.
1010

1111
use std::pin::Pin;
12-
use std::str::FromStr;
1312
use std::sync::Arc;
1413
use std::task::{Context, Poll};
1514

@@ -36,8 +35,6 @@ use restate_core::network::protobuf::node_svc::StorageQueryRequest;
3635
use schemars::JsonSchema;
3736
use serde::Deserialize;
3837
use serde_with::serde_as;
39-
use tonic::metadata::{KeyAndValueRef, MetadataMap};
40-
use tonic::Status;
4138

4239
use super::error::StorageQueryError;
4340
use crate::state::QueryServiceState;
@@ -81,7 +78,7 @@ pub async fn query(
8178
data_body: response.data,
8279
..FlightData::default()
8380
})
84-
.map_err(|status| FlightError::from(tonic_status_012_to_011(status))),
81+
.map_err(FlightError::from),
8582
);
8683

8784
// create a stream without LargeUtf8 or LargeBinary columns as JS doesn't support these yet
@@ -270,40 +267,3 @@ impl Stream for ConvertRecordBatchStream {
270267
}
271268
}
272269
}
273-
274-
// todo: Remove once arrow-flight works with tonic 0.12
275-
fn tonic_status_012_to_011(status: Status) -> tonic_0_11::Status {
276-
let code = tonic_0_11::Code::from(status.code() as i32);
277-
let message = status.message().to_owned();
278-
let details = Bytes::copy_from_slice(status.details());
279-
let metadata = tonic_metadata_map_012_to_011(status.metadata());
280-
tonic_0_11::Status::with_details_and_metadata(code, message, details, metadata)
281-
}
282-
283-
// todo: Remove once arrow-flight works with tonic 0.12
284-
fn tonic_metadata_map_012_to_011(metadata_map: &MetadataMap) -> tonic_0_11::metadata::MetadataMap {
285-
let mut resulting_metadata_map =
286-
tonic_0_11::metadata::MetadataMap::with_capacity(metadata_map.len());
287-
for key_value in metadata_map.iter() {
288-
match key_value {
289-
KeyAndValueRef::Ascii(key, value) => {
290-
// ignore metadata map entries if conversion fails
291-
if let Ok(value) =
292-
tonic_0_11::metadata::MetadataValue::from_str(value.to_str().unwrap_or(""))
293-
{
294-
if let Ok(key) = tonic_0_11::metadata::MetadataKey::from_str(key.as_str()) {
295-
resulting_metadata_map.insert(key, value);
296-
}
297-
}
298-
}
299-
KeyAndValueRef::Binary(key, value) => {
300-
if let Ok(key) = tonic_0_11::metadata::MetadataKey::from_bytes(key.as_ref()) {
301-
let value = tonic_0_11::metadata::MetadataValue::from_bytes(value.as_ref());
302-
resulting_metadata_map.insert_bin(key, value);
303-
}
304-
}
305-
}
306-
}
307-
308-
resulting_metadata_map
309-
}

crates/errors/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ include_doc = ["termimad"]
1414
[dependencies]
1515
codederror = { workspace = true }
1616
paste = { workspace = true }
17-
termimad = { version = "0.23", optional = true }
17+
termimad = { version = "0.30.0", optional = true }
1818
tracing = { workspace = true }
1919

2020
[dev-dependencies]

crates/node/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ tokio-stream = { workspace = true }
6767
tokio-util = { workspace = true }
6868
tonic = { workspace = true }
6969
tonic-reflection = { workspace = true }
70-
tonic-0-11 = { workspace = true }
7170
tower = { workspace = true }
7271
tower-http = { workspace = true, features = ["trace"] }
7372
tracing = { workspace = true }

crates/node/src/network_server/handler/node.rs

+2-39
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
use arrow_flight::encode::FlightDataEncoderBuilder;
1212
use arrow_flight::error::FlightError;
13-
use bytes::Bytes;
1413
use futures::stream::BoxStream;
1514
use futures::TryStreamExt;
1615
use restate_core::network::protobuf::node_svc::node_svc_server::NodeSvc;
@@ -21,10 +20,8 @@ use restate_core::network::{ConnectionManager, GrpcConnector};
2120
use restate_core::{metadata, TaskCenter};
2221
use restate_types::protobuf::common::NodeStatus;
2322
use restate_types::protobuf::node::Message;
24-
use std::str::FromStr;
2523
use tokio_stream::StreamExt;
26-
use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
27-
use tonic::{Code, Request, Response, Status, Streaming};
24+
use tonic::{Request, Response, Status, Streaming};
2825

2926
use crate::network_server::WorkerDependencies;
3027

@@ -133,43 +130,9 @@ fn flight_error_to_tonic_status(err: FlightError) -> Status {
133130
match err {
134131
FlightError::Arrow(e) => Status::internal(e.to_string()),
135132
FlightError::NotYetImplemented(e) => Status::internal(e),
136-
FlightError::Tonic(status) => tonic_status_010_to_012(status),
133+
FlightError::Tonic(status) => status,
137134
FlightError::ProtocolError(e) => Status::internal(e),
138135
FlightError::DecodeError(e) => Status::internal(e),
139136
FlightError::ExternalError(e) => Status::internal(e.to_string()),
140137
}
141138
}
142-
143-
// todo: Remove once arrow-flight works with tonic 0.12
144-
fn tonic_status_010_to_012(status: tonic_0_11::Status) -> Status {
145-
let code = Code::from(status.code() as i32);
146-
let message = status.message().to_owned();
147-
let details = Bytes::copy_from_slice(status.details());
148-
let metadata = tonic_metadata_map_010_to_012(status.metadata());
149-
Status::with_details_and_metadata(code, message, details, metadata)
150-
}
151-
152-
// todo: Remove once arrow-flight works with tonic 0.12
153-
fn tonic_metadata_map_010_to_012(metadata_map: &tonic_0_11::metadata::MetadataMap) -> MetadataMap {
154-
let mut resulting_metadata_map = MetadataMap::with_capacity(metadata_map.len());
155-
for key_value in metadata_map.iter() {
156-
match key_value {
157-
tonic_0_11::metadata::KeyAndValueRef::Ascii(key, value) => {
158-
// ignore metadata map entries if conversion fails
159-
if let Ok(value) = MetadataValue::from_str(value.to_str().unwrap_or("")) {
160-
if let Ok(key) = MetadataKey::from_str(key.as_str()) {
161-
resulting_metadata_map.insert(key, value);
162-
}
163-
}
164-
}
165-
tonic_0_11::metadata::KeyAndValueRef::Binary(key, value) => {
166-
if let Ok(key) = MetadataKey::from_bytes(key.as_ref()) {
167-
let value = MetadataValue::from_bytes(value.as_ref());
168-
resulting_metadata_map.insert_bin(key, value);
169-
}
170-
}
171-
}
172-
}
173-
174-
resulting_metadata_map
175-
}

crates/storage-query-datafusion/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ ahash = { workspace = true }
2424
async-trait = { workspace = true }
2525
bytes = { workspace = true }
2626
bytestring = { workspace = true }
27-
chrono = { version = "0.4.26", default-features = false, features = ["clock"] }
27+
chrono = { workspace = true }
2828
codederror = { workspace = true }
2929
datafusion = { workspace = true }
3030
derive_more = { workspace = true }

crates/storage-query-datafusion/src/context.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ use std::sync::Arc;
1515
use async_trait::async_trait;
1616
use codederror::CodedError;
1717
use datafusion::error::DataFusionError;
18-
use datafusion::execution::context::{SQLOptions, SessionState};
18+
use datafusion::execution::context::SQLOptions;
1919
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
20+
use datafusion::execution::SessionStateBuilder;
21+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
2022
use datafusion::physical_plan::SendableRecordBatchStream;
2123
use datafusion::prelude::{SessionConfig, SessionContext};
2224

@@ -188,7 +190,10 @@ impl QueryContext {
188190
//
189191
// build the state
190192
//
191-
let mut state = SessionState::new_with_config_rt(session_config, runtime);
193+
let mut state_builder = SessionStateBuilder::new()
194+
.with_config(session_config)
195+
.with_runtime_env(runtime)
196+
.with_default_features();
192197

193198
// Rewrite the logical plan, to transparently add a 'partition_key' column to Join's
194199
// To tables that have a partition key in their schema.
@@ -200,7 +205,7 @@ impl QueryContext {
200205
// 'SELECT b.service_key FROM sys_invocation_status a JOIN state b on a.target_service_key = b.service_key AND a.partition_key = b.partition_key'
201206
//
202207
// This would be used by the SymmetricHashJoin as a watermark.
203-
state.add_analyzer_rule(Arc::new(
208+
state_builder = state_builder.with_analyzer_rule(Arc::new(
204209
analyzer::UseSymmetricHashJoinWhenPartitionKeyIsPresent::new(),
205210
));
206211

@@ -219,10 +224,13 @@ impl QueryContext {
219224
// A far more involved but potentially more robust solution would be wrap the SymmetricHashJoin in a ProjectionExec
220225
// If this would become an issue for any reason, then we can explore that alternative.
221226
//
222-
let mut physical_optimizers = state.physical_optimizers().to_vec();
223-
physical_optimizers.insert(0, Arc::new(physical_optimizer::JoinRewrite::new()));
227+
let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> =
228+
vec![Arc::new(physical_optimizer::JoinRewrite::new())];
229+
230+
state_builder = state_builder.with_physical_optimizer_rules(physical_optimizers);
231+
232+
let state = state_builder.build();
224233

225-
state = state.with_physical_optimizer_rules(physical_optimizers);
226234
let ctx = SessionContext::new_with_state(state);
227235

228236
let sql_options = SQLOptions::new()

crates/storage-query-datafusion/src/table_providers.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use async_trait::async_trait;
1717
use datafusion::arrow::datatypes::SchemaRef;
1818
use datafusion::common::DataFusionError;
1919
use datafusion::datasource::{TableProvider, TableType};
20-
use datafusion::execution::context::{SessionState, TaskContext};
20+
use datafusion::execution::context::TaskContext;
2121
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
2222
use datafusion::physical_expr::EquivalenceProperties;
2323
use datafusion::physical_plan::{
@@ -75,7 +75,7 @@ where
7575

7676
async fn scan(
7777
&self,
78-
_state: &SessionState,
78+
_state: &(dyn datafusion::catalog::Session),
7979
projection: Option<&Vec<usize>>,
8080
_filters: &[Expr],
8181
_limit: Option<usize>,
@@ -239,7 +239,7 @@ impl TableProvider for GenericTableProvider {
239239

240240
async fn scan(
241241
&self,
242-
_state: &SessionState,
242+
_state: &(dyn datafusion::catalog::Session),
243243
projection: Option<&Vec<usize>>,
244244
filters: &[Expr],
245245
limit: Option<usize>,

deny.toml

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ version = 2
66
yanked = "deny"
77
ignore = [
88
{ id = "RUSTSEC-2024-0370", reason = "crate is unmaintained. This needs `arrow_convert` to use an alternative to `err-derive`" },
9-
{ id = "RUSTSEC-2023-0086", reason = "lexical-core pending Arrow update https://github.com/restatedev/restate/issues/1966" }
109
]
1110

1211

0 commit comments

Comments
 (0)