diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0a21005..f19bc70 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -29,7 +29,7 @@ jobs: cache: true - name: Run unit tests - run: go test ./... + run: make test - name: Install xk6 run: go install go.k6.io/xk6/cmd/xk6@latest @@ -38,7 +38,7 @@ jobs: run: xk6 build --with github.com/msaf1980/xk6-output-clickhouse=. - name: Run Clickhouse - run: make ch + run: make up - name: Sleep some run: sleep 1 diff --git a/Makefile b/Makefile index ce1a3ef..b170e36 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,12 @@ K6_VERSION ?= "v0.41.0" MAKEFLAGS += --silent +CLICKHOUSE_VERSION ?= "clickhouse/clickhouse-server:latest" +CLICKHOUSE_CONTAINER ?= "xk6_output_clickhouse" + +DOCKER ?= docker +GO ?= go + all: clean format test build ## help: Prints a list of available build targets. @@ -27,30 +33,34 @@ build: ## format: Applies Go formatting to code. format: - go fmt ./... + ${GO} fmt ./... ## test: Executes any unit tests. test: - go test -cover -race + ${GO} test -cover -race ./... + +up: + ${DOCKER} run -d -it --rm --name "${CLICKHOUSE_CONTAINER}" -p 127.0.0.1:8123:8123 -p 127.0.0.1:9000:9000 ${CLICKHOUSE_VERSION} -ch: - docker run -d -it --rm --name xk6_output_clickhouse -p 127.0.0.1:8123:8123 -p 127.0.0.1:9000:9000 clickhouse/clickhouse-server:latest +down: + ${DOCKER} stop "${CLICKHOUSE_CONTAINER}" -ch_stop: - docker stop xk6_output_clickhouse +clear: + ${DOCKER} exec -ti "${CLICKHOUSE_CONTAINER}" clickhouse-client -q "DROP TABLE IF EXISTS k6_tests" + ${DOCKER} exec -ti "${CLICKHOUSE_CONTAINER}" clickhouse-client -q "DROP TABLE IF EXISTS k6_samples" -ch_clear: - docker exec -ti xk6_output_clickhouse clickhouse-client -q "DROP TABLE IF EXISTS k6_tests" - docker exec -ti xk6_output_clickhouse clickhouse-client -q "DROP TABLE IF EXISTS k6_samples" +cli: + ${DOCKER} exec -it "${CLICKHOUSE_CONTAINER}" clickhouse-client integrations: - K6_CLICKHOUSE_PARAMS="USERS_1H_0=10 USERS_7D_0=1" K6_OUT="clickhouse=http://localhost:8123/default?dial_timeout=200ms&max_execution_time=60" ./k6 run tests/http.js -v + ${GO} test -count=1 -tags=test_integration ./tests + K6_OUT_CLICKHOUSE_TABLE_TESTS="t_k6_tests" K6_OUT_CLICKHOUSE_TABLE_SAMPLES="t_k6_samples" K6_CLICKHOUSE_PARAMS="USERS_1H_0=10 USERS_7D_0=1" K6_OUT="clickhouse=http://localhost:8123/default?dial_timeout=200ms&max_execution_time=60" ./k6 run tests/http.js -v dump: echo "tests id name params" - docker exec -ti xk6_output_clickhouse clickhouse-client -q "SELECT id, name, params FROM k6_tests" + ${DOCKER} exec -ti "${CLICKHOUSE_CONTAINER}" clickhouse-client -q "SELECT id, name, params FROM t_k6_tests" echo echo "samples count" - docker exec -ti xk6_output_clickhouse clickhouse-client -q "SELECT id, count(1) AS samples FROM k6_samples GROUP BY id" + ${DOCKER} exec -ti "${CLICKHOUSE_CONTAINER}" clickhouse-client -q "SELECT id, count(1) AS samples FROM t_k6_samples GROUP BY id" .PHONY: build clean format help test diff --git a/README.md b/README.md index a134769..8f5d8a1 100644 --- a/README.md +++ b/README.md @@ -40,9 +40,8 @@ CREATE TABLE IF NOT EXISTS k6_samples ( status String, name String, tags Map(String, String), - value Float64, - version DateTime64(9) -) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/k6_samples', '{replica}', version) + value Float64 +) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/k6_samples', '{replica}', start) PARTITION BY toYYYYMM(start) ORDER BY (id, start, ts, metric, url, label, status, name); @@ -72,9 +71,8 @@ CREATE TABLE IF NOT EXISTS k6_samples ( status String, name String, tags Map(String, String), - value Float64, - version DateTime64(9) -) ENGINE = ReplacingMergeTree(version) + value Float64 +) ENGINE = ReplacingMergeTree(start) PARTITION BY toYYYYMM(start) ORDER BY (id, start, ts, metric, url, label, status, name); @@ -116,3 +114,4 @@ The `xk6-output-clickhouse` extension supports this additional option: - `K6_OUT_CLICKHOUSE_PUSH_INTERVAL`: to define how often metrics are sent to clickhouse. The default value is `10s` (10 second). - `K6_OUT_CLICKHOUSE_TESTNAME`: to set test name prefix prepended to id (formated start timestamp). +- `K6_OUT_CLICKHOUSE_PARAMS`: to set test params. diff --git a/config.go b/config.go index e354fa3..b57d914 100644 --- a/config.go +++ b/config.go @@ -1,4 +1,4 @@ -package timescaledb +package clickhouse import ( "encoding/json" @@ -46,6 +46,8 @@ type config struct { PushInterval Duration `json:"pushInterval"` Name string `json:"name"` dbName string + tableTests string + tableSamples string id uint64 ts time.Time params string @@ -53,12 +55,31 @@ type config struct { func newConfig() config { return config{ - URL: "http://localhost:8123/default?dial_timeout=200ms&max_execution_time=60", + URL: "http://localhost:8123/default?dial_timeout=1s&max_execution_time=60", + // URL: "http://localhost:8123/default?read_timeout=10s&write_timeout=20s", PushInterval: Duration(10 * time.Second), dbName: "default", + tableTests: "k6_tests", + tableSamples: "k6_samples", } } +func (c config) TableTests() string { + return c.tableTests +} + +func (c config) TableSamples() string { + return c.tableSamples +} + +func (c config) Id() uint64 { + return c.id +} + +func (c config) StartTime() time.Time { + return c.ts +} + func (c config) apply(modifiedConf config) (config, error) { if modifiedConf.URL != "" { if u, dbName, err := parseURL(modifiedConf.URL); err == nil { @@ -127,5 +148,15 @@ func getConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string) ( } consolidatedConf.params = env["K6_OUT_CLICKHOUSE_PARAMS"] + tableTests := env["K6_OUT_CLICKHOUSE_TABLE_TESTS"] + if tableTests != "" { + consolidatedConf.tableTests = tableTests + } + + tableSamples := env["K6_OUT_CLICKHOUSE_TABLE_SAMPLES"] + if tableSamples != "" { + consolidatedConf.tableSamples = tableSamples + } + return consolidatedConf, nil } diff --git a/config_test.go b/config_test.go index 296a64f..7126727 100644 --- a/config_test.go +++ b/config_test.go @@ -1,4 +1,4 @@ -package timescaledb +package clickhouse import ( "testing" @@ -28,6 +28,8 @@ func Test_getConsolidatedConfig_Succeeds(t *testing.T) { ts: time.Unix(1669909784, 10).UTC(), dbName: "k6", params: "USERS_1H_0=10 USERS_7D_0=1", + tableTests: "k6_tests", + tableSamples: "k6_samples", }, actualConfig) } @@ -46,6 +48,8 @@ func Test_getConsolidatedConfig_FromJsonAndPopulatesConfigFieldsFromJsonUrl(t *t id: uint64(time.Unix(1669909784, 10).UnixNano()), ts: time.Unix(1669909784, 10).UTC(), dbName: "default", + tableTests: "k6_tests", + tableSamples: "k6_samples", }, actualConfig) } @@ -61,12 +65,14 @@ func Test_getConsolidatedConfig_FromEnvVariables(t *testing.T) { assert.NoError(t, err) assert.Equal(t, config{ - URL: "http://localhost:8123/default?dial_timeout=200ms&max_execution_time=60", + URL: "http://localhost:8123/default?dial_timeout=1s&max_execution_time=60", PushInterval: Duration(2 * time.Second), Name: "2022-12-01T15:49:44.00000001Z", id: uint64(time.Unix(1669909784, 10).UnixNano()), ts: time.Unix(1669909784, 10).UTC(), dbName: "default", + tableTests: "k6_tests", + tableSamples: "k6_samples", }, actualConfig) } @@ -88,5 +94,7 @@ func Test_getConsolidatedConfig_EnvVariableTakesPrecedenceWithoutConfigArg(t *te id: uint64(time.Unix(1669909784, 1).UnixNano()), ts: time.Unix(1669909784, 1).UTC(), dbName: "default", + tableTests: "k6_tests", + tableSamples: "k6_samples", }, actualConfig) } diff --git a/go.mod b/go.mod index 535f710..48766f1 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/ClickHouse/clickhouse-go/v2 v2.4.3 + github.com/mailru/go-clickhouse/v2 v2.0.0 github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.8.1 go.k6.io/k6 v0.41.0 diff --git a/go.sum b/go.sum index 447de8b..249824f 100644 --- a/go.sum +++ b/go.sum @@ -644,6 +644,8 @@ github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7 github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mailru/go-clickhouse/v2 v2.0.0 h1:O+ZGJDwp/E5W19ooeouEqaOlg+qxA+4Zsfjt63QcnVU= +github.com/mailru/go-clickhouse/v2 v2.0.0/go.mod h1:TwxN829KnFZ7jAka9l9EoCV+U0CBFq83SFev4oLbnNU= github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= diff --git a/output.go b/output.go index 9d7234d..8c4fcab 100644 --- a/output.go +++ b/output.go @@ -1,4 +1,4 @@ -package timescaledb +package clickhouse import ( "database/sql" @@ -7,14 +7,17 @@ import ( "strings" "time" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/sirupsen/logrus" "go.k6.io/k6/metrics" "go.k6.io/k6/output" + + _ "github.com/mailru/go-clickhouse/v2" ) func init() { - output.RegisterExtension("clickhouse", newOutput) + output.RegisterExtension("clickhouse", New) } var ( @@ -37,18 +40,19 @@ func (o *Output) Description() string { return "Clickhouse" } -func newOutput(params output.Params) (output.Output, error) { +func New(params output.Params) (output.Output, error) { config, err := getConsolidatedConfig(params.JSONConfig, params.Environment) if err != nil { return nil, fmt.Errorf("problem parsing config: %w", err) } conn, err := sql.Open("clickhouse", config.URL) + // conn, err := sql.Open("chhttp", config.URL) if err != nil { return nil, fmt.Errorf("clickhouse: unable to create connection: %w", err) } - o := Output{ + o := &Output{ Conn: conn, Config: config, logger: params.Logger.WithFields(logrus.Fields{ @@ -56,7 +60,7 @@ func newOutput(params output.Params) (output.Output, error) { }), } - return &o, nil + return o, nil } func (o *Output) SetThresholds(thresholds map[string]metrics.Thresholds) { @@ -78,32 +82,6 @@ type dbThreshold struct { threshold *metrics.Threshold } -var schema = []string{ - `CREATE TABLE IF NOT EXISTS k6_samples ( - id UInt64, - start DateTime64(9, 'UTC'), - ts DateTime64(9, 'UTC'), - metric String, - url String, - label String, - status String, - name String, - tags Map(String, String), - value Float64, - version DateTime64(9, 'UTC') - ) ENGINE = ReplacingMergeTree(version) - PARTITION BY toYYYYMM(start) - ORDER BY (id, start, ts, metric, url, label, status, name);`, - `CREATE TABLE IF NOT EXISTS k6_tests ( - id UInt64, - ts DateTime64(9, 'UTC'), - name String, - params String - ) ENGINE = ReplacingMergeTree(ts) - PARTITION BY toYYYYMM(ts) - ORDER BY (id, ts, name);`, -} - func (o *Output) Start() error { sql := "CREATE DATABASE IF NOT EXISTS " + o.Config.dbName _, err := o.Conn.Exec(sql) @@ -111,14 +89,50 @@ func (o *Output) Start() error { o.logger.WithError(err).WithField("sql", sql).Debug("Start: Couldn't create database; most likely harmless") } + schema := []string{ + `CREATE TABLE IF NOT EXISTS ` + o.Config.tableSamples + `( + id UInt64, + start DateTime64(9, 'UTC'), + ts DateTime64(9, 'UTC'), + metric String, + url String, + label String, + status String, + name String, + tags Map(String, String), + value Float64 + ) ENGINE = ReplacingMergeTree(start) + PARTITION BY toYYYYMM(start) + ORDER BY (id, start, ts, metric, url, label, status, name);`, + `CREATE TABLE IF NOT EXISTS ` + o.Config.tableTests + ` ( + id UInt64, + ts DateTime64(9, 'UTC'), + name String, + params String + ) ENGINE = ReplacingMergeTree(ts) + PARTITION BY toYYYYMM(ts) + ORDER BY (id, ts, name);`, + } + for _, s := range schema { - _, err = o.Conn.Exec(s) - if err != nil { + if _, err = o.Conn.Exec(s); err != nil { o.logger.WithError(err).WithField("sql", s).Debug("Start: Couldn't create database schema; most likely harmless") return err } } - if _, err = o.Conn.Exec("INSERT INTO k6_tests (id, ts, name, params) VALUES (?, ?, ?, ?)", o.Config.id, o.Config.ts, o.Config.Name, o.Config.params); err != nil { + _, err = o.Conn.Exec( + "INSERT INTO "+o.Config.tableTests+" (id, ts, name, params) VALUES (@Id, @Time, @Name, @Params)", + clickhouse.Named("Id", o.Config.id), + clickhouse.DateNamed("Time", o.Config.ts, clickhouse.NanoSeconds), + clickhouse.Named("Name", o.Config.Name), + clickhouse.Named("Params", o.Config.params), + // "INSERT INTO "+o.Config.tableTests+" (id, ts, name, params) VALUES ($1, $2, $3, $4)", + // o.Config.id, + // o.Config.ts, + // o.Config.Name, + // o.Config.params, + ) + if err != nil { o.logger.WithError(err).Debug("Start: Failed to insert test") return err } @@ -134,7 +148,7 @@ func (o *Output) Start() error { return nil } -func tagsName(tags map[string]string) string { +func TagsName(tags map[string]string) string { tagsSlice := make([]string, 0, len(tags)) for k, v := range tags { tagsSlice = append(tagsSlice, k+"="+v) @@ -148,14 +162,16 @@ func (o *Output) flushMetrics() { if len(samplesContainer) == 0 { return } - start := timeNow() - scope, err := o.Conn.Begin() + start := time.Now() + + tx, err := o.Conn.Begin() if err != nil { o.logger.Error(err) return } - batch, err := scope.Prepare(`INSERT INTO k6_samples (id, ts, metric, url, label, name, tags, value, version)`) + + stmt, err := tx.Prepare("INSERT INTO " + o.Config.tableSamples + " (id, start, ts, metric, url, label, status, name, tags, value)") if err != nil { o.logger.Error(err) return @@ -165,19 +181,18 @@ func (o *Output) flushMetrics() { samples := sc.GetSamples() for _, s := range samples { tags := s.Tags.Map() - name := tagsName(tags) + name := TagsName(tags) url := tags["url"] label := tags["label"] - if _, err = batch.Exec(o.Config.id, s.Time.UTC(), s.Metric.Name, url, label, name, tags, s.Value, start.UTC()); err != nil { + status := tags["status"] + if _, err = stmt.Exec(o.Config.id, o.Config.ts, s.Time.UTC(), s.Metric.Name, url, label, status, name, tags, s.Value); err != nil { o.logger.Error(err) - scope.Rollback() return } } } - err = scope.Commit() - if err != nil { + if err = tx.Commit(); err != nil { o.logger.Error(err) return } diff --git a/tests/output_test.go b/tests/output_test.go new file mode 100644 index 0000000..4fe0eff --- /dev/null +++ b/tests/output_test.go @@ -0,0 +1,200 @@ +//go:build test_all || test_integration +// +build test_all test_integration + +package tests + +import ( + "fmt" + "reflect" + "strings" + "testing" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/stretchr/testify/require" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/metrics" + "go.k6.io/k6/output" + + out "github.com/msaf1980/xk6-output-clickhouse" +) + +type sample struct { + id uint64 + start string + ts string + label string + url string + name string + tags map[string]string + value float64 +} + +func max(a, b int) int { + if a >= b { + return a + } + return b +} + +func diffSamples(expected, actual []sample) string { + maxLen := max(len(expected), len(actual)) + var sb strings.Builder + sb.Grow(1024) + for i := 0; i < maxLen; i++ { + if i > len(expected) { + sb.WriteString(fmt.Sprintf("+ [%d] = %+v\n", i, actual[i])) + } else if i > len(actual) { + sb.WriteString(fmt.Sprintf("- [%d] = %+v\n", i, expected[i])) + } else if !reflect.DeepEqual(actual[i], expected[i]) { + sb.WriteString(fmt.Sprintf("- [%d] = %+v\n", i, expected[i])) + sb.WriteString(fmt.Sprintf("+ [%d] = %+v\n", i, actual[i])) + } + } + return sb.String() +} + +func TestOutputFlushMetrics(t *testing.T) { + var ( + id uint64 + start time.Time + nTests int + ) + + c, err := out.New(output.Params{ + Logger: testutils.NewLogger(t), + Environment: map[string]string{ + "K6_OUT_CLICKHOUSE_TABLE_TESTS": "t_k6_tests", + "K6_OUT_CLICKHOUSE_TABLE_SAMPLES": "t_k6_samples", + "K6_OUT_CLICKHOUSE_TESTNAME": "carbonapi", + "K6_OUT_CLICKHOUSE_PARAMS": "USERS_1H_0=1 FIND=1", + "K6_OUT_CLICKHOUSE_PUSH_INTERVAL": "1s", + }, + }) + out_ch := c.(*out.Output) + require.NoError(t, err) + require.Equal(t, "t_k6_tests", out_ch.Config.TableTests()) + require.Equal(t, "t_k6_samples", out_ch.Config.TableSamples()) + for _, s := range []string{ + "DROP TABLE IF EXISTS t_k6_tests", + "DROP TABLE IF EXISTS t_k6_samples", + } { + _, err = out_ch.Conn.Exec(s) + require.NoError(t, err) + } + + require.NoError(t, c.Start()) + + defer func() { + c.Stop() + }() + + registry := metrics.NewRegistry() + + samples := make(metrics.Samples, 0, 11) + samplesIn := make([]sample, 0, len(samples)) + + metric, err := registry.NewMetric("test_gauge", metrics.Gauge) + require.NoError(t, err) + tagsInFind := map[string]string{ + "url": "metrics/find", + "label": "find", + "status": "404", + "VU": "20", + } + samples = append(samples, metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: metric, + Tags: registry.RootTagSet().WithTagsFromMap(tagsInFind), + }, + Time: out_ch.Config.StartTime(), + Value: float64(0), + }) + samplesIn = append(samplesIn, sample{ + id: out_ch.Config.Id(), + start: out_ch.Config.StartTime().Format(time.RFC3339Nano), + ts: out_ch.Config.StartTime().Format(time.RFC3339Nano), + label: "find", + url: "metrics/find", + name: out.TagsName(tagsInFind), + tags: tagsInFind, + value: float64(0), + }) + + tagsIn := map[string]string{ + "url": "render", + "label": "1h", + "status": "200", + "VU": "21", + } + nameIn := out.TagsName(tagsIn) + for i := 0; i < 10; i++ { + metric, err := registry.NewMetric("test_gauge", metrics.Gauge) + require.NoError(t, err) + ts := out_ch.Config.StartTime().Add(time.Duration(i) * time.Second) + v := float64(i) + samples = append(samples, metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: metric, + Tags: registry.RootTagSet().WithTagsFromMap(tagsIn), + }, + Time: ts, + Value: v, + }) + samplesIn = append(samplesIn, sample{ + id: out_ch.Config.Id(), + start: out_ch.Config.StartTime().Format(time.RFC3339Nano), + ts: ts.Format(time.RFC3339Nano), + label: "1h", + url: "render", + name: nameIn, + tags: tagsIn, + value: v, + }) + } + + c.AddMetricSamples([]metrics.SampleContainer{samples}) + c.AddMetricSamples([]metrics.SampleContainer{samples}) + + query := "SELECT id, ts, name, params FROM t_k6_tests ORDER BY id, ts, name" + rows, err := out_ch.Conn.Query(query) + require.NoError(t, err) + defer rows.Close() + for rows.Next() { + var ( + name, params string + ) + err = rows.Scan(&id, &start, &name, ¶ms) + require.NoError(t, err) + nTests++ + } + // get any error encountered during iteration + require.NoError(t, rows.Err()) + require.Equal(t, 1, nTests, "tests count") + require.Equal(t, out_ch.Config.StartTime(), start, "test start") + rows.Close() + + time.Sleep(2 * time.Second) + + query = "SELECT id, start, ts, label, url, name, tags, value FROM t_k6_samples WHERE id = @Id AND start = @Time ORDER BY start, ts, url" + rows, err = out_ch.Conn.Query(query, clickhouse.Named("Id", id), clickhouse.DateNamed("Time", start, 3)) + require.NoError(t, err) + samplesOut := make([]sample, 0, len(samples)) + for rows.Next() { + var ( + s sample + ts, st time.Time + ) + err = rows.Scan(&s.id, &st, &ts, &s.label, &s.url, &s.name, &s.tags, &s.value) + require.NoError(t, err) + s.start = st.Format(time.RFC3339Nano) + s.ts = ts.Format(time.RFC3339Nano) + samplesOut = append(samplesOut, s) + } + + // get any error encountered during iteration + require.NoError(t, rows.Err()) + if diff := diffSamples(samplesIn, samplesOut); diff != "" { + t.Errorf("samples differs:\n%s", diff) + } +}