Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
106193: util/metric: change preset buckets from NetworkLatencyBuckets to IOLatencyBuckets r=ericharmeling a=ericharmeling

This commit replaces NetworkLatencyBuckets with IOLatencyBuckets as the preset buckets used for the following metrics' histograms:
- `liveness.heartbeatlatency`
- `leases.requests.latency`
- `kv.prober.read.latency`
- `kv.prober.write.latency`
- `proxy.conn_migration.attempted.latency`

The upper limit on NetworkLatencyBuckets (1s) is too low for these metrics. Bucket size for all preset buckets increases logarithmically (see `prometheus.ExponentialBucketsRange`), retaining fidelity at the lower-end of buckets.

Fixes #104017.

Release note: None

106481: changefeedccl: Update previous row builder when version changes Parquet r=miretskiy a=miretskiy

Parquet writer incorrectly cached "value builder" state, even
when row version changed.

Epic: None

Release note: None

106536: row: Avoid allocations when using ConsumeKVProvider r=miretskiy a=miretskiy

There is no need to allocate new KVFetcher when calling
ConsumeKVProvider repeatedly.

Issues: None
Epic: None
Release note: None

Co-authored-by: Eric Harmeling <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed Jul 10, 2023
4 parents 7cd5444 + 831f979 + 7c4e8be + f9a08e0 commit 5e8c311
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 100 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/expr_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ func randEncDatumPrimaryFamily(

// readSortedRangeFeedValues reads n values, and sorts them based on key order.
func readSortedRangeFeedValues(
t *testing.T, n int, row func(t *testing.T) *kvpb.RangeFeedValue,
t *testing.T, n int, row func(t testing.TB) *kvpb.RangeFeedValue,
) (res []kvpb.RangeFeedValue) {
t.Helper()
for i := 0; i < n; i++ {
Expand Down
45 changes: 44 additions & 1 deletion pkg/ccl/changefeedccl/cdcevent/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ CREATE TABLE foo (
}
})
}

}

func TestEventColumnOrderingWithSchemaChanges(t *testing.T) {
Expand Down Expand Up @@ -751,5 +750,49 @@ func TestMakeRowFromTuple(t *testing.T) {
require.Equal(t, current.valAsString, tree.AsStringWithFlags(d, tree.FmtExport))
return nil
}))
}

func BenchmarkEventDecoder(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)

b.StopTimer()
s, db, _ := serverutils.StartServer(b, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(b, `
CREATE TABLE foo (
a INT,
b STRING,
c STRING,
PRIMARY KEY (b, a)
)`)

tableDesc := cdctest.GetHydratedTableDescriptor(b, s.ExecutorConfig(), "foo")
popRow, cleanup := cdctest.MakeRangeFeedValueReader(b, s.ExecutorConfig(), tableDesc)
sqlDB.Exec(b, "INSERT INTO foo VALUES (5, 'hello', 'world')")
v := popRow(b)
cleanup()

targets := changefeedbase.Targets{}
targets.Add(changefeedbase.Target{
TableID: tableDesc.GetID(),
})
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
ctx := context.Background()
decoder, err := NewEventDecoder(ctx, &execCfg, targets, false, false)
if err != nil {
b.Fatal(err)
}
b.ReportAllocs()
b.StartTimer()

for i := 0; i < b.N; i++ {
_, err := decoder.DecodeKV(
ctx, roachpb.KeyValue{Key: v.Key, Value: v.Value}, CurrentRow, v.Timestamp(), false)
if err != nil {
b.Fatal(err)
}
}
}
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/cdctest/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
// Instead of trying to generate KVs ourselves (subject to encoding restrictions, etc), it is
// simpler to just "INSERT ..." into the table, and then use this function to read next value.
func MakeRangeFeedValueReader(
t *testing.T, execCfgI interface{}, desc catalog.TableDescriptor,
) (func(t *testing.T) *kvpb.RangeFeedValue, func()) {
t testing.TB, execCfgI interface{}, desc catalog.TableDescriptor,
) (func(t testing.TB) *kvpb.RangeFeedValue, func()) {
t.Helper()
execCfg := execCfgI.(sql.ExecutorConfig)
rows := make(chan *kvpb.RangeFeedValue)
Expand All @@ -60,7 +60,7 @@ func MakeRangeFeedValueReader(

// Helper to read next rangefeed value.
dups := make(map[string]struct{})
return func(t *testing.T) *kvpb.RangeFeedValue {
return func(t testing.TB) *kvpb.RangeFeedValue {
t.Helper()
for {
select {
Expand All @@ -84,7 +84,7 @@ func MakeRangeFeedValueReader(
// GetHydratedTableDescriptor returns a table descriptor for the specified
// table. The descriptor is "hydrated" if it has user defined data types.
func GetHydratedTableDescriptor(
t *testing.T, execCfgI interface{}, parts ...tree.Name,
t testing.TB, execCfgI interface{}, parts ...tree.Name,
) (td catalog.TableDescriptor) {
t.Helper()
dbName, scName, tableName := func() (tree.Name, tree.Name, tree.Name) {
Expand Down
17 changes: 11 additions & 6 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand All @@ -37,8 +38,11 @@ type parquetWriter struct {
schemaDef *parquet.SchemaDefinition
datumAlloc []tree.Datum

// Cached object builder for when using the `diff` option.
vb *json.FixedKeysObjectBuilder
// Cached object builder for previous row when using the `diff` option.
prevState struct {
vb *json.FixedKeysObjectBuilder
version descpb.DescriptorVersion
}
}

// newParquetSchemaDefintion returns a parquet schema definition based on the
Expand Down Expand Up @@ -165,7 +169,7 @@ func (w *parquetWriter) populateDatums(
if prevRow.IsDeleted() {
datums = append(datums, tree.DNull)
} else {
if w.vb == nil {
if w.prevState.vb == nil || w.prevState.version != prevRow.Version {
keys := make([]string, 0, len(prevRow.ResultColumns()))
_ = prevRow.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
keys = append(keys, col.Name)
Expand All @@ -175,20 +179,21 @@ func (w *parquetWriter) populateDatums(
if err != nil {
return err
}
w.vb = valueBuilder
w.prevState.version = prevRow.Version
w.prevState.vb = valueBuilder
}

if err := prevRow.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
j, err := tree.AsJSON(d, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
return w.vb.Set(col.Name, j)
return w.prevState.vb.Set(col.Name, j)
}); err != nil {
return err
}

j, err := w.vb.Build()
j, err := w.prevState.vb.Build()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestParquetRows(t *testing.T) {

func makeRangefeedReaderAndDecoder(
t *testing.T, s serverutils.TestServerInterface,
) (func(t *testing.T) *kvpb.RangeFeedValue, func(), cdcevent.Decoder) {
) (func(t testing.TB) *kvpb.RangeFeedValue, func(), cdcevent.Decoder) {
tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo")
popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), tableDesc)
targets := changefeedbase.Targets{}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TestConnector_dialTenantCluster(t *testing.T) {
Mode: metric.HistogramModePrometheus,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
Buckets: metric.IOLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
Expand Down Expand Up @@ -469,7 +469,7 @@ func TestConnector_dialTenantCluster(t *testing.T) {
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
Buckets: metric.IOLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestConnector_dialTenantCluster(t *testing.T) {
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
Buckets: metric.IOLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func makeProxyMetrics() metrics {
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaConnMigrationAttemptedCount,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets,
Buckets: metric.IOLatencyBuckets,
}),
AuthFailedCount: metric.NewCounter(metaAuthFailedCount),
ExpiredClientConnCount: metric.NewCounter(metaExpiredClientConnCount),
Expand All @@ -246,7 +246,7 @@ func makeProxyMetrics() metrics {
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets},
Buckets: metric.IOLatencyBuckets},
),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
// Connection migration metrics.
Expand All @@ -258,7 +258,7 @@ func makeProxyMetrics() metrics {
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaConnMigrationAttemptedLatency,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets,
Buckets: metric.IOLatencyBuckets,
}),
ConnMigrationTransferResponseMessageSize: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaConnMigrationTransferResponseMessageSize,
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ func NewProber(opts Opts) *Prober {
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaReadProbeLatency,
Duration: opts.HistogramWindowInterval,
Buckets: metric.NetworkLatencyBuckets,
Buckets: metric.IOLatencyBuckets,
}),
WriteProbeAttempts: metric.NewCounter(metaWriteProbeAttempts),
WriteProbeFailures: metric.NewCounter(metaWriteProbeFailures),
WriteProbeLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaWriteProbeLatency,
Duration: opts.HistogramWindowInterval,
Buckets: metric.NetworkLatencyBuckets,
Buckets: metric.IOLatencyBuckets,
}),
WriteProbeQuarantineOldestDuration: metric.NewFunctionalGauge(
metaWriteProbeQuarantineOldestDuration,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_manual_proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ LIMIT
Mode: metric.HistogramModePrometheus,
Metadata: fakeMeta,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
Buckets: metric.IOLatencyBuckets,
}),
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness {
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaHeartbeatLatency,
Duration: opts.HistogramWindowInterval,
Buckets: metric.NetworkLatencyBuckets,
Buckets: metric.IOLatencyBuckets,
}),
}
nl.cache = newCache(opts.Gossip, opts.Clock, nl.cacheUpdated)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2826,7 +2826,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaLeaseRequestLatency,
Duration: histogramWindow,
Buckets: metric.NetworkLatencyBuckets,
Buckets: metric.IOLatencyBuckets,
}),
LeaseTransferSuccessCount: metric.NewCounter(metaLeaseTransferSuccessCount),
LeaseTransferErrorCount: metric.NewCounter(metaLeaseTransferErrorCount),
Expand Down Expand Up @@ -3028,7 +3028,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
Mode: metric.HistogramModePrometheus,
Metadata: metaRaftReplicationLatency,
Duration: histogramWindow,
Buckets: metric.IOLatencyBuckets, // because NetworkLatencyBuckets tops out at 1s
Buckets: metric.IOLatencyBuckets,
}),
RaftSchedulerLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,13 @@ func (rf *Fetcher) ConsumeKVProvider(ctx context.Context, f *KVProvider) error {
if !rf.args.WillUseKVProvider {
return errors.AssertionFailedf("ConsumeKVProvider is called instead of StartScan")
}
if rf.kvFetcher != nil {
if rf.kvFetcher == nil {
rf.kvFetcher = newKVFetcher(f)
} else {
rf.kvFetcher.Close(ctx)
rf.kvFetcher.reset(f)
}
rf.kvFetcher = newKVFetcher(f)

return rf.startScan(ctx)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ func (f *KVFetcher) SetupNextFetch(
)
}

func (f *KVFetcher) reset(b KVBatchFetcher) {
*f = KVFetcher{KVBatchFetcher: b}
}

// KVProvider is a KVBatchFetcher that returns a set slice of kvs.
type KVProvider struct {
KVs []roachpb.KeyValue
Expand Down
68 changes: 0 additions & 68 deletions pkg/util/metric/histogram_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,74 +78,6 @@ var IOLatencyBuckets = []float64{
9999999999.999969, // 9.999999999s
}

// NetworkLatencyBuckets are prometheus histogram buckets suitable for a histogram
// that records a quantity (nanosecond-denominated) in which most measurements
// behave like network latencies, i.e. most measurements are in the ms to sub-second
// range during normal operation.
var NetworkLatencyBuckets = []float64{
// Generated via TestHistogramBuckets/NetworkLatencyBuckets.
500000.000000, // 500µs
568747.715565, // 568.747µs
646947.927922, // 646.947µs
735900.312190, // 735.9µs
837083.242884, // 837.083µs
952178.364257, // 952.178µs
1083098.538963, // 1.083098ms
1232019.639535, // 1.232019ms
1401416.711034, // 1.401416ms
1594105.105912, // 1.594105ms
1813287.274717, // 1.813287ms
2062605.990318, // 2.062605ms
2346204.890209, // 2.346204ms
2668797.343109, // 2.668797ms
3035744.784401, // 3.035744ms
3453145.822334, // 3.453145ms
3927937.595933, // 3.927937ms
4468011.069141, // 4.468011ms
5082342.177389, // 5.082342ms
5781141.006222, // 5.781141ms
6576021.481300, // 6.576021ms
7480194.389996, // 7.480194ms
8508686.942589, // 8.508686ms
9678592.522117, // 9.678592ms
11009354.773683, // 11.009354ms
12523090.754761, // 12.52309ms
14244958.517175, // 14.244958ms
16203575.229933, // 16.203575ms
18431492.792031, // 18.431492ms
20965738.839853, // 20.965738ms
23848432.140611, // 23.848432ms
27127482.599575, // 27.127482ms
30857387.515093, // 30.857387ms
35100137.315047, // 35.100137ms
39926245.827925, // 39.926245ms
45415922.211464, // 45.415922ms
51660404.016126, // 51.660404ms
58763473.538708, // 58.763473ms
66843182.667648, // 66.843182ms
76033814.886682, // 76.033814ms
86488117.045035, // 86.488117ms
98379837.985822, // 98.379837ms
111906616.224248, // 111.906616ms
127293264.668375, // 127.293264ms
144795506.973983, // 144.795506ms
164704227.631154, // 164.704227ms
187350306.418342, // 187.350306ms
213110117.571795, // 213.110117ms
242411785.065635, // 242.411785ms
275742297.964389, // 275.742297ms
313655604.103963, // 313.655604ms
356781816.616787, // 356.781816ms
405837686.312094, // 405.837686ms
461638513.960647, // 461.638513ms
525111700.464186, // 525.1117ms
597312160.111267, // 597.31216ms
679439853.085354, // 679.439853ms
772859728.612681, // 772.859728ms
879124410.201811, // 879.12441ms
1000000000.000001, // 1s
}

// BatchProcessLatencyBuckets are prometheus histogram buckets suitable for a
// histogram that records a quantity (nanosecond-denominated) in which most
// measurements are in the seconds to minutes range during normal operation.
Expand Down
5 changes: 0 additions & 5 deletions pkg/util/metric/histogram_buckets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ func TestHistogramBuckets(t *testing.T) {
verifyAndPrint(t, exp, IOLatencyBuckets, LATENCY)
})

t.Run("NetworkLatencyBuckets", func(t *testing.T) {
exp := prometheus.ExponentialBucketsRange(500e3, 1e9, 60)
verifyAndPrint(t, exp, NetworkLatencyBuckets, LATENCY)
})

t.Run("BatchProcessLatencyBuckets", func(t *testing.T) {
exp := prometheus.ExponentialBucketsRange(500e6, 300e9, 60)
verifyAndPrint(t, exp, BatchProcessLatencyBuckets, LATENCY)
Expand Down

0 comments on commit 5e8c311

Please sign in to comment.