Skip to content

Commit

Permalink
fix: cross entity sensor should use updated job metadata api (#23)
Browse files Browse the repository at this point in the history
Signed-off-by: Kush Sharma <[email protected]>
  • Loading branch information
kushsharma authored Jul 5, 2021
1 parent d11711f commit a577ede
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 25 deletions.
10 changes: 2 additions & 8 deletions api/handler/v1/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,13 +486,7 @@ func (sv *RuntimeServiceServer) JobStatus(ctx context.Context, req *pb.JobStatus
return nil, status.Error(codes.NotFound, fmt.Sprintf("%s: project %s not found", err.Error(), req.GetProjectName()))
}

namespaceRepo := sv.namespaceRepoFactory.New(projSpec)
namespaceSpec, err := namespaceRepo.GetByName(req.GetNamespace())
if err != nil {
return nil, status.Error(codes.NotFound, fmt.Sprintf("%s: namespace %s not found", err.Error(), req.GetNamespace()))
}

_, err = sv.jobSvc.GetByName(req.GetJobName(), namespaceSpec)
_, _, err = sv.jobSvc.GetByNameForProject(req.GetJobName(), projSpec)
if err != nil {
return nil, status.Error(codes.NotFound, fmt.Sprintf("%s: failed to find the job %s for namespace %s", err.Error(),
req.GetJobName(), req.GetNamespace()))
Expand All @@ -504,7 +498,7 @@ func (sv *RuntimeServiceServer) JobStatus(ctx context.Context, req *pb.JobStatus
req.GetJobName()))
}

adaptedJobStatus := []*pb.JobStatus{}
var adaptedJobStatus []*pb.JobStatus
for _, jobStatus := range jobStatuses {
ts, err := ptypes.TimestampProto(jobStatus.ScheduledAt)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions ext/scheduler/airflow/airflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (a *scheduler) GetJobStatus(ctx context.Context, projSpec models.ProjectSpe
return nil, errors.Wrapf(err, "failed to fetch airflow dag runs from %s", fetchURL)
}
if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("failed to fetch airflow dag runs from %s", fetchURL)
return nil, errors.Errorf("failed to fetch airflow dag runs from %s: %d", fetchURL, resp.StatusCode)
}
defer resp.Body.Close()

Expand Down Expand Up @@ -196,7 +196,7 @@ func (a *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN
return errors.Wrapf(err, "failed to clear airflow dag runs from %s", clearDagRunURL)
}
if resp.StatusCode != http.StatusOK {
return errors.Errorf("failed to clear airflow dag runs from %s", clearDagRunURL)
return errors.Errorf("failed to clear airflow dag runs from %s: %d", clearDagRunURL, resp.StatusCode)
}

body, err := ioutil.ReadAll(resp.Body)
Expand Down
9 changes: 5 additions & 4 deletions ext/scheduler/airflow/resources/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,12 +443,13 @@ def get_task_window(self, scheduled_at: str, window_size: str, window_offset: st
return response.json()

def get_job_metadata(self, execution_date, project, job) -> dict:
url = '{optimus_host}/api/v1/instance'.format(optimus_host=self.host)
url = '{optimus_host}/api/v1/project/{project_name}/job/{job_name}/instance'.format(optimus_host=self.host,
project_name=project,
job_name=job)
request_data = {
"scheduledAt": execution_date,
"projectName": project,
"jobName": job,
"type": "hook",
"instance_type": "TASK",
"instance_name": "none"
}
response = requests.post(url, data=json.dumps(request_data))
self._raise_error_if_request_failed(response)
Expand Down
25 changes: 18 additions & 7 deletions ext/scheduler/airflow2/airflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package airflow2
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand All @@ -28,7 +29,7 @@ var resBaseDAG []byte

const (
baseLibFileName = "__lib.py"
dagStatusUrl = "api/v1/dags/%s/dagRuns"
dagStatusUrl = "api/v1/dags/%s/dagRuns?limit=99999"
dagRunClearURL = "api/v1/dags/%s/clearTaskInstances"
)

Expand Down Expand Up @@ -116,19 +117,24 @@ func (a *scheduler) GetJobStatus(ctx context.Context, projSpec models.ProjectSpe
return nil, errors.Errorf("scheduler host not set for %s", projSpec.Name)
}
schdHost = strings.Trim(schdHost, "/")
authToken, ok := projSpec.Secret.GetByName(models.ProjectSchedulerAuth)
if !ok {
return nil, errors.Errorf("%s secret not configured for project %s", models.ProjectSchedulerAuth, projSpec.Name)
}

fetchUrl := fmt.Sprintf(fmt.Sprintf("%s/%s", schdHost, dagStatusUrl), jobName)
request, err := http.NewRequest(http.MethodGet, fetchUrl, nil)
fetchURL := fmt.Sprintf(fmt.Sprintf("%s/%s", schdHost, dagStatusUrl), jobName)
request, err := http.NewRequest(http.MethodGet, fetchURL, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to build http request for %s", fetchUrl)
return nil, errors.Wrapf(err, "failed to build http request for %s", fetchURL)
}
request.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(authToken))))

resp, err := a.httpClient.Do(request)
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch airflow dag runs from %s", fetchUrl)
return nil, errors.Wrapf(err, "failed to fetch airflow dag runs from %s", fetchURL)
}
if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("failed to fetch airflow dag runs from %s", fetchUrl)
return nil, errors.Errorf("failed to fetch airflow dag runs from %s: %d", fetchURL, resp.StatusCode)
}
defer resp.Body.Close()

Expand Down Expand Up @@ -184,6 +190,10 @@ func (a *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN
if !ok {
return errors.Errorf("scheduler host not set for %s", projSpec.Name)
}
authToken, ok := projSpec.Secret.GetByName(models.ProjectSchedulerAuth)
if !ok {
return errors.Errorf("%s secret not configured for project %s", models.ProjectSchedulerAuth, projSpec.Name)
}

schdHost = strings.Trim(schdHost, "/")
airflowDateFormat := "2006-01-02T15:04:05+00:00"
Expand All @@ -199,13 +209,14 @@ func (a *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN
return errors.Wrapf(err, "failed to build http request for %s", postURL)
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(authToken))))

resp, err := a.httpClient.Do(request)
if err != nil {
return errors.Wrapf(err, "failed to clear airflow dag runs from %s", postURL)
}
if resp.StatusCode != http.StatusOK {
return errors.Errorf("failed to clear airflow dag runs from %s", postURL)
return errors.Errorf("failed to clear airflow dag runs from %s: %d", postURL, resp.StatusCode)
}
defer resp.Body.Close()

Expand Down
45 changes: 45 additions & 0 deletions ext/scheduler/airflow2/airflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ func TestAirflow2(t *testing.T) {
Name: "test-proj",
Config: map[string]string{
models.ProjectSchedulerHost: host,
models.ProjectSchedulerAuth: "admin:admin",
},
Secret: []models.ProjectSecretItem{
{
Name: models.ProjectSchedulerAuth,
Value: "admin:admin",
},
},
}, "sample_select")

Expand All @@ -164,11 +171,27 @@ func TestAirflow2(t *testing.T) {
Config: map[string]string{
models.ProjectSchedulerHost: host,
},
Secret: []models.ProjectSecretItem{
{
Name: models.ProjectSchedulerAuth,
Value: "admin:admin",
},
},
}, "sample_select")

assert.NotNil(t, err)
assert.Len(t, status, 0)
})
t.Run("should fail if not scheduler secret registered", func(t *testing.T) {
air := airflow2.NewScheduler(nil, nil)
_, err := air.GetJobStatus(ctx, models.ProjectSpec{
Name: "test-proj",
Config: map[string]string{
models.ProjectSchedulerHost: host,
},
}, "sample_select")
assert.NotNil(t, err)
})
})
t.Run("Clear", func(t *testing.T) {
host := "http://airflow.example.io"
Expand All @@ -195,6 +218,12 @@ func TestAirflow2(t *testing.T) {
Config: map[string]string{
models.ProjectSchedulerHost: host,
},
Secret: []models.ProjectSecretItem{
{
Name: models.ProjectSchedulerAuth,
Value: "admin:admin",
},
},
}, "sample_select", startDateTime, endDateTime)

assert.Nil(t, err)
Expand All @@ -217,9 +246,25 @@ func TestAirflow2(t *testing.T) {
Config: map[string]string{
models.ProjectSchedulerHost: host,
},
Secret: []models.ProjectSecretItem{
{
Name: models.ProjectSchedulerAuth,
Value: "admin:admin",
},
},
}, "sample_select", startDateTime, endDateTime)

assert.NotNil(t, err)
})
t.Run("should fail if not scheduler secret registered", func(t *testing.T) {
air := airflow2.NewScheduler(nil, nil)
err := air.Clear(ctx, models.ProjectSpec{
Name: "test-proj",
Config: map[string]string{
models.ProjectSchedulerHost: host,
},
}, "sample_select", startDateTime, endDateTime)
assert.NotNil(t, err)
})
})
}
10 changes: 6 additions & 4 deletions ext/scheduler/airflow2/resources/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def _xcom_value_has_error(_xcom) -> bool:
)
return failed_alert.execute(context=context)


class OptimusAPIClient:
def __init__(self, optimus_host):
self.host = self._add_connection_adapter_if_absent(optimus_host)
Expand Down Expand Up @@ -358,12 +359,13 @@ def get_task_window(self, scheduled_at: str, window_size: str, window_offset: st
return response.json()

def get_job_metadata(self, execution_date, project, job) -> dict:
url = '{optimus_host}/api/v1/instance'.format(optimus_host=self.host)
url = '{optimus_host}/api/v1/project/{project_name}/job/{job_name}/instance'.format(optimus_host=self.host,
project_name=project,
job_name=job)
request_data = {
"scheduledAt": execution_date,
"projectName": project,
"jobName": job,
"type": "hook",
"instance_type": "TASK",
"instance_name": "none"
}
response = requests.post(url, data=json.dumps(request_data))
self._raise_error_if_request_failed(response)
Expand Down
3 changes: 3 additions & 0 deletions models/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const (
// Secret used for uploading prepared scheduler specifications to cloud
// e.g. for gcs it will be base64 encoded service account for the bucket
ProjectSecretStorageKey = "STORAGE"

// Secret used to authenticate with scheduler provided at ProjectSchedulerHost
ProjectSchedulerAuth = "SCHEDULER_AUTH"
)

var (
Expand Down

0 comments on commit a577ede

Please sign in to comment.