diff --git a/docker-compose.yml b/docker-compose.yml index 9472443616..a5c527b056 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,6 +18,7 @@ x-flow-worker-env: &flow-worker-env ENABLE_METRICS: "true" # enables exporting of mirror metrics to Catalog in the PEERDB_STATS schema. ENABLE_STATS: "true" + PYROSCOPE_SERVER_ADDRESS: http://pyroscope:4040 services: catalog: @@ -60,6 +61,12 @@ services: labels: kompose.volume.type: configMap + pyroscope: + container_name: pyroscope + image: grafana/pyroscope:latest + ports: + - 4040:4040 + temporal-admin-tools: container_name: temporal-admin-tools depends_on: @@ -120,7 +127,6 @@ services: dockerfile: stacks/flow-worker.Dockerfile environment: <<: [*catalog-config, *flow-worker-env] - PROFILING_SERVER: 0.0.0.0:6060 METRICS_SERVER: 0.0.0.0:6061 ports: - 6060:6060 @@ -136,7 +142,6 @@ services: dockerfile: stacks/flow-worker.Dockerfile environment: <<: [*catalog-config, *flow-worker-env] - PROFILING_SERVER: 0.0.0.0:6062 METRICS_SERVER: 0.0.0.0:6063 ports: - 6062:6062 @@ -155,7 +160,6 @@ services: dockerfile: stacks/flow-worker.Dockerfile environment: <<: [*catalog-config, *flow-worker-env] - PROFILING_SERVER: 0.0.0.0:6064 METRICS_SERVER: 0.0.0.0:6065 ports: - 6064:6064 diff --git a/flow/cmd/main.go b/flow/cmd/main.go index 9b42fc6a55..e2c0a6a411 100644 --- a/flow/cmd/main.go +++ b/flow/cmd/main.go @@ -51,11 +51,11 @@ func main() { EnvVars: []string{"ENABLE_STATS"}, } - profilingServerFlag := &cli.StringFlag{ - Name: "profiling-server", - Value: "localhost:6060", // Default is localhost:6060 - Usage: "HTTP server address for profiling", - EnvVars: []string{"PROFILING_SERVER"}, + pyroscopeServerFlag := &cli.StringFlag{ + Name: "pyroscope-server-address", + Value: "http://pyroscope:4040", + Usage: "HTTP server address for pyroscope", + EnvVars: []string{"PYROSCOPE_SERVER_ADDRESS"}, } metricsServerFlag := &cli.StringFlag{ @@ -77,7 +77,7 @@ func main() { EnableProfiling: ctx.Bool("enable-profiling"), EnableMetrics: ctx.Bool("enable-metrics"), EnableMonitoring: ctx.Bool("enable-monitoring"), - ProfilingServer: ctx.String("profiling-server"), + PyroscopeServer: ctx.String("pyroscope-server-address"), MetricsServer: ctx.String("metrics-server"), }) }, @@ -86,7 +86,7 @@ func main() { profilingFlag, metricsFlag, monitoringFlag, - profilingServerFlag, + pyroscopeServerFlag, metricsServerFlag, }, }, diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 6abdd7e05f..a6528fbd32 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -2,7 +2,6 @@ package main import ( "fmt" - "net/http" "os" "os/signal" "runtime" @@ -20,6 +19,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber-go/tally/v4/prometheus" + "github.com/grafana/pyroscope-go" prom "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/client" @@ -32,26 +32,55 @@ type WorkerOptions struct { EnableProfiling bool EnableMetrics bool EnableMonitoring bool - ProfilingServer string + PyroscopeServer string MetricsServer string } +func setupPyroscope(opts *WorkerOptions) { + if opts.PyroscopeServer == "" { + log.Fatal("pyroscope server address is not set but profiling is enabled") + } + + // measure contention + runtime.SetMutexProfileFraction(5) + runtime.SetBlockProfileRate(5) + + _, err := pyroscope.Start(pyroscope.Config{ + ApplicationName: "io.peerdb.flow_worker", + + ServerAddress: opts.PyroscopeServer, + + // you can disable logging by setting this to nil + Logger: log.StandardLogger(), + + // you can provide static tags via a map: + Tags: map[string]string{"hostname": os.Getenv("HOSTNAME")}, + + ProfileTypes: []pyroscope.ProfileType{ + // these profile types are enabled by default: + pyroscope.ProfileCPU, + pyroscope.ProfileAllocObjects, + pyroscope.ProfileAllocSpace, + pyroscope.ProfileInuseObjects, + pyroscope.ProfileInuseSpace, + + // these profile types are optional: + pyroscope.ProfileGoroutines, + pyroscope.ProfileMutexCount, + pyroscope.ProfileMutexDuration, + pyroscope.ProfileBlockCount, + pyroscope.ProfileBlockDuration, + }, + }) + + if err != nil { + log.Fatal(err) + } +} + func WorkerMain(opts *WorkerOptions) error { if opts.EnableProfiling { - // Start HTTP profiling server with timeouts - go func() { - server := http.Server{ - Addr: opts.ProfilingServer, - ReadTimeout: 5 * time.Minute, - WriteTimeout: 15 * time.Minute, - } - - log.Infof("starting profiling server on %s", opts.ProfilingServer) - - if err := server.ListenAndServe(); err != nil { - log.Errorf("unable to start profiling server: %v", err) - } - }() + setupPyroscope(opts) } go func() { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 34d91dea45..251c7a0eeb 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -221,7 +221,7 @@ func (p *PostgresCDCSource) consumeStream( // should be ideally sourceTableName as we are in pullRecrods. // will change in future pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn - pkeyColVal := rec.GetItems()[pkeyCol] + pkeyColVal := rec.GetItems().GetColumnValue(pkeyCol) tablePkeyVal := model.TableWithPkey{ TableName: tableName, PkeyColVal: pkeyColVal, @@ -232,10 +232,10 @@ func (p *PostgresCDCSource) consumeStream( records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1 } else { oldRec := records.Records[records.TablePKeyLastSeen[tablePkeyVal]] - // iterate through unchanged toast cols and set them - for col, val := range oldRec.GetItems() { - if _, ok := r.NewItems[col]; !ok { - r.NewItems[col] = val + // iterate through unchanged toast cols and set them in new record + updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems()) + for _, col := range updatedCols { + if _, ok := r.UnchangedToastColumns[col]; ok { delete(r.UnchangedToastColumns, col) } } @@ -244,7 +244,7 @@ func (p *PostgresCDCSource) consumeStream( } case *model.InsertRecord: pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn - pkeyColVal := rec.GetItems()[pkeyCol] + pkeyColVal := rec.GetItems().GetColumnValue(pkeyCol) tablePkeyVal := model.TableWithPkey{ TableName: tableName, PkeyColVal: pkeyColVal, @@ -430,34 +430,35 @@ It takes a tuple and a relation message as input and returns func (p *PostgresCDCSource) convertTupleToMap( tuple *pglogrepl.TupleData, rel *protos.RelationMessage, -) (model.RecordItems, map[string]bool, error) { +) (*model.RecordItems, map[string]bool, error) { // if the tuple is nil, return an empty map if tuple == nil { - return make(model.RecordItems), make(map[string]bool), nil + return model.NewRecordItems(), make(map[string]bool), nil } // create empty map of string to interface{} - items := make(model.RecordItems) + items := model.NewRecordItems() unchangedToastColumns := make(map[string]bool) for idx, col := range tuple.Columns { colName := rel.Columns[idx].Name switch col.DataType { case 'n': // null - items[colName] = qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil} + val := &qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil} + items.AddColumn(colName, val) case 't': // text /* bytea also appears here as a hex */ data, err := p.decodeColumnData(col.Data, rel.Columns[idx].DataType, pgtype.TextFormatCode) if err != nil { return nil, nil, fmt.Errorf("error decoding text column data: %w", err) } - items[colName] = *data + items.AddColumn(colName, data) case 'b': // binary data, err := p.decodeColumnData(col.Data, rel.Columns[idx].DataType, pgtype.BinaryFormatCode) if err != nil { return nil, nil, fmt.Errorf("error decoding binary column data: %w", err) } - items[colName] = *data + items.AddColumn(colName, data) case 'u': // unchanged toast unchangedToastColumns[colName] = true default: diff --git a/flow/connectors/postgres/postgres_cdc_test.go b/flow/connectors/postgres/postgres_cdc_test.go index 727ffa133a..f6284cdf29 100644 --- a/flow/connectors/postgres/postgres_cdc_test.go +++ b/flow/connectors/postgres/postgres_cdc_test.go @@ -39,13 +39,19 @@ func (suite *PostgresCDCTestSuite) insertSimpleRecords(srcTableName string) { func (suite *PostgresCDCTestSuite) validateInsertedSimpleRecords(records []model.Record, srcTableName string, dstTableName string) { suite.Equal(3, len(records)) - matchData := []model.RecordItems{ - {"id": qvalue.QValue{Kind: qvalue.QValueKindInt32, Value: int32(2)}, - "name": qvalue.QValue{Kind: qvalue.QValueKindString, Value: "quick"}}, - {"id": qvalue.QValue{Kind: qvalue.QValueKindInt32, Value: int32(4)}, - "name": qvalue.QValue{Kind: qvalue.QValueKindString, Value: "brown"}}, - {"id": qvalue.QValue{Kind: qvalue.QValueKindInt32, Value: int32(8)}, - "name": qvalue.QValue{Kind: qvalue.QValueKindString, Value: "fox"}}, + matchData := []*model.RecordItems{ + model.NewRecordItemWithData(map[string]*qvalue.QValue{ + "id": {Kind: qvalue.QValueKindInt32, Value: int32(2)}, + "name": {Kind: qvalue.QValueKindString, Value: "quick"}, + }), + model.NewRecordItemWithData(map[string]*qvalue.QValue{ + "id": {Kind: qvalue.QValueKindInt32, Value: int32(4)}, + "name": {Kind: qvalue.QValueKindString, Value: "brown"}, + }), + model.NewRecordItemWithData(map[string]*qvalue.QValue{ + "id": {Kind: qvalue.QValueKindInt32, Value: int32(8)}, + "name": {Kind: qvalue.QValueKindString, Value: "fox"}, + }), } for idx, record := range records { suite.IsType(&model.InsertRecord{}, record) @@ -84,15 +90,22 @@ func (suite *PostgresCDCTestSuite) validateSimpleMutatedRecords(records []model. suite.Equal(srcTableName, updateRecord.SourceTableName) suite.Equal(dstTableName, updateRecord.DestinationTableName) suite.Equal(model.RecordItems{}, updateRecord.OldItems) - suite.Equal(model.RecordItems{"id": qvalue.QValue{Kind: qvalue.QValueKindInt32, Value: int32(2)}, - "name": qvalue.QValue{Kind: qvalue.QValueKindString, Value: "slow"}}, updateRecord.NewItems) + + items := model.NewRecordItemWithData(map[string]*qvalue.QValue{ + "id": {Kind: qvalue.QValueKindInt32, Value: int32(2)}, + "name": {Kind: qvalue.QValueKindString, Value: "slow"}, + }) + suite.Equal(items, updateRecord.NewItems) suite.IsType(&model.DeleteRecord{}, records[1]) deleteRecord := records[1].(*model.DeleteRecord) suite.Equal(srcTableName, deleteRecord.SourceTableName) suite.Equal(dstTableName, deleteRecord.DestinationTableName) - suite.Equal(model.RecordItems{"id": qvalue.QValue{Kind: qvalue.QValueKindInt32, Value: int32(8)}, - "name": qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil}}, deleteRecord.Items) + items = model.NewRecordItemWithData(map[string]*qvalue.QValue{ + "id": {Kind: qvalue.QValueKindInt32, Value: int32(8)}, + "name": {Kind: qvalue.QValueKindInvalid, Value: nil}, + }) + suite.Equal(items, deleteRecord.Items) } func (suite *PostgresCDCTestSuite) randBytea(n int) []byte { @@ -143,13 +156,29 @@ func (suite *PostgresCDCTestSuite) validateInsertedToastRecords(records []model. insertRecord := record.(*model.InsertRecord) suite.Equal(srcTableName, insertRecord.SourceTableName) suite.Equal(dstTableName, insertRecord.DestinationTableName) - suite.Equal(5, len(insertRecord.Items)) + suite.Equal(5, insertRecord.Items.Len()) + + idVal, err := insertRecord.Items.GetValueByColName("id") + suite.NoError(err, "Error fetching id") + + n_tVal, err := insertRecord.Items.GetValueByColName("n_t") + suite.NoError(err, "Error fetching n_t") + + lz4_tVal, err := insertRecord.Items.GetValueByColName("lz4_t") + suite.NoError(err, "Error fetching lz4_t") - suite.Equal(int32(idx+1), insertRecord.Items["id"].Value.(int32)) - suite.Equal(32768, len(insertRecord.Items["n_t"].Value.(string))) - suite.Equal(32768, len(insertRecord.Items["lz4_t"].Value.(string))) - suite.Equal(32768, len(insertRecord.Items["n_b"].Value.([]byte))) - suite.Equal(32768, len(insertRecord.Items["lz4_b"].Value.([]byte))) + n_bVal, err := insertRecord.Items.GetValueByColName("n_b") + suite.NoError(err, "Error fetching n_b") + + lz4_bVal, err := insertRecord.Items.GetValueByColName("lz4_b") + suite.NoError(err, "Error fetching lz4_b") + + // Perform the actual value checks + suite.Equal(int32(idx+1), idVal.Value.(int32)) + suite.Equal(32768, len(n_tVal.Value.(string))) + suite.Equal(32768, len(lz4_tVal.Value.(string))) + suite.Equal(32768, len(n_bVal.Value.([]byte))) + suite.Equal(32768, len(lz4_bVal.Value.([]byte))) } } @@ -191,61 +220,74 @@ func (suite *PostgresCDCTestSuite) validateMutatedToastRecords(records []model.R updateRecord := records[0].(*model.UpdateRecord) suite.Equal(srcTableName, updateRecord.SourceTableName) suite.Equal(dstTableName, updateRecord.DestinationTableName) - suite.Equal(2, len(updateRecord.NewItems)) - suite.Equal(int32(1), updateRecord.NewItems["id"].Value.(int32)) - suite.Equal(qvalue.QValueKindString, updateRecord.NewItems["n_t"].Kind) - suite.Equal(65536, len(updateRecord.NewItems["n_t"].Value.(string))) + items := updateRecord.NewItems + suite.Equal(2, items.Len()) + v := items.GetColumnValue("id") + suite.Equal(int32(1), v.Value.(int32)) + v = items.GetColumnValue("n_t") + suite.Equal(qvalue.QValueKindString, v.Kind) + suite.Equal(65536, len(v.Value.(string))) suite.Equal(3, len(updateRecord.UnchangedToastColumns)) suite.True(updateRecord.UnchangedToastColumns["lz4_t"]) suite.True(updateRecord.UnchangedToastColumns["n_b"]) suite.True(updateRecord.UnchangedToastColumns["lz4_b"]) - suite.IsType(&model.UpdateRecord{}, records[1]) updateRecord = records[1].(*model.UpdateRecord) suite.Equal(srcTableName, updateRecord.SourceTableName) suite.Equal(dstTableName, updateRecord.DestinationTableName) - suite.Equal(4, len(updateRecord.NewItems)) - suite.Equal(qvalue.QValueKindInt32, updateRecord.NewItems["id"].Kind) - suite.Equal(int32(3), updateRecord.NewItems["id"].Value.(int32)) - suite.Equal(qvalue.QValueKindString, updateRecord.NewItems["lz4_t"].Kind) - suite.Equal(65536, len(updateRecord.NewItems["lz4_t"].Value.(string))) - suite.Equal(qvalue.QValueKindBytes, updateRecord.NewItems["n_b"].Kind) - suite.Equal(65536, len(updateRecord.NewItems["n_b"].Value.([]byte))) - suite.Equal(qvalue.QValueKindBytes, updateRecord.NewItems["lz4_b"].Kind) - suite.Equal(65536, len(updateRecord.NewItems["lz4_b"].Value.([]byte))) + + items = updateRecord.NewItems + suite.Equal(4, items.Len()) + v = items.GetColumnValue("id") + suite.Equal(qvalue.QValueKindInt32, v.Kind) + suite.Equal(int32(3), v.Value.(int32)) + v = items.GetColumnValue("lz4_t") + suite.Equal(qvalue.QValueKindString, v.Kind) + suite.Equal(65536, len(v.Value.(string))) + v = items.GetColumnValue("n_b") + suite.Equal(qvalue.QValueKindBytes, v.Kind) + suite.Equal(65536, len(v.Value.([]byte))) + v = items.GetColumnValue("lz4_b") + suite.Equal(qvalue.QValueKindBytes, v.Kind) + suite.Equal(65536, len(v.Value.([]byte))) suite.Equal(1, len(updateRecord.UnchangedToastColumns)) suite.True(updateRecord.UnchangedToastColumns["n_t"]) - + // Test case for records[2] suite.IsType(&model.UpdateRecord{}, records[2]) updateRecord = records[2].(*model.UpdateRecord) suite.Equal(srcTableName, updateRecord.SourceTableName) suite.Equal(dstTableName, updateRecord.DestinationTableName) - suite.Equal(5, len(updateRecord.NewItems)) - suite.Equal(int32(4), updateRecord.NewItems["id"].Value.(int32)) - suite.Equal(qvalue.QValueKindString, updateRecord.NewItems["n_t"].Kind) - suite.Equal(65536, len(updateRecord.NewItems["n_t"].Value.(string))) - suite.Equal(qvalue.QValueKindString, updateRecord.NewItems["lz4_t"].Kind) - suite.Equal(65536, len(updateRecord.NewItems["lz4_t"].Value.(string))) - suite.Equal(qvalue.QValueKindBytes, updateRecord.NewItems["n_b"].Kind) - suite.Equal(65536, len(updateRecord.NewItems["n_b"].Value.([]byte))) - suite.Equal(qvalue.QValueKindBytes, updateRecord.NewItems["lz4_b"].Kind) - suite.Equal(65536, len(updateRecord.NewItems["lz4_b"].Value.([]byte))) + + items = updateRecord.NewItems + suite.Equal(5, items.Len()) + v = items.GetColumnValue("id") + suite.Equal(int32(4), v.Value.(int32)) + suite.Equal(qvalue.QValueKindString, items.GetColumnValue("n_t").Kind) + suite.Equal(65536, len(items.GetColumnValue("n_t").Value.(string))) + suite.Equal(qvalue.QValueKindString, items.GetColumnValue("lz4_t").Kind) + suite.Equal(65536, len(items.GetColumnValue("lz4_t").Value.(string))) + suite.Equal(qvalue.QValueKindBytes, items.GetColumnValue("n_b").Kind) + suite.Equal(65536, len(items.GetColumnValue("n_b").Value.([]byte))) + suite.Equal(qvalue.QValueKindBytes, items.GetColumnValue("lz4_b").Kind) + suite.Equal(65536, len(items.GetColumnValue("lz4_b").Value.([]byte))) suite.Equal(0, len(updateRecord.UnchangedToastColumns)) + // Test case for records[3] suite.IsType(&model.DeleteRecord{}, records[3]) deleteRecord := records[3].(*model.DeleteRecord) suite.Equal(srcTableName, deleteRecord.SourceTableName) suite.Equal(dstTableName, deleteRecord.DestinationTableName) - suite.Equal(5, len(deleteRecord.Items)) - suite.Equal(int32(3), deleteRecord.Items["id"].Value.(int32)) - suite.Equal(qvalue.QValueKindInvalid, deleteRecord.Items["n_t"].Kind) - suite.Nil(deleteRecord.Items["n_t"].Value) - suite.Equal(qvalue.QValueKindInvalid, deleteRecord.Items["lz4_t"].Kind) - suite.Nil(deleteRecord.Items["lz4_t"].Value) - suite.Equal(qvalue.QValueKindInvalid, deleteRecord.Items["n_b"].Kind) - suite.Nil(deleteRecord.Items["n_b"].Value) - suite.Equal(qvalue.QValueKindInvalid, deleteRecord.Items["lz4_b"].Kind) - suite.Nil(deleteRecord.Items["lz4_b"].Value) + items = deleteRecord.Items + suite.Equal(5, items.Len()) + suite.Equal(int32(3), items.GetColumnValue("id").Value.(int32)) + suite.Equal(qvalue.QValueKindInvalid, items.GetColumnValue("n_t").Kind) + suite.Nil(items.GetColumnValue("n_t").Value) + suite.Equal(qvalue.QValueKindInvalid, items.GetColumnValue("lz4_t").Kind) + suite.Nil(items.GetColumnValue("lz4_t").Value) + suite.Equal(qvalue.QValueKindInvalid, items.GetColumnValue("n_b").Kind) + suite.Nil(items.GetColumnValue("n_b").Value) + suite.Equal(qvalue.QValueKindInvalid, items.GetColumnValue("lz4_b").Kind) + suite.Nil(items.GetColumnValue("lz4_b").Value) } func (suite *PostgresCDCTestSuite) SetupSuite() { @@ -649,7 +691,7 @@ func (suite *PostgresCDCTestSuite) TestAllTypesHappyFlow() { }) suite.failTestError(err) suite.Equal(1, len(records.RecordBatch.Records)) - suite.Equal(35, len(records.RecordBatch.Records[0].GetItems())) + suite.Equal(35, records.RecordBatch.Records[0].GetItems().Len()) err = suite.connector.PullFlowCleanup(allTypesHappyFlowName) suite.failTestError(err) diff --git a/flow/go.mod b/flow/go.mod index df3e3ed06f..e11e83f977 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -36,6 +36,7 @@ require ( require ( github.com/golang-jwt/jwt/v5 v5.0.0 // indirect + github.com/grafana/pyroscope-go/godeltaprof v0.1.3 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/twmb/murmur3 v1.1.8 // indirect @@ -106,6 +107,7 @@ require ( github.com/google/s2a-go v0.1.5 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/grafana/pyroscope-go v1.0.2 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/errwrap v1.1.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index ee6182af9b..c0c5591487 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -1193,6 +1193,10 @@ github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+ github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/grafana/pyroscope-go v1.0.2 h1:dEFgO9VbhYTwuwpCC5coTpuW0JjISEWDZtvRAW9v5Tw= +github.com/grafana/pyroscope-go v1.0.2/go.mod h1:bShDKsVZdzxq+Ol6no0JKigU9y5FTWUcFditMXaH09o= +github.com/grafana/pyroscope-go/godeltaprof v0.1.3 h1:eunWpv1B3Z7ZK9o4499EmQGlY+CsDmSZ4FbxjRx37uk= +github.com/grafana/pyroscope-go/godeltaprof v0.1.3/go.mod h1:1HSPtjU8vLG0jE9JrTdzjgFqdJ/VgN7fvxBNq3luJko= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= diff --git a/flow/model/model.go b/flow/model/model.go index 9bc5877cba..c04ce50957 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -2,6 +2,8 @@ package model import ( "encoding/json" + "errors" + "fmt" "math/big" "time" @@ -38,33 +40,106 @@ type Record interface { // get table name GetTableName() string // get columns and values for the record - GetItems() RecordItems + GetItems() *RecordItems } -type RecordItems map[string]qvalue.QValue +type RecordItems struct { + colToValIdx map[string]int + values []*qvalue.QValue +} + +func NewRecordItems() *RecordItems { + return &RecordItems{ + colToValIdx: make(map[string]int), + // create a slice of 64 qvalues so that we don't have to allocate memory + // for each record to reduce GC pressure + values: make([]*qvalue.QValue, 16), + } +} + +func NewRecordItemWithData(data map[string]*qvalue.QValue) *RecordItems { + recordItem := NewRecordItems() + for col, val := range data { + recordItem.colToValIdx[col] = len(recordItem.values) + recordItem.values = append(recordItem.values, val) + } + return recordItem +} -func (r RecordItems) ToJSON() (string, error) { +func (r *RecordItems) AddColumn(col string, val *qvalue.QValue) { + if _, ok := r.colToValIdx[col]; ok { + return + } + + r.colToValIdx[col] = len(r.values) + r.values = append(r.values, val) +} + +func (r *RecordItems) GetColumnValue(col string) *qvalue.QValue { + idx, ok := r.colToValIdx[col] + if !ok { + return nil + } + return r.values[idx] +} + +// UpdateIfNotExists takes in a RecordItems as input and updates the values of the +// current RecordItems with the values from the input RecordItems for the columns +// that are present in the input RecordItems but not in the current RecordItems. +// We return the slice of col names that were updated. +func (r *RecordItems) UpdateIfNotExists(input *RecordItems) []string { + var updatedCols []string + for col, idx := range input.colToValIdx { + if _, ok := r.colToValIdx[col]; !ok { + r.colToValIdx[col] = len(r.values) + r.values = append(r.values, input.values[idx]) + updatedCols = append(updatedCols, col) + } + } + return updatedCols +} + +func (r *RecordItems) GetValueByColName(colName string) (*qvalue.QValue, error) { + idx, ok := r.colToValIdx[colName] + if !ok { + return nil, fmt.Errorf("column name %s not found", colName) + } + return r.values[idx], nil +} + +func (r *RecordItems) Len() int { + return len(r.values) +} + +func (r *RecordItems) ToJSON() (string, error) { + if r.colToValIdx == nil { + return "", errors.New("colToValIdx is nil") + } jsonStruct := make(map[string]interface{}) - for k, v := range r { + for col, idx := range r.colToValIdx { + v := r.values[idx] var err error switch v.Kind { case qvalue.QValueKindString, qvalue.QValueKindJSON: if len(v.Value.(string)) > 15*1024*1024 { - jsonStruct[k] = "" + jsonStruct[col] = "" } else { - jsonStruct[k] = v.Value + jsonStruct[col] = v.Value } case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ, qvalue.QValueKindDate, qvalue.QValueKindTime, qvalue.QValueKindTimeTZ: - jsonStruct[k], err = v.GoTimeConvert() + jsonStruct[col], err = v.GoTimeConvert() if err != nil { return "", err } case qvalue.QValueKindNumeric: - bigRat := v.Value.(*big.Rat) - jsonStruct[k] = bigRat.FloatString(9) + bigRat, ok := v.Value.(*big.Rat) + if !ok { + return "", errors.New("expected *big.Rat value") + } + jsonStruct[col] = bigRat.FloatString(9) default: - jsonStruct[k] = v.Value + jsonStruct[col] = v.Value } } jsonBytes, err := json.Marshal(jsonStruct) @@ -84,7 +159,7 @@ type InsertRecord struct { // CommitID is the ID of the commit corresponding to this record. CommitID int64 // Items is a map of column name to value. - Items RecordItems + Items *RecordItems // unchanged toast columns UnchangedToastColumns map[string]bool } @@ -98,7 +173,7 @@ func (r *InsertRecord) GetTableName() string { return r.DestinationTableName } -func (r *InsertRecord) GetItems() RecordItems { +func (r *InsertRecord) GetItems() *RecordItems { return r.Items } @@ -110,9 +185,9 @@ type UpdateRecord struct { // Name of the destination table DestinationTableName string // OldItems is a map of column name to value. - OldItems RecordItems + OldItems *RecordItems // NewItems is a map of column name to value. - NewItems RecordItems + NewItems *RecordItems // unchanged toast columns UnchangedToastColumns map[string]bool } @@ -127,7 +202,7 @@ func (r *UpdateRecord) GetTableName() string { return r.DestinationTableName } -func (r *UpdateRecord) GetItems() RecordItems { +func (r *UpdateRecord) GetItems() *RecordItems { return r.NewItems } @@ -139,7 +214,7 @@ type DeleteRecord struct { // CheckPointID is the ID of the record. CheckPointID int64 // Items is a map of column name to value. - Items RecordItems + Items *RecordItems // unchanged toast columns UnchangedToastColumns map[string]bool } @@ -153,7 +228,7 @@ func (r *DeleteRecord) GetTableName() string { return r.SourceTableName } -func (r *DeleteRecord) GetItems() RecordItems { +func (r *DeleteRecord) GetItems() *RecordItems { return r.Items } @@ -238,7 +313,7 @@ func (r *RelationRecord) GetTableName() string { return r.TableSchemaDelta.SrcTableName } -func (r *RelationRecord) GetItems() RecordItems { +func (r *RelationRecord) GetItems() *RecordItems { return nil } diff --git a/flow/workflows/peer_flow.go b/flow/workflows/peer_flow.go index e509e59e52..b2f015cb07 100644 --- a/flow/workflows/peer_flow.go +++ b/flow/workflows/peer_flow.go @@ -93,7 +93,7 @@ func NewStartedPeerFlowState() *PeerFlowState { } // truncate the progress and other arrays to a max of 10 elements -func (s *PeerFlowState) TruncateProgress() { +func (s *PeerFlowState) TruncateProgress(log log.Logger) { if len(s.Progress) > 10 { s.Progress = s.Progress[len(s.Progress)-10:] } @@ -103,6 +103,19 @@ func (s *PeerFlowState) TruncateProgress() { if len(s.NormalizeFlowStatuses) > 10 { s.NormalizeFlowStatuses = s.NormalizeFlowStatuses[len(s.NormalizeFlowStatuses)-10:] } + + if s.SyncFlowErrors != nil { + // log and clear the error + log.Error("sync flow error: ", s.SyncFlowErrors) + s.SyncFlowErrors = nil + } + + if s.NormalizeFlowErrors != nil { + // log and clear the error + log.Error("normalize flow error: ", s.NormalizeFlowErrors) + s.NormalizeFlowErrors = nil + } + } // PeerFlowWorkflowExecution represents the state for execution of a peer flow. @@ -413,6 +426,6 @@ func PeerFlowWorkflowWithConfig( } } - state.TruncateProgress() + state.TruncateProgress(w.logger) return nil, workflow.NewContinueAsNewError(ctx, PeerFlowWorkflowWithConfig, cfg, limits, state) }