Skip to content

Commit

Permalink
Fix formatting in record_batch_stream_reader.cc based on CI failures.
Browse files Browse the repository at this point in the history
  • Loading branch information
kevingurney committed Dec 18, 2024
1 parent 63f6314 commit 481a095
Showing 1 changed file with 40 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include "arrow/io/file.h"
#include "arrow/matlab/error/error.h"
#include "arrow/matlab/tabular/proxy/record_batch.h"
#include "arrow/matlab/tabular/proxy/table.h"
#include "arrow/matlab/tabular/proxy/schema.h"
#include "arrow/matlab/tabular/proxy/table.h"
#include "arrow/util/utf8.h"

#include "libmexclass/proxy/ProxyManager.h"
Expand All @@ -39,7 +39,8 @@ RecordBatchStreamReader::RecordBatchStreamReader(
libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
namespace mda = ::matlab::data;
using RecordBatchStreamReaderProxy = arrow::matlab::io::ipc::proxy::RecordBatchStreamReader;
using RecordBatchStreamReaderProxy =
arrow::matlab::io::ipc::proxy::RecordBatchStreamReader;

const mda::StructArray opts = constructor_arguments[0];

Expand Down Expand Up @@ -74,12 +75,12 @@ void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& con
context.outputs[0] = schema_proxy_id_mda;
}

void RecordBatchStreamReader::readTable(
libmexclass::proxy::method::Context& context) {
void RecordBatchStreamReader::readTable(libmexclass::proxy::method::Context& context) {
namespace mda = ::matlab::data;
using TableProxy = arrow::matlab::tabular::proxy::Table;

MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(auto table, reader->ToTable(), context, error::IPC_TABLE_READ_FAILED);
MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(auto table, reader->ToTable(), context,
error::IPC_TABLE_READ_FAILED);
auto table_proxy = std::make_shared<TableProxy>(table);
const auto table_proxy_id = libmexclass::proxy::ProxyManager::manageProxy(table_proxy);

Expand All @@ -93,20 +94,27 @@ void RecordBatchStreamReader::readRecordBatch(
namespace mda = ::matlab::data;
using RecordBatchProxy = arrow::matlab::tabular::proxy::RecordBatch;
using namespace libmexclass::error;
// If we don't have a "pre-cached" record batch to return, then try reading another record batch
// from the IPC Stream. If there are no more record batches in the stream, then error.
// If we don't have a "pre-cached" record batch to return, then try reading another
// record batch from the IPC Stream. If there are no more record batches in the stream,
// then error.
if (!nextRecordBatch) {
MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(nextRecordBatch, reader->Next(), context, error::IPC_RECORD_BATCH_READ_FAILED);
MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(nextRecordBatch, reader->Next(), context,
error::IPC_RECORD_BATCH_READ_FAILED);
}
// Even if the read was "successful", the resulting record batch may be empty, signalling the end of the stream.
// Even if the read was "successful", the resulting record batch may be empty,
// signalling the end of the stream.
if (!nextRecordBatch) {
context.error = Error{"arrow:io:ipc:EndOfStream", "Reached end of Arrow IPC Stream. No more record batches to read."};
return;
context.error =
Error{"arrow:io:ipc:EndOfStream",
"Reached end of Arrow IPC Stream. No more record batches to read."};
return;
}
auto record_batch_proxy = std::make_shared<RecordBatchProxy>(nextRecordBatch);
const auto record_batch_proxy_id = libmexclass::proxy::ProxyManager::manageProxy(record_batch_proxy);
const auto record_batch_proxy_id =
libmexclass::proxy::ProxyManager::manageProxy(record_batch_proxy);
// Once we have "consumed" the next RecordBatch, set nextRecordBatch to nullptr
// so that the next call to hasNextRecordBatch correctly checks whether there are more record batches remaining in the IPC Stream.
// so that the next call to hasNextRecordBatch correctly checks whether there are more
// record batches remaining in the IPC Stream.
nextRecordBatch = nullptr;
mda::ArrayFactory factory;
const auto record_batch_proxy_id_mda = factory.createScalar(record_batch_proxy_id);
Expand All @@ -118,25 +126,25 @@ void RecordBatchStreamReader::hasNextRecordBatch(
namespace mda = ::matlab::data;
bool has_next_record_batch = true;
if (!nextRecordBatch) {
// Try to read another RecordBatch from the
// IPC Stream.
auto maybe_record_batch = reader->Next();
if (!maybe_record_batch.ok()) {
has_next_record_batch = false;
} else {
// If we read a RecordBatch successfully,
// then "cache" the RecordBatch
// so that we can return it on the next
// call to readRecordBatch.
nextRecordBatch = *maybe_record_batch;

// Even if the read was "successful", the resulting
// record batch may be empty, signaling that
// the end of the IPC stream has been reached.
if (!nextRecordBatch) {
has_next_record_batch = false;
}
}
// Try to read another RecordBatch from the
// IPC Stream.
auto maybe_record_batch = reader->Next();
if (!maybe_record_batch.ok()) {
has_next_record_batch = false;
} else {
// If we read a RecordBatch successfully,
// then "cache" the RecordBatch
// so that we can return it on the next
// call to readRecordBatch.
nextRecordBatch = *maybe_record_batch;

// Even if the read was "successful", the resulting
// record batch may be empty, signaling that
// the end of the IPC stream has been reached.
if (!nextRecordBatch) {
has_next_record_batch = false;
}
}
}

mda::ArrayFactory factory;
Expand Down

0 comments on commit 481a095

Please sign in to comment.