Skip to content

Commit

Permalink
Pass QValue by pointer
Browse files Browse the repository at this point in the history
- also use pyroscope for better info
  • Loading branch information
iskakaushik committed Sep 12, 2023
1 parent c1d153a commit 713c938
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 111 deletions.
10 changes: 7 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ x-flow-worker-env: &flow-worker-env
ENABLE_METRICS: "true"
# enables exporting of mirror metrics to Catalog in the PEERDB_STATS schema.
ENABLE_STATS: "true"
PYROSCOPE_SERVER_ADDRESS: http://pyroscope:4040

services:
catalog:
Expand Down Expand Up @@ -60,6 +61,12 @@ services:
labels:
kompose.volume.type: configMap

pyroscope:
container_name: pyroscope
image: grafana/pyroscope:latest
ports:
- 4040:4040

temporal-admin-tools:
container_name: temporal-admin-tools
depends_on:
Expand Down Expand Up @@ -120,7 +127,6 @@ services:
dockerfile: stacks/flow-worker.Dockerfile
environment:
<<: [*catalog-config, *flow-worker-env]
PROFILING_SERVER: 0.0.0.0:6060
METRICS_SERVER: 0.0.0.0:6061
ports:
- 6060:6060
Expand All @@ -136,7 +142,6 @@ services:
dockerfile: stacks/flow-worker.Dockerfile
environment:
<<: [*catalog-config, *flow-worker-env]
PROFILING_SERVER: 0.0.0.0:6062
METRICS_SERVER: 0.0.0.0:6063
ports:
- 6062:6062
Expand All @@ -155,7 +160,6 @@ services:
dockerfile: stacks/flow-worker.Dockerfile
environment:
<<: [*catalog-config, *flow-worker-env]
PROFILING_SERVER: 0.0.0.0:6064
METRICS_SERVER: 0.0.0.0:6065
ports:
- 6064:6064
Expand Down
14 changes: 7 additions & 7 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func main() {
EnvVars: []string{"ENABLE_STATS"},
}

profilingServerFlag := &cli.StringFlag{
Name: "profiling-server",
Value: "localhost:6060", // Default is localhost:6060
Usage: "HTTP server address for profiling",
EnvVars: []string{"PROFILING_SERVER"},
pyroscopeServerFlag := &cli.StringFlag{
Name: "pyroscope-server-address",
Value: "http://pyroscope:4040",
Usage: "HTTP server address for pyroscope",
EnvVars: []string{"PYROSCOPE_SERVER_ADDRESS"},
}

metricsServerFlag := &cli.StringFlag{
Expand All @@ -77,7 +77,7 @@ func main() {
EnableProfiling: ctx.Bool("enable-profiling"),
EnableMetrics: ctx.Bool("enable-metrics"),
EnableMonitoring: ctx.Bool("enable-monitoring"),
ProfilingServer: ctx.String("profiling-server"),
PyroscopeServer: ctx.String("pyroscope-server-address"),
MetricsServer: ctx.String("metrics-server"),
})
},
Expand All @@ -86,7 +86,7 @@ func main() {
profilingFlag,
metricsFlag,
monitoringFlag,
profilingServerFlag,
pyroscopeServerFlag,
metricsServerFlag,
},
},
Expand Down
61 changes: 45 additions & 16 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"fmt"
"net/http"
"os"
"os/signal"
"runtime"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/uber-go/tally/v4"
"github.com/uber-go/tally/v4/prometheus"

"github.com/grafana/pyroscope-go"
prom "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/client"
Expand All @@ -32,26 +32,55 @@ type WorkerOptions struct {
EnableProfiling bool
EnableMetrics bool
EnableMonitoring bool
ProfilingServer string
PyroscopeServer string
MetricsServer string
}

func setupPyroscope(opts *WorkerOptions) {
if opts.PyroscopeServer == "" {
log.Fatal("pyroscope server address is not set but profiling is enabled")
}

// measure contention
runtime.SetMutexProfileFraction(5)
runtime.SetBlockProfileRate(5)

_, err := pyroscope.Start(pyroscope.Config{
ApplicationName: "io.peerdb.flow_worker",

ServerAddress: opts.PyroscopeServer,

// you can disable logging by setting this to nil
Logger: log.StandardLogger(),

// you can provide static tags via a map:
Tags: map[string]string{"hostname": os.Getenv("HOSTNAME")},

ProfileTypes: []pyroscope.ProfileType{
// these profile types are enabled by default:
pyroscope.ProfileCPU,
pyroscope.ProfileAllocObjects,
pyroscope.ProfileAllocSpace,
pyroscope.ProfileInuseObjects,
pyroscope.ProfileInuseSpace,

// these profile types are optional:
pyroscope.ProfileGoroutines,
pyroscope.ProfileMutexCount,
pyroscope.ProfileMutexDuration,
pyroscope.ProfileBlockCount,
pyroscope.ProfileBlockDuration,
},
})

if err != nil {
log.Fatal(err)
}
}

func WorkerMain(opts *WorkerOptions) error {
if opts.EnableProfiling {
// Start HTTP profiling server with timeouts
go func() {
server := http.Server{
Addr: opts.ProfilingServer,
ReadTimeout: 5 * time.Minute,
WriteTimeout: 15 * time.Minute,
}

log.Infof("starting profiling server on %s", opts.ProfilingServer)

if err := server.ListenAndServe(); err != nil {
log.Errorf("unable to start profiling server: %v", err)
}
}()
setupPyroscope(opts)
}

go func() {
Expand Down
25 changes: 13 additions & 12 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (p *PostgresCDCSource) consumeStream(
// should be ideally sourceTableName as we are in pullRecrods.
// will change in future
pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn
pkeyColVal := rec.GetItems()[pkeyCol]
pkeyColVal := rec.GetItems().GetColumnValue(pkeyCol)
tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: pkeyColVal,
Expand All @@ -232,10 +232,10 @@ func (p *PostgresCDCSource) consumeStream(
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
} else {
oldRec := records.Records[records.TablePKeyLastSeen[tablePkeyVal]]
// iterate through unchanged toast cols and set them
for col, val := range oldRec.GetItems() {
if _, ok := r.NewItems[col]; !ok {
r.NewItems[col] = val
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems())
for _, col := range updatedCols {
if _, ok := r.UnchangedToastColumns[col]; ok {
delete(r.UnchangedToastColumns, col)
}
}
Expand All @@ -244,7 +244,7 @@ func (p *PostgresCDCSource) consumeStream(
}
case *model.InsertRecord:
pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn
pkeyColVal := rec.GetItems()[pkeyCol]
pkeyColVal := rec.GetItems().GetColumnValue(pkeyCol)
tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: pkeyColVal,
Expand Down Expand Up @@ -430,34 +430,35 @@ It takes a tuple and a relation message as input and returns
func (p *PostgresCDCSource) convertTupleToMap(
tuple *pglogrepl.TupleData,
rel *protos.RelationMessage,
) (model.RecordItems, map[string]bool, error) {
) (*model.RecordItems, map[string]bool, error) {
// if the tuple is nil, return an empty map
if tuple == nil {
return make(model.RecordItems), make(map[string]bool), nil
return model.NewRecordItems(), make(map[string]bool), nil
}

// create empty map of string to interface{}
items := make(model.RecordItems)
items := model.NewRecordItems()
unchangedToastColumns := make(map[string]bool)

for idx, col := range tuple.Columns {
colName := rel.Columns[idx].Name
switch col.DataType {
case 'n': // null
items[colName] = qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil}
val := &qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil}
items.AddColumn(colName, val)
case 't': // text
/* bytea also appears here as a hex */
data, err := p.decodeColumnData(col.Data, rel.Columns[idx].DataType, pgtype.TextFormatCode)
if err != nil {
return nil, nil, fmt.Errorf("error decoding text column data: %w", err)
}
items[colName] = *data
items.AddColumn(colName, data)
case 'b': // binary
data, err := p.decodeColumnData(col.Data, rel.Columns[idx].DataType, pgtype.BinaryFormatCode)
if err != nil {
return nil, nil, fmt.Errorf("error decoding binary column data: %w", err)
}
items[colName] = *data
items.AddColumn(colName, data)
case 'u': // unchanged toast
unchangedToastColumns[colName] = true
default:
Expand Down
Loading

0 comments on commit 713c938

Please sign in to comment.