Skip to content

Commit

Permalink
Add IpcError variant to replace some uses of IoErrorthat don't ha…
Browse files Browse the repository at this point in the history
…ve underlying `std::io::Error` (#4726)
  • Loading branch information
alexandreyc authored Aug 24, 2023
1 parent 90449ff commit d9381c6
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 50 deletions.
2 changes: 1 addition & 1 deletion arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ mod tests {

fn endpoint(uri: String) -> Result<Endpoint, ArrowError> {
let endpoint = Endpoint::new(uri)
.map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
.map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
Expand Down
10 changes: 5 additions & 5 deletions arrow-flight/src/bin/flight_sql_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async fn setup_client(
let protocol = if args.tls { "https" } else { "http" };

let mut endpoint = Endpoint::new(format!("{}://{}:{}", protocol, args.host, port))
.map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
.map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
Expand All @@ -162,15 +162,15 @@ async fn setup_client(

if args.tls {
let tls_config = ClientTlsConfig::new();
endpoint = endpoint
.tls_config(tls_config)
.map_err(|_| ArrowError::IoError("Cannot create TLS endpoint".to_string()))?;
endpoint = endpoint.tls_config(tls_config).map_err(|_| {
ArrowError::IpcError("Cannot create TLS endpoint".to_string())
})?;
}

let channel = endpoint
.connect()
.await
.map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: {e}")))?;
.map_err(|e| ArrowError::IpcError(format!("Cannot connect to endpoint: {e}")))?;

let mut client = FlightSqlServiceClient::new(channel);
info!("connected");
Expand Down
16 changes: 10 additions & 6 deletions arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl FlightSqlServiceClient<Channel> {
.flight_client
.handshake(req)
.await
.map_err(|e| ArrowError::IoError(format!("Can't handshake {e}")))?;
.map_err(|e| ArrowError::IpcError(format!("Can't handshake {e}")))?;
if let Some(auth) = resp.metadata().get("authorization") {
let auth = auth.to_str().map_err(|_| {
ArrowError::ParseError("Can't read auth header".to_string())
Expand Down Expand Up @@ -390,16 +390,20 @@ impl FlightSqlServiceClient<Channel> {
) -> Result<tonic::Request<T>, ArrowError> {
for (k, v) in &self.headers {
let k = AsciiMetadataKey::from_str(k.as_str()).map_err(|e| {
ArrowError::IoError(format!("Cannot convert header key \"{k}\": {e}"))
ArrowError::ParseError(format!("Cannot convert header key \"{k}\": {e}"))
})?;
let v = v.parse().map_err(|e| {
ArrowError::IoError(format!("Cannot convert header value \"{v}\": {e}"))
ArrowError::ParseError(format!(
"Cannot convert header value \"{v}\": {e}"
))
})?;
req.metadata_mut().insert(k, v);
}
if let Some(token) = &self.token {
let val = format!("Bearer {token}").parse().map_err(|e| {
ArrowError::IoError(format!("Cannot convert token to header value: {e}"))
ArrowError::ParseError(format!(
"Cannot convert token to header value: {e}"
))
})?;
req.metadata_mut().insert("authorization", val);
}
Expand Down Expand Up @@ -504,11 +508,11 @@ impl PreparedStatement<Channel> {
}

fn decode_error_to_arrow_error(err: prost::DecodeError) -> ArrowError {
ArrowError::IoError(err.to_string())
ArrowError::IpcError(err.to_string())
}

fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
ArrowError::IoError(format!("{status:?}"))
ArrowError::IpcError(format!("{status:?}"))
}

// A polymorphic structure to natively represent different types of data contained in `FlightData`
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/tests/encode_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ async fn test_mismatched_schema_message() {
do_test(
make_primitive_batch(5),
make_dictionary_batch(3),
"Error decoding ipc RecordBatch: Io error: Invalid data for schema",
"Error decoding ipc RecordBatch: Schema error: Invalid data for schema",
)
.await;

Expand Down
4 changes: 2 additions & 2 deletions arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowErr
if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
Ok(schema)
} else {
Err(ArrowError::IoError(
Err(ArrowError::ParseError(
"Unable to get head as schema".to_string(),
))
}
} else {
Err(ArrowError::IoError(
Err(ArrowError::ParseError(
"Unable to get root as message".to_string(),
))
}
Expand Down
54 changes: 28 additions & 26 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];

let dict_id = field.dict_id().ok_or_else(|| {
ArrowError::IoError(format!("Field {field} does not have dict id"))
ArrowError::ParseError(format!("Field {field} does not have dict id"))
})?;

let value_array =
reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
ArrowError::IoError(format!(
ArrowError::ParseError(format!(
"Cannot find a dictionary batch with dict id: {dict_id}"
))
})?;
Expand Down Expand Up @@ -193,7 +193,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
let null_count = node.null_count();

if length != null_count {
return Err(ArrowError::IoError(format!(
return Err(ArrowError::SchemaError(format!(
"Field {field} of NullArray has unequal null_count {null_count} and len {length}"
)));
}
Expand Down Expand Up @@ -325,7 +325,7 @@ impl<'a> ArrayReader<'a> {

fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
self.nodes.next().ok_or_else(|| {
ArrowError::IoError(format!(
ArrowError::SchemaError(format!(
"Invalid data for schema. {} refers to node not found in schema",
field
))
Expand Down Expand Up @@ -402,10 +402,10 @@ pub fn read_record_batch(
metadata: &MetadataVersion,
) -> Result<RecordBatch, ArrowError> {
let buffers = batch.buffers().ok_or_else(|| {
ArrowError::IoError("Unable to get buffers from IPC RecordBatch".to_string())
ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
})?;
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
})?;
let batch_compression = batch.compression();
let compression = batch_compression
Expand Down Expand Up @@ -462,7 +462,7 @@ pub fn read_dictionary(
metadata: &crate::MetadataVersion,
) -> Result<(), ArrowError> {
if batch.isDelta() {
return Err(ArrowError::IoError(
return Err(ArrowError::InvalidArgumentError(
"delta dictionary batches not supported".to_string(),
));
}
Expand Down Expand Up @@ -569,14 +569,14 @@ impl<R: Read + Seek> FileReader<R> {
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
return Err(ArrowError::ParseError(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
return Err(ArrowError::ParseError(
"Arrow file does not contain correct footer".to_string(),
));
}
Expand All @@ -592,11 +592,11 @@ impl<R: Read + Seek> FileReader<R> {
reader.read_exact(&mut footer_data)?;

let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {err:?}"))
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
})?;

let blocks = footer.recordBatches().ok_or_else(|| {
ArrowError::IoError(
ArrowError::ParseError(
"Unable to get record batches from IPC Footer".to_string(),
)
})?;
Expand Down Expand Up @@ -633,7 +633,9 @@ impl<R: Read + Seek> FileReader<R> {
reader.read_exact(&mut block_data)?;

let message = crate::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {err:?}"))
ArrowError::ParseError(format!(
"Unable to get root as message: {err:?}"
))
})?;

match message.header_type() {
Expand All @@ -657,7 +659,7 @@ impl<R: Read + Seek> FileReader<R> {
)?;
}
t => {
return Err(ArrowError::IoError(format!(
return Err(ArrowError::ParseError(format!(
"Expecting DictionaryBatch in dictionary blocks, found {t:?}."
)));
}
Expand Down Expand Up @@ -705,7 +707,7 @@ impl<R: Read + Seek> FileReader<R> {
/// Sets the current block to the index, allowing random reads
pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
if index >= self.total_blocks {
Err(ArrowError::IoError(format!(
Err(ArrowError::InvalidArgumentError(format!(
"Cannot set batch to index {} from {} total batches",
index, self.total_blocks
)))
Expand All @@ -732,25 +734,25 @@ impl<R: Read + Seek> FileReader<R> {
let mut block_data = vec![0; meta_len as usize];
self.reader.read_exact(&mut block_data)?;
let message = crate::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {err:?}"))
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
})?;

// some old test data's footer metadata is not set, so we account for that
if self.metadata_version != crate::MetadataVersion::V1
&& message.version() != self.metadata_version
{
return Err(ArrowError::IoError(
return Err(ArrowError::IpcError(
"Could not read IPC message as metadata versions mismatch".to_string(),
));
}

match message.header_type() {
crate::MessageHeader::Schema => Err(ArrowError::IoError(
crate::MessageHeader::Schema => Err(ArrowError::IpcError(
"Not expecting a schema when messages are read".to_string(),
)),
crate::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
ArrowError::IpcError(
"Unable to read IPC message as record batch".to_string(),
)
})?;
Expand All @@ -774,7 +776,7 @@ impl<R: Read + Seek> FileReader<R> {
crate::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(format!(
t => Err(ArrowError::InvalidArgumentError(format!(
"Reading types other than record batches not yet supported, unable to read {t:?}"
))),
}
Expand Down Expand Up @@ -886,11 +888,11 @@ impl<R: Read> StreamReader<R> {
reader.read_exact(&mut meta_buffer)?;

let message = crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {err:?}"))
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})?;
// message header is a Schema, so read it
let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| {
ArrowError::IoError("Unable to read IPC message as schema".to_string())
ArrowError::ParseError("Unable to read IPC message as schema".to_string())
})?;
let schema = crate::convert::fb_to_schema(ipc_schema);

Expand Down Expand Up @@ -965,16 +967,16 @@ impl<R: Read> StreamReader<R> {

let vecs = &meta_buffer.to_vec();
let message = crate::root_as_message(vecs).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {err:?}"))
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})?;

match message.header_type() {
crate::MessageHeader::Schema => Err(ArrowError::IoError(
crate::MessageHeader::Schema => Err(ArrowError::IpcError(
"Not expecting a schema when messages are read".to_string(),
)),
crate::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
ArrowError::IpcError(
"Unable to read IPC message as record batch".to_string(),
)
})?;
Expand All @@ -986,7 +988,7 @@ impl<R: Read> StreamReader<R> {
}
crate::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
ArrowError::IoError(
ArrowError::IpcError(
"Unable to read IPC message as dictionary batch".to_string(),
)
})?;
Expand All @@ -1004,7 +1006,7 @@ impl<R: Read> StreamReader<R> {
crate::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(
t => Err(ArrowError::InvalidArgumentError(
format!("Reading types other than record batches not yet supported, unable to read {t:?} ")
)),
}
Expand Down
8 changes: 4 additions & 4 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ impl<W: Write> FileWriter<W> {
/// Write a record batch to the file
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
return Err(ArrowError::IpcError(
"Cannot write record batch to file writer as it is closed".to_string(),
));
}
Expand Down Expand Up @@ -794,7 +794,7 @@ impl<W: Write> FileWriter<W> {
/// Write footer and closing tag, then mark the writer as done
pub fn finish(&mut self) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
return Err(ArrowError::IpcError(
"Cannot write footer to file writer as it is closed".to_string(),
));
}
Expand Down Expand Up @@ -909,7 +909,7 @@ impl<W: Write> StreamWriter<W> {
/// Write a record batch to the stream
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
return Err(ArrowError::IpcError(
"Cannot write record batch to stream writer as it is closed".to_string(),
));
}
Expand All @@ -930,7 +930,7 @@ impl<W: Write> StreamWriter<W> {
/// Write continuation bytes, and mark the stream as done
pub fn finish(&mut self) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
return Err(ArrowError::IpcError(
"Cannot write footer to stream writer as it is closed".to_string(),
));
}
Expand Down
10 changes: 6 additions & 4 deletions arrow-schema/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub enum ArrowError {
DivideByZero,
CsvError(String),
JsonError(String),
IoError(String),
IoError(String, std::io::Error),
IpcError(String),
InvalidArgumentError(String),
ParquetError(String),
/// Error during import or export to/from the C Data Interface
Expand All @@ -53,7 +54,7 @@ impl ArrowError {

impl From<std::io::Error> for ArrowError {
fn from(error: std::io::Error) -> Self {
ArrowError::IoError(error.to_string())
ArrowError::IoError(error.to_string(), error)
}
}

Expand All @@ -65,7 +66,7 @@ impl From<std::string::FromUtf8Error> for ArrowError {

impl<W: Write> From<std::io::IntoInnerError<W>> for ArrowError {
fn from(error: std::io::IntoInnerError<W>) -> Self {
ArrowError::IoError(error.to_string())
ArrowError::IoError(error.to_string(), error.into())
}
}

Expand All @@ -84,7 +85,8 @@ impl Display for ArrowError {
ArrowError::DivideByZero => write!(f, "Divide by zero error"),
ArrowError::CsvError(desc) => write!(f, "Csv error: {desc}"),
ArrowError::JsonError(desc) => write!(f, "Json error: {desc}"),
ArrowError::IoError(desc) => write!(f, "Io error: {desc}"),
ArrowError::IoError(desc, _) => write!(f, "Io error: {desc}"),
ArrowError::IpcError(desc) => write!(f, "Ipc error: {desc}"),
ArrowError::InvalidArgumentError(desc) => {
write!(f, "Invalid argument error: {desc}")
}
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/ffi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ fn get_error_code(err: &ArrowError) -> i32 {
match err {
ArrowError::NotYetImplemented(_) => ENOSYS,
ArrowError::MemoryError(_) => ENOMEM,
ArrowError::IoError(_) => EIO,
ArrowError::IoError(_, _) => EIO,
_ => EINVAL,
}
}
Expand Down

0 comments on commit d9381c6

Please sign in to comment.