Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/npm_and_yarn/ui/browserify-sign-4…
Browse files Browse the repository at this point in the history
….2.2
  • Loading branch information
serprex authored Nov 3, 2023
2 parents 0b52183 + 9b05f32 commit ecf8772
Show file tree
Hide file tree
Showing 46 changed files with 2,246 additions and 787 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/stable-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
with:
token: ${{ secrets.DEPOT_TOKEN }}
files: ./docker-bake.hcl
push: ${{ github.ref == 'refs/heads/main' }}
push: true
env:
SHA_SHORT: stable-${{ github.ref_name }}
TAG: latest-stable
3 changes: 2 additions & 1 deletion dev-peerdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ then
exit 1
fi

docker compose -f docker-compose-dev.yml up --build
docker compose -f docker-compose-dev.yml up --build\
--no-attach temporal --no-attach pyroscope --no-attach temporal-ui
101 changes: 95 additions & 6 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,19 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
return nil
}

func (h *FlowRequestHandler) removeFlowEntryInCatalog(
flowName string,
) error {
_, err := h.pool.Exec(context.Background(),
"DELETE FROM flows WHERE name = $1",
flowName)
if err != nil {
return fmt.Errorf("unable to remove flow entry in catalog: %w", err)
}

return nil
}

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
lastPartition := &protos.QRepPartition{
Expand All @@ -201,7 +214,6 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

cfg := req.QrepConfig
log.Infof("Config for QRepFlow: %+v", cfg)
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
Expand Down Expand Up @@ -270,12 +282,18 @@ func (h *FlowRequestHandler) ShutdownFlow(
shared.ShutdownSignal,
)
if err != nil {
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to signal PeerFlow workflow: %v", err),
}, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
}

err = h.waitForWorkflowClose(ctx, req.WorkflowId)
if err != nil {
return nil, fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to wait for PeerFlow workflow to close: %v", err),
}, fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err)
}

workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New())
Expand All @@ -290,7 +308,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
req, // workflow input
)
if err != nil {
return nil, fmt.Errorf("unable to start DropFlow workflow: %w", err)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to start DropFlow workflow: %v", err),
}, fmt.Errorf("unable to start DropFlow workflow: %w", err)
}

cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
Expand All @@ -304,12 +325,28 @@ func (h *FlowRequestHandler) ShutdownFlow(
select {
case err := <-errChan:
if err != nil {
return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("DropFlow workflow did not execute successfully: %v", err),
}, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(1 * time.Minute):
err := h.handleWorkflowNotClosed(ctx, workflowID, "")
if err != nil {
return nil, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to wait for DropFlow workflow to close: %v", err),
}, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
}
}

if req.RemoveFlowEntry {
delErr := h.removeFlowEntryInCatalog(req.FlowJobName)
if delErr != nil {
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: err.Error(),
}, err
}
}

Expand Down Expand Up @@ -501,3 +538,55 @@ func (h *FlowRequestHandler) CreatePeer(
Message: "",
}, nil
}

func (h *FlowRequestHandler) DropPeer(
ctx context.Context,
req *protos.DropPeerRequest,
) (*protos.DropPeerResponse, error) {
if req.PeerName == "" {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Peer %s not found", req.PeerName),
}, fmt.Errorf("peer %s not found", req.PeerName)
}

// Check if peer name is in flows table
peerID, _, err := h.getPeerID(ctx, req.PeerName)
if err != nil {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Failed to obtain peer ID for peer %s: %v", req.PeerName, err),
}, fmt.Errorf("failed to obtain peer ID for peer %s: %v", req.PeerName, err)
}

var inMirror int64
queryErr := h.pool.QueryRow(ctx,
"SELECT COUNT(*) FROM flows WHERE source_peer=$1 or destination_peer=$2",
peerID, peerID).Scan(&inMirror)
if queryErr != nil {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Failed to check for existing mirrors with peer %s: %v", req.PeerName, queryErr),
}, fmt.Errorf("failed to check for existing mirrors with peer %s", req.PeerName)
}

if inMirror != 0 {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Peer %s is currently involved in an ongoing mirror.", req.PeerName),
}, nil
}

_, delErr := h.pool.Exec(ctx, "DELETE FROM peers WHERE name = $1", req.PeerName)
if delErr != nil {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("failed to delete peer %s from metadata table: %v", req.PeerName, delErr),
}, fmt.Errorf("failed to delete peer %s from metadata table: %v", req.PeerName, delErr)
}

return &protos.DropPeerResponse{
Ok: true,
}, nil

}
3 changes: 1 addition & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
}).Errorf("failed to delete staging table %s: %v", stagingTable, err)
}

log.Printf("loaded stage into %s.%s",
datasetID, dstTableName)
log.Printf("loaded stage into %s.%s", datasetID, dstTableName)

return numRecords, nil
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,13 @@ func (p *PostgresCDCSource) consumeStream(

if xld.WALStart > clientXLogPos {
clientXLogPos = xld.WALStart
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}

if len(localRecords) == 0 {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
consumedXLogPos = clientXLogPos
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}
}
}
Expand All @@ -392,7 +392,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
// for a commit message, update the last checkpoint id for the record batch.
log.Debugf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
msg.CommitLSN, msg.TransactionEndLSN)
batch.UpdateLatestCheckpoint(int64(xld.WALStart))
batch.UpdateLatestCheckpoint(int64(msg.CommitLSN))
p.commitLock = false
case *pglogrepl.RelationMessage:
// treat all relation messages as correponding to parent if partitioned.
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

peersql "github.com/PeerDB-io/peer-flow/connectors/sql"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
util "github.com/PeerDB-io/peer-flow/utils"
)

Expand Down Expand Up @@ -55,7 +56,7 @@ func NewSnowflakeClient(ctx context.Context, config *protos.SnowflakeConfig) (*S
}

genericExecutor := *peersql.NewGenericSQLQueryExecutor(
ctx, database, snowflakeTypeToQValueKindMap, qValueKindToSnowflakeTypeMap)
ctx, database, snowflakeTypeToQValueKindMap, qvalue.QValueKindToSnowflakeTypeMap)

return &SnowflakeClient{
GenericSQLQueryExecutor: genericExecutor,
Expand Down
84 changes: 84 additions & 0 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
util "github.com/PeerDB-io/peer-flow/utils"
log "github.com/sirupsen/logrus"
_ "github.com/snowflakedb/gosnowflake"
Expand Down Expand Up @@ -121,6 +122,17 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords(
"partitionID": partition.PartitionId,
}).Infof("sync function called and schema acquired")

err = s.addMissingColumns(
config.FlowJobName,
schema,
dstTableSchema,
dstTableName,
partition,
)
if err != nil {
return 0, err
}

avroSchema, err := s.getAvroSchema(dstTableName, schema, config.FlowJobName)
if err != nil {
return 0, err
Expand Down Expand Up @@ -166,6 +178,78 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords(
return numRecords, nil
}

func (s *SnowflakeAvroSyncMethod) addMissingColumns(
flowJobName string,
schema *model.QRecordSchema,
dstTableSchema []*sql.ColumnType,
dstTableName string,
partition *protos.QRepPartition,
) error {
// check if avro schema has additional columns compared to destination table
// if so, we need to add those columns to the destination table
colsToTypes := map[string]qvalue.QValueKind{}
for _, col := range schema.Fields {
hasColumn := false
// check ignoring case
for _, dstCol := range dstTableSchema {
if strings.EqualFold(col.Name, dstCol.Name()) {
hasColumn = true
break
}
}

if !hasColumn {
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("adding column %s to destination table %s", col.Name, dstTableName)
colsToTypes[col.Name] = col.Type
}
}

if len(colsToTypes) > 0 {
tx, err := s.connector.database.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}

for colName, colType := range colsToTypes {
sfColType, err := colType.ToDWHColumnType(qvalue.QDWHTypeSnowflake)
if err != nil {
return fmt.Errorf("failed to convert QValueKind to Snowflake column type: %w", err)
}
upperCasedColName := strings.ToUpper(colName)
alterTableCmd := fmt.Sprintf("ALTER TABLE %s ", dstTableName)
alterTableCmd += fmt.Sprintf("ADD COLUMN IF NOT EXISTS \"%s\" %s;", upperCasedColName, sfColType)

log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("altering destination table %s with command `%s`", dstTableName, alterTableCmd)

if _, err := tx.Exec(alterTableCmd); err != nil {
return fmt.Errorf("failed to alter destination table: %w", err)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("successfully added missing columns to destination table %s", dstTableName)
} else {
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("no missing columns found in destination table %s", dstTableName)
}

return nil
}

func (s *SnowflakeAvroSyncMethod) getAvroSchema(
dstTableName string,
schema *model.QRecordSchema,
Expand Down
43 changes: 6 additions & 37 deletions flow/connectors/snowflake/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,6 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

var qValueKindToSnowflakeTypeMap = map[qvalue.QValueKind]string{
qvalue.QValueKindBoolean: "BOOLEAN",
qvalue.QValueKindInt16: "INTEGER",
qvalue.QValueKindInt32: "INTEGER",
qvalue.QValueKindInt64: "INTEGER",
qvalue.QValueKindFloat32: "FLOAT",
qvalue.QValueKindFloat64: "FLOAT",
qvalue.QValueKindNumeric: "NUMBER(38, 9)",
qvalue.QValueKindString: "STRING",
qvalue.QValueKindJSON: "VARIANT",
qvalue.QValueKindTimestamp: "TIMESTAMP_NTZ",
qvalue.QValueKindTimestampTZ: "TIMESTAMP_TZ",
qvalue.QValueKindTime: "TIME",
qvalue.QValueKindDate: "DATE",
qvalue.QValueKindBit: "BINARY",
qvalue.QValueKindBytes: "BINARY",
qvalue.QValueKindStruct: "STRING",
qvalue.QValueKindUUID: "STRING",
qvalue.QValueKindTimeTZ: "STRING",
qvalue.QValueKindInvalid: "STRING",
qvalue.QValueKindHStore: "STRING",
qvalue.QValueKindGeography: "GEOGRAPHY",
qvalue.QValueKindGeometry: "GEOMETRY",
qvalue.QValueKindPoint: "GEOMETRY",

// array types will be mapped to VARIANT
qvalue.QValueKindArrayFloat32: "VARIANT",
qvalue.QValueKindArrayFloat64: "VARIANT",
qvalue.QValueKindArrayInt32: "VARIANT",
qvalue.QValueKindArrayInt64: "VARIANT",
qvalue.QValueKindArrayString: "VARIANT",
}

var snowflakeTypeToQValueKindMap = map[string]qvalue.QValueKind{
"INT": qvalue.QValueKindInt32,
"BIGINT": qvalue.QValueKindInt64,
Expand Down Expand Up @@ -67,11 +34,13 @@ var snowflakeTypeToQValueKindMap = map[string]qvalue.QValueKind{
"GEOGRAPHY": qvalue.QValueKindGeography,
}

func qValueKindToSnowflakeType(colType qvalue.QValueKind) string {
if val, ok := qValueKindToSnowflakeTypeMap[colType]; ok {
return val
func qValueKindToSnowflakeType(colType qvalue.QValueKind) (string, error) {
val, err := colType.ToDWHColumnType(qvalue.QDWHTypeSnowflake)
if err != nil {
return "", err
}
return "STRING"

return val, err
}

func snowflakeTypeToQValueKind(name string) (qvalue.QValueKind, error) {
Expand Down
Loading

0 comments on commit ecf8772

Please sign in to comment.