Skip to content

Commit

Permalink
Merge pull request #156 from m-lab/sandbox-tcpinfo
Browse files Browse the repository at this point in the history
Add tcpinfo config
  • Loading branch information
gfr10598 authored Jun 13, 2019
2 parents d16c4c7 + 8057f8f commit b7b655c
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 50 deletions.
17 changes: 17 additions & 0 deletions cloud/bq/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ var dedupTemplateTraceroute = `
FROM ` + "`%s`" + `)
WHERE row_number = 1`

var dedupTemplateTCPInfo = `
#standardSQL
# Delete all duplicate rows based on uuid, preferring "later" task filename, later parse_time
SELECT * EXCEPT (row_number)
FROM (
SELECT *,
# Prefer more snapshots, earlier task names, later parse time
ROW_NUMBER() OVER (PARTITION BY uuid ORDER BY ARRAY_LENGTH(Snapshots) DESC, ParseInfo.TaskFileName, ParseInfo.ParseTime DESC) row_number
FROM (
SELECT *
FROM ` + "`%s`" + `
)
)
WHERE row_number = 1`

// Dedup executes a query that dedups and writes to destination partition.
// This function is alpha status. The interface may change without notice
// or major version number change.
Expand Down Expand Up @@ -217,6 +232,8 @@ func Dedup(ctx context.Context, dsExt *dataset.Dataset, src string, destTable bq
queryString = fmt.Sprintf(dedupTemplateSwitch, src)
case strings.HasPrefix(destTable.TableID(), "traceroute"):
queryString = fmt.Sprintf(dedupTemplateTraceroute, src)
case strings.HasPrefix(destTable.TableID(), "tcpinfo"):
queryString = fmt.Sprintf(dedupTemplateTCPInfo, src)
default:
log.Println("Only handles sidestream, ndt, switch, traceroute, not " + destTable.TableID())
return nil, errors.New("Unknown table type")
Expand Down
18 changes: 15 additions & 3 deletions cloud/bq/sanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,29 @@ func GetTableDetail(ctx context.Context, dsExt *dataset.Dataset, table bqiface.T
}
}
detail := Detail{}
queryString := fmt.Sprintf(`
legacyQuery := fmt.Sprintf(`
#standardSQL
SELECT COUNT(DISTINCT test_id) AS TestCount, COUNT(DISTINCT task_filename) AS TaskFileCount
FROM `+"`%s.%s`"+`
%s -- where clause`,
dataset, tableName, where)

err := dsExt.QueryAndParse(ctx, queryString, &detail)
tcpinfoQuery := fmt.Sprintf(`
#standardSQL
SELECT COUNT(DISTINCT UUID) AS TestCount, COUNT(DISTINCT ParseInfo.TaskFileName) AS TaskFileCount
FROM `+"`%s.%s`"+`
%s -- where clause`,
dataset, tableName, where)

// TODO - find a better way to do this.
query := legacyQuery
if parts[0] == "tcpinfo" {
query = tcpinfoQuery
}
err := dsExt.QueryAndParse(ctx, query, &detail)
if err != nil {
log.Println(err)
log.Println("Query:", queryString)
log.Println("Query:", query)
}
return &detail, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/gardener/gardener.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func taskHandlerFromEnv(ctx context.Context, client *http.Client) (*reproc.TaskH
if err != nil {
return nil, err
}
return reproc.NewTaskHandler(exec, queues, saver), nil
return reproc.NewTaskHandler(env.Experiment, exec, queues, saver), nil
}

// doDispatchLoop just sequences through archives in date order.
Expand Down
99 changes: 99 additions & 0 deletions k8s/data-processing-cluster/deployments/etl-gardener-tcpinfo.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: etl-gardener-tcpinfo
namespace: default
spec:
replicas: 1
selector:
matchLabels:
# Used to match pre-existing pods that may be affected during updates.
run: etl-gardener-tcpinfo
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
# Pod template.
template:
metadata:
labels:
# Note: run=etl-gardener-server should match a service config with a
# public IP and port so that it is publicly accessible.
run: etl-gardener-tcpinfo
annotations:
# Tell prometheus service discovery to collect metrics from the containers.
prometheus.io/scrape: 'true'
spec:
# When container receives SIGTERM, it begins a new checkpoint. This can
# take longer than the default grace period of 30s.
terminationGracePeriodSeconds: 300

# Place the pod into the Guaranteed QoS by setting equal resource
# requests and limits for *all* containers in the pod.
# For more background, see:
# https://github.com/kubernetes/community/blob/master/contributors/design-proposals/resource-qos.md
containers:
- image: gcr.io/{{GCLOUD_PROJECT}}/github-m-lab-etl-gardener:{{GIT_COMMIT}}
name: etl-gardener
env:
- name: GARDENER_SERVICE
value: "true"
- name: GIT_COMMIT
value: "{{GIT_COMMIT}}"
- name: PROJECT
value: "{{GCLOUD_PROJECT}}"
# NOTE: We read archives from the public archive for all projects.
# TODO: Update when we address https://github.com/m-lab/dev-tracker/issues/369
- name: TASKFILE_BUCKET
value: "pusher-{{GCLOUD_PROJECT}}" # This will work for sandbox/staging, but prod should use archive-measurement-lab.
- name: START_DATE
value: "20190329"
- name: DATE_SKIP # Should be 0 for normal operation
value: "{{DATE_SKIP}}"
- name: TASK_FILE_SKIP # Should be 0 for normal operation
value: "{{TASK_FILE_SKIP}}"
- name: EXPERIMENT
value: "ndt/tcpinfo"
- name: DATASET
value: "batch"
- name: FINAL_DATASET
value: "base_tables"
- name: QUEUE_BASE
value: "etl-tcpinfo-batch-"
- name: NUM_QUEUES
value: "2"

ports:
- name: prometheus-port
containerPort: 9090
- name: service-port
containerPort: 8080

livenessProbe:
httpGet:
path: /alive
port: service-port
initialDelaySeconds: 30
periodSeconds: 60

resources:
requests:
memory: "3Gi"
cpu: "1"
limits:
memory: "3Gi"
cpu: "1"

volumeMounts:
- mountPath: /volume-claim
name: tcpinfo-storage

nodeSelector:
gardener-node: "true"

volumes:
- name: tcpinfo-storage
persistentVolumeClaim:
claimName: gardener-tcpinfo-disk0

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: v1
kind: Service
metadata:
name: etl-gardener-tcpinfo-service
namespace: default
spec:
ports:
- port: 8080
protocol: TCP
targetPort: 8080
selector:
run: etl-gardener-tcpinfo
sessionAffinity: None
type: LoadBalancer
7 changes: 4 additions & 3 deletions reproc/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func NewTerminator() *Terminator {
// It is responsible for starting tasks, recycling queues, and handling the
// termination signal.
type TaskHandler struct {
expName string // The string used for task.Experiment, which is used in Datastore queries.
exec state.Executor // Executor passed to new tasks
taskQueues chan string // Channel through which queues recycled.
saver state.Saver // The Saver used to save task states.
Expand All @@ -76,14 +77,14 @@ type TaskHandler struct {
}

// NewTaskHandler creates a new TaskHandler.
func NewTaskHandler(exec state.Executor, queues []string, saver state.Saver) *TaskHandler {
func NewTaskHandler(expKey string, exec state.Executor, queues []string, saver state.Saver) *TaskHandler {
// Create taskQueue channel, and preload with queues.
taskQueues := make(chan string, len(queues))
for _, q := range queues {
taskQueues <- q
}

return &TaskHandler{exec, taskQueues, saver, NewTerminator()}
return &TaskHandler{expKey, exec, taskQueues, saver, NewTerminator()}
}

// ErrTerminating is returned e.g. by AddTask, when tracker is terminating.
Expand Down Expand Up @@ -120,7 +121,7 @@ func (th *TaskHandler) AddTask(ctx context.Context, prefix string) error {
select {
// Wait until there is an available task queue.
case queue := <-th.taskQueues:
t, err := state.NewTask(prefix, queue, th.saver)
t, err := state.NewTask(th.expName, prefix, queue, th.saver)
if err != nil {
log.Println(err)
return err
Expand Down
8 changes: 4 additions & 4 deletions reproc/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestBasic(t *testing.T) {
// Start tracker with no queues.
exec := Exec{}
saver := NewTestSaver()
th := reproc.NewTaskHandler(&exec, []string{}, saver)
th := reproc.NewTaskHandler("exp", &exec, []string{}, saver)

// This will block because there are no queues.
go th.AddTask(ctx, "foobar")
Expand All @@ -135,7 +135,7 @@ func TestWithTaskQueue(t *testing.T) {
// Start tracker with one queue.
exec := Exec{}
saver := NewTestSaver()
th := reproc.NewTaskHandler(&exec, []string{"queue-1"}, saver)
th := reproc.NewTaskHandler("exp", &exec, []string{"queue-1"}, saver)

th.AddTask(ctx, "gs://fake/ndt/2017/09/22/")

Expand All @@ -151,10 +151,10 @@ func TestRestart(t *testing.T) {
ctx := context.Background()
exec := Exec{}
saver := NewTestSaver()
th := reproc.NewTaskHandler(&exec, []string{"queue-1", "queue-2"}, saver)
th := reproc.NewTaskHandler("exp", &exec, []string{"queue-1", "queue-2"}, saver)

taskName := "gs://foobar/exp/2001/02/03/"
t1, err := state.NewTask(taskName, "queue-1", nil)
t1, err := state.NewTask("exp", taskName, "queue-1", nil)
t1.State = state.Processing
if err != nil {
t.Fatal(err)
Expand Down
8 changes: 4 additions & 4 deletions rex/rex.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,19 +235,18 @@ func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int,
t.SetError(ctx, err, "NewQueueHandler")
return 0, 0, err
}
parts, err := t.ParsePrefix()
prefix, err := t.ParsePrefix()
if err != nil {
// If there is a parse error, log and skip request.
log.Println(err)
t.SetError(ctx, err, "BadPrefix")
return 0, 0, err
}
bucketName := parts[0]

// Use a real storage bucket.
// TODO - add a persistent storageClient to the rex object?
// TODO - try cancelling the context instead?
bucket, err := tq.GetBucket(ctx, rex.StorageClient, rex.Project, bucketName, false)
bucket, err := tq.GetBucket(ctx, rex.StorageClient, rex.Project, prefix.Bucket, false)
if err != nil {
if err == io.EOF && env.TestMode {
log.Println("Using fake client, ignoring EOF error")
Expand All @@ -257,9 +256,10 @@ func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int,
t.SetError(ctx, err, "BucketError")
return 0, 0, err
}

// NOTE: This does not check the terminate channel, so once started, it will
// complete the queuing.
fileCount, byteCount, err := qh.PostDay(ctx, bucket, bucketName, parts[1]+"/"+parts[2]+"/")
fileCount, byteCount, err := qh.PostDay(ctx, bucket, prefix.Bucket, prefix.Path())
if err != nil {
log.Println(err)
t.SetError(ctx, err, "PostDayError")
Expand Down
2 changes: 1 addition & 1 deletion rex/rex_bb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestRealBucket(t *testing.T) {
}
defer exec.StorageClient.Close()
saver := newTestSaver()
th := reproc.NewTaskHandler(exec, []string{"queue-1"}, saver)
th := reproc.NewTaskHandler("exp", exec, []string{"queue-1"}, saver)

// We submit tasks corresponding to real buckets...
th.AddTask(ctx, "gs://archive-mlab-testing/ndt/2017/09/22/")
Expand Down
6 changes: 3 additions & 3 deletions rex/rex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestWithTaskQueue(t *testing.T) {
exec := &rex.ReprocessingExecutor{BQConfig: bqConfig, StorageClient: fc}
defer exec.StorageClient.Close()
saver := newTestSaver()
th := reproc.NewTaskHandler(exec, []string{"queue-1"}, saver)
th := reproc.NewTaskHandler("exp", exec, []string{"queue-1"}, saver)

th.AddTask(ctx, "gs://foo/bar/2001/01/01/")

Expand Down Expand Up @@ -179,7 +179,7 @@ func TestBadPrefix(t *testing.T) {
defer exec.StorageClient.Close()

saver := newTestSaver()
th := reproc.NewTaskHandler(exec, []string{"queue-1"}, saver)
th := reproc.NewTaskHandler("exp", exec, []string{"queue-1"}, saver)

err := th.AddTask(ctx, "gs://foo/bar/badprefix/01/01/")
if err == nil {
Expand All @@ -203,7 +203,7 @@ func TestZeroFiles(t *testing.T) {
defer exec.StorageClient.Close()

saver := newTestSaver()
th := reproc.NewTaskHandler(exec, []string{"queue-1"}, saver)
th := reproc.NewTaskHandler("exp", exec, []string{"queue-1"}, saver)

err := th.AddTask(ctx, "gs://foo/bar/2001/01/01/")
if err != nil {
Expand Down
Loading

0 comments on commit b7b655c

Please sign in to comment.