Skip to content

Commit

Permalink
outdir with date
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Jun 1, 2024
1 parent 550612e commit a6b8575
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
1 change: 0 additions & 1 deletion docs/adrs/202405-bidcollect.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ go run . service bidcollect --out csv --data-api --ultrasound-stream

- Diagram showing the flow of data and the components involved
- File Output
- Use date in output directory
- Combine all individual files into a big file
- Consider gzipped CSV output: https://gist.github.com/mchirico/6147687 (currently, an hour of bids is about 300MB)
- Consider Parquet output files
Expand Down
9 changes: 5 additions & 4 deletions services/bidcollect/bid-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error)
// hourlybucket
sec := int64(bucketMinutes * 60)
bucketTS := bid.ReceivedAt / sec * sec // timestamp down-round to start of bucket
// t := time.Unix(bucketTS, 0).UTC()
t := time.Unix(bucketTS, 0).UTC()

// files may already be opened
c.outFilesLock.RLock()
Expand All @@ -130,13 +130,14 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error)
}

// Create output files
err = os.MkdirAll(c.opts.OutDir, os.ModePerm)
dir := filepath.Join(c.opts.OutDir, t.Format(time.DateOnly))
err = os.MkdirAll(dir, os.ModePerm)
if err != nil {
return nil, nil, err
}

// Open ALL BIDS CSV
fnAll := filepath.Join(c.opts.OutDir, c.getFilename("all", bucketTS))
fnAll := filepath.Join(dir, c.getFilename("all", bucketTS))
fAll, err = os.OpenFile(fnAll, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
return nil, nil, err
Expand All @@ -153,7 +154,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error)
}

// Open TOP BIDS CSV
fnTop := filepath.Join(c.opts.OutDir, c.getFilename("top", bucketTS))
fnTop := filepath.Join(dir, c.getFilename("top", bucketTS))
fTop, err = os.OpenFile(fnTop, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
return nil, nil, err
Expand Down
1 change: 1 addition & 0 deletions services/bidcollect/data-api-poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (poller *DataAPIPoller) Start() {
tNextSlot := common.SlotToTime(nextSlot)
untilNextSlot := tNextSlot.Sub(t)
time.Sleep(untilNextSlot)
poller.Log.Infof("[data-api poller] waiting until start of next slot (%d, %s from now)", nextSlot, untilNextSlot.String())

// then run polling loop
for {
Expand Down

0 comments on commit a6b8575

Please sign in to comment.