Skip to content

Commit

Permalink
refacto: speed up ingestion usecase (#424)
Browse files Browse the repository at this point in the history
  • Loading branch information
github-zoe-cade authored Dec 22, 2023
1 parent 2c0a1db commit 3354af4
Showing 1 changed file with 29 additions and 33 deletions.
62 changes: 29 additions & 33 deletions usecases/ingestion_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,42 +249,38 @@ func (usecase *IngestionUseCase) ingestObjectsFromCSV(ctx context.Context, organ
}

payloadReaders := make([]models.PayloadReader, 0)
var i int
for i = 0; ; i++ {
logger.InfoContext(ctx, fmt.Sprintf("Start reading line %v", i))
record, err := r.Read()
if err == io.EOF {
break
} else if err != nil {
return err
}
object, err := parseStringValuesToMap(firstRow, record, table)
if err != nil {
return err
}
logger.InfoContext(ctx, fmt.Sprintf("Object to ingest %d: %+v", i, object))
clientObject := models.ClientObject{
TableName: table.Name,
Data: object,
}
payloadReader := models.PayloadReader(clientObject)
keepParsingFile := true
windowStart := 0
total := 0

payloadReaders = append(payloadReaders, payloadReader)
}
numRows := i
logger.InfoContext(ctx, fmt.Sprintf("Read %d lines", numRows))

// ingest by batches of 'batchSize'
for windowStart := 0; windowStart < numRows; windowStart += batchSize {
logger.InfoContext(ctx, fmt.Sprintf("Starting batch : %d of %d", windowStart/batchSize+1, numRows/batchSize+1))
for keepParsingFile {
windowEnd := windowStart + batchSize
if windowEnd > numRows {
windowEnd = numRows
for ; windowStart < windowEnd; windowStart++ {
logger.InfoContext(ctx, fmt.Sprintf("Start reading line %v", windowStart))
record, err := r.Read()
if err == io.EOF {
total = windowStart
keepParsingFile = false
break
} else if err != nil {
return err
}
object, err := parseStringValuesToMap(firstRow, record, table)
if err != nil {
return err
}
logger.InfoContext(ctx, fmt.Sprintf("Object to ingest %d: %+v", windowStart, object))
clientObject := models.ClientObject{
TableName: table.Name,
Data: object,
}
payloadReader := models.PayloadReader(clientObject)

payloadReaders = append(payloadReaders, payloadReader)
}
batch := payloadReaders[windowStart:windowEnd]

if err := usecase.orgTransactionFactory.TransactionInOrgSchema(organizationId, func(tx repositories.Transaction) error {
return usecase.ingestionRepository.IngestObjects(tx, batch, table, logger)
return usecase.ingestionRepository.IngestObjects(tx, payloadReaders, table, logger)
}); err != nil {
return err
}
Expand All @@ -293,8 +289,8 @@ func (usecase *IngestionUseCase) ingestObjectsFromCSV(ctx context.Context, organ
end := time.Now()
duration := end.Sub(start)
// divide by 1e6 convert to milliseconds (base is nanoseconds)
avgDuration := float64(duration) / float64(i*1e6)
logger.InfoContext(ctx, fmt.Sprintf("Ingested %d objects in %s, average %vms", i, duration, avgDuration))
avgDuration := float64(duration) / float64(total*1e6)
logger.InfoContext(ctx, fmt.Sprintf("Ingested %d objects in %s, average %vms", total, duration, avgDuration))

return nil
}
Expand Down

0 comments on commit 3354af4

Please sign in to comment.