diff --git a/floor/interfaces/unmarshaller.go b/floor/interfaces/unmarshaller.go index f1cd5bb..3d888a7 100644 --- a/floor/interfaces/unmarshaller.go +++ b/floor/interfaces/unmarshaller.go @@ -214,9 +214,16 @@ func (e *unmarshElem) Map() (UnmarshalMap, error) { return nil, fmt.Errorf("data is not a map, found %T instead", e.data) } + if len(data) == 0 { + return &unmarshMap{data: []map[string]interface{}{}, idx: -1}, ErrFieldNotPresent + } + kvData, ok := data["key_value"] if !ok { - return nil, errors.New("sub-group key_value not found") + kvData, ok = data["map"] + if !ok { + return nil, errors.New("sub-group key_value not found") + } } kvList, ok := kvData.([]map[string]interface{}) diff --git a/floor/reader.go b/floor/reader.go index 2d6dcc7..dc10a48 100644 --- a/floor/reader.go +++ b/floor/reader.go @@ -305,13 +305,20 @@ func (um *reflectUnmarshaller) fillMap(value reflect.Value, data interfaces.Unma } keyValueList, err := data.Map() - if err != nil { + if err != nil && !errors.Is(err, interfaces.ErrFieldNotPresent) { return err } + if *schemaDef.RootColumn.SchemaElement.RepetitionType == parquet.FieldRepetitionType_REQUIRED && errors.Is(err, interfaces.ErrFieldNotPresent) { + return fmt.Errorf("field %s is required", schemaDef.RootColumn.SchemaElement.GetName()) + } + value.Set(reflect.MakeMap(value.Type())) keyValueSchemaDef := schemaDef.SubSchema("key_value") + if keyValueSchemaDef == nil { + keyValueSchemaDef = schemaDef.SubSchema("map") + } keySchemaDef := keyValueSchemaDef.SubSchema("key") valueSchemaDef := keyValueSchemaDef.SubSchema("value") diff --git a/floor/reader_test.go b/floor/reader_test.go index 1183402..20d3850 100644 --- a/floor/reader_test.go +++ b/floor/reader_test.go @@ -607,3 +607,39 @@ func BenchmarkReadFile(b *testing.B) { _ = hlReader.Scan(&msg) } } + +func TestCanReadMap(t *testing.T) { + filename := os.Getenv("CDC_PARQUET_FILE") + if filename == "" { + //filename = "/Users/mparsons/tmp/gen/part_1.parquet" + t.Skip("missing CDC_PARQUET_FILE, skipping") + } + type AV struct { + B string `parquet:"b"` + N string `parquet:"n"` + } + type CDC struct { + TenantID string `parquet:"tenantid"` + RowID string `parquet:"rowid"` + TransactionID string `parquet:"transactionid"` + Begin int64 `parquet:"begin"` + End int64 `parquet:"end"` + RecordType uint8 `parquet:"recordtype"` + EventID string `parquet:"eventid"` + EventSource string `parquet:"eventsource"` + Operation uint8 `parquet:"operation"` + SequenceNumber string `parquet:"sequencenumber"` + ApproximateCreationDateTime [12]byte `parquet:"approximatecreationdatetime"` + Keys map[string]AV `parquet:"keys"` + Old map[string]AV `parquet:"old"` + New map[string]AV `parquet:"new"` + } + + fr, err := NewFileReader(filename) + require.NoError(t, err) + + for fr.Next() { + var rec CDC + require.NoError(t, fr.Scan(&rec)) + } +}