Skip to content

Commit

Permalink
default types replaced with pgtype wherever possible
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 11, 2023
1 parent 77cd516 commit 231da91
Show file tree
Hide file tree
Showing 18 changed files with 138 additions and 119 deletions.
7 changes: 4 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -713,7 +714,6 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown
}

func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
var peerOptions sql.RawBytes
catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv()
if catalogErr != nil {
return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr)
Expand All @@ -729,8 +729,9 @@ func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
return nil, err
}
defer optionRows.Close()
var peerName string
var peerName pgtype.Text
var postgresPeers []*protos.Peer
var peerOptions sql.RawBytes
for optionRows.Next() {
err := optionRows.Scan(&peerName, &peerOptions)
if err != nil {
Expand All @@ -742,7 +743,7 @@ func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
return nil, unmarshalErr
}
postgresPeers = append(postgresPeers, &protos.Peer{
Name: peerName,
Name: peerName.String,
Type: protos.DBType_POSTGRES,
Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgPeerConfig},
})
Expand Down
11 changes: 6 additions & 5 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
peerflow "github.com/PeerDB-io/peer-flow/workflows"
backoff "github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/client"
Expand All @@ -36,14 +37,14 @@ func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool, tas
}

func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (int32, int32, error) {
var id int32
var peerType int32
var id pgtype.Int4
var peerType pgtype.Int4
err := h.pool.QueryRow(ctx, "SELECT id,type FROM peers WHERE name = $1", peerName).Scan(&id, &peerType)
if err != nil {
log.Errorf("unable to query peer id for peer %s: %s", peerName, err.Error())
return -1, -1, fmt.Errorf("unable to query peer id for peer %s: %s", peerName, err)
}
return id, peerType, nil
return id.Int32, peerType.Int32, nil
}

func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
Expand Down Expand Up @@ -639,7 +640,7 @@ func (h *FlowRequestHandler) DropPeer(
}, fmt.Errorf("failed to obtain peer ID for peer %s: %v", req.PeerName, err)
}

var inMirror int64
var inMirror pgtype.Int8
queryErr := h.pool.QueryRow(ctx,
"SELECT COUNT(*) FROM flows WHERE source_peer=$1 or destination_peer=$2",
peerID, peerID).Scan(&inMirror)
Expand All @@ -650,7 +651,7 @@ func (h *FlowRequestHandler) DropPeer(
}, fmt.Errorf("failed to check for existing mirrors with peer %s", req.PeerName)
}

if inMirror != 0 {
if inMirror.Int64 != 0 {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Peer %s is currently involved in an ongoing mirror.", req.PeerName),
Expand Down
17 changes: 9 additions & 8 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -92,13 +93,13 @@ func (h *FlowRequestHandler) GetTablesInSchema(
defer rows.Close()
var tables []string
for rows.Next() {
var table string
var table pgtype.Text
err := rows.Scan(&table)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}

tables = append(tables, table)
tables = append(tables, table.String)
}
return &protos.SchemaTablesResponse{Tables: tables}, nil
}
Expand All @@ -123,13 +124,13 @@ func (h *FlowRequestHandler) GetAllTables(
defer rows.Close()
var tables []string
for rows.Next() {
var table string
var table pgtype.Text
err := rows.Scan(&table)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}

tables = append(tables, table)
tables = append(tables, table.String)
}
return &protos.AllTablesResponse{Tables: tables}, nil
}
Expand Down Expand Up @@ -185,14 +186,14 @@ func (h *FlowRequestHandler) GetColumns(
defer rows.Close()
var columns []string
for rows.Next() {
var columnName string
var datatype string
var isPkey bool
var columnName pgtype.Text
var datatype pgtype.Text
var isPkey pgtype.Bool
err := rows.Scan(&columnName, &datatype, &isPkey)
if err != nil {
return &protos.TableColumnsResponse{Columns: nil}, err
}
column := fmt.Sprintf("%s:%s:%v", columnName, datatype, isPkey)
column := fmt.Sprintf("%s:%s:%v", columnName.String, datatype.String, isPkey.Bool)
columns = append(columns, column)
}
return &protos.TableColumnsResponse{Columns: columns}, nil
Expand Down
13 changes: 7 additions & 6 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -76,14 +77,14 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)

var exists int64
var exists pgtype.Int8
err := rows.Scan(&exists)
if err != nil {
log.Errorf("failed to check if schema exists: %v", err)
return false
}

if exists > 0 {
if exists.Int64 > 0 {
return true
}

Expand Down Expand Up @@ -136,7 +137,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyn
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)
var offset int64
var offset pgtype.Int8
err := rows.Scan(&offset)
if err != nil {
// if the job doesn't exist, return 0
Expand All @@ -155,7 +156,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyn
log.Infof("got last offset for job `%s`: %d", jobName, offset)

return &protos.LastSyncState{
Checkpoint: offset,
Checkpoint: offset.Int64,
}, nil
}

Expand All @@ -166,7 +167,7 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
WHERE job_name = $1
`, jobName)

var syncBatchID int64
var syncBatchID pgtype.Int8
err := rows.Scan(&syncBatchID)
if err != nil {
// if the job doesn't exist, return 0
Expand All @@ -182,7 +183,7 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {

log.Infof("got last sync batch ID for job `%s`: %d", jobName, syncBatchID)

return syncBatchID, nil
return syncBatchID.Int64, nil
}

// update offset for a job
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ func getChildToParentRelIDMap(ctx context.Context, pool *pgxpool.Pool) (map[uint
defer rows.Close()

childToParentRelIDMap := make(map[uint32]uint32)
var parentRelID uint32
var childRelID uint32
var parentRelID pgtype.Uint32
var childRelID pgtype.Uint32
for rows.Next() {
err := rows.Scan(&parentRelID, &childRelID)
if err != nil {
return nil, fmt.Errorf("error scanning child to parent relid map: %w", err)
}
childToParentRelIDMap[childRelID] = parentRelID
childToParentRelIDMap[childRelID.Uint32] = parentRelID.Uint32
}

return childToParentRelIDMap, nil
Expand Down
Loading

0 comments on commit 231da91

Please sign in to comment.