Skip to content

Commit

Permalink
refactor metadata and stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 13, 2023
1 parent 3b62078 commit be6ad20
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 683 deletions.
59 changes: 56 additions & 3 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -22,7 +23,7 @@ import (
type EventHubConnector struct {
ctx context.Context
config *protos.EventHubGroupConfig
pgMetadata *PostgresMetadataStore
pgMetadata *metadataStore.PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
hubManager *EventHubManager
Expand All @@ -40,7 +41,9 @@ func NewEventHubConnector(
}

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config)
pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb())
metadataSchemaName := "peerdb_eventhub_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(),
metadataSchemaName)
if err != nil {
log.Errorf("failed to create postgres metadata store: %v", err)
return nil, err
Expand Down Expand Up @@ -84,6 +87,48 @@ func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
}

func (c *EventHubConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
log.Errorf("failed to setup metadata tables: %v", err)
return err
}

return nil
}

func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName)
if err != nil {
return 0, err
}

return syncBatchID, nil
}

func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return nil, err
}

return res, nil
}

func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
log.Errorf("failed to update last offset: %v", err)
return err
}

return nil
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
Expand Down Expand Up @@ -177,7 +222,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
log.Errorf("failed to update last offset: %v", err)
return nil, err
}
err = c.incrementSyncBatchID(req.FlowJobName)
err = c.pgMetadata.IncrementID(req.FlowJobName)
if err != nil {
log.Errorf("%v", err)
return nil, err
Expand Down Expand Up @@ -311,3 +356,11 @@ func (c *EventHubConnector) SetupNormalizedTables(
TableExistsMapping: nil,
}, nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
err := c.pgMetadata.DropMetadata(jobName)
if err != nil {
return err
}
return nil
}
236 changes: 0 additions & 236 deletions flow/connectors/eventhub/metadata.go

This file was deleted.

Loading

0 comments on commit be6ad20

Please sign in to comment.