Skip to content

Commit

Permalink
scylla writeNewShard in several batches
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Dec 21, 2024
1 parent fbd99f1 commit 5b65704
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions carstore/scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ func (sqs *ScyllaStore) writeNewShard(ctx context.Context, root cid.Cid, rev str
span.SetAttributes(attribute.Int("blocks", len(blks)))

batch := sqs.WriteSession.NewBatch(gocql.LoggedBatch)
// stop writing when we get to half of default batch limit config:
// batch_size_fail_threshold_in_kb: 1024
const batchEnough = 512 * 1024
batchSize := 0
const blockInsertStatement = `INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?)`
blockInsertStatementLen := len(blockInsertStatement)
rowSizeConstants := blockInsertStatementLen + len(rev) + 8 /*user*/ + len(dbroot)

for bcid, block := range blks {
// build shard for output firehose
Expand All @@ -169,15 +176,25 @@ func (sqs *ScyllaStore) writeNewShard(ctx context.Context, root cid.Cid, rev str
blockbytes := block.RawData()

batch.Entries = append(batch.Entries, gocql.BatchEntry{
Stmt: `INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?)`,
Stmt: blockInsertStatement,
Args: []interface{}{user, dbcid, rev, dbroot, blockbytes},
Idempotent: true,
})
batchSize += rowSizeConstants + len(dbcid) + len(blockbytes)
if batchSize >= batchEnough {
err = sqs.WriteSession.ExecuteBatch(batch)
if err != nil {
return nil, fmt.Errorf("failed to write batch uid=%d: %w", user, err)
}
batch = sqs.WriteSession.NewBatch(gocql.LoggedBatch)
}
}

err = sqs.WriteSession.ExecuteBatch(batch)
if err != nil {
return nil, fmt.Errorf("failed to write batch uid=%d: %w", user, err)
if len(batch.Entries) > 0 {
err = sqs.WriteSession.ExecuteBatch(batch)
if err != nil {
return nil, fmt.Errorf("failed to write batch uid=%d: %w", user, err)
}
}

shard := CarShard{
Expand Down

0 comments on commit 5b65704

Please sign in to comment.