Skip to content

Commit

Permalink
Add support for reading with mmap
Browse files Browse the repository at this point in the history
Read a single parquet column
Improve temporal conversion performance
  • Loading branch information
nmcdonnell-kx committed Feb 5, 2021
1 parent a657835 commit f940c7f
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 236 deletions.
4 changes: 2 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ q)show system "ls inferred_schema.arrow"
"inferred_schema.arrow"
// Read the arrow file into another table
q)new_table:.arrowkdb.ipc.readArrowToTable["inferred_schema.arrow"]
q)new_table:.arrowkdb.ipc.readArrowToTable["inferred_schema.arrow";::]
// Compare the kdb+ tables
q)show table~new_table
Expand Down Expand Up @@ -314,7 +314,7 @@ q)show schema~new_schema
1b
// Read the array data back from the arrow file
q)new_array_data:.arrowkdb.ipc.readArrowData["constructed_schema.arrow"]
q)new_array_data:.arrowkdb.ipc.readArrowData["constructed_schema.arrow";::]
// Compare the array data
q)show array_data~new_array_data
Expand Down
2 changes: 1 addition & 1 deletion examples/concrete_datatypes.q
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ show .arrowkdb.sc.equalSchemas[schema;new_schema]
show schema~new_schema

// Read the array data back and compare
new_array_data:.arrowkdb.ipc.readArrowData[filename];
new_array_data:.arrowkdb.ipc.readArrowData[filename;::];
show array_data~new_array_data
rm filename;

Expand Down
2 changes: 1 addition & 1 deletion examples/inferred_schema.q
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ filename:"inferred_schema.arrow";
show ls filename

// Read the arrow file into another table
new_table:.arrowkdb.ipc.readArrowToTable["inferred_schema.arrow"];
new_table:.arrowkdb.ipc.readArrowToTable["inferred_schema.arrow";::];

// Compare the kdb+ tables
show table~new_table
Expand Down
2 changes: 1 addition & 1 deletion examples/nested_datatypes.q
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ show .arrowkdb.sc.equalSchemas[schema;new_schema]
show schema~new_schema

// Read the array data back and compare
new_array_data:.arrowkdb.ipc.readArrowData[filename];
new_array_data:.arrowkdb.ipc.readArrowData[filename;::];
show array_data~new_array_data
rm filename;

Expand Down
2 changes: 1 addition & 1 deletion examples/parameterized_datatypes.q
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ show .arrowkdb.sc.equalSchemas[schema;new_schema]
show schema~new_schema

// Read the array data back and compare
new_array_data:.arrowkdb.ipc.readArrowData[filename];
new_array_data:.arrowkdb.ipc.readArrowData[filename;::];
show array_data~new_array_data
rm filename;

Expand Down
4 changes: 2 additions & 2 deletions examples/readme.q
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ filename:"inferred_schema.arrow";
show ls filename

// Read the arrow file into another table
new_table:.arrowkdb.ipc.readArrowToTable[filename];
new_table:.arrowkdb.ipc.readArrowToTable[filename;::];

// Compare the kdb+ tables
show table~new_table
Expand Down Expand Up @@ -162,7 +162,7 @@ show .arrowkdb.sc.equalSchemas[schema;new_schema]
show schema~new_schema

// Read the array data back from the arrow file
new_array_data:.arrowkdb.ipc.readArrowData[filename];
new_array_data:.arrowkdb.ipc.readArrowData[filename;::];

// Compare the array data
show array_data~new_array_data
Expand Down
11 changes: 6 additions & 5 deletions q/arrowkdb.q
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ dt.getDictionaryDatatypes:`arrowkdb 2:(`getDictionaryDatatypes;1);
dt.getChildFields:`arrowkdb 2:(`getChildFields;1);

// datatype management:
dt.listDatatypes:`arrowkdb 2:(`listDatatypes;1);
dt.printDatatype_:`arrowkdb 2:(`printDatatype;1);
dt.printDatatype:{[x] -1 dt.printDatatype_[x];};
dt.listDatatypes:`arrowkdb 2:(`listDatatypes;1);
dt.removeDatatype:`arrowkdb 2:(`removeDatatype;1);
dt.equalDatatypes:`arrowkdb 2:(`equalDatatypes;2);

Expand All @@ -71,9 +71,9 @@ fd.fieldName:`arrowkdb 2:(`fieldName;1);
fd.fieldDatatype:`arrowkdb 2:(`fieldDatatype;1);

// field management:
fd.listFields:`arrowkdb 2:(`listFields;1);
fd.printField_:`arrowkdb 2:(`printField;1);
fd.printField:{[x] -1 fd.printField_[x];};
fd.listFields:`arrowkdb 2:(`listFields;1);
fd.removeField:`arrowkdb 2:(`removeField;1);
fd.equalFields:`arrowkdb 2:(`equalFields;2);

Expand All @@ -88,9 +88,9 @@ sc.inferSchema:`arrowkdb 2:(`inferSchema;1);
sc.schemaFields:`arrowkdb 2:(`schemaFields;1);

// schema management
sc.listSchemas:`arrowkdb 2:(`listSchemas;1);
sc.printSchema_:`arrowkdb 2:(`printSchema;1);
sc.printSchema:{[x] -1 sc.printSchema_[x];};
sc.listSchemas:`arrowkdb 2:(`listSchemas;1);
sc.removeSchema:`arrowkdb 2:(`removeSchema;1);
sc.equalSchemas:`arrowkdb 2:(`equalSchemas;2);

Expand All @@ -113,14 +113,15 @@ pq.writeParquetFromTable:{[filename;table;options] pq.writeParquet[filename;sc.i
pq.readParquetSchema:`arrowkdb 2:(`readParquetSchema;1);
pq.readParquetData:`arrowkdb 2:(`readParquetData;2);
pq.readParquetToTable:{[filename;options] flip (fd.fieldName each sc.schemaFields[pq.readParquetSchema[filename]])!(pq.readParquetData[filename;options])};
pq.readParquetColumn:`arrowkdb 2:(`readParquetColumn;2);


// arrow files
ipc.writeArrow:`arrowkdb 2:(`writeArrow;3);
ipc.writeArrowFromTable:{[filename;table] ipc.writeArrow[filename;sc.inferSchema[table];value flip table]};
ipc.readArrowSchema:`arrowkdb 2:(`readArrowSchema;1);
ipc.readArrowData:`arrowkdb 2:(`readArrowData;1);
ipc.readArrowToTable:{[filename] flip (fd.fieldName each sc.schemaFields[ipc.readArrowSchema[filename]])!(ipc.readArrowData[filename])};
ipc.readArrowData:`arrowkdb 2:(`readArrowData;2);
ipc.readArrowToTable:{[filename;options] flip (fd.fieldName each sc.schemaFields[ipc.readArrowSchema[filename]])!(ipc.readArrowData[filename;options])};


// arrow streams
Expand Down
18 changes: 12 additions & 6 deletions src/ArrayReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,40 +253,45 @@ void AppendArray(std::shared_ptr<arrow::Array> array_data, K k_array, size_t& in
}
case arrow::Type::DATE32:
{
TemporalConversion tc(array_data->type());
auto d32_array = std::static_pointer_cast<arrow::Date32Array>(array_data);
for (auto i = 0; i < d32_array->length(); ++i)
kI(k_array)[index++] = Date32_KDate(d32_array->Value(i));
kI(k_array)[index++] = tc.ArrowToKdb(d32_array->Value(i));
break;
}
case arrow::Type::DATE64:
{
TemporalConversion tc(array_data->type());
auto d64_array = std::static_pointer_cast<arrow::Date64Array>(array_data);
for (auto i = 0; i < d64_array->length(); ++i)
kJ(k_array)[index++] = Date64_KTimestamp(d64_array->Value(i));
kJ(k_array)[index++] = tc.ArrowToKdb(d64_array->Value(i));
break;
}
case arrow::Type::TIMESTAMP:
{
TemporalConversion tc(array_data->type());
auto ts_array = std::static_pointer_cast<arrow::TimestampArray>(array_data);
auto timestamp_type = std::static_pointer_cast<arrow::TimestampType>(ts_array->type());
for (auto i = 0; i < ts_array->length(); ++i)
kJ(k_array)[index++] = Timestamp_KTimestamp(timestamp_type, ts_array->Value(i));
kJ(k_array)[index++] = tc.ArrowToKdb(ts_array->Value(i));
break;
}
case arrow::Type::TIME32:
{
TemporalConversion tc(array_data->type());
auto t32_array = std::static_pointer_cast<arrow::Time32Array>(array_data);
auto time32_type = std::static_pointer_cast<arrow::Time32Type>(t32_array->type());
for (auto i = 0; i < t32_array->length(); ++i)
kI(k_array)[index++] = Time32_KTime(time32_type, t32_array->Value(i));
kI(k_array)[index++] = tc.ArrowToKdb(t32_array->Value(i));
break;
}
case arrow::Type::TIME64:
{
TemporalConversion tc(array_data->type());
auto t64_array = std::static_pointer_cast<arrow::Time64Array>(array_data);
auto time64_type = std::static_pointer_cast<arrow::Time64Type>(t64_array->type());
for (auto i = 0; i < t64_array->length(); ++i)
kJ(k_array)[index++] = Time64_KTimespan(time64_type, t64_array->Value(i));
kJ(k_array)[index++] = tc.ArrowToKdb(t64_array->Value(i));
break;
}
case arrow::Type::DECIMAL:
Expand All @@ -303,10 +308,11 @@ void AppendArray(std::shared_ptr<arrow::Array> array_data, K k_array, size_t& in
}
case arrow::Type::DURATION:
{
TemporalConversion tc(array_data->type());
auto dur_array = std::static_pointer_cast<arrow::DurationArray>(array_data);
auto duration_type = std::static_pointer_cast<arrow::DurationType>(dur_array->type());
for (auto i = 0; i < dur_array->length(); ++i)
kJ(k_array)[index++] = Duration_KTimespan(duration_type, dur_array->Value(i));
kJ(k_array)[index++] = tc.ArrowToKdb(dur_array->Value(i));
break;
}
case arrow::Type::INTERVAL_MONTHS:
Expand Down
18 changes: 12 additions & 6 deletions src/ArrayWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,40 +394,45 @@ void PopulateBuilder(std::shared_ptr<arrow::DataType> datatype, K k_array, arrow
}
case arrow::Type::DATE32:
{
TemporalConversion tc(datatype);
auto d32_builder = static_cast<arrow::Date32Builder*>(builder);
for (auto i = 0; i < k_array->n; ++i)
PARQUET_THROW_NOT_OK(d32_builder->Append(KDate_Date32(kI(k_array)[i])));
PARQUET_THROW_NOT_OK(d32_builder->Append(tc.KdbToArrow(kI(k_array)[i])));
break;
}
case arrow::Type::DATE64:
{
TemporalConversion tc(datatype);
auto d64_builder = static_cast<arrow::Date64Builder*>(builder);
for (auto i = 0; i < k_array->n; ++i)
PARQUET_THROW_NOT_OK(d64_builder->Append(KTimestamp_Date64(kJ(k_array)[i])));
PARQUET_THROW_NOT_OK(d64_builder->Append(tc.KdbToArrow(kJ(k_array)[i])));
break;
}
case arrow::Type::TIMESTAMP:
{
TemporalConversion tc(datatype);
auto ts_builder = static_cast<arrow::TimestampBuilder*>(builder);
auto timestamp_type = std::static_pointer_cast<arrow::TimestampType>(datatype);
for (auto i = 0; i < k_array->n; ++i)
PARQUET_THROW_NOT_OK(ts_builder->Append(KTimestamp_Timestamp(timestamp_type, kJ(k_array)[i])));
PARQUET_THROW_NOT_OK(ts_builder->Append(tc.KdbToArrow(kJ(k_array)[i])));
break;
}
case arrow::Type::TIME32:
{
TemporalConversion tc(datatype);
auto t32_builder = static_cast<arrow::Time32Builder*>(builder);
auto time32_type = std::static_pointer_cast<arrow::Time32Type>(datatype);
for (auto i = 0; i < k_array->n; ++i)
PARQUET_THROW_NOT_OK(t32_builder->Append(KTime_Time32(time32_type, kI(k_array)[i])));
PARQUET_THROW_NOT_OK(t32_builder->Append(tc.KdbToArrow(kI(k_array)[i])));
break;
}
case arrow::Type::TIME64:
{
TemporalConversion tc(datatype);
auto t64_builder = static_cast<arrow::Time64Builder*>(builder);
auto time64_type = std::static_pointer_cast<arrow::Time64Type>(datatype);
for (auto i = 0; i < k_array->n; ++i)
PARQUET_THROW_NOT_OK(t64_builder->Append(KTimespan_Time64(time64_type, kJ(k_array)[i])));
PARQUET_THROW_NOT_OK(t64_builder->Append(tc.KdbToArrow(kJ(k_array)[i])));
break;
}
case arrow::Type::DECIMAL:
Expand All @@ -446,10 +451,11 @@ void PopulateBuilder(std::shared_ptr<arrow::DataType> datatype, K k_array, arrow
}
case arrow::Type::DURATION:
{
TemporalConversion tc(datatype);
auto dur_builder = static_cast<arrow::DurationBuilder*>(builder);
auto duration_type = std::static_pointer_cast<arrow::DurationType>(datatype);
for (auto i = 0; i < k_array->n; ++i)
PARQUET_THROW_NOT_OK(dur_builder->Append(KTimespan_Duration(duration_type, kJ(k_array)[i])));
PARQUET_THROW_NOT_OK(dur_builder->Append(tc.KdbToArrow(kJ(k_array)[i])));
break;
}
case arrow::Type::INTERVAL_MONTHS:
Expand Down
10 changes: 8 additions & 2 deletions src/DatatypeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ K time32(K time_unit)
KDB_EXCEPTION_TRY;

auto tu = kx::arrowkdb::ToTimeUnit(kx::arrowkdb::GetKdbString(time_unit));
return ki(kx::arrowkdb::GetDatatypeStore()->Add(arrow::time32(tu)));
if (tu == arrow::TimeUnit::SECOND || tu == arrow::TimeUnit::MILLI)
return ki(kx::arrowkdb::GetDatatypeStore()->Add(arrow::time32(tu)));
else
return krr((S)"time32 time_unit not SECOND or MILLI");

KDB_EXCEPTION_CATCH;
}
Expand All @@ -278,7 +281,10 @@ K time64(K time_unit)
KDB_EXCEPTION_TRY;

auto tu = kx::arrowkdb::ToTimeUnit(kx::arrowkdb::GetKdbString(time_unit));
return ki(kx::arrowkdb::GetDatatypeStore()->Add(arrow::time64(tu)));
if (tu == arrow::TimeUnit::MICRO || tu == arrow::TimeUnit::NANO)
return ki(kx::arrowkdb::GetDatatypeStore()->Add(arrow::time64(tu)));
else
return krr((S)"time64 time_unit not MICRO or NANO");

KDB_EXCEPTION_CATCH;
}
Expand Down
Loading

0 comments on commit f940c7f

Please sign in to comment.