Skip to content

Commit

Permalink
Implementation to reprocess last 12 months of data (#436)
Browse files Browse the repository at this point in the history
* Historical processing only targets last year

* Fix tests

* Add tests

* Fix comments

* Config

* Wording

* Use timex
  • Loading branch information
cristinaleonr authored Feb 15, 2024
1 parent 9ff5c25 commit a6a2865
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 15 deletions.
13 changes: 7 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ type MonitorConfig struct {

// SourceConfig holds the config that defines all data sources to be processed.
type SourceConfig struct {
Bucket string `yaml:"bucket"`
Experiment string `yaml:"experiment"`
Datatype string `yaml:"datatype"`
Filter string `yaml:"filter"`
Datasets Datasets `yaml:"target_datasets"`
DailyOnly bool `yaml:"daily_only"`
Bucket string `yaml:"bucket"`
Experiment string `yaml:"experiment"`
Datatype string `yaml:"datatype"`
Filter string `yaml:"filter"`
Datasets Datasets `yaml:"target_datasets"`
DailyOnly bool `yaml:"daily_only"`
FullHistory bool `yaml:"full_history"` // FullHistory = true implies DailyOnly = false.
}

// Datasets contains the name of BigQuery datasets used for temporary, raw, or
Expand Down
23 changes: 21 additions & 2 deletions job-service/job-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (svc *Service) NextJob(ctx context.Context) *tracker.JobWithTarget {
return svc.ifHasFiles(ctx, jt)
}

// Calculate date from 1 year ago.
lastYear := time.Now().UTC().AddDate(-1, 0, 0)

// Since some jobs may be configured as dailyOnly, or have no files for a
// given date, we check for these conditions, and skip the job if
// appropriate. We try up to histJobs.Len() times to find the next job.
Expand All @@ -56,6 +59,12 @@ func (svc *Service) NextJob(ctx context.Context) *tracker.JobWithTarget {
log.Println(err)
continue
}

// Only reprocess data from the last year.
if jt != nil && !jt.FullHistory && jt.Job.Date.Before(lastYear) {
continue
}

return svc.ifHasFiles(ctx, jt)
}
return nil
Expand All @@ -78,6 +87,10 @@ func (svc *Service) ifHasFiles(ctx context.Context, jt *tracker.JobWithTarget) *
// ErrInvalidStartDate is returned if startDate is time.Time{}
var ErrInvalidStartDate = errors.New("invalid start date")

// ErrInvalidDateConfig is returned if both DailyOnly and FullHistory are true for
// a source. Only one of these options can be true.
var ErrInvalidDateConfig = errors.New("invalid source date configuration")

// NewJobService creates the default job service.
func NewJobService(startDate time.Time,
sources []config.SourceConfig,
Expand All @@ -94,6 +107,11 @@ func NewJobService(startDate time.Time,
histSpecs := make([]tracker.JobWithTarget, 0)
for _, s := range sources {
log.Println(s)

if s.DailyOnly && s.FullHistory {
return nil, ErrInvalidDateConfig
}

job := tracker.Job{
Bucket: s.Bucket,
Experiment: s.Experiment,
Expand All @@ -108,8 +126,9 @@ func NewJobService(startDate time.Time,
// TODO - handle gs:// targets
jt := tracker.JobWithTarget{
// NOTE: JobWithTarget.ID is assigned after Job.Date is set.
Job: job,
DailyOnly: s.DailyOnly,
Job: job,
DailyOnly: s.DailyOnly,
FullHistory: s.FullHistory,
}
dailySpecs = append(dailySpecs, jt)
if !s.DailyOnly {
Expand Down
33 changes: 29 additions & 4 deletions job-service/job-service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/m-lab/etl-gardener/persistence"
"github.com/m-lab/etl-gardener/tracker"
"github.com/m-lab/go/cloudtest/gcsfake"
"github.com/m-lab/go/timex"
)

type namedSaver interface {
Expand All @@ -37,8 +38,8 @@ func TestNewJobService(t *testing.T) {
name: "successful-init",
startDate: time.Date(2022, time.July, 1, 0, 0, 0, 0, time.UTC),
sources: []config.SourceConfig{
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5"},
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "tcpinfo"},
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", FullHistory: true},
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "tcpinfo", FullHistory: true},
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "pcap", DailyOnly: true},
},
dailySaver: &failSaver{err: errors.New("any error")},
Expand All @@ -56,6 +57,14 @@ func TestNewJobService(t *testing.T) {
sources: []config.SourceConfig{},
wantErr: job.ErrNoConfiguredJobs,
},
{
name: "error-invalid-date-config",
startDate: time.Date(2022, time.July, 1, 0, 0, 0, 0, time.UTC),
sources: []config.SourceConfig{
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", DailyOnly: true, FullHistory: true},
},
wantErr: job.ErrInvalidDateConfig,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -84,6 +93,8 @@ func TestService_NextJob(t *testing.T) {
// Setup fake gcs bucket access.
fakeb := gcsfake.NewBucketHandle()
fakeb.ObjAttrs = append(fakeb.ObjAttrs, &storage.ObjectAttrs{Name: "ndt/ndt5/2022/07/01/foo.tgz", Size: 1, Updated: time.Now()})
lastYear := time.Now().UTC().AddDate(-1, 0, 1)
fakeb.ObjAttrs = append(fakeb.ObjAttrs, &storage.ObjectAttrs{Name: "ndt/ndt5/" + lastYear.Format(timex.YYYYMMDDWithSlash) + "/bar.tgz", Size: 1, Updated: time.Now()})
fakec := &gcsfake.GCSClient{}
fakec.AddTestBucket("fake-bucket", fakeb)

Expand All @@ -107,7 +118,8 @@ func TestService_NextJob(t *testing.T) {
name: "success-historical",
startDate: time.Date(2022, time.July, 1, 0, 0, 0, 0, time.UTC),
sources: []config.SourceConfig{
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5"},
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5",
FullHistory: true},
},
statsClient: fakec,
dailySaver: &failSaver{err: errors.New("any error")},
Expand All @@ -124,11 +136,24 @@ func TestService_NextJob(t *testing.T) {
histSaver: &noopSaver{},
want: "fake-bucket/ndt/pcap/20220701",
},
{
name: "success-historical-full-history-false",
startDate: lastYear,
sources: []config.SourceConfig{
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5",
FullHistory: false},
},
statsClient: fakec,
dailySaver: &failSaver{err: errors.New("any error")},
histSaver: &noopSaver{},
want: tracker.Key("fake-bucket/ndt/ndt5/" + lastYear.Format(timex.YYYYMMDD)),
},
{
name: "error-fail-savers",
startDate: time.Date(2022, time.July, 1, 0, 0, 0, 0, time.UTC),
sources: []config.SourceConfig{
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5"},
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5",
FullHistory: true},
},
dailySaver: &failSaver{err: errors.New("fail")},
histSaver: &failSaver{err: errors.New("fail")},
Expand Down
7 changes: 4 additions & 3 deletions tracker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ func (j *Job) TablePartition() string {
// JobWithTarget specifies a type/date job, and a destination
// table or GCS prefix
type JobWithTarget struct {
ID Key // ID used by gardener & parsers to identify a Job's status and configuration.
Job Job
DailyOnly bool `json:"-"`
ID Key // ID used by gardener & parsers to identify a Job's status and configuration.
Job Job
DailyOnly bool `json:"-"`
FullHistory bool `json:"-"`
// TODO: enable configuration for parser to target alterate buckets.
}

Expand Down

0 comments on commit a6a2865

Please sign in to comment.