Skip to content

Commit

Permalink
Archive stories in parallel goroutines. Only arhive 10 at once for now
Browse files Browse the repository at this point in the history
  • Loading branch information
johnwarden committed Nov 23, 2024
1 parent 99d8361 commit 3f05901
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 34 deletions.
73 changes: 41 additions & 32 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
pond "github.com/alitto/pond/v2"
"github.com/pkg/errors"
"net/http"
)
Expand Down Expand Up @@ -111,50 +112,58 @@ func (app app) archiveOldStatsData(ctx context.Context) error {
return errors.Wrap(err, "create storage client")
}

// Create a pool of workers to upload stories to archive in parallel
pool := pond.NewPool(10, pond.WithContext(ctx))

app.logger.Debug("Uploading archive JSON files")
for _, storyID := range storyIDs {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
pool.Submit(func() {
app.logger.Info("Archiving stats data for story", "storyID", storyID)
err := app.archiveStory(ctx, sc, storyID)
if err != nil {
app.logger.Error("archiveStory", err)
}
})
}

filename := fmt.Sprintf("%d.json", storyID)
// Wait for all tasks in the group to complete or the timeout to occur, whichever comes first
pool.StopAndWait()

// Check if the file already exists before uploading
exists, err := sc.FileExists(ctx, filename)
if err != nil {
app.logger.Error("checking if file exists", err, "filename", filename)
continue // Continue with the next storyID
}
app.logger.Info("Finished archiving old stats data")
return nil
}

if exists {
app.logger.Info("File already archived", "filename", filename)
} else {
jsonData, err := app.generateStatsDataJSON(ctx, storyID)
if err != nil {
app.logger.Error("generating stats data for story", err, "storyID", storyID)
continue // Continue with the next storyID
}
func (app app) archiveStory(ctx context.Context, sc *StorageClient, storyID int) error {

app.logger.Debug("Uploading archive file", "storyID", storyID)
err = sc.UploadFile(ctx, filename, jsonData, "application/json", true)
if err != nil {
app.logger.Error(fmt.Sprintf("uploading file %s", filename), err)
continue // Continue with the next storyID
}
filename := fmt.Sprintf("%d.json", storyID)

// Check if the file already exists before uploading
exists, err := sc.FileExists(ctx, filename)
if err != nil {
return errors.Wrapf(err, "checking if file %s exists", filename)
}

if exists {
app.logger.Info("File already archived", "filename", filename)
} else {
jsonData, err := app.generateStatsDataJSON(ctx, storyID)
if err != nil {
return errors.Wrapf(err, "generating stats data for story %d", storyID)
}

app.logger.Debug("Deleting old statsData", "storyID", storyID)
n, err := app.ndb.deleteOldData(storyID)
app.logger.Debug("Uploading archive file", "storyID", storyID)
err = sc.UploadFile(ctx, filename, jsonData, "application/json", true)
if err != nil {
app.logger.Error("deleting old data for story", err, "rowsDeleted", n, "storyID", storyID)
continue // Continue with the next storyID
return errors.Wrapf(err, "uploading file %s", filename)
}
}

app.logger.Info("Archived stats data for story", "rowsDeleted", n, "storyID", storyID)
app.logger.Debug("Deleting old statsData", "storyID", storyID)
n, err := app.ndb.deleteOldData(storyID)
if err != nil {
return errors.Wrapf(err, "deleting %d rows of data for story %d", n, storyID)
}

app.logger.Info("Finished archiving old stats data")
app.logger.Info("Archived stats data for story", "rowsDeleted", n, "storyID", storyID)
return nil
}
4 changes: 2 additions & 2 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,9 @@ func openNewsDatabase(sqliteDataDir string) (newsDatabase, error) {
FROM dataset
join stories using (id)
WHERE
sampleTime <= unixepoch() - 14*24*60*60
sampleTime <= unixepoch() - 28*24*60*60
and archived = 0
limit 50
limit 10
`
ndb.selectStoriesToArchiveStatement, err = ndb.db.Prepare(sql)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (

require (
github.com/PuerkitoBio/goquery v1.5.1 // indirect
github.com/alitto/pond/v2 v2.1.4 // indirect
github.com/andybalholm/cascadia v1.2.0 // indirect
github.com/antchfx/htmlquery v1.2.3 // indirect
github.com/antchfx/xmlquery v1.2.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/PuerkitoBio/goquery v1.5.1 h1:PSPBGne8NIUWw+/7vFBV+kG2J/5MOjbzc7154Oa
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
github.com/VictoriaMetrics/metrics v1.23.0 h1:WzfqyzCaxUZip+OBbg1+lV33WChDSu4ssYII3nxtpeA=
github.com/VictoriaMetrics/metrics v1.23.0/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc=
github.com/alitto/pond/v2 v2.1.4 h1:FLVRXHjQBpyMdgn6Ua3NWLy8B/4swn9XoB2S3W7UkMQ=
github.com/alitto/pond/v2 v2.1.4/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/andybalholm/cascadia v1.2.0 h1:vuRCkM5Ozh/BfmsaTm26kbjm0mIOM3yS5Ek/F5h18aE=
github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxBp0T0eFw1RUQY=
Expand Down

0 comments on commit 3f05901

Please sign in to comment.