From f940c7f4367ee79b85d7c9f481104076dca352c8 Mon Sep 17 00:00:00 2001 From: nmcdonnell-kx Date: Fri, 5 Feb 2021 18:10:35 +0000 Subject: [PATCH] Add support for reading with mmap Read a single parquet column Improve temporal conversion performance --- examples/README.md | 4 +- examples/concrete_datatypes.q | 2 +- examples/inferred_schema.q | 2 +- examples/nested_datatypes.q | 2 +- examples/parameterized_datatypes.q | 2 +- examples/readme.q | 4 +- q/arrowkdb.q | 11 +- src/ArrayReader.cpp | 18 +- src/ArrayWriter.cpp | 18 +- src/DatatypeStore.cpp | 10 +- src/HelperFunctions.cpp | 258 ++++++++++++----------------- src/HelperFunctions.h | 68 ++++---- src/KdbOptions.h | 25 ++- src/TableData.cpp | 75 +++++++-- src/TableData.h | 31 +++- tests/test.t | 19 ++- 16 files changed, 313 insertions(+), 236 deletions(-) diff --git a/examples/README.md b/examples/README.md index 6d1155c..6b7e3c7 100644 --- a/examples/README.md +++ b/examples/README.md @@ -122,7 +122,7 @@ q)show system "ls inferred_schema.arrow" "inferred_schema.arrow" // Read the arrow file into another table -q)new_table:.arrowkdb.ipc.readArrowToTable["inferred_schema.arrow"] +q)new_table:.arrowkdb.ipc.readArrowToTable["inferred_schema.arrow";::] // Compare the kdb+ tables q)show table~new_table @@ -314,7 +314,7 @@ q)show schema~new_schema 1b // Read the array data back from the arrow file -q)new_array_data:.arrowkdb.ipc.readArrowData["constructed_schema.arrow"] +q)new_array_data:.arrowkdb.ipc.readArrowData["constructed_schema.arrow";::] // Compare the array data q)show array_data~new_array_data diff --git a/examples/concrete_datatypes.q b/examples/concrete_datatypes.q index d7bb739..3770724 100644 --- a/examples/concrete_datatypes.q +++ b/examples/concrete_datatypes.q @@ -97,7 +97,7 @@ show .arrowkdb.sc.equalSchemas[schema;new_schema] show schema~new_schema // Read the array data back and compare -new_array_data:.arrowkdb.ipc.readArrowData[filename]; +new_array_data:.arrowkdb.ipc.readArrowData[filename;::]; show array_data~new_array_data rm filename; diff --git a/examples/inferred_schema.q b/examples/inferred_schema.q index 67d0a2d..82c2228 100644 --- a/examples/inferred_schema.q +++ b/examples/inferred_schema.q @@ -58,7 +58,7 @@ filename:"inferred_schema.arrow"; show ls filename // Read the arrow file into another table -new_table:.arrowkdb.ipc.readArrowToTable["inferred_schema.arrow"]; +new_table:.arrowkdb.ipc.readArrowToTable["inferred_schema.arrow";::]; // Compare the kdb+ tables show table~new_table diff --git a/examples/nested_datatypes.q b/examples/nested_datatypes.q index 1b83264..16b02b1 100644 --- a/examples/nested_datatypes.q +++ b/examples/nested_datatypes.q @@ -115,7 +115,7 @@ show .arrowkdb.sc.equalSchemas[schema;new_schema] show schema~new_schema // Read the array data back and compare -new_array_data:.arrowkdb.ipc.readArrowData[filename]; +new_array_data:.arrowkdb.ipc.readArrowData[filename;::]; show array_data~new_array_data rm filename; diff --git a/examples/parameterized_datatypes.q b/examples/parameterized_datatypes.q index 9d6205e..7888f07 100644 --- a/examples/parameterized_datatypes.q +++ b/examples/parameterized_datatypes.q @@ -87,7 +87,7 @@ show .arrowkdb.sc.equalSchemas[schema;new_schema] show schema~new_schema // Read the array data back and compare -new_array_data:.arrowkdb.ipc.readArrowData[filename]; +new_array_data:.arrowkdb.ipc.readArrowData[filename;::]; show array_data~new_array_data rm filename; diff --git a/examples/readme.q b/examples/readme.q index 9ba23d9..4b24ea3 100644 --- a/examples/readme.q +++ b/examples/readme.q @@ -52,7 +52,7 @@ filename:"inferred_schema.arrow"; show ls filename // Read the arrow file into another table -new_table:.arrowkdb.ipc.readArrowToTable[filename]; +new_table:.arrowkdb.ipc.readArrowToTable[filename;::]; // Compare the kdb+ tables show table~new_table @@ -162,7 +162,7 @@ show .arrowkdb.sc.equalSchemas[schema;new_schema] show schema~new_schema // Read the array data back from the arrow file -new_array_data:.arrowkdb.ipc.readArrowData[filename]; +new_array_data:.arrowkdb.ipc.readArrowData[filename;::]; // Compare the array data show array_data~new_array_data diff --git a/q/arrowkdb.q b/q/arrowkdb.q index af9a94f..e795b7c 100644 --- a/q/arrowkdb.q +++ b/q/arrowkdb.q @@ -56,9 +56,9 @@ dt.getDictionaryDatatypes:`arrowkdb 2:(`getDictionaryDatatypes;1); dt.getChildFields:`arrowkdb 2:(`getChildFields;1); // datatype management: -dt.listDatatypes:`arrowkdb 2:(`listDatatypes;1); dt.printDatatype_:`arrowkdb 2:(`printDatatype;1); dt.printDatatype:{[x] -1 dt.printDatatype_[x];}; +dt.listDatatypes:`arrowkdb 2:(`listDatatypes;1); dt.removeDatatype:`arrowkdb 2:(`removeDatatype;1); dt.equalDatatypes:`arrowkdb 2:(`equalDatatypes;2); @@ -71,9 +71,9 @@ fd.fieldName:`arrowkdb 2:(`fieldName;1); fd.fieldDatatype:`arrowkdb 2:(`fieldDatatype;1); // field management: -fd.listFields:`arrowkdb 2:(`listFields;1); fd.printField_:`arrowkdb 2:(`printField;1); fd.printField:{[x] -1 fd.printField_[x];}; +fd.listFields:`arrowkdb 2:(`listFields;1); fd.removeField:`arrowkdb 2:(`removeField;1); fd.equalFields:`arrowkdb 2:(`equalFields;2); @@ -88,9 +88,9 @@ sc.inferSchema:`arrowkdb 2:(`inferSchema;1); sc.schemaFields:`arrowkdb 2:(`schemaFields;1); // schema management -sc.listSchemas:`arrowkdb 2:(`listSchemas;1); sc.printSchema_:`arrowkdb 2:(`printSchema;1); sc.printSchema:{[x] -1 sc.printSchema_[x];}; +sc.listSchemas:`arrowkdb 2:(`listSchemas;1); sc.removeSchema:`arrowkdb 2:(`removeSchema;1); sc.equalSchemas:`arrowkdb 2:(`equalSchemas;2); @@ -113,14 +113,15 @@ pq.writeParquetFromTable:{[filename;table;options] pq.writeParquet[filename;sc.i pq.readParquetSchema:`arrowkdb 2:(`readParquetSchema;1); pq.readParquetData:`arrowkdb 2:(`readParquetData;2); pq.readParquetToTable:{[filename;options] flip (fd.fieldName each sc.schemaFields[pq.readParquetSchema[filename]])!(pq.readParquetData[filename;options])}; +pq.readParquetColumn:`arrowkdb 2:(`readParquetColumn;2); // arrow files ipc.writeArrow:`arrowkdb 2:(`writeArrow;3); ipc.writeArrowFromTable:{[filename;table] ipc.writeArrow[filename;sc.inferSchema[table];value flip table]}; ipc.readArrowSchema:`arrowkdb 2:(`readArrowSchema;1); -ipc.readArrowData:`arrowkdb 2:(`readArrowData;1); -ipc.readArrowToTable:{[filename] flip (fd.fieldName each sc.schemaFields[ipc.readArrowSchema[filename]])!(ipc.readArrowData[filename])}; +ipc.readArrowData:`arrowkdb 2:(`readArrowData;2); +ipc.readArrowToTable:{[filename;options] flip (fd.fieldName each sc.schemaFields[ipc.readArrowSchema[filename]])!(ipc.readArrowData[filename;options])}; // arrow streams diff --git a/src/ArrayReader.cpp b/src/ArrayReader.cpp index 4bca462..bc35fd6 100644 --- a/src/ArrayReader.cpp +++ b/src/ArrayReader.cpp @@ -253,40 +253,45 @@ void AppendArray(std::shared_ptr array_data, K k_array, size_t& in } case arrow::Type::DATE32: { + TemporalConversion tc(array_data->type()); auto d32_array = std::static_pointer_cast(array_data); for (auto i = 0; i < d32_array->length(); ++i) - kI(k_array)[index++] = Date32_KDate(d32_array->Value(i)); + kI(k_array)[index++] = tc.ArrowToKdb(d32_array->Value(i)); break; } case arrow::Type::DATE64: { + TemporalConversion tc(array_data->type()); auto d64_array = std::static_pointer_cast(array_data); for (auto i = 0; i < d64_array->length(); ++i) - kJ(k_array)[index++] = Date64_KTimestamp(d64_array->Value(i)); + kJ(k_array)[index++] = tc.ArrowToKdb(d64_array->Value(i)); break; } case arrow::Type::TIMESTAMP: { + TemporalConversion tc(array_data->type()); auto ts_array = std::static_pointer_cast(array_data); auto timestamp_type = std::static_pointer_cast(ts_array->type()); for (auto i = 0; i < ts_array->length(); ++i) - kJ(k_array)[index++] = Timestamp_KTimestamp(timestamp_type, ts_array->Value(i)); + kJ(k_array)[index++] = tc.ArrowToKdb(ts_array->Value(i)); break; } case arrow::Type::TIME32: { + TemporalConversion tc(array_data->type()); auto t32_array = std::static_pointer_cast(array_data); auto time32_type = std::static_pointer_cast(t32_array->type()); for (auto i = 0; i < t32_array->length(); ++i) - kI(k_array)[index++] = Time32_KTime(time32_type, t32_array->Value(i)); + kI(k_array)[index++] = tc.ArrowToKdb(t32_array->Value(i)); break; } case arrow::Type::TIME64: { + TemporalConversion tc(array_data->type()); auto t64_array = std::static_pointer_cast(array_data); auto time64_type = std::static_pointer_cast(t64_array->type()); for (auto i = 0; i < t64_array->length(); ++i) - kJ(k_array)[index++] = Time64_KTimespan(time64_type, t64_array->Value(i)); + kJ(k_array)[index++] = tc.ArrowToKdb(t64_array->Value(i)); break; } case arrow::Type::DECIMAL: @@ -303,10 +308,11 @@ void AppendArray(std::shared_ptr array_data, K k_array, size_t& in } case arrow::Type::DURATION: { + TemporalConversion tc(array_data->type()); auto dur_array = std::static_pointer_cast(array_data); auto duration_type = std::static_pointer_cast(dur_array->type()); for (auto i = 0; i < dur_array->length(); ++i) - kJ(k_array)[index++] = Duration_KTimespan(duration_type, dur_array->Value(i)); + kJ(k_array)[index++] = tc.ArrowToKdb(dur_array->Value(i)); break; } case arrow::Type::INTERVAL_MONTHS: diff --git a/src/ArrayWriter.cpp b/src/ArrayWriter.cpp index 7f85ae8..bb8eda1 100644 --- a/src/ArrayWriter.cpp +++ b/src/ArrayWriter.cpp @@ -394,40 +394,45 @@ void PopulateBuilder(std::shared_ptr datatype, K k_array, arrow } case arrow::Type::DATE32: { + TemporalConversion tc(datatype); auto d32_builder = static_cast(builder); for (auto i = 0; i < k_array->n; ++i) - PARQUET_THROW_NOT_OK(d32_builder->Append(KDate_Date32(kI(k_array)[i]))); + PARQUET_THROW_NOT_OK(d32_builder->Append(tc.KdbToArrow(kI(k_array)[i]))); break; } case arrow::Type::DATE64: { + TemporalConversion tc(datatype); auto d64_builder = static_cast(builder); for (auto i = 0; i < k_array->n; ++i) - PARQUET_THROW_NOT_OK(d64_builder->Append(KTimestamp_Date64(kJ(k_array)[i]))); + PARQUET_THROW_NOT_OK(d64_builder->Append(tc.KdbToArrow(kJ(k_array)[i]))); break; } case arrow::Type::TIMESTAMP: { + TemporalConversion tc(datatype); auto ts_builder = static_cast(builder); auto timestamp_type = std::static_pointer_cast(datatype); for (auto i = 0; i < k_array->n; ++i) - PARQUET_THROW_NOT_OK(ts_builder->Append(KTimestamp_Timestamp(timestamp_type, kJ(k_array)[i]))); + PARQUET_THROW_NOT_OK(ts_builder->Append(tc.KdbToArrow(kJ(k_array)[i]))); break; } case arrow::Type::TIME32: { + TemporalConversion tc(datatype); auto t32_builder = static_cast(builder); auto time32_type = std::static_pointer_cast(datatype); for (auto i = 0; i < k_array->n; ++i) - PARQUET_THROW_NOT_OK(t32_builder->Append(KTime_Time32(time32_type, kI(k_array)[i]))); + PARQUET_THROW_NOT_OK(t32_builder->Append(tc.KdbToArrow(kI(k_array)[i]))); break; } case arrow::Type::TIME64: { + TemporalConversion tc(datatype); auto t64_builder = static_cast(builder); auto time64_type = std::static_pointer_cast(datatype); for (auto i = 0; i < k_array->n; ++i) - PARQUET_THROW_NOT_OK(t64_builder->Append(KTimespan_Time64(time64_type, kJ(k_array)[i]))); + PARQUET_THROW_NOT_OK(t64_builder->Append(tc.KdbToArrow(kJ(k_array)[i]))); break; } case arrow::Type::DECIMAL: @@ -446,10 +451,11 @@ void PopulateBuilder(std::shared_ptr datatype, K k_array, arrow } case arrow::Type::DURATION: { + TemporalConversion tc(datatype); auto dur_builder = static_cast(builder); auto duration_type = std::static_pointer_cast(datatype); for (auto i = 0; i < k_array->n; ++i) - PARQUET_THROW_NOT_OK(dur_builder->Append(KTimespan_Duration(duration_type, kJ(k_array)[i]))); + PARQUET_THROW_NOT_OK(dur_builder->Append(tc.KdbToArrow(kJ(k_array)[i]))); break; } case arrow::Type::INTERVAL_MONTHS: diff --git a/src/DatatypeStore.cpp b/src/DatatypeStore.cpp index 715fccc..29c1160 100644 --- a/src/DatatypeStore.cpp +++ b/src/DatatypeStore.cpp @@ -265,7 +265,10 @@ K time32(K time_unit) KDB_EXCEPTION_TRY; auto tu = kx::arrowkdb::ToTimeUnit(kx::arrowkdb::GetKdbString(time_unit)); - return ki(kx::arrowkdb::GetDatatypeStore()->Add(arrow::time32(tu))); + if (tu == arrow::TimeUnit::SECOND || tu == arrow::TimeUnit::MILLI) + return ki(kx::arrowkdb::GetDatatypeStore()->Add(arrow::time32(tu))); + else + return krr((S)"time32 time_unit not SECOND or MILLI"); KDB_EXCEPTION_CATCH; } @@ -278,7 +281,10 @@ K time64(K time_unit) KDB_EXCEPTION_TRY; auto tu = kx::arrowkdb::ToTimeUnit(kx::arrowkdb::GetKdbString(time_unit)); - return ki(kx::arrowkdb::GetDatatypeStore()->Add(arrow::time64(tu))); + if (tu == arrow::TimeUnit::MICRO || tu == arrow::TimeUnit::NANO) + return ki(kx::arrowkdb::GetDatatypeStore()->Add(arrow::time64(tu))); + else + return krr((S)"time64 time_unit not MICRO or NANO"); KDB_EXCEPTION_CATCH; } diff --git a/src/HelperFunctions.cpp b/src/HelperFunctions.cpp index 2761fa4..9045f85 100644 --- a/src/HelperFunctions.cpp +++ b/src/HelperFunctions.cpp @@ -7,161 +7,121 @@ namespace kx { namespace arrowkdb { -// Epoch / scaling constants -const static int32_t kdb_date_epoch_days = 10957; -const static int64_t kdb_timestamp_epoch_nano = 946684800000000000LL; -const static int64_t ns_us_scale = 1000LL; -const static int64_t ns_ms_scale = ns_us_scale * 1000LL; -const static int64_t ns_sec_scale = ns_ms_scale * 1000LL; -const static int64_t day_as_ns = 86400000000000LL; - - -// Date32 is int32_t days since the UNIX epoch -int32_t Date32_KDate(int32_t date32) -{ - return date32 - kdb_date_epoch_days; -} - -// Kdb date is days since 2000.01.01 -int32_t KDate_Date32(int32_t k_date) -{ - return k_date + kdb_date_epoch_days; -} - -// Date64 is int64_t milliseconds since the UNIX epoch -int64_t Date64_KTimestamp(int64_t date64) -{ - return (date64 * ns_ms_scale) - kdb_timestamp_epoch_nano; -} - -// Kdb timestamp is nano since 2000.01.01 00:00:00.0 -int64_t KTimestamp_Date64(int64_t k_timestamp) -{ - return (k_timestamp + kdb_timestamp_epoch_nano) / ns_ms_scale; -} - -// Exact timestamp encoded with int64 (as number of seconds, milliseconds, -// microseconds or nanoseconds since UNIX epoch) -// -// @@@ Doesn't support timezone strings -int64_t Timestamp_KTimestamp(std::shared_ptr datatype, int64_t timestamp) -{ - switch (datatype->unit()) { - case arrow::TimeUnit::SECOND: - return (timestamp * ns_sec_scale) - kdb_timestamp_epoch_nano; - case arrow::TimeUnit::MILLI: - return (timestamp * ns_ms_scale) - kdb_timestamp_epoch_nano; - case arrow::TimeUnit::MICRO: - return (timestamp * ns_us_scale) - kdb_timestamp_epoch_nano; - case arrow::TimeUnit::NANO: - return timestamp - kdb_timestamp_epoch_nano; - default: - throw TypeCheck("Invalid TimeUnit"); - } -} - -// Kdb timestamp is nano since 2000.01.01 00:00:00.0 -int64_t KTimestamp_Timestamp(std::shared_ptr datatype, int64_t k_timestamp) -{ - switch (datatype->unit()) { - case arrow::TimeUnit::SECOND: - return (k_timestamp + kdb_timestamp_epoch_nano) / ns_sec_scale; - case arrow::TimeUnit::MILLI: - return (k_timestamp + kdb_timestamp_epoch_nano) / ns_ms_scale; - case arrow::TimeUnit::MICRO: - return (k_timestamp + kdb_timestamp_epoch_nano) / ns_us_scale; - case arrow::TimeUnit::NANO: - return k_timestamp + kdb_timestamp_epoch_nano; - default: - throw TypeCheck("Invalid TimeUnit"); - } -} -// Time32 is signed 32-bit integer, representing either seconds or milliseconds since midnight. -int32_t Time32_KTime(std::shared_ptr datatype, int32_t time32) +TemporalConversion::TemporalConversion(std::shared_ptr datatype) { - switch (datatype->unit()) { - case arrow::TimeUnit::SECOND: - return time32 * 1000; - case arrow::TimeUnit::MILLI: - return time32; - default: - throw TypeCheck("Invalid TimeUnit"); - } -} - -// Kdb time is milliseconds from midnight but need to use the correct arrow TimeUnit for the datatype -int32_t KTime_Time32(std::shared_ptr datatype, int32_t k_time) -{ - switch (datatype->unit()) { - case arrow::TimeUnit::SECOND: - return k_time / 1000; - case arrow::TimeUnit::MILLI: - return k_time; - default: - throw TypeCheck("Invalid TimeUnit"); - } -} - -// Time64 is signed 64-bit integer, representing either microseconds or nanoseconds since midnight. -int64_t Time64_KTimespan(std::shared_ptr datatype, int64_t time64) -{ - switch (datatype->unit()) { - case arrow::TimeUnit::MICRO: - return time64 * 1000; - case arrow::TimeUnit::NANO: - return time64; - default: - throw TypeCheck("Invalid TimeUnit"); - } -} - -// Kdb timespan is nanoseconds from midnight but need to use the correct arrow TimeUnit for the datatype -int64_t KTimespan_Time64(std::shared_ptr datatype, int64_t k_timespan) -{ - switch (datatype->unit()) { - case arrow::TimeUnit::MICRO: - return k_timespan / 1000; - case arrow::TimeUnit::NANO: - return k_timespan; - default: - throw TypeCheck("Invalid TimeUnit"); - } -} - -// Duration is an int64_t measure of elapsed time in either seconds, milliseconds, microseconds or nanoseconds. -int64_t Duration_KTimespan(std::shared_ptr datatype, int64_t timespan) -{ - switch (datatype->unit()) { - case arrow::TimeUnit::SECOND: - return (timespan * ns_sec_scale); - case arrow::TimeUnit::MILLI: - return (timespan * ns_ms_scale); - case arrow::TimeUnit::MICRO: - return (timespan * ns_us_scale); - case arrow::TimeUnit::NANO: - return timespan; + // Work out the correct epoch offsetting and scaling factors required for this + // arrow datatype + switch (datatype->id()) { + case arrow::Type::DATE32: + // Arrow date32 <-> kdb date (KD) + // Date32 is int32_t days since the UNIX epoch + // Kdb date is days since 2000.01.01 + // Requires: epoch offsetting + offset = kdb_date_epoch_days; + scalar = 1; + break; + case arrow::Type::DATE64: + // Arrow date64 <-> kdb timestamp (KP) + // Date64 is int64_t milliseconds since the UNIX epoch + // Kdb timestamp is nano since 2000.01.01 00:00:00.0 + // Requires: epoch offsetting and scaling + offset = kdb_timestamp_epoch_nano; + scalar = ns_ms_scale; + break; + case arrow::Type::TIMESTAMP: + // Arrow timestamp <-> kdb timestamp (KP) + // Timestamp is int64 (as number of seconds, milliseconds, microseconds + // or nanoseconds since UNIX epoch) + // Kdb timestamp is nano since 2000.01.01 00:00:00.0 + // Requires: epoch offsetting and scaling + offset = kdb_timestamp_epoch_nano; + switch (std::static_pointer_cast(datatype)->unit()) { + case arrow::TimeUnit::SECOND: + scalar = ns_sec_scale; + break; + case arrow::TimeUnit::MILLI: + scalar = ns_ms_scale; + break; + case arrow::TimeUnit::MICRO: + scalar = ns_us_scale; + break; + case arrow::TimeUnit::NANO: + scalar = 1; + break; + default: + throw TypeCheck("Invalid TimeUnit"); + } + break; + case arrow::Type::TIME32: + // Arrow time32 <-> kdb time (KT) + // Time32 is int32 representing either seconds or milliseconds since + // midnight + // Kdb time is milliseconds from midnight + // Requires: scaling + offset = 0; + switch (std::static_pointer_cast(datatype)->unit()) { + case arrow::TimeUnit::SECOND: + scalar = 1000; + break; + case arrow::TimeUnit::MILLI: + scalar = 1; + break; + default: + throw TypeCheck("Invalid TimeUnit"); + } + break; + case arrow::Type::TIME64: + // Arrow time64 <-> kdb timespan (KN) + // Time64 is int64 representing either microseconds or nanoseconds + // since midnight + // Kdb timespan is nanoseconds from midnight + // Requires: scaling + offset = 0; + switch (std::static_pointer_cast(datatype)->unit()) { + case arrow::TimeUnit::MICRO: + scalar = 1000; + break; + case arrow::TimeUnit::NANO: + scalar = 1; + break; + default: + throw TypeCheck("Invalid TimeUnit"); + } + break; + case arrow::Type::DURATION: + // Arrow duration <-> kdb timespan (KN) + // Duration is an int64 measure of elapsed time in either seconds, + // milliseconds, microseconds or nanoseconds + // Kdb timestamp is nano since 2000.01.01 00:00:00.0 + // Requires: scaling + offset = 0; + switch (std::static_pointer_cast(datatype)->unit()) { + case arrow::TimeUnit::SECOND: + scalar = ns_sec_scale; + break; + case arrow::TimeUnit::MILLI: + scalar = ns_ms_scale; + break; + case arrow::TimeUnit::MICRO: + scalar = ns_us_scale; + break; + case arrow::TimeUnit::NANO: + scalar = 1; + break; + default: + throw TypeCheck("Invalid TimeUnit"); + } + break; default: - throw TypeCheck("Invalid TimeUnit"); + TYPE_CHECK_UNSUPPORTED(datatype->ToString()); } } -// Kdb timestamp is nano since 2000.01.01 00:00:00.0 -int64_t KTimespan_Duration(std::shared_ptr datatype, int64_t k_timespan) -{ - switch (datatype->unit()) { - case arrow::TimeUnit::SECOND: - return (k_timespan / ns_sec_scale); - case arrow::TimeUnit::MILLI: - return (k_timespan / ns_ms_scale); - case arrow::TimeUnit::MICRO: - return (k_timespan / ns_us_scale); - case arrow::TimeUnit::NANO: - return k_timespan; - default: - throw TypeCheck("Invalid TimeUnit"); - } -} +// Epoch / scaling constants +const static int64_t ns_us_scale = 1000LL; +const static int64_t ns_ms_scale = ns_us_scale * 1000LL; +const static int64_t day_as_ns = 86400000000000LL; int64_t DayTimeInterval_KTimespan(arrow::DayTimeIntervalType::c_type dt_interval) { diff --git a/src/HelperFunctions.h b/src/HelperFunctions.h index 3cd879f..d7d946d 100644 --- a/src/HelperFunctions.h +++ b/src/HelperFunctions.h @@ -6,6 +6,8 @@ #include #include +#include "TypeCheck.h" + #include @@ -16,35 +18,43 @@ namespace arrowkdb { // TEMPORAL TYPE CONVERSION // ////////////////////////////// -// Arrow date32 <-> kdb date (KD) -// Requires: epoch offsetting -int32_t Date32_KDate(int32_t date32); -int32_t KDate_Date32(int32_t k_date); - -// Arrow date64 <-> kdb timestamp (KP) -// Requires: epoch offsetting and scaling -int64_t Date64_KTimestamp(int64_t date64); -int64_t KTimestamp_Date64(int64_t k_timestamp); - -// Arrow timestamp <-> kdb timestamp (KP) -// Requires: epoch offsetting and scaling -int64_t Timestamp_KTimestamp(std::shared_ptr datatype, int64_t timestamp); -int64_t KTimestamp_Timestamp(std::shared_ptr datatype, int64_t k_timestamp); - -// Arrow time32 <-> kdb time (KT) -// Requires: scaling -int32_t Time32_KTime(std::shared_ptr datatype, int32_t time32); -int32_t KTime_Time32(std::shared_ptr datatype, int32_t k_time); - -// Arrow time64 <-> kdb timespan (KN) -// Requires: scaling -int64_t Time64_KTimespan(std::shared_ptr datatype, int64_t time64); -int64_t KTimespan_Time64(std::shared_ptr datatype, int64_t k_timespan); +// Helper class which can convert any int32 or int64 arrow temporal type +// (including those with a parameterised TimeUnit) to an appropriate kdb type. +class TemporalConversion +{ +private: + // Epoch / scaling constants + const static int32_t kdb_date_epoch_days = 10957; + const static int64_t kdb_timestamp_epoch_nano = 946684800000000000LL; + const static int64_t ns_us_scale = 1000LL; + const static int64_t ns_ms_scale = ns_us_scale * 1000LL; + const static int64_t ns_sec_scale = ns_ms_scale * 1000LL; + const static int64_t day_as_ns = 86400000000000LL; + + int64_t offset = 0; + int64_t scalar = 1; + +public: + // The constructor sets up the correct epoch offsetting and scaling factor + // based the arrow datatype + TemporalConversion(std::shared_ptr datatype); + + // Converts from an arrow temporal (either int32 or int64) to its kdb value, + // applying the epoch offseting and scaling factor + template + inline T ArrowToKdb(T value) + { + return value * (T)scalar - (T)offset; + } -// Arrow duration <-> kdb timespan (KN) -// Requires: scaling -int64_t Duration_KTimespan(std::shared_ptr datatype, int64_t timespan); -int64_t KTimespan_Duration(std::shared_ptr datatype, int64_t k_timespan); + // Converts from a kdb temporal (either int32 or int64) to its arrow value, + // applying the epoch offseting and scaling factor + template + inline T KdbToArrow(T value) + { + return (value + (T)offset) / (T)scalar; + } +}; // Arrow day_time_interval <-> kdb timespan (KN) // Requires: splitting day/time and scaling @@ -55,6 +65,7 @@ arrow::DayTimeIntervalType::c_type KTimespan_DayTimeInterval(int64_t k_timespan) ///////////////// // KDB STRINGS // ///////////////// + bool IsKdbString(K str); const std::string GetKdbString(K str); @@ -62,6 +73,7 @@ const std::string GetKdbString(K str); ////////////////// // TYPE MAPPING // ////////////////// + typedef signed char KdbType; /** diff --git a/src/KdbOptions.h b/src/KdbOptions.h index e9cefab..b0ea93b 100644 --- a/src/KdbOptions.h +++ b/src/KdbOptions.h @@ -5,6 +5,7 @@ #include #include #include +#include #include "k.h" @@ -12,6 +13,10 @@ namespace kx { namespace arrowkdb { +// Supported options for arrowkdb +const static std::set supported_int_options = { "PARQUET_CHUNK_SIZE", "PARQUET_MULTITHREADED_READ", "USE_MMAP" }; +const static std::set supported_string_options = { "PARQUET_VERSION" }; + // Helper class for reading function argument containing dictionary of options // // Dictionary key: KS @@ -35,14 +40,22 @@ class KdbOptions void PopulateIntOptions(K keys, K values) { - for (auto i = 0; i < values->n; ++i) - int_options[ToUpper(kS(keys)[i])] = kJ(values)[i]; + for (auto i = 0; i < values->n; ++i) { + std::string key = ToUpper(kS(keys)[i]); + if (supported_int_options.find(key) == supported_int_options.end()) + throw InvalidOption(("Unsupported int option '" + key + "'").c_str()); + int_options[key] = kJ(values)[i]; + } } void PopulateStringOptions(K keys, K values) { - for (auto i = 0; i < values->n; ++i) - string_options[ToUpper(kS(keys)[i])] = ToUpper(kS(values)[i]); + for (auto i = 0; i < values->n; ++i) { + std::string key = ToUpper(kS(keys)[i]); + if (supported_string_options.find(key) == supported_string_options.end()) + throw InvalidOption(("Unsupported string option '" + key + "'").c_str()); + string_options[key] = ToUpper(kS(values)[i]); + } } void PopulateMixedOptions(K keys, K values) @@ -52,9 +65,13 @@ class KdbOptions K value = kK(values)[i]; switch (value->t) { case -KJ: + if (supported_int_options.find(key) == supported_int_options.end()) + throw InvalidOption(("Unsupported int option '" + key + "'").c_str()); int_options[key] = value->j; break; case -KS: + if (supported_string_options.find(key) == supported_string_options.end()) + throw InvalidOption(("Unsupported string option '" + key + "'").c_str()); string_options[key] = ToUpper(value->s); break; case 101: diff --git a/src/TableData.cpp b/src/TableData.cpp index fecf6a9..da94c38 100644 --- a/src/TableData.cpp +++ b/src/TableData.cpp @@ -204,18 +204,28 @@ K readParquetData(K parquet_file, K options) if (!kx::arrowkdb::IsKdbString(parquet_file)) return krr((S)"parquet_file not 11h or 0 of 10h"); - std::shared_ptr infile; - PARQUET_ASSIGN_OR_THROW( - infile, - arrow::io::ReadableFile::Open(kx::arrowkdb::GetKdbString(parquet_file), - arrow::default_memory_pool())); + auto read_options = kx::arrowkdb::KdbOptions(options); + int64_t parquet_multithreaded_read = 0; + read_options.GetIntOption("parquet_multithreaded_read", parquet_multithreaded_read); + int64_t use_mmap = 0; + read_options.GetIntOption("use_mmap", use_mmap); + + std::shared_ptr infile; + if (use_mmap) { + PARQUET_ASSIGN_OR_THROW( + infile, + arrow::io::MemoryMappedFile::Open(kx::arrowkdb::GetKdbString(parquet_file), + arrow::io::FileMode::READ)); + } else { + PARQUET_ASSIGN_OR_THROW( + infile, + arrow::io::ReadableFile::Open(kx::arrowkdb::GetKdbString(parquet_file), + arrow::default_memory_pool())); + } std::unique_ptr reader; PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); - auto read_options = kx::arrowkdb::KdbOptions(options); - int64_t parquet_multithreaded_read = 0; - read_options.GetIntOption("parquet_multithreaded_read", parquet_multithreaded_read); reader->set_use_threads(parquet_multithreaded_read); std::shared_ptr table; @@ -235,6 +245,32 @@ K readParquetData(K parquet_file, K options) KDB_EXCEPTION_CATCH; } +K readParquetColumn(K parquet_file, K column_index) +{ + KDB_EXCEPTION_TRY; + + if (!kx::arrowkdb::IsKdbString(parquet_file)) + return krr((S)"parquet_file not 11h or 0 of 10h"); + if (column_index->t != -KI) + return krr((S)"column not -6h"); + + std::shared_ptr infile; + PARQUET_ASSIGN_OR_THROW( + infile, + arrow::io::ReadableFile::Open(kx::arrowkdb::GetKdbString(parquet_file), + arrow::default_memory_pool())); + + std::unique_ptr reader; + PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + + std::shared_ptr<::arrow::ChunkedArray> chunked_array; + PARQUET_THROW_NOT_OK(reader->ReadColumn(column_index->i, &chunked_array)); + + return kx::arrowkdb::ReadChunkedArray(chunked_array); + + KDB_EXCEPTION_CATCH; +} + K writeArrow(K arrow_file, K schema_id, K array_data) { KDB_EXCEPTION_TRY; @@ -308,18 +344,29 @@ K readArrowSchema(K arrow_file) KDB_EXCEPTION_CATCH; } -K readArrowData(K arrow_file) +K readArrowData(K arrow_file, K options) { KDB_EXCEPTION_TRY; if (!kx::arrowkdb::IsKdbString(arrow_file)) return krr((S)"arrow_file not 11h or 0 of 10h"); - std::shared_ptr infile; - PARQUET_ASSIGN_OR_THROW( - infile, - arrow::io::ReadableFile::Open(kx::arrowkdb::GetKdbString(arrow_file), - arrow::default_memory_pool())); + auto read_options = kx::arrowkdb::KdbOptions(options); + int64_t use_mmap = 0; + read_options.GetIntOption("use_mmap", use_mmap); + + std::shared_ptr infile; + if (use_mmap) { + PARQUET_ASSIGN_OR_THROW( + infile, + arrow::io::MemoryMappedFile::Open(kx::arrowkdb::GetKdbString(arrow_file), + arrow::io::FileMode::READ)); + } else { + PARQUET_ASSIGN_OR_THROW( + infile, + arrow::io::ReadableFile::Open(kx::arrowkdb::GetKdbString(arrow_file), + arrow::default_memory_pool())); + } std::shared_ptr reader; PARQUET_ASSIGN_OR_THROW(reader, arrow::ipc::RecordBatchFileReader::Open(infile)); diff --git a/src/TableData.h b/src/TableData.h index be7bdd8..459386d 100644 --- a/src/TableData.h +++ b/src/TableData.h @@ -91,6 +91,10 @@ extern "C" * Flag indicating whether the parquet reader should run in multithreaded * mode. This can improve performance by processing multiple columns in * parallel (long, default: 0) + * use_mmap + * Flag indicating whether the parquet file should be memory mapped in. + * This can improve performance on systems which support mmap (long, + * default: 0) * * @param parquet_file String name of the parquet file to read * @param options Dictionary of symbol options to long values @@ -98,6 +102,15 @@ extern "C" */ EXP K readParquetData(K parquet_file, K options); + /** + * @brief Reads a single column from a parquet file + * + * @param parquet_file String name of the parquet file to read + * @param column_index The index of the column to be read + * @return Arrow array object + */ + EXP K readParquetColumn(K parquet_file, K column_index); + /** * @brief Creates an arrow IPC record batch file with the specified arrow * schema and populates it from a mixed list of arrow array objects. @@ -109,13 +122,15 @@ extern "C" * * @param arrow_file String name of the arrow file to write * @param schema_id The schema identifier - * @param array_data Mixed list of arrow array data to be written to the file + * @param array_data Mixed list of arrow array data to be written to the + * file * @return NULL on success, error otherwise */ EXP K writeArrow(K arrow_file, K schema_id, K array_data); /** - * @brief Reads the arrow schema from the specified arrow IPC record batch file + * @brief Reads the arrow schema from the specified arrow IPC record batch + * file * * @param arrow_file String name of the arrow file to read * @return Schema identifier @@ -123,13 +138,19 @@ extern "C" EXP K readArrowSchema(K arrow_file); /** - * @brief Reads the arrow array data from the specified arrow IPC record batch - * file + * @brief Reads the arrow array data from the specified arrow IPC record + * batch file * + * Supported options: + * use_mmap + * Flag indicating whether the arrow file should be memory mapped in. This + * can improve performance on systems which support mmap (long, default: 0) + * * @param arrow_file String name of the arrow file to read + * @param options Dictionary of symbol options to long values * @return Mixed list of arrow array objects */ - EXP K readArrowData(K arrow_file); + EXP K readArrowData(K arrow_file, K options); /** * @brief Serializes to an arrow IPC record batch stream using the specified diff --git a/tests/test.t b/tests/test.t index 12945de..fd11f27 100644 --- a/tests/test.t +++ b/tests/test.t @@ -181,7 +181,7 @@ rm filename; filename:"ints.arrow" ipc.writeArrow[filename;schema;array_data] ipc.readArrowSchema[filename]~schema -ipc.readArrowData[filename]~array_data +ipc.readArrowData[filename;::]~array_data rm filename; -1 "<--- Read/write arrow stream --->"; @@ -213,7 +213,7 @@ rm filename; filename:"floats_bool_na_dec.arrow" ipc.writeArrow[filename;schema;array_data] ipc.readArrowSchema[filename]~schema -ipc.readArrowData[filename]~array_data +ipc.readArrowData[filename;::]~array_data rm filename; -1 "<--- Read/write arrow stream --->"; @@ -245,7 +245,7 @@ rm filename; filename:"utf8_binary.arrow" ipc.writeArrow[filename;schema;array_data] ipc.readArrowSchema[filename]~schema -ipc.readArrowData[filename]~array_data +ipc.readArrowData[filename;::]~array_data rm filename; -1 "<--- Read/write arrow stream --->"; @@ -281,7 +281,7 @@ rm filename; filename:"temporal.arrow" ipc.writeArrow[filename;schema;array_data] ipc.readArrowSchema[filename]~schema -ipc.readArrowData[filename]~array_data +ipc.readArrowData[filename;::]~array_data rm filename; -1 "<--- Read/write arrow stream --->"; @@ -313,7 +313,7 @@ rm filename; filename:"lists.arrow" ipc.writeArrow[filename;schema;array_data] ipc.readArrowSchema[filename]~schema -ipc.readArrowData[filename]~array_data +ipc.readArrowData[filename;::]~array_data rm filename; -1 "<--- Read/write arrow stream --->"; @@ -345,7 +345,7 @@ rm filename; filename:"map_struct.arrow" ipc.writeArrow[filename;schema;array_data] ipc.readArrowSchema[filename]~schema -ipc.readArrowData[filename]~array_data +ipc.readArrowData[filename;::]~array_data rm filename; -1 "<--- Read/write arrow stream --->"; @@ -380,7 +380,7 @@ array_data:(float16_data;large_utf8_data;large_binary_data;month_interval_data;d filename:"simple_arrow_only.arrow" ipc.writeArrow[filename;schema;array_data] ipc.readArrowSchema[filename]~schema -ipc.readArrowData[filename]~array_data +ipc.readArrowData[filename;::]~array_data rm filename; -1 "<--- Read/write arrow stream --->"; @@ -412,7 +412,7 @@ array_data:(fixed_size_list_data;sparse_union_data;dense_union_data;dictionary_d filename:"nested_arrow_only.arrow" ipc.writeArrow[filename;schema;array_data] ipc.readArrowSchema[filename]~schema -ipc.readArrowData[filename]~array_data +ipc.readArrowData[filename;::]~array_data rm filename; -1 "<--- Read/write arrow stream --->"; @@ -460,6 +460,7 @@ filename:"inferred.parquet" pq.writeParquetFromTable[filename;table;parquet_write_options] pq.readParquetSchema[filename]~schema pq.readParquetToTable[filename;::]~table +pq.readParquetColumn[filename;6i]~float64_data rm filename; -1 "<--- Read/write arrow file --->"; @@ -467,7 +468,7 @@ rm filename; filename:"inferred.arrow" ipc.writeArrowFromTable[filename;table] ipc.readArrowSchema[filename]~schema -ipc.readArrowToTable[filename]~table +ipc.readArrowToTable[filename;::]~table rm filename; -1 "<--- Read/write arrow stream --->";