Skip to content

Commit

Permalink
updates for eventhub code
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 11, 2024
2 parents 3beed6a + f7ff9cd commit 89ef3eb
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 20 deletions.
20 changes: 5 additions & 15 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
Expand Down Expand Up @@ -71,7 +70,7 @@ func (c *EventHubConnector) Close() error {

err = c.hubManager.Close(context.Background())
if err != nil {
slog.Error("failed to close event hub manager", slog.Any("error", err))
c.logger.Error("failed to close event hub manager", slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}

Expand Down Expand Up @@ -215,23 +214,14 @@ func (c *EventHubConnector) processBatch(
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
var err error
batch := req.Records
var numRecords uint32
maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
}

var err error
batch := req.Records
var numRecords uint32

shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf(
"processed %d records for flow %s",
numRecords, req.FlowJobName,
)
})
defer shutdown()

numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
c.logger.Error("failed to process batch", slog.Any("error", err))
Expand All @@ -240,7 +230,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
c.logger.Error("failed to get last checkpoint", err)
c.logger.Error("failed to get last checkpoint", slog.Any("error", err))
return nil, err
}

Expand Down
5 changes: 0 additions & 5 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,11 +749,6 @@ fn parse_db_options(
let conn_str = opts.get("metadata_db");
let metadata_db = parse_metadata_db_info(conn_str.copied())?;

// metadata_db is required for eventhub group
if metadata_db.is_none() {
anyhow::bail!("metadata_db is required for eventhub group");
}

// split comma separated list of columns and trim
let unnest_columns = opts
.get("unnest_columns")
Expand Down

0 comments on commit 89ef3eb

Please sign in to comment.