Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add update streaming job grpc method #119

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ service JobService {
// Start job to ingest data from offline store into online store
rpc StartOfflineToOnlineIngestionJob (StartOfflineToOnlineIngestionJobRequest) returns (StartOfflineToOnlineIngestionJobResponse);

// Start job to ingest data from streaming source into online store
rpc StartStreamIngestionJob(StartStreamIngestionJobRequest) returns (StartStreamIngestionJobResponse);

// Start scheduled job to ingest data from offline store into online store
rpc ScheduleOfflineToOnlineIngestionJob (ScheduleOfflineToOnlineIngestionJobRequest) returns (ScheduleOfflineToOnlineIngestionJobResponse);

Expand All @@ -26,7 +29,7 @@ service JobService {
rpc ListJobs (ListJobsRequest) returns (ListJobsResponse);

// List all scheduled jobs
rpc ListScheduledJob (ListScheduledJobRequest) returns (ListScheduledJobResponse);
rpc ListScheduledJobs (ListScheduledJobsRequest) returns (ListScheduledJobsResponse);

// Cancel a single job
rpc CancelJob (CancelJobRequest) returns (CancelJobResponse);
Expand Down Expand Up @@ -137,6 +140,18 @@ message StartOfflineToOnlineIngestionJobResponse {
string log_uri = 4;
}

// Ingest data from streaming source into online store
message StartStreamIngestionJobRequest {
// Feature table to ingest
string project = 1;
string table_name = 2;
}

message StartStreamIngestionJobResponse {
// Job ID assigned by Feast
string id = 1;
}

message ScheduleOfflineToOnlineIngestionJobRequest {
// Feature table to ingest
string project = 1;
Expand Down Expand Up @@ -205,7 +220,7 @@ message ListJobsRequest {
string project = 3;
}

message ListScheduledJobRequest {
message ListScheduledJobsRequest {
string project = 1;
string table_name = 2;
}
Expand All @@ -214,7 +229,7 @@ message ListJobsResponse {
repeated Job jobs = 1;
}

message ListScheduledJobResponse {
message ListScheduledJobsResponse {
repeated ScheduledJob jobs = 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import dev.caraml.store.protobuf.jobservice.JobServiceProto.ScheduledJob;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.StartOfflineToOnlineIngestionJobRequest;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.StartOfflineToOnlineIngestionJobResponse;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.StartStreamIngestionJobRequest;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.StartStreamIngestionJobResponse;
import dev.caraml.store.sparkjob.JobNotFoundException;
import dev.caraml.store.sparkjob.JobService;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -53,6 +55,19 @@ public void startOfflineToOnlineIngestionJob(
responseObserver.onCompleted();
}

@Override
public void startStreamIngestionJob(
StartStreamIngestionJobRequest request,
StreamObserver<StartStreamIngestionJobResponse> responseObserver) {
Job job =
jobService.createOrUpdateStreamingIngestionJob(
request.getProject(), request.getTableName());
StartStreamIngestionJobResponse response =
StartStreamIngestionJobResponse.newBuilder().setId(job.getId()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}

@Override
public void scheduleOfflineToOnlineIngestionJob(
ScheduleOfflineToOnlineIngestionJobRequest request,
Expand Down Expand Up @@ -99,13 +114,14 @@ public void listJobs(ListJobsRequest request, StreamObserver<ListJobsResponse> r
responseObserver.onCompleted();
}

@Override
public void listScheduledJobs(
JobServiceProto.ListScheduledJobRequest request,
StreamObserver<JobServiceProto.ListScheduledJobResponse> responseObserver) {
JobServiceProto.ListScheduledJobsRequest request,
StreamObserver<JobServiceProto.ListScheduledJobsResponse> responseObserver) {
List<ScheduledJob> jobs =
jobService.listScheduledJobs(request.getProject(), request.getTableName());
JobServiceProto.ListScheduledJobResponse response =
JobServiceProto.ListScheduledJobResponse.newBuilder().addAllJobs(jobs).build();
JobServiceProto.ListScheduledJobsResponse response =
JobServiceProto.ListScheduledJobsResponse.newBuilder().addAllJobs(jobs).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
Expand Down
Loading
Loading