diff --git a/docs/adrs/202405-bidcollect.md b/docs/adrs/202405-bidcollect.md index aaf633b..abec029 100644 --- a/docs/adrs/202405-bidcollect.md +++ b/docs/adrs/202405-bidcollect.md @@ -35,7 +35,6 @@ go run . service bidcollect --out csv --data-api --ultrasound-stream - Diagram showing the flow of data and the components involved - File Output - - Use date in output directory - Combine all individual files into a big file - Consider gzipped CSV output: https://gist.github.com/mchirico/6147687 (currently, an hour of bids is about 300MB) - Consider Parquet output files diff --git a/services/bidcollect/bid-processor.go b/services/bidcollect/bid-processor.go index 8cbb2bf..8178298 100644 --- a/services/bidcollect/bid-processor.go +++ b/services/bidcollect/bid-processor.go @@ -118,7 +118,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error) // hourlybucket sec := int64(bucketMinutes * 60) bucketTS := bid.ReceivedAt / sec * sec // timestamp down-round to start of bucket - // t := time.Unix(bucketTS, 0).UTC() + t := time.Unix(bucketTS, 0).UTC() // files may already be opened c.outFilesLock.RLock() @@ -130,13 +130,14 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error) } // Create output files - err = os.MkdirAll(c.opts.OutDir, os.ModePerm) + dir := filepath.Join(c.opts.OutDir, t.Format(time.DateOnly)) + err = os.MkdirAll(dir, os.ModePerm) if err != nil { return nil, nil, err } // Open ALL BIDS CSV - fnAll := filepath.Join(c.opts.OutDir, c.getFilename("all", bucketTS)) + fnAll := filepath.Join(dir, c.getFilename("all", bucketTS)) fAll, err = os.OpenFile(fnAll, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) if err != nil { return nil, nil, err @@ -153,7 +154,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error) } // Open TOP BIDS CSV - fnTop := filepath.Join(c.opts.OutDir, c.getFilename("top", bucketTS)) + fnTop := filepath.Join(dir, c.getFilename("top", bucketTS)) fTop, err = os.OpenFile(fnTop, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) if err != nil { return nil, nil, err diff --git a/services/bidcollect/data-api-poller.go b/services/bidcollect/data-api-poller.go index d76e51a..8625d5e 100644 --- a/services/bidcollect/data-api-poller.go +++ b/services/bidcollect/data-api-poller.go @@ -47,6 +47,7 @@ func (poller *DataAPIPoller) Start() { tNextSlot := common.SlotToTime(nextSlot) untilNextSlot := tNextSlot.Sub(t) time.Sleep(untilNextSlot) + poller.Log.Infof("[data-api poller] waiting until start of next slot (%d, %s from now)", nextSlot, untilNextSlot.String()) // then run polling loop for {