Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into spiritus-mundi
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 11, 2024
2 parents a66dab5 + e5eb347 commit c5e9fb3
Show file tree
Hide file tree
Showing 20 changed files with 120 additions and 109 deletions.
34 changes: 18 additions & 16 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (h *FlowRequestHandler) GetSchemas(
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, "SELECT schema_name"+
" FROM information_schema.schemata WHERE schema_name !~ '^pg_' AND schema_name <> 'information_schema';")
rows, err := peerConn.Query(ctx, "SELECT nspname"+
" FROM pg_namespace WHERE nspname !~ '^pg_' AND nspname <> 'information_schema';")
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
Expand Down Expand Up @@ -145,8 +145,10 @@ func (h *FlowRequestHandler) GetAllTables(
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, "SELECT table_schema || '.' || table_name AS schema_table "+
"FROM information_schema.tables WHERE table_schema !~ '^pg_' AND table_schema <> 'information_schema'")
rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+
"FROM pg_class c "+
"JOIN pg_namespace n ON c.relnamespace = n.oid "+
"WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
Expand Down Expand Up @@ -178,26 +180,26 @@ func (h *FlowRequestHandler) GetColumns(

rows, err := peerConn.Query(ctx, `
SELECT
cols.column_name,
cols.data_type,
attname AS column_name,
format_type(atttypid, atttypmod) AS data_type,
CASE
WHEN con.contype = 'p' AND cols.ordinal_position = ANY(con.conkey) THEN true
WHEN attnum = ANY(conkey) THEN true
ELSE false
END AS is_primary_key
FROM
information_schema.columns cols
pg_attribute
JOIN
pg_class tbl ON cols.table_name = tbl.relname
JOIN
pg_namespace n ON tbl.relnamespace = n.oid
pg_class ON pg_attribute.attrelid = pg_class.oid
LEFT JOIN
pg_constraint con ON con.conrelid = tbl.oid
AND con.contype = 'p'
pg_constraint ON pg_attribute.attrelid = pg_constraint.conrelid
AND pg_attribute.attnum = ANY(pg_constraint.conkey)
WHERE
n.nspname = $1
AND cols.table_name = $2
relnamespace::regnamespace::text = $1
AND
relname = $2
AND pg_attribute.attnum > 0
ORDER BY
cols.ordinal_position;
attnum;
`, req.SchemaName, req.TableName)
if err != nil {
return &protos.TableColumnsResponse{Columns: nil}, err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
// drop the staging table
if err := bqClient.DatasetInProject(s.connector.projectID, datasetID).Table(stagingTable).Delete(ctx); err != nil {
// just log the error this isn't fatal.
slog.Error("failed to delete staging table "+stagingTable,
s.connector.logger.Warn("failed to delete staging table "+stagingTable,
slog.Any("error", err),
slog.Int64("syncBatchID", syncBatchID),
slog.String("destinationTable", rawTableName))
Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
if err := bqClient.DatasetInProject(s.connector.projectID, stagingDatasetTable.dataset).
Table(stagingDatasetTable.table).Delete(ctx); err != nil {
// just log the error this isn't fatal.
slog.Error("failed to delete staging table "+stagingDatasetTable.string(),
s.connector.logger.Warn("failed to delete staging table "+stagingDatasetTable.string(),
slog.Any("error", err),
flowLog)
}
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)
Expand Down Expand Up @@ -289,6 +290,6 @@ func CloseConnector(ctx context.Context, conn Connector) {

err := conn.Close(ctx)
if err != nil {
slog.Error("error closing connector", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Error("error closing connector", slog.Any("error", err))
}
}
12 changes: 7 additions & 5 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"

"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -105,17 +106,18 @@ func (h *HubBatches) sendBatch(
return err
}

slog.InfoContext(ctx, "sendBatch",
slog.Int("events sent", int(events.NumEvents())), slog.String("event hub topic ", tblName.ToString()))
logger.LoggerFromCtx(ctx).Info("sendBatch",
slog.Int("events sent", int(events.NumEvents())), slog.String("event hub topic", tblName.ToString()))
return nil
}

func (h *HubBatches) flushAllBatches(
ctx context.Context,
flowName string,
) error {
logger := logger.LoggerFromCtx(ctx)
if h.Len() == 0 {
slog.Info("no events to send", slog.String(string(shared.FlowNameKey), flowName))
logger.Info("no events to send", slog.String(string(shared.FlowNameKey), flowName))
return nil
}

Expand All @@ -132,7 +134,7 @@ func (h *HubBatches) flushAllBatches(
}

numEventsPushed.Add(numEvents)
slog.Info("flushAllBatches",
logger.Info("flushAllBatches",
slog.String(string(shared.FlowNameKey), flowName),
slog.Int("events sent", int(numEvents)),
slog.String("event hub topic ", destination.ToString()))
Expand All @@ -144,7 +146,7 @@ func (h *HubBatches) flushAllBatches(
if err != nil {
return fmt.Errorf("failed to flushAllBatches: %v", err)
}
slog.Info("hub batches flush",
logger.Info("hub batches flush",
slog.String(string(shared.FlowNameKey), flowName),
slog.Int("events sent", int(numEventsPushed.Load())))

Expand Down
15 changes: 9 additions & 6 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
)

type EventHubManager struct {
Expand Down Expand Up @@ -65,12 +66,13 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
hubTmp := hub.(*azeventhubs.ProducerClient)
_, err := hubTmp.GetEventHubProperties(ctx, nil)
if err != nil {
slog.Info(fmt.Sprintf("eventhub %s", name)+
"not reachable. Will re-establish connection and re-create it.",
logger := logger.LoggerFromCtx(ctx)
logger.Info(
fmt.Sprintf("eventhub %s not reachable. Will re-establish connection and re-create it.", name),
slog.Any("error", err))
closeError := m.closeProducerClient(ctx, hubTmp)
if closeError != nil {
slog.Error("failed to close producer client", slog.Any("error", closeError))
logger.Error("failed to close producer client", slog.Any("error", closeError))
}
m.hubs.Delete(name)
hubConnectOK = false
Expand Down Expand Up @@ -116,7 +118,7 @@ func (m *EventHubManager) Close(ctx context.Context) error {
hub := value.(*azeventhubs.ProducerClient)
err := m.closeProducerClient(ctx, hub)
if err != nil {
slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
logger.LoggerFromCtx(ctx).Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}
numHubsClosed.Add(1)
Expand Down Expand Up @@ -167,6 +169,7 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE

partitionCount := int64(cfg.PartitionCount)
retention := int64(cfg.MessageRetentionInDays)
logger := logger.LoggerFromCtx(ctx)
if err != nil {
opts := armeventhub.Eventhub{
Properties: &armeventhub.Properties{
Expand All @@ -181,9 +184,9 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE
return err
}

slog.Info("event hub created", slog.Any("name", name))
logger.Info("event hub created", slog.Any("name", name))
} else {
slog.Info("event hub exists already", slog.Any("name", name))
logger.Info("event hub exists already", slog.Any("name", name))
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/cdc_records"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/geo"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -155,8 +156,9 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

logger := logger.LoggerFromCtx(ctx)
addRecordWithKey := func(key *model.TableWithPkey, rec model.Record) error {
err := cdcRecordsStorage.Set(key, rec)
err := cdcRecordsStorage.Set(logger, key, rec)
if err != nil {
return err
}
Expand Down
12 changes: 7 additions & 5 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, error) {
conn, err := c.ssh.NewPostgresConnFromConfig(ctx, c.replConfig)
if err != nil {
slog.Error("failed to create replication connection", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Error("failed to create replication connection", "error", err)
return nil, fmt.Errorf("failed to create replication connection: %w", err)
}
return conn, nil
Expand Down Expand Up @@ -1046,22 +1046,24 @@ func (c *PostgresConnector) HandleSlotInfo(
slotName string,
peerName string,
) error {
logger := logger.LoggerFromCtx(ctx)

// must create new connection because HandleSlotInfo is threadsafe
conn, err := c.ssh.NewPostgresConnFromPostgresConfig(ctx, c.config)
if err != nil {
slog.WarnContext(ctx, "warning: failed to connect to get slot info", slog.Any("error", err))
logger.Warn("warning: failed to connect to get slot info", "error", err)
return err
}
defer conn.Close(ctx)

slotInfo, err := getSlotInfo(ctx, conn, slotName, c.config.Database)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
logger.Warn("warning: failed to get slot info", "error", err)
return err
}

if len(slotInfo) == 0 {
slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName))
logger.Warn("warning: unable to get slot info", "slotName", slotName)
return nil
}

Expand All @@ -1082,7 +1084,7 @@ cc: <!channel>`,
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
res, err := getOpenConnectionsForUser(ctx, conn, c.config.User)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
logger.Warn("warning: failed to get current open connections", "error", err)
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
Expand Down
11 changes: 6 additions & 5 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peer-flow/connectors/utils"
partition_utils "github.com/PeerDB-io/peer-flow/connectors/utils/partition"
Expand Down Expand Up @@ -290,7 +291,7 @@ func (c *PostgresConnector) CheckForUpdatedMaxValue(
defer func() {
deferErr := tx.Rollback(ctx)
if deferErr != pgx.ErrTxClosed && deferErr != nil {
c.logger.Error("error rolling back transaction for getting max value", slog.Any("error", err))
c.logger.Error("error rolling back transaction for getting max value", "error", err)
}
}()

Expand Down Expand Up @@ -362,7 +363,7 @@ func (c *PostgresConnector) PullQRepRecords(

// Build the query to pull records within the range from the source table
// Be sure to order the results by the watermark column to ensure consistency across pulls
query, err := BuildQuery(config.Query, config.FlowJobName)
query, err := BuildQuery(c.logger, config.Query, config.FlowJobName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -432,7 +433,7 @@ func (c *PostgresConnector) PullQRepRecordStream(

// Build the query to pull records within the range from the source table
// Be sure to order the results by the watermark column to ensure consistency across pulls
query, err := BuildQuery(config.Query, config.FlowJobName)
query, err := BuildQuery(c.logger, config.Query, config.FlowJobName)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -568,7 +569,7 @@ func (c *PostgresConnector) PullXminRecordStream(
return numRecords, currentSnapshotXmin, nil
}

func BuildQuery(query string, flowJobName string) (string, error) {
func BuildQuery(logger log.Logger, query string, flowJobName string) (string, error) {
tmpl, err := template.New("query").Parse(query)
if err != nil {
return "", err
Expand All @@ -587,7 +588,7 @@ func BuildQuery(query string, flowJobName string) (string, error) {
}
res := buf.String()

slog.Info(fmt.Sprintf("templated query: %s", res))
logger.Info(fmt.Sprintf("templated query: %s", res))
return res, nil
}

Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/qrep_query_build_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package connpostgres

import (
"log/slog"
"testing"
)

Expand Down Expand Up @@ -34,7 +35,7 @@ func TestBuildQuery(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual, err := BuildQuery(tc.query, "test_flow")
actual, err := BuildQuery(slog.Default(), tc.query, "test_flow")
if err != nil {
t.Fatalf("Error returned by BuildQuery: %v", err)
}
Expand Down
13 changes: 8 additions & 5 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
qe.logger.Info("Processing rows")
// Iterate over the rows
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
record, err := mapRowToQRecord(qe.logger, rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
return nil, fmt.Errorf("failed to map row to QRecord: %w", err)
Expand Down Expand Up @@ -186,7 +186,7 @@ func (qe *QRepQueryExecutor) processRowsStream(
return numRows, ctx.Err()
default:
// Process the row as before
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
record, err := mapRowToQRecord(qe.logger, rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
stream.Records <- model.QRecordOrError{
Expand Down Expand Up @@ -450,15 +450,18 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
return totalRecordsFetched, nil
}

func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
func mapRowToQRecord(
logger log.Logger,
row pgx.Rows,
fds []pgconn.FieldDescription,
customTypeMap map[uint32]string,
) ([]qvalue.QValue, error) {
// make vals an empty array of QValue of size len(fds)
record := make([]qvalue.QValue, len(fds))

values, err := row.Values()
if err != nil {
slog.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err))
logger.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err))
return nil, fmt.Errorf("failed to scan row: %w", err)
}

Expand All @@ -468,7 +471,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
if !ok {
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
slog.Error("[pg_query_executor] failed to parse field", slog.Any("error", err))
logger.Error("[pg_query_executor] failed to parse field", slog.Any("error", err))
return nil, fmt.Errorf("failed to parse field: %w", err)
}
record[i] = tmp
Expand Down
Loading

0 comments on commit c5e9fb3

Please sign in to comment.