From 831f9799f122164c94d5f19683f8a71f43174a08 Mon Sep 17 00:00:00 2001 From: Eric Harmeling Date: Wed, 5 Jul 2023 10:20:51 -0700 Subject: [PATCH 1/4] util/metric: remove NetworkLatencyBuckets This commit removes the generated NetworkLatencyBuckets and replaces their usage with IOLatencyBuckets as the preset buckets used for the following metrics' histograms: - `liveness.heartbeatlatency` - `leases.requests.latency` - `kv.prober.read.latency` - `kv.prober.write.latency` - `proxy.conn_migration.attempted.latency` The upper limit on NetworkLatencyBuckets (1s) is too low for all metrics that currently use it. Bucket size for all buckets generated with `prometheus.ExponentialBucketsRange` (including IOLatencyBuckets) increases logarithmically, retaining fidelity at the lower-end of buckets. Fixes #104017. Release note: None --- pkg/ccl/sqlproxyccl/connector_test.go | 6 +- pkg/ccl/sqlproxyccl/metrics.go | 6 +- pkg/kv/kvprober/kvprober.go | 4 +- .../kvserver/client_manual_proposal_test.go | 2 +- pkg/kv/kvserver/liveness/liveness.go | 2 +- pkg/kv/kvserver/metrics.go | 4 +- pkg/util/metric/histogram_buckets.go | 68 ------------------- pkg/util/metric/histogram_buckets_test.go | 5 -- 8 files changed, 12 insertions(+), 85 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/connector_test.go b/pkg/ccl/sqlproxyccl/connector_test.go index ae2979e59c66..5c7a3f968cfe 100644 --- a/pkg/ccl/sqlproxyccl/connector_test.go +++ b/pkg/ccl/sqlproxyccl/connector_test.go @@ -384,7 +384,7 @@ func TestConnector_dialTenantCluster(t *testing.T) { Mode: metric.HistogramModePrometheus, Metadata: metaDialTenantLatency, Duration: time.Millisecond, - Buckets: metric.NetworkLatencyBuckets, + Buckets: metric.IOLatencyBuckets, }), DialTenantRetries: metric.NewCounter(metaDialTenantRetries), } @@ -469,7 +469,7 @@ func TestConnector_dialTenantCluster(t *testing.T) { Mode: metric.HistogramModePreferHdrLatency, Metadata: metaDialTenantLatency, Duration: time.Millisecond, - Buckets: metric.NetworkLatencyBuckets, + Buckets: metric.IOLatencyBuckets, }), DialTenantRetries: metric.NewCounter(metaDialTenantRetries), } @@ -503,7 +503,7 @@ func TestConnector_dialTenantCluster(t *testing.T) { Mode: metric.HistogramModePreferHdrLatency, Metadata: metaDialTenantLatency, Duration: time.Millisecond, - Buckets: metric.NetworkLatencyBuckets, + Buckets: metric.IOLatencyBuckets, }), DialTenantRetries: metric.NewCounter(metaDialTenantRetries), } diff --git a/pkg/ccl/sqlproxyccl/metrics.go b/pkg/ccl/sqlproxyccl/metrics.go index 37523f1b4309..28d9ad426847 100644 --- a/pkg/ccl/sqlproxyccl/metrics.go +++ b/pkg/ccl/sqlproxyccl/metrics.go @@ -237,7 +237,7 @@ func makeProxyMetrics() metrics { Mode: metric.HistogramModePreferHdrLatency, Metadata: metaConnMigrationAttemptedCount, Duration: base.DefaultHistogramWindowInterval(), - Buckets: metric.NetworkLatencyBuckets, + Buckets: metric.IOLatencyBuckets, }), AuthFailedCount: metric.NewCounter(metaAuthFailedCount), ExpiredClientConnCount: metric.NewCounter(metaExpiredClientConnCount), @@ -246,7 +246,7 @@ func makeProxyMetrics() metrics { Mode: metric.HistogramModePreferHdrLatency, Metadata: metaDialTenantLatency, Duration: base.DefaultHistogramWindowInterval(), - Buckets: metric.NetworkLatencyBuckets}, + Buckets: metric.IOLatencyBuckets}, ), DialTenantRetries: metric.NewCounter(metaDialTenantRetries), // Connection migration metrics. @@ -258,7 +258,7 @@ func makeProxyMetrics() metrics { Mode: metric.HistogramModePreferHdrLatency, Metadata: metaConnMigrationAttemptedLatency, Duration: base.DefaultHistogramWindowInterval(), - Buckets: metric.NetworkLatencyBuckets, + Buckets: metric.IOLatencyBuckets, }), ConnMigrationTransferResponseMessageSize: metric.NewHistogram(metric.HistogramOptions{ Metadata: metaConnMigrationTransferResponseMessageSize, diff --git a/pkg/kv/kvprober/kvprober.go b/pkg/kv/kvprober/kvprober.go index 3e441a61113f..17a8e0776500 100644 --- a/pkg/kv/kvprober/kvprober.go +++ b/pkg/kv/kvprober/kvprober.go @@ -257,7 +257,7 @@ func NewProber(opts Opts) *Prober { Mode: metric.HistogramModePreferHdrLatency, Metadata: metaReadProbeLatency, Duration: opts.HistogramWindowInterval, - Buckets: metric.NetworkLatencyBuckets, + Buckets: metric.IOLatencyBuckets, }), WriteProbeAttempts: metric.NewCounter(metaWriteProbeAttempts), WriteProbeFailures: metric.NewCounter(metaWriteProbeFailures), @@ -265,7 +265,7 @@ func NewProber(opts Opts) *Prober { Mode: metric.HistogramModePreferHdrLatency, Metadata: metaWriteProbeLatency, Duration: opts.HistogramWindowInterval, - Buckets: metric.NetworkLatencyBuckets, + Buckets: metric.IOLatencyBuckets, }), WriteProbeQuarantineOldestDuration: metric.NewFunctionalGauge( metaWriteProbeQuarantineOldestDuration, diff --git a/pkg/kv/kvserver/client_manual_proposal_test.go b/pkg/kv/kvserver/client_manual_proposal_test.go index 1f96839c0395..003693d1f3fc 100644 --- a/pkg/kv/kvserver/client_manual_proposal_test.go +++ b/pkg/kv/kvserver/client_manual_proposal_test.go @@ -235,7 +235,7 @@ LIMIT Mode: metric.HistogramModePrometheus, Metadata: fakeMeta, Duration: time.Millisecond, - Buckets: metric.NetworkLatencyBuckets, + Buckets: metric.IOLatencyBuckets, }), }, } diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index 5ab475a5bd72..b02261bb1bf7 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -376,7 +376,7 @@ func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { Mode: metric.HistogramModePreferHdrLatency, Metadata: metaHeartbeatLatency, Duration: opts.HistogramWindowInterval, - Buckets: metric.NetworkLatencyBuckets, + Buckets: metric.IOLatencyBuckets, }), } nl.cache = newCache(opts.Gossip, opts.Clock, nl.cacheUpdated) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 9076b1ad1be3..2e16c4b66f26 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -2826,7 +2826,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { Mode: metric.HistogramModePreferHdrLatency, Metadata: metaLeaseRequestLatency, Duration: histogramWindow, - Buckets: metric.NetworkLatencyBuckets, + Buckets: metric.IOLatencyBuckets, }), LeaseTransferSuccessCount: metric.NewCounter(metaLeaseTransferSuccessCount), LeaseTransferErrorCount: metric.NewCounter(metaLeaseTransferErrorCount), @@ -3028,7 +3028,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { Mode: metric.HistogramModePrometheus, Metadata: metaRaftReplicationLatency, Duration: histogramWindow, - Buckets: metric.IOLatencyBuckets, // because NetworkLatencyBuckets tops out at 1s + Buckets: metric.IOLatencyBuckets, }), RaftSchedulerLatency: metric.NewHistogram(metric.HistogramOptions{ Mode: metric.HistogramModePreferHdrLatency, diff --git a/pkg/util/metric/histogram_buckets.go b/pkg/util/metric/histogram_buckets.go index d83f9d3ac065..51c47beea4d1 100644 --- a/pkg/util/metric/histogram_buckets.go +++ b/pkg/util/metric/histogram_buckets.go @@ -78,74 +78,6 @@ var IOLatencyBuckets = []float64{ 9999999999.999969, // 9.999999999s } -// NetworkLatencyBuckets are prometheus histogram buckets suitable for a histogram -// that records a quantity (nanosecond-denominated) in which most measurements -// behave like network latencies, i.e. most measurements are in the ms to sub-second -// range during normal operation. -var NetworkLatencyBuckets = []float64{ - // Generated via TestHistogramBuckets/NetworkLatencyBuckets. - 500000.000000, // 500µs - 568747.715565, // 568.747µs - 646947.927922, // 646.947µs - 735900.312190, // 735.9µs - 837083.242884, // 837.083µs - 952178.364257, // 952.178µs - 1083098.538963, // 1.083098ms - 1232019.639535, // 1.232019ms - 1401416.711034, // 1.401416ms - 1594105.105912, // 1.594105ms - 1813287.274717, // 1.813287ms - 2062605.990318, // 2.062605ms - 2346204.890209, // 2.346204ms - 2668797.343109, // 2.668797ms - 3035744.784401, // 3.035744ms - 3453145.822334, // 3.453145ms - 3927937.595933, // 3.927937ms - 4468011.069141, // 4.468011ms - 5082342.177389, // 5.082342ms - 5781141.006222, // 5.781141ms - 6576021.481300, // 6.576021ms - 7480194.389996, // 7.480194ms - 8508686.942589, // 8.508686ms - 9678592.522117, // 9.678592ms - 11009354.773683, // 11.009354ms - 12523090.754761, // 12.52309ms - 14244958.517175, // 14.244958ms - 16203575.229933, // 16.203575ms - 18431492.792031, // 18.431492ms - 20965738.839853, // 20.965738ms - 23848432.140611, // 23.848432ms - 27127482.599575, // 27.127482ms - 30857387.515093, // 30.857387ms - 35100137.315047, // 35.100137ms - 39926245.827925, // 39.926245ms - 45415922.211464, // 45.415922ms - 51660404.016126, // 51.660404ms - 58763473.538708, // 58.763473ms - 66843182.667648, // 66.843182ms - 76033814.886682, // 76.033814ms - 86488117.045035, // 86.488117ms - 98379837.985822, // 98.379837ms - 111906616.224248, // 111.906616ms - 127293264.668375, // 127.293264ms - 144795506.973983, // 144.795506ms - 164704227.631154, // 164.704227ms - 187350306.418342, // 187.350306ms - 213110117.571795, // 213.110117ms - 242411785.065635, // 242.411785ms - 275742297.964389, // 275.742297ms - 313655604.103963, // 313.655604ms - 356781816.616787, // 356.781816ms - 405837686.312094, // 405.837686ms - 461638513.960647, // 461.638513ms - 525111700.464186, // 525.1117ms - 597312160.111267, // 597.31216ms - 679439853.085354, // 679.439853ms - 772859728.612681, // 772.859728ms - 879124410.201811, // 879.12441ms - 1000000000.000001, // 1s -} - // BatchProcessLatencyBuckets are prometheus histogram buckets suitable for a // histogram that records a quantity (nanosecond-denominated) in which most // measurements are in the seconds to minutes range during normal operation. diff --git a/pkg/util/metric/histogram_buckets_test.go b/pkg/util/metric/histogram_buckets_test.go index 9bde8336d575..54e7b11dc4d4 100644 --- a/pkg/util/metric/histogram_buckets_test.go +++ b/pkg/util/metric/histogram_buckets_test.go @@ -52,11 +52,6 @@ func TestHistogramBuckets(t *testing.T) { verifyAndPrint(t, exp, IOLatencyBuckets, LATENCY) }) - t.Run("NetworkLatencyBuckets", func(t *testing.T) { - exp := prometheus.ExponentialBucketsRange(500e3, 1e9, 60) - verifyAndPrint(t, exp, NetworkLatencyBuckets, LATENCY) - }) - t.Run("BatchProcessLatencyBuckets", func(t *testing.T) { exp := prometheus.ExponentialBucketsRange(500e6, 300e9, 60) verifyAndPrint(t, exp, BatchProcessLatencyBuckets, LATENCY) From 7c4e8beaa7ea96daf59977808370b996eb854f16 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 7 Jul 2023 17:53:16 -0400 Subject: [PATCH 2/4] changefeedccl: Update previous row builder when version changes Parquet writer incorrectly cached "value builder" state, even when row version changed. Epic: None Release note: None --- pkg/ccl/changefeedccl/parquet.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index 95d5e16a51be..1d26f0ca9421 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -37,8 +38,11 @@ type parquetWriter struct { schemaDef *parquet.SchemaDefinition datumAlloc []tree.Datum - // Cached object builder for when using the `diff` option. - vb *json.FixedKeysObjectBuilder + // Cached object builder for previous row when using the `diff` option. + prevState struct { + vb *json.FixedKeysObjectBuilder + version descpb.DescriptorVersion + } } // newParquetSchemaDefintion returns a parquet schema definition based on the @@ -165,7 +169,7 @@ func (w *parquetWriter) populateDatums( if prevRow.IsDeleted() { datums = append(datums, tree.DNull) } else { - if w.vb == nil { + if w.prevState.vb == nil || w.prevState.version != prevRow.Version { keys := make([]string, 0, len(prevRow.ResultColumns())) _ = prevRow.ForEachColumn().Col(func(col cdcevent.ResultColumn) error { keys = append(keys, col.Name) @@ -175,7 +179,8 @@ func (w *parquetWriter) populateDatums( if err != nil { return err } - w.vb = valueBuilder + w.prevState.version = prevRow.Version + w.prevState.vb = valueBuilder } if err := prevRow.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { @@ -183,12 +188,12 @@ func (w *parquetWriter) populateDatums( if err != nil { return err } - return w.vb.Set(col.Name, j) + return w.prevState.vb.Set(col.Name, j) }); err != nil { return err } - j, err := w.vb.Build() + j, err := w.prevState.vb.Build() if err != nil { return err } From 042315524a8595a7e73bd996cb2a748a2d37b089 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 10 Jul 2023 13:33:12 -0400 Subject: [PATCH 3/4] changefeedccl: Add a DecodeKV benchmark Add a small benchmark testing DecodeKV performance. Epic: None Issue: None Release note: None --- .../changefeedccl/cdceval/expr_eval_test.go | 2 +- pkg/ccl/changefeedccl/cdcevent/event_test.go | 45 ++++++++++++++++++- pkg/ccl/changefeedccl/cdctest/row.go | 8 ++-- pkg/ccl/changefeedccl/parquet_test.go | 2 +- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go index ee6973a340f2..875b84cea6a9 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -737,7 +737,7 @@ func randEncDatumPrimaryFamily( // readSortedRangeFeedValues reads n values, and sorts them based on key order. func readSortedRangeFeedValues( - t *testing.T, n int, row func(t *testing.T) *kvpb.RangeFeedValue, + t *testing.T, n int, row func(t testing.TB) *kvpb.RangeFeedValue, ) (res []kvpb.RangeFeedValue) { t.Helper() for i := 0; i < n; i++ { diff --git a/pkg/ccl/changefeedccl/cdcevent/event_test.go b/pkg/ccl/changefeedccl/cdcevent/event_test.go index 4d3c1446b8af..2b5d00da3e73 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/event_test.go @@ -407,7 +407,6 @@ CREATE TABLE foo ( } }) } - } func TestEventColumnOrderingWithSchemaChanges(t *testing.T) { @@ -751,5 +750,49 @@ func TestMakeRowFromTuple(t *testing.T) { require.Equal(t, current.valAsString, tree.AsStringWithFlags(d, tree.FmtExport)) return nil })) +} + +func BenchmarkEventDecoder(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + + b.StopTimer() + s, db, _ := serverutils.StartServer(b, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(b, ` +CREATE TABLE foo ( + a INT, + b STRING, + c STRING, + PRIMARY KEY (b, a) +)`) + + tableDesc := cdctest.GetHydratedTableDescriptor(b, s.ExecutorConfig(), "foo") + popRow, cleanup := cdctest.MakeRangeFeedValueReader(b, s.ExecutorConfig(), tableDesc) + sqlDB.Exec(b, "INSERT INTO foo VALUES (5, 'hello', 'world')") + v := popRow(b) + cleanup() + + targets := changefeedbase.Targets{} + targets.Add(changefeedbase.Target{ + TableID: tableDesc.GetID(), + }) + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + ctx := context.Background() + decoder, err := NewEventDecoder(ctx, &execCfg, targets, false, false) + if err != nil { + b.Fatal(err) + } + b.ReportAllocs() + b.StartTimer() + + for i := 0; i < b.N; i++ { + _, err := decoder.DecodeKV( + ctx, roachpb.KeyValue{Key: v.Key, Value: v.Value}, CurrentRow, v.Timestamp(), false) + if err != nil { + b.Fatal(err) + } + } } diff --git a/pkg/ccl/changefeedccl/cdctest/row.go b/pkg/ccl/changefeedccl/cdctest/row.go index 583ee37aa193..dbecaef96b55 100644 --- a/pkg/ccl/changefeedccl/cdctest/row.go +++ b/pkg/ccl/changefeedccl/cdctest/row.go @@ -33,8 +33,8 @@ import ( // Instead of trying to generate KVs ourselves (subject to encoding restrictions, etc), it is // simpler to just "INSERT ..." into the table, and then use this function to read next value. func MakeRangeFeedValueReader( - t *testing.T, execCfgI interface{}, desc catalog.TableDescriptor, -) (func(t *testing.T) *kvpb.RangeFeedValue, func()) { + t testing.TB, execCfgI interface{}, desc catalog.TableDescriptor, +) (func(t testing.TB) *kvpb.RangeFeedValue, func()) { t.Helper() execCfg := execCfgI.(sql.ExecutorConfig) rows := make(chan *kvpb.RangeFeedValue) @@ -60,7 +60,7 @@ func MakeRangeFeedValueReader( // Helper to read next rangefeed value. dups := make(map[string]struct{}) - return func(t *testing.T) *kvpb.RangeFeedValue { + return func(t testing.TB) *kvpb.RangeFeedValue { t.Helper() for { select { @@ -84,7 +84,7 @@ func MakeRangeFeedValueReader( // GetHydratedTableDescriptor returns a table descriptor for the specified // table. The descriptor is "hydrated" if it has user defined data types. func GetHydratedTableDescriptor( - t *testing.T, execCfgI interface{}, parts ...tree.Name, + t testing.TB, execCfgI interface{}, parts ...tree.Name, ) (td catalog.TableDescriptor) { t.Helper() dbName, scName, tableName := func() (tree.Name, tree.Name, tree.Name) { diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index 30d72bcc9786..836bb4d6ea54 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -159,7 +159,7 @@ func TestParquetRows(t *testing.T) { func makeRangefeedReaderAndDecoder( t *testing.T, s serverutils.TestServerInterface, -) (func(t *testing.T) *kvpb.RangeFeedValue, func(), cdcevent.Decoder) { +) (func(t testing.TB) *kvpb.RangeFeedValue, func(), cdcevent.Decoder) { tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), tableDesc) targets := changefeedbase.Targets{} From f9a08e0873e3d6ef841262e7af61a9f3306525de Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 10 Jul 2023 13:33:47 -0400 Subject: [PATCH 4/4] row: Avoid allocations when using `ConsumeKVProvider` There is no need to allocate new KVFetcher when calling ConsumeKVProvider repeatedly. Issues: None Epic: None Release note: None --- pkg/sql/row/fetcher.go | 7 +++++-- pkg/sql/row/kv_fetcher.go | 4 ++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 3b6df4f397d1..97d8d44618a7 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -671,10 +671,13 @@ func (rf *Fetcher) ConsumeKVProvider(ctx context.Context, f *KVProvider) error { if !rf.args.WillUseKVProvider { return errors.AssertionFailedf("ConsumeKVProvider is called instead of StartScan") } - if rf.kvFetcher != nil { + if rf.kvFetcher == nil { + rf.kvFetcher = newKVFetcher(f) + } else { rf.kvFetcher.Close(ctx) + rf.kvFetcher.reset(f) } - rf.kvFetcher = newKVFetcher(f) + return rf.startScan(ctx) } diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 7051bbbe2cd0..ce1aab7e79e8 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -338,6 +338,10 @@ func (f *KVFetcher) SetupNextFetch( ) } +func (f *KVFetcher) reset(b KVBatchFetcher) { + *f = KVFetcher{KVBatchFetcher: b} +} + // KVProvider is a KVBatchFetcher that returns a set slice of kvs. type KVProvider struct { KVs []roachpb.KeyValue