Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flight sql do put handling, add bind parameter support to FlightSQL cli client #4797

Merged
merged 7 commits into from
Sep 18, 2023

Conversation

suremarc
Copy link
Contributor

@suremarc suremarc commented Sep 7, 2023

Which issue does this PR close?

Closes #4658 and closes #3598

Rationale for this change

DoPut requests with nonempty flight streams cannot be handled properly by the Rust FlightSQL server implementation in its current state.

What changes are included in this PR?

We change all DoPut methods on the FlightSqlService trait to accept a Peekable<Streaming<FlightData>> instead of a regular Streaming<FlightData>. We also finished the parameter binding functionality in the client, in order to test prepared statements properly.

Are there any user-facing changes?

Yes, there is unfortunately an API change for the FlightSqlService trait. I am open to alternatives, as it is probably possible to do evil things with Peekable, but I do not think it is possible to fix this without a breaking change.

@github-actions github-actions bot added arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate labels Sep 7, 2023
Comment on lines 555 to 564
fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
match err {
FlightError::Arrow(e) => e,
FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
FlightError::Tonic(status) => status_to_arrow_error(status),
FlightError::ProtocolError(e) => ArrowError::IpcError(e),
FlightError::DecodeError(s) => ArrowError::IpcError(s),
FlightError::ExternalError(e) => ArrowError::ExternalError(e),
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why we return ArrowError from the Flight client (instead of FlightError), but I am trying to keep this PR scoped, so I just decided to stay consistent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this should probably be FlightError

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, however, I'm not very well versed in FlightSQL and so have asked some potentially stupid questions

const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need multi thread here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really think so. I just followed what the existing test did, but I don't really know see why it should. I will try and see if it works without it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to work fine without it, so I just removed it

// To allow the first message to be reused by the `do_put` handler,
// we wrap this stream in a `Peekable` one, which allows us to peek at
// the first message without discarding it.
let mut request = request.map(futures::StreamExt::peekable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I am following correctly, the issue is do_put accepts a FlightData stream, but the first request in the stream will contain a FlightDescriptor in addition to potentially any data. I continue to be utterly baffled by the design of Flight 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is my understanding as well. Prior to this change, decoding a flight stream inside one of the do_put methods would give you an error like Received RecordBatch prior to schema

arrow-flight/src/sql/client.rs Outdated Show resolved Hide resolved
Comment on lines 555 to 564
fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
match err {
FlightError::Arrow(e) => e,
FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
FlightError::Tonic(status) => status_to_arrow_error(status),
FlightError::ProtocolError(e) => ArrowError::IpcError(e),
FlightError::DecodeError(s) => ArrowError::IpcError(s),
FlightError::ExternalError(e) => ArrowError::ExternalError(e),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this should probably be FlightError

.await
.map_err(flight_error_to_arrow_error)?;

self.flight_sql_client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears consistent with the FlightSQL specification, it uses do_put to bind the parameter arguments. What isn't clear to me is if the result should be being used in some way.

This would seem to imply some sort of server-side state which I had perhaps expected FlightSQL to not rely on

Copy link
Contributor Author

@suremarc suremarc Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we are in agreement about it implying server-side state. FWIW FlightSQL also supports transactions which I think (maybe wrongly) would also require state. There was also some discussion happening about adding new RPC's for managing session state at some point (like a close RPC or something)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a fundamental flaw in FlightSQL tbh, gRPC is not a connection-oriented protocol and so the lifetime of any server state is non-deterministic... I believe @alamb plans to start a discussion to see if we can't fix this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed apache/arrow#37720 and will circulate this around

@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
/// Implementors may override to handle additional calls to do_put()
async fn do_put_fallback(
&self,
_request: Request<Streaming<FlightData>>,
_request: Request<Peekable<Streaming<FlightData>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option might be to pass the first ticket request as a separate argument. I don't feel strongly either way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a tough decision for me. I prefer using Peekable as it can be used as if it were the original stream, but I hate the fact that we have to leak its usage. We could pass the first FlightData as a separate argument, but it would require the user to chain it with the stream, if they wanted to use any APIs expecting a stream of FlightData. So I think I would stick with Peekable in the absence of any preference from others.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This server is the scaffolding to help people build flightsql servers -- they can always use the raw FlightService if they prefer. Thus I think the change in API is less critical and given the requirements it seems inevitable.

I think the only thing we should try and improve in this PR is improving the documentation to explain why peekable is used somehow (to lower the cognative burden on people trying to use this).

One potential option to document this is rather than using Peekable<Streaming<...>> dirctly, would make our own wrapper, PeekableStreaming or something. While this would require duplicate a bunch of the peekable API, there would be a natural place to document what it was for and how to use it which I think would lower the barrier to usage.

For example:

/// A wrapper around `Streaming` that allows inspection of the first message. 
/// This is needed because sometimes the first request in the stream will contain 
/// a [`FlightDescriptor`] in addition to potentially any data and the dispatch logic
/// must inspect this information. 
///
/// # example:
/// <show an example here of calling `into_inner()` to get the original data back
struct PeekableStreaming {
  inner: Peekable<Streaming<FlightData>>
}

impl PeekableStreaming {
  /// return the inner stream
  pub fn into_inner(self) -> Streaming<FlightData> { self.inner.into_inner() }
...
}

We could also potentially use something like BoxStream<FlightData> but that would lose the gRPC specific stuff like status codes and trailers exposed by Streaming as well as being an API change as well.

Thus I think this design is the best of several less than ideal solutions. To proceed perhaps we can add some documentation on the do_*_fallback methods that mentions the stream comes from peekable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new type, PeekableFlightDataStream, which exposes into_inner and peek, similarly to Peekable. I think this is a good enough subset of functionality for FlightSQL use cases, and if users need access to more of the lower-level functionality of Peekable, they can call PeekableFlightDataStream::into_peekable.

@suremarc suremarc requested a review from tustvold September 8, 2023 14:54
Comment on lines -454 to +461
let cmd = CommandPreparedStatementQuery {
self.write_bind_params().await?;

let cmd = CommandPreparedStatementUpdate {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also forgot to mention, I think this was a bug in the existing implementation. ExecuteUpdate should be performed with a CommandPreparedStatementUpdate command, not a CommandPreparedStatementQuery.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb alamb changed the title Fix flight sql do put handling Fix flight sql do put handling, add bind parameter support to FlightSQL cli client Sep 14, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @suremarc and @tustvold -- I think this PR is pretty close to ready. I left some comments: I think we need to document / explain the peekable use better (I left some suggestions) and get CI passing but then we should be ready to go

Comment on lines -454 to +461
let cmd = CommandPreparedStatementQuery {
self.write_bind_params().await?;

let cmd = CommandPreparedStatementUpdate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.arg(addr.port().to_string())
.arg("prepared-statement-query")
.arg(PREPARED_QUERY)
.args(["-p", "$1=string"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for the tests

@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
/// Implementors may override to handle additional calls to do_put()
async fn do_put_fallback(
&self,
_request: Request<Streaming<FlightData>>,
_request: Request<Peekable<Streaming<FlightData>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This server is the scaffolding to help people build flightsql servers -- they can always use the raw FlightService if they prefer. Thus I think the change in API is less critical and given the requirements it seems inevitable.

I think the only thing we should try and improve in this PR is improving the documentation to explain why peekable is used somehow (to lower the cognative burden on people trying to use this).

One potential option to document this is rather than using Peekable<Streaming<...>> dirctly, would make our own wrapper, PeekableStreaming or something. While this would require duplicate a bunch of the peekable API, there would be a natural place to document what it was for and how to use it which I think would lower the barrier to usage.

For example:

/// A wrapper around `Streaming` that allows inspection of the first message. 
/// This is needed because sometimes the first request in the stream will contain 
/// a [`FlightDescriptor`] in addition to potentially any data and the dispatch logic
/// must inspect this information. 
///
/// # example:
/// <show an example here of calling `into_inner()` to get the original data back
struct PeekableStreaming {
  inner: Peekable<Streaming<FlightData>>
}

impl PeekableStreaming {
  /// return the inner stream
  pub fn into_inner(self) -> Streaming<FlightData> { self.inner.into_inner() }
...
}

We could also potentially use something like BoxStream<FlightData> but that would lose the gRPC specific stuff like status codes and trailers exposed by Streaming as well as being an API change as well.

Thus I think this design is the best of several less than ideal solutions. To proceed perhaps we can add some documentation on the do_*_fallback methods that mentions the stream comes from peekable

.await
.map_err(flight_error_to_arrow_error)?;

self.flight_sql_client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed apache/arrow#37720 and will circulate this around

@suremarc suremarc requested a review from alamb September 15, 2023 20:17
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is really nicely done -- thank you @suremarc


/// A wrapper around [`Streaming<FlightData>`] that allows "peeking" at the
/// message at the front of the stream without consuming it.
/// This is needed because sometimes the first message in the stream will contain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit 47e8a8d into apache:master Sep 18, 2023
12 checks passed
@alamb alamb added the api-change Changes to the arrow API label Sep 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api-change Changes to the arrow API arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate
Projects
None yet
3 participants