From a577edefa0649e362bebfaf6f02f1ea2747b7fc2 Mon Sep 17 00:00:00 2001 From: Kush <3647166+kushsharma@users.noreply.github.com> Date: Mon, 5 Jul 2021 12:01:54 +0530 Subject: [PATCH] fix: cross entity sensor should use updated job metadata api (#23) Signed-off-by: Kush Sharma --- api/handler/v1/runtime.go | 10 +---- ext/scheduler/airflow/airflow.go | 4 +- ext/scheduler/airflow/resources/__lib.py | 9 +++-- ext/scheduler/airflow2/airflow.go | 25 +++++++++---- ext/scheduler/airflow2/airflow_test.go | 45 +++++++++++++++++++++++ ext/scheduler/airflow2/resources/__lib.py | 10 +++-- models/project.go | 3 ++ 7 files changed, 81 insertions(+), 25 deletions(-) diff --git a/api/handler/v1/runtime.go b/api/handler/v1/runtime.go index 611afcee2a..93c98feb40 100644 --- a/api/handler/v1/runtime.go +++ b/api/handler/v1/runtime.go @@ -486,13 +486,7 @@ func (sv *RuntimeServiceServer) JobStatus(ctx context.Context, req *pb.JobStatus return nil, status.Error(codes.NotFound, fmt.Sprintf("%s: project %s not found", err.Error(), req.GetProjectName())) } - namespaceRepo := sv.namespaceRepoFactory.New(projSpec) - namespaceSpec, err := namespaceRepo.GetByName(req.GetNamespace()) - if err != nil { - return nil, status.Error(codes.NotFound, fmt.Sprintf("%s: namespace %s not found", err.Error(), req.GetNamespace())) - } - - _, err = sv.jobSvc.GetByName(req.GetJobName(), namespaceSpec) + _, _, err = sv.jobSvc.GetByNameForProject(req.GetJobName(), projSpec) if err != nil { return nil, status.Error(codes.NotFound, fmt.Sprintf("%s: failed to find the job %s for namespace %s", err.Error(), req.GetJobName(), req.GetNamespace())) @@ -504,7 +498,7 @@ func (sv *RuntimeServiceServer) JobStatus(ctx context.Context, req *pb.JobStatus req.GetJobName())) } - adaptedJobStatus := []*pb.JobStatus{} + var adaptedJobStatus []*pb.JobStatus for _, jobStatus := range jobStatuses { ts, err := ptypes.TimestampProto(jobStatus.ScheduledAt) if err != nil { diff --git a/ext/scheduler/airflow/airflow.go b/ext/scheduler/airflow/airflow.go index 7b8f552147..c7d93b4147 100644 --- a/ext/scheduler/airflow/airflow.go +++ b/ext/scheduler/airflow/airflow.go @@ -128,7 +128,7 @@ func (a *scheduler) GetJobStatus(ctx context.Context, projSpec models.ProjectSpe return nil, errors.Wrapf(err, "failed to fetch airflow dag runs from %s", fetchURL) } if resp.StatusCode != http.StatusOK { - return nil, errors.Errorf("failed to fetch airflow dag runs from %s", fetchURL) + return nil, errors.Errorf("failed to fetch airflow dag runs from %s: %d", fetchURL, resp.StatusCode) } defer resp.Body.Close() @@ -196,7 +196,7 @@ func (a *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN return errors.Wrapf(err, "failed to clear airflow dag runs from %s", clearDagRunURL) } if resp.StatusCode != http.StatusOK { - return errors.Errorf("failed to clear airflow dag runs from %s", clearDagRunURL) + return errors.Errorf("failed to clear airflow dag runs from %s: %d", clearDagRunURL, resp.StatusCode) } body, err := ioutil.ReadAll(resp.Body) diff --git a/ext/scheduler/airflow/resources/__lib.py b/ext/scheduler/airflow/resources/__lib.py index 44f55c7f42..d625f425c9 100644 --- a/ext/scheduler/airflow/resources/__lib.py +++ b/ext/scheduler/airflow/resources/__lib.py @@ -443,12 +443,13 @@ def get_task_window(self, scheduled_at: str, window_size: str, window_offset: st return response.json() def get_job_metadata(self, execution_date, project, job) -> dict: - url = '{optimus_host}/api/v1/instance'.format(optimus_host=self.host) + url = '{optimus_host}/api/v1/project/{project_name}/job/{job_name}/instance'.format(optimus_host=self.host, + project_name=project, + job_name=job) request_data = { "scheduledAt": execution_date, - "projectName": project, - "jobName": job, - "type": "hook", + "instance_type": "TASK", + "instance_name": "none" } response = requests.post(url, data=json.dumps(request_data)) self._raise_error_if_request_failed(response) diff --git a/ext/scheduler/airflow2/airflow.go b/ext/scheduler/airflow2/airflow.go index c2301b562e..54173141bc 100644 --- a/ext/scheduler/airflow2/airflow.go +++ b/ext/scheduler/airflow2/airflow.go @@ -3,6 +3,7 @@ package airflow2 import ( "bytes" "context" + "encoding/base64" "encoding/json" "fmt" "io" @@ -28,7 +29,7 @@ var resBaseDAG []byte const ( baseLibFileName = "__lib.py" - dagStatusUrl = "api/v1/dags/%s/dagRuns" + dagStatusUrl = "api/v1/dags/%s/dagRuns?limit=99999" dagRunClearURL = "api/v1/dags/%s/clearTaskInstances" ) @@ -116,19 +117,24 @@ func (a *scheduler) GetJobStatus(ctx context.Context, projSpec models.ProjectSpe return nil, errors.Errorf("scheduler host not set for %s", projSpec.Name) } schdHost = strings.Trim(schdHost, "/") + authToken, ok := projSpec.Secret.GetByName(models.ProjectSchedulerAuth) + if !ok { + return nil, errors.Errorf("%s secret not configured for project %s", models.ProjectSchedulerAuth, projSpec.Name) + } - fetchUrl := fmt.Sprintf(fmt.Sprintf("%s/%s", schdHost, dagStatusUrl), jobName) - request, err := http.NewRequest(http.MethodGet, fetchUrl, nil) + fetchURL := fmt.Sprintf(fmt.Sprintf("%s/%s", schdHost, dagStatusUrl), jobName) + request, err := http.NewRequest(http.MethodGet, fetchURL, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to build http request for %s", fetchUrl) + return nil, errors.Wrapf(err, "failed to build http request for %s", fetchURL) } + request.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(authToken)))) resp, err := a.httpClient.Do(request) if err != nil { - return nil, errors.Wrapf(err, "failed to fetch airflow dag runs from %s", fetchUrl) + return nil, errors.Wrapf(err, "failed to fetch airflow dag runs from %s", fetchURL) } if resp.StatusCode != http.StatusOK { - return nil, errors.Errorf("failed to fetch airflow dag runs from %s", fetchUrl) + return nil, errors.Errorf("failed to fetch airflow dag runs from %s: %d", fetchURL, resp.StatusCode) } defer resp.Body.Close() @@ -184,6 +190,10 @@ func (a *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN if !ok { return errors.Errorf("scheduler host not set for %s", projSpec.Name) } + authToken, ok := projSpec.Secret.GetByName(models.ProjectSchedulerAuth) + if !ok { + return errors.Errorf("%s secret not configured for project %s", models.ProjectSchedulerAuth, projSpec.Name) + } schdHost = strings.Trim(schdHost, "/") airflowDateFormat := "2006-01-02T15:04:05+00:00" @@ -199,13 +209,14 @@ func (a *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN return errors.Wrapf(err, "failed to build http request for %s", postURL) } request.Header.Set("Content-Type", "application/json") + request.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(authToken)))) resp, err := a.httpClient.Do(request) if err != nil { return errors.Wrapf(err, "failed to clear airflow dag runs from %s", postURL) } if resp.StatusCode != http.StatusOK { - return errors.Errorf("failed to clear airflow dag runs from %s", postURL) + return errors.Errorf("failed to clear airflow dag runs from %s: %d", postURL, resp.StatusCode) } defer resp.Body.Close() diff --git a/ext/scheduler/airflow2/airflow_test.go b/ext/scheduler/airflow2/airflow_test.go index 03d220c16e..c8a35abb04 100644 --- a/ext/scheduler/airflow2/airflow_test.go +++ b/ext/scheduler/airflow2/airflow_test.go @@ -140,6 +140,13 @@ func TestAirflow2(t *testing.T) { Name: "test-proj", Config: map[string]string{ models.ProjectSchedulerHost: host, + models.ProjectSchedulerAuth: "admin:admin", + }, + Secret: []models.ProjectSecretItem{ + { + Name: models.ProjectSchedulerAuth, + Value: "admin:admin", + }, }, }, "sample_select") @@ -164,11 +171,27 @@ func TestAirflow2(t *testing.T) { Config: map[string]string{ models.ProjectSchedulerHost: host, }, + Secret: []models.ProjectSecretItem{ + { + Name: models.ProjectSchedulerAuth, + Value: "admin:admin", + }, + }, }, "sample_select") assert.NotNil(t, err) assert.Len(t, status, 0) }) + t.Run("should fail if not scheduler secret registered", func(t *testing.T) { + air := airflow2.NewScheduler(nil, nil) + _, err := air.GetJobStatus(ctx, models.ProjectSpec{ + Name: "test-proj", + Config: map[string]string{ + models.ProjectSchedulerHost: host, + }, + }, "sample_select") + assert.NotNil(t, err) + }) }) t.Run("Clear", func(t *testing.T) { host := "http://airflow.example.io" @@ -195,6 +218,12 @@ func TestAirflow2(t *testing.T) { Config: map[string]string{ models.ProjectSchedulerHost: host, }, + Secret: []models.ProjectSecretItem{ + { + Name: models.ProjectSchedulerAuth, + Value: "admin:admin", + }, + }, }, "sample_select", startDateTime, endDateTime) assert.Nil(t, err) @@ -217,9 +246,25 @@ func TestAirflow2(t *testing.T) { Config: map[string]string{ models.ProjectSchedulerHost: host, }, + Secret: []models.ProjectSecretItem{ + { + Name: models.ProjectSchedulerAuth, + Value: "admin:admin", + }, + }, }, "sample_select", startDateTime, endDateTime) assert.NotNil(t, err) }) + t.Run("should fail if not scheduler secret registered", func(t *testing.T) { + air := airflow2.NewScheduler(nil, nil) + err := air.Clear(ctx, models.ProjectSpec{ + Name: "test-proj", + Config: map[string]string{ + models.ProjectSchedulerHost: host, + }, + }, "sample_select", startDateTime, endDateTime) + assert.NotNil(t, err) + }) }) } diff --git a/ext/scheduler/airflow2/resources/__lib.py b/ext/scheduler/airflow2/resources/__lib.py index 17f013d7e8..795a36e073 100644 --- a/ext/scheduler/airflow2/resources/__lib.py +++ b/ext/scheduler/airflow2/resources/__lib.py @@ -326,6 +326,7 @@ def _xcom_value_has_error(_xcom) -> bool: ) return failed_alert.execute(context=context) + class OptimusAPIClient: def __init__(self, optimus_host): self.host = self._add_connection_adapter_if_absent(optimus_host) @@ -358,12 +359,13 @@ def get_task_window(self, scheduled_at: str, window_size: str, window_offset: st return response.json() def get_job_metadata(self, execution_date, project, job) -> dict: - url = '{optimus_host}/api/v1/instance'.format(optimus_host=self.host) + url = '{optimus_host}/api/v1/project/{project_name}/job/{job_name}/instance'.format(optimus_host=self.host, + project_name=project, + job_name=job) request_data = { "scheduledAt": execution_date, - "projectName": project, - "jobName": job, - "type": "hook", + "instance_type": "TASK", + "instance_name": "none" } response = requests.post(url, data=json.dumps(request_data)) self._raise_error_if_request_failed(response) diff --git a/models/project.go b/models/project.go index b85c89c2b7..e8125a46e6 100644 --- a/models/project.go +++ b/models/project.go @@ -18,6 +18,9 @@ const ( // Secret used for uploading prepared scheduler specifications to cloud // e.g. for gcs it will be base64 encoded service account for the bucket ProjectSecretStorageKey = "STORAGE" + + // Secret used to authenticate with scheduler provided at ProjectSchedulerHost + ProjectSchedulerAuth = "SCHEDULER_AUTH" ) var (