Skip to content

Commit

Permalink
Merge pull request #118 from caraml-dev/add-list-scheduled-jobs
Browse files Browse the repository at this point in the history
feat: add list scheduled jobs grpc method
  • Loading branch information
khorshuheng authored May 15, 2024
2 parents ab5318f + fb1c80b commit afca8cf
Show file tree
Hide file tree
Showing 11 changed files with 878 additions and 420 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ service JobService {
// List all types of jobs
rpc ListJobs (ListJobsRequest) returns (ListJobsResponse);

// List all scheduled jobs
rpc ListScheduledJob (ListScheduledJobRequest) returns (ListScheduledJobResponse);

// Cancel a single job
rpc CancelJob (CancelJobRequest) returns (CancelJobResponse);

Expand Down Expand Up @@ -55,6 +58,17 @@ enum JobStatus {
JOB_STATUS_ERROR = 4;
}

message ScheduledJob {
// Identifier of the Job
string id = 1;
string table_name = 2;
string project = 3;
// Timespan of the ingested data per job, in days. The data from end of the day - timespan till end of the day will be ingested. Eg. if the job execution date is 10/4/2021, and ingestion timespan is 2, then data from 9/4/2021 00:00 to 10/4/2021 23:59 (inclusive) will be ingested.
int32 ingestion_timespan = 4;
// Crontab string. Eg. 0 13 * * *
string cron_schedule = 5;
}

message Job {
// Identifier of the Job
string id = 1;
Expand Down Expand Up @@ -191,10 +205,19 @@ message ListJobsRequest {
string project = 3;
}

message ListScheduledJobRequest {
string project = 1;
string table_name = 2;
}

message ListJobsResponse {
repeated Job jobs = 1;
}

message ListScheduledJobResponse {
repeated ScheduledJob jobs = 1;
}

message GetJobRequest {
string job_id = 1;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.caraml.store.api;

import dev.caraml.store.protobuf.jobservice.JobServiceGrpc;
import dev.caraml.store.protobuf.jobservice.JobServiceProto;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.GetHistoricalFeaturesRequest;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.GetHistoricalFeaturesResponse;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.GetJobRequest;
Expand All @@ -10,6 +11,7 @@
import dev.caraml.store.protobuf.jobservice.JobServiceProto.ListJobsResponse;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.ScheduleOfflineToOnlineIngestionJobRequest;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.ScheduleOfflineToOnlineIngestionJobResponse;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.ScheduledJob;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.StartOfflineToOnlineIngestionJobRequest;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.StartOfflineToOnlineIngestionJobResponse;
import dev.caraml.store.sparkjob.JobNotFoundException;
Expand Down Expand Up @@ -97,6 +99,17 @@ public void listJobs(ListJobsRequest request, StreamObserver<ListJobsResponse> r
responseObserver.onCompleted();
}

public void listScheduledJobs(
JobServiceProto.ListScheduledJobRequest request,
StreamObserver<JobServiceProto.ListScheduledJobResponse> responseObserver) {
List<ScheduledJob> jobs =
jobService.listScheduledJobs(request.getProject(), request.getTableName());
JobServiceProto.ListScheduledJobResponse response =
JobServiceProto.ListScheduledJobResponse.newBuilder().addAllJobs(jobs).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}

@Override
public void getJob(GetJobRequest request, StreamObserver<GetJobResponse> responseObserver) {
GetJobResponse response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import dev.caraml.store.protobuf.jobservice.JobServiceProto.Job;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.JobStatus;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.JobType;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.ScheduledJob;
import dev.caraml.store.sparkjob.adapter.BatchIngestionArgumentAdapter;
import dev.caraml.store.sparkjob.adapter.HistoricalRetrievalArgumentAdapter;
import dev.caraml.store.sparkjob.adapter.ScheduledBatchIngestionArgumentAdapter;
Expand Down Expand Up @@ -144,6 +145,20 @@ private Job sparkApplicationToJob(SparkApplication app) {
return builder.build();
}

private ScheduledJob scheduledSparkApplicationToScheduledJob(ScheduledSparkApplication app) {
Map<String, String> labels = app.getMetadata().getLabels();
List<String> args = app.getSpec().getTemplate().getArguments();
int ingestionTimespan = Integer.parseInt(args.get(args.size() - 1));
ScheduledJob.Builder builder =
ScheduledJob.newBuilder()
.setId(app.getMetadata().getName())
.setTableName(labels.getOrDefault(FEATURE_TABLE_LABEL, ""))
.setProject(labels.getOrDefault(PROJECT_LABEL, ""))
.setCronSchedule(app.getSpec().getSchedule())
.setIngestionTimespan(ingestionTimespan);
return builder.build();
}

public Job createOrUpdateStreamingIngestionJob(String project, String featureTableName) {
FeatureTableSpec spec =
tableRepository
Expand Down Expand Up @@ -484,13 +499,7 @@ public List<Job> listJobs(Boolean includeTerminated, String project, String tabl
.stream()
.filter(es -> !es.getValue().isEmpty())
.map(es -> String.format("%s=%s", es.getKey(), es.getValue()));
String jobSets =
Stream.of(JobType.BATCH_INGESTION_JOB, JobType.STREAM_INGESTION_JOB, JobType.RETRIEVAL_JOB)
.map(Enum::toString)
.collect(Collectors.joining(","));
Stream<String> setSelectors = Stream.of(String.format("%s in (%s)", JOB_TYPE_LABEL, jobSets));
String labelSelectors =
Stream.concat(equalitySelectors, setSelectors).collect(Collectors.joining(","));
String labelSelectors = equalitySelectors.collect(Collectors.joining(","));
Stream<Job> jobStream =
sparkOperatorApi.list(namespace, labelSelectors).stream().map(this::sparkApplicationToJob);
if (!includeTerminated) {
Expand All @@ -499,6 +508,23 @@ public List<Job> listJobs(Boolean includeTerminated, String project, String tabl
return jobStream.toList();
}

public List<ScheduledJob> listScheduledJobs(String project, String tableName) {
String labelSelectors = "";
Map<String, String> selectorMap = new HashMap<>();
if (!project.isEmpty()) {
selectorMap.put(PROJECT_LABEL, project);
}
if (!tableName.isEmpty()) {
selectorMap.put(FEATURE_TABLE_LABEL, tableName);
}
selectorMap.entrySet().stream()
.map(es -> String.format("%s=%s", es.getKey(), es.getValue()))
.collect(Collectors.joining(","));
return sparkOperatorApi.listScheduled(namespace, labelSelectors).stream()
.map(this::scheduledSparkApplicationToScheduledJob)
.toList();
}

public Optional<Job> getJob(String id) {
return sparkOperatorApi.getSparkApplication(namespace, id).map(this::sparkApplicationToJob);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ List<SparkApplication> list(String namespace, String labelSelector)
Optional<SparkApplication> getSparkApplication(String namespace, String name)
throws SparkOperatorApiException;

List<ScheduledSparkApplication> listScheduled(String namespace, String labelSelector)
throws SparkOperatorApiException;

Optional<ScheduledSparkApplication> getScheduledSparkApplication(String namespace, String name)
throws SparkOperatorApiException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public ScheduledSparkApplication create(ScheduledSparkApplication app)
public List<SparkApplication> list(String namespace, String labelSelector)
throws SparkOperatorApiException {
ListOptions options = new ListOptions();
options.setLabelSelector(labelSelector);
if (!labelSelector.isEmpty()) {
options.setLabelSelector(labelSelector);
}
try {
return sparkApplicationApi
.list(namespace, options)
Expand All @@ -112,6 +114,24 @@ public Optional<SparkApplication> getSparkApplication(String namespace, String n
};
}

@Override
public List<ScheduledSparkApplication> listScheduled(String namespace, String labelSelector)
throws SparkOperatorApiException {
ListOptions options = new ListOptions();
if (!labelSelector.isEmpty()) {
options.setLabelSelector(labelSelector);
}
try {
return scheduledSparkApplicationApi
.list(namespace, options)
.throwsApiException()
.getObject()
.getItems();
} catch (ApiException e) {
throw new SparkOperatorApiException(e.getMessage());
}
}

@Override
public Optional<ScheduledSparkApplication> getScheduledSparkApplication(
String namespace, String name) throws SparkOperatorApiException {
Expand Down
Loading

0 comments on commit afca8cf

Please sign in to comment.