Skip to content

Commit

Permalink
Fix SchemaToColumns for geo_point types, add ingest integration t…
Browse files Browse the repository at this point in the history
…ests (#1019)

When a `geo_point` field is added to `fieldEncodings`, actually two
fields get added:
- `my_field.lat`
- `my_field.lon`

`SchemaToColumns` however tried to find `my_field` in `fieldEncodings`
(which was missing; only `my_field.lat` and `my_field.lon` were there).
This resulted in creating (incorrect, unused) columns `lat`, `lon` since
`internalPropertyName` was empty.

Fix the problem by accessing `my_field.lat` and `my_field.lon` keys in
`fieldEncodings` instead (which should be there).

Add new integration test that checks Quesma's ingest of Kibana sample
flights, Kibana sample ecommerce. The tests without "PUT mapping" caught
the fixed issue (this is how I found it).
  • Loading branch information
avelanarius authored Nov 20, 2024
1 parent 6b5c479 commit c083bdd
Show file tree
Hide file tree
Showing 5 changed files with 682 additions and 2 deletions.
142 changes: 142 additions & 0 deletions ci/it/configs/quesma-ingest.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: my-clickhouse-instance
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
ingestStatistics: true
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
kibana_sample_data_ecommerce:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"geoip.location":
type: geo_point
"products.manufacturer":
type: text
"products.product_name":
type: text
category:
type: text
manufacturer:
type: text
kibana_sample_data_ecommerce_with_mappings:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"geoip.location":
type: geo_point
"products.manufacturer":
type: text
"products.product_name":
type: text
category:
type: text
manufacturer:
type: text
kibana_sample_data_flights:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"DestLocation":
type: geo_point
"OriginLocation":
type: geo_point
kibana_sample_data_flights_with_mappings:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"DestLocation":
type: geo_point
"OriginLocation":
type: geo_point
"*":
target:
- my-minimal-elasticsearch
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
kibana_sample_data_ecommerce:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"geoip.location":
type: geo_point
"products.manufacturer":
type: text
"products.product_name":
type: text
category:
type: text
manufacturer:
type: text
kibana_sample_data_ecommerce_with_mappings:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"geoip.location":
type: geo_point
"products.manufacturer":
type: text
"products.product_name":
type: text
category:
type: text
manufacturer:
type: text
kibana_sample_data_flights:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"DestLocation":
type: geo_point
"OriginLocation":
type: geo_point
kibana_sample_data_flights_with_mappings:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"DestLocation":
type: geo_point
"OriginLocation":
type: geo_point
"*":
target:
- my-minimal-elasticsearch
pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]

5 changes: 5 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ func TestWildcardClickhouseTestcase(t *testing.T) {
testCase := testcases.NewWildcardClickhouseTestcase()
runIntegrationTest(t, testCase)
}

func TestIngestTestcase(t *testing.T) {
testCase := testcases.NewIngestTestcase()
runIntegrationTest(t, testCase)
}
21 changes: 21 additions & 0 deletions ci/it/testcases/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ func (tc *IntegrationTestcaseBase) ExecuteClickHouseStatement(ctx context.Contex
return res, nil
}

func (tc *IntegrationTestcaseBase) FetchClickHouseColumns(ctx context.Context, tableName string) (map[string]string, error) {
rows, err := tc.ExecuteClickHouseQuery(ctx, fmt.Sprintf("SELECT name, type FROM system.columns WHERE table = '%s'", tableName))
if err != nil {
return nil, err
}
defer rows.Close()

result := make(map[string]string)
for rows.Next() {
var name, colType string
if err := rows.Scan(&name, &colType); err != nil {
return nil, err
}
result[name] = colType
}
if err := rows.Err(); err != nil {
return nil, err
}
return result, nil
}

func (tc *IntegrationTestcaseBase) RequestToQuesma(ctx context.Context, t *testing.T, method, uri string, requestBody []byte) (*http.Response, []byte) {
endpoint := tc.getQuesmaEndpoint()
resp, err := tc.doRequest(ctx, method, endpoint+uri, requestBody, nil)
Expand Down
Loading

0 comments on commit c083bdd

Please sign in to comment.