Skip to content

Commit

Permalink
Parallel deploy by default
Browse files Browse the repository at this point in the history
  • Loading branch information
matusfaro committed Jan 6, 2025
1 parent 5d76317 commit d99bc0b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,16 @@ public TaskVersion uploadAndPublish(

// First get S3 upload presigned url
ControlApi controlApi = control();
log.info("Requesting permission to upload {}", codeZipFile.toPath().getFileName());
log.info("Task {} requesting permission to upload", taskId);
UploadCodeResponse uploadCodeResponse = controlApi
.uploadCode(organizationName, new UploadCodeRequest()
.taskId(taskId)
.contentLengthBytes(codeZipFile.length()));

log.info("Uploading file to S3");
log.info("Task {} uploading to S3 file {}", taskId, codeZipFile.toPath());
uploadToS3(uploadCodeResponse.getPresignedUrl(), codeZipFile);

log.info("Requesting asynchronous publishing");
log.info("Task {} trigger publish", taskId);
DeployRequest deployRequest = codeUrlToDeployRequest.apply(uploadCodeResponse.getCodeUrl());
try {
controlApi.deployVersion(organizationName, taskId, uploadCodeResponse.getSessionId(), "Event", deployRequest);
Expand All @@ -146,7 +146,7 @@ public TaskVersion uploadAndPublish(
}
}

log.info("Polling for asynchronous publishing status");
log.info("Task {} polling until published", taskId);
DeployVersionCheckResponse response = RetryerBuilder.<DeployVersionCheckResponse>newBuilder()
.retryIfResult(r -> r.getStatus() == DeployVersionCheckResponse.StatusEnum.PROCESSING)
.withWaitStrategy(WaitStrategies.join(
Expand Down
15 changes: 13 additions & 2 deletions dataspray-cli/src/main/java/io/dataspray/cli/Deploy.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package io.dataspray.cli;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import io.dataspray.core.Codegen;
import io.dataspray.core.Project;
import io.dataspray.core.StreamRuntime;
Expand All @@ -33,6 +34,7 @@
import picocli.CommandLine.Option;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

@Slf4j
@Command(name = "deploy", description = "Single command to publish and activate")
Expand All @@ -43,6 +45,8 @@ public class Deploy implements Runnable {
private String taskId;
@Option(names = "--skip-activate", description = "deploy without activating version; use activate command to start using the deployed version")
boolean skipActivate;
@Option(names = "--no-parallel", description = "deploy serially")
boolean noParallel;
@Option(names = {"-p", "--profile"}, description = "Profile name")
private String profileName;

Expand All @@ -58,7 +62,14 @@ public class Deploy implements Runnable {
@Override
public void run() {
Project project = codegen.loadProject();
commandUtil.getSelectedTaskIds(project, taskId).forEach(selectedTaskId ->
streamRuntime.deploy(cliConfig.getProfile(Optional.ofNullable(Strings.emptyToNull(profileName))), project, selectedTaskId, !skipActivate));
ImmutableSet<String> selectedTaskIds = commandUtil.getSelectedTaskIds(project, taskId);
(noParallel ? selectedTaskIds.stream() : selectedTaskIds.parallelStream())
.forEach(selectedTaskId -> CompletableFuture.runAsync(() -> {
try {
streamRuntime.deploy(cliConfig.getProfile(Optional.ofNullable(Strings.emptyToNull(profileName))), project, selectedTaskId, !skipActivate);
} catch (Exception ex) {
log.error("Task {} failed", selectedTaskId, ex);
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ public TaskVersion deploy(Organization organization, Project project, String pro
handler = "index.js";
runtime = RuntimeEnum.NODEJS20_X;
} else {
throw new RuntimeException("Cannot publish processor " + processor.getName() + " of unknown type " + processor.getClass().getCanonicalName());
throw new RuntimeException("Cannot publish task " + processor.getName() + " of unknown type " + processor.getClass().getCanonicalName());
}

log.info("Publishing task {} with inputs {} outputs {}{}",
log.info("Task {} found with inputs {} outputs {}{}",
processor.getName(),
processor.getInputStreams().stream().map(StreamLink::getStreamName).collect(Collectors.toSet()),
processor.getOutputStreams().stream().map(StreamLink::getStreamName).collect(Collectors.toSet()),
Expand Down

0 comments on commit d99bc0b

Please sign in to comment.