From b5ac43e180ff370f38775b81987fd453312db633 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 15 Nov 2023 10:04:47 +0100 Subject: [PATCH] Apps: implement force deletion and improve status (#714) --- helm/crds/applications.langstream.ai-v1.yml | 2 + .../langstream/admin/client/AdminClient.java | 4 +- .../admin/client/model/Applications.java | 2 +- .../api/storage/ApplicationStore.java | 2 +- .../applications/DeleteApplicationCmd.java | 18 +- .../commands/applications/AppsCmdTest.java | 16 +- .../ai/langstream/tests/AppLifecycleIT.java | 66 ++++++ .../tests/util/BaseEndToEndTest.java | 129 ++++++++---- .../k8s/api/crds/apps/ApplicationSpec.java | 29 ++- .../api/crds/apps/ApplicationSpecOptions.java | 32 +++ .../deployer/k8s/util/KubeUtil.java | 25 +++ .../k8s/controllers/apps/AppController.java | 172 +++++++++++++--- .../k8s/controllers/AppControllerIT.java | 192 ++++++++++++++---- .../k8s/controllers/OperatorExtension.java | 1 + .../k8s/apps/KubernetesApplicationStore.java | 48 +++-- .../apps/KubernetesApplicationStoreTest.java | 51 +++-- .../tester/InMemoryApplicationStore.java | 2 +- .../application/ApplicationResource.java | 5 +- .../application/ApplicationService.java | 4 +- .../webservice/common/TenantResource.java | 2 +- .../application/ApplicationResourceTest.java | 3 +- .../ApplicationServiceResourceLimitTest.java | 2 +- 22 files changed, 633 insertions(+), 174 deletions(-) create mode 100644 langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java create mode 100644 langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpecOptions.java diff --git a/helm/crds/applications.langstream.ai-v1.yml b/helm/crds/applications.langstream.ai-v1.yml index 28aa6b512..184b5f081 100644 --- a/helm/crds/applications.langstream.ai-v1.yml +++ b/helm/crds/applications.langstream.ai-v1.yml @@ -27,6 +27,8 @@ spec: type: string imagePullPolicy: type: string + options: + type: string tenant: type: string required: diff --git a/langstream-admin-client/src/main/java/ai/langstream/admin/client/AdminClient.java b/langstream-admin-client/src/main/java/ai/langstream/admin/client/AdminClient.java index 8f9c00262..5132bacfa 100644 --- a/langstream-admin-client/src/main/java/ai/langstream/admin/client/AdminClient.java +++ b/langstream-admin-client/src/main/java/ai/langstream/admin/client/AdminClient.java @@ -292,8 +292,8 @@ public void update(String application, MultiPartBodyPublisher multiPartBodyPubli @Override @SneakyThrows - public void delete(String application) { - http(newDelete(tenantAppPath("/" + application))); + public void delete(String application, boolean force) { + http(newDelete(tenantAppPath("/" + application + "?force=" + force))); } @Override diff --git a/langstream-admin-client/src/main/java/ai/langstream/admin/client/model/Applications.java b/langstream-admin-client/src/main/java/ai/langstream/admin/client/model/Applications.java index 3e7ec9175..1abf3698b 100644 --- a/langstream-admin-client/src/main/java/ai/langstream/admin/client/model/Applications.java +++ b/langstream-admin-client/src/main/java/ai/langstream/admin/client/model/Applications.java @@ -29,7 +29,7 @@ String deploy( void update(String application, MultiPartBodyPublisher multiPartBodyPublisher); - void delete(String application); + void delete(String application, boolean force); String get(String application, boolean stats); diff --git a/langstream-api/src/main/java/ai/langstream/api/storage/ApplicationStore.java b/langstream-api/src/main/java/ai/langstream/api/storage/ApplicationStore.java index 6e447415f..287f0bd70 100644 --- a/langstream-api/src/main/java/ai/langstream/api/storage/ApplicationStore.java +++ b/langstream-api/src/main/java/ai/langstream/api/storage/ApplicationStore.java @@ -49,7 +49,7 @@ void put( Secrets getSecrets(String tenant, String applicationId); - void delete(String tenant, String applicationId); + void delete(String tenant, String applicationId, boolean force); Map list(String tenant); diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/DeleteApplicationCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/DeleteApplicationCmd.java index e810ccfb7..8c1e8235e 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/DeleteApplicationCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/DeleteApplicationCmd.java @@ -18,16 +18,28 @@ import lombok.SneakyThrows; import picocli.CommandLine; -@CommandLine.Command(name = "delete", header = "Delete an application") +@CommandLine.Command( + name = "delete", + header = "Delete an application. The deletion of the application it's asynchronous.") public class DeleteApplicationCmd extends BaseApplicationCmd { @CommandLine.Parameters(description = "ID of the application") private String applicationId; + @CommandLine.Option( + names = {"-f", "--force"}, + description = + "Force application deletion. This could cause orphaned assets and topics.") + private boolean force; + @Override @SneakyThrows public void run() { - getClient().applications().delete(applicationId); - log(String.format("Application %s deleted", applicationId)); + getClient().applications().delete(applicationId, force); + if (force) { + log(String.format("Application '%s' marked for deletion (forced)", applicationId)); + } else { + log(String.format("Application '%s' marked for deletion", applicationId)); + } } } diff --git a/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java b/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java index 29c80e3c8..e5365457c 100644 --- a/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java +++ b/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java @@ -352,13 +352,25 @@ public void testGet() throws Exception { @Test public void testDelete() { wireMock.register( - WireMock.delete(String.format("/api/applications/%s/my-app", TENANT)) + WireMock.delete(String.format("/api/applications/%s/my-app?force=false", TENANT)) .willReturn(WireMock.ok())); CommandResult result = executeCommand("apps", "delete", "my-app"); Assertions.assertEquals(0, result.exitCode()); Assertions.assertEquals("", result.err()); - Assertions.assertEquals("Application my-app deleted", result.out()); + Assertions.assertEquals("Application 'my-app' marked for deletion", result.out()); + } + + @Test + public void testForceDelete() { + wireMock.register( + WireMock.delete(String.format("/api/applications/%s/my-app?force=true", TENANT)) + .willReturn(WireMock.ok())); + + CommandResult result = executeCommand("apps", "delete", "my-app", "-f"); + Assertions.assertEquals(0, result.exitCode()); + Assertions.assertEquals("", result.err()); + Assertions.assertEquals("Application 'my-app' marked for deletion (forced)", result.out()); } @Test diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java new file mode 100644 index 000000000..e3a6187f7 --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java @@ -0,0 +1,66 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests; + +import ai.langstream.tests.util.BaseEndToEndTest; +import ai.langstream.tests.util.TestSuites; +import java.io.File; +import java.nio.file.Files; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@Slf4j +@ExtendWith(BaseEndToEndTest.class) +@Tag(TestSuites.CATEGORY_OTHER) +public class AppLifecycleIT extends BaseEndToEndTest { + + @Test + public void testDeleteBrokenApplication() throws Exception { + installLangStreamCluster(true); + final String tenant = "ten-" + System.currentTimeMillis(); + setupTenant(tenant); + final String applicationId = "my-test-app"; + + final Map> instanceContent = + Map.of( + "instance", + Map.of( + "streamingCluster", + Map.of( + "type", + "kafka", + "configuration", + Map.of("bootstrapServers", "wrong:9092")), + "computeCluster", + Map.of("type", "kubernetes"))); + + final File instanceFile = Files.createTempFile("ls-test", ".yaml").toFile(); + YAML_MAPPER.writeValue(instanceFile, instanceContent); + + deployLocalApplication( + tenant, false, applicationId, "python-processor", instanceFile, Map.of()); + awaitApplicationInStatus(applicationId, "ERROR_DEPLOYING"); + executeCommandOnClient( + "bin/langstream apps delete %s ".formatted(applicationId).split(" ")); + awaitApplicationInStatus(applicationId, "ERROR_DELETING"); + executeCommandOnClient( + "bin/langstream apps delete -f %s".formatted(applicationId).split(" ")); + awaitApplicationCleanup(tenant, applicationId); + } +} diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java index eb0d82d7d..7a65c1d5b 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java @@ -893,7 +893,8 @@ protected static void dumpPodLogs(String podName, String namespace, String fileP final File outputFile = new File( TEST_LOGS_DIR, - "%s.%s.%s.log".formatted(filePrefix, podName, container)); + "%s.%s.%s.%s.log" + .formatted(filePrefix, namespace, podName, container)); try (FileWriter writer = new FileWriter(outputFile)) { writer.write(logs); } catch (IOException e) { @@ -1070,6 +1071,37 @@ protected static boolean isApplicationReady( expectedRunningTotalExecutors + "/" + expectedRunningTotalExecutors); } + protected static void awaitApplicationInStatus(String applicationId, String status) { + Awaitility.await() + .atMost(3, TimeUnit.MINUTES) + .pollInterval(5, TimeUnit.SECONDS) + .until(() -> isApplicationInStatus(applicationId, status)); + } + + protected static boolean isApplicationInStatus(String applicationId, String expectedStatus) { + final String response = + executeCommandOnClient( + "bin/langstream apps get %s".formatted(applicationId).split(" ")); + final List lines = response.lines().collect(Collectors.toList()); + final String appLine = lines.get(1); + final List lineAsList = + Arrays.stream(appLine.split(" ")) + .filter(s -> !s.isBlank()) + .collect(Collectors.toList()); + final String status = lineAsList.get(3); + if (status != null && status.equals(expectedStatus)) { + return true; + } + log.info( + "application {} is not in expected status {} but is in {}, dumping:", + applicationId, + expectedStatus, + status); + executeCommandOnClient( + "bin/langstream apps get %s -o yaml".formatted(applicationId).split(" ")); + return false; + } + @SneakyThrows protected static void deployLocalApplicationAndAwaitReady( String tenant, @@ -1100,6 +1132,58 @@ private static void deployLocalApplicationAndAwaitReady( String appDirName, Map env, int expectedNumExecutors) { + final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant; + final String podUids = + deployLocalApplication( + tenant, isUpdate, applicationId, appDirName, instanceFile, env); + + awaitApplicationReady(applicationId, expectedNumExecutors); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .pollDelay(Duration.ZERO) + .pollInterval(5, TimeUnit.SECONDS) + .untilAsserted( + () -> { + final List pods = + client.pods() + .inNamespace(tenantNamespace) + .withLabels( + Map.of( + "langstream-application", + applicationId, + "app", + "langstream-runtime")) + .list() + .getItems(); + log.info( + "waiting new executors to be ready, found {}, expected {}", + pods.size(), + expectedNumExecutors); + if (pods.size() != expectedNumExecutors) { + fail("too many pods: " + pods.size()); + } + final String currentUids = + pods.stream() + .map(p -> p.getMetadata().getUid()) + .sorted() + .collect(Collectors.joining(",")); + Assertions.assertNotEquals(podUids, currentUids); + for (Pod pod : pods) { + log.info("checking pod readiness {}", pod.getMetadata().getName()); + assertTrue(checkPodReadiness(pod)); + } + }); + } + + @SneakyThrows + protected static String deployLocalApplication( + String tenant, + boolean isUpdate, + String applicationId, + String appDirName, + File instanceFile, + Map env) { + final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant; String testAppsBaseDir = "src/test/resources/apps"; String testSecretBaseDir = "src/test/resources/secrets"; @@ -1126,7 +1210,6 @@ private static void deployLocalApplicationAndAwaitReady( .collect(Collectors.joining(" && ")); beforeCmd += " && "; } - final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant; final String podUids; if (isUpdate) { @@ -1153,43 +1236,7 @@ private static void deployLocalApplicationAndAwaitReady( "bin/langstream apps %s %s -app /tmp/app -i /tmp/instance.yaml -s /tmp/secrets.yaml" .formatted(isUpdate ? "update" : "deploy", applicationId); executeCommandOnClient((beforeCmd + command).split(" ")); - - awaitApplicationReady(applicationId, expectedNumExecutors); - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .pollDelay(Duration.ZERO) - .pollInterval(5, TimeUnit.SECONDS) - .untilAsserted( - () -> { - final List pods = - client.pods() - .inNamespace(tenantNamespace) - .withLabels( - Map.of( - "langstream-application", - applicationId, - "app", - "langstream-runtime")) - .list() - .getItems(); - log.info( - "waiting new executors to be ready, found {}, expected {}", - pods.size(), - expectedNumExecutors); - if (pods.size() != expectedNumExecutors) { - fail("too many pods: " + pods.size()); - } - final String currentUids = - pods.stream() - .map(p -> p.getMetadata().getUid()) - .sorted() - .collect(Collectors.joining(",")); - Assertions.assertNotEquals(podUids, currentUids); - for (Pod pod : pods) { - log.info("checking pod readiness {}", pod.getMetadata().getName()); - assertTrue(checkPodReadiness(pod)); - } - }); + return podUids; } private static void validateApp(File appDir, File secretFile) throws Exception { @@ -1265,10 +1312,10 @@ protected static String generateControlPlaneToken(String subject) { protected void deleteAppAndAwaitCleanup(String tenant, String applicationId) { executeCommandOnClient("bin/langstream apps delete %s".formatted(applicationId).split(" ")); - awaitCleanup(tenant, applicationId); + awaitApplicationCleanup(tenant, applicationId); } - private static void awaitCleanup(String tenant, String applicationId) { + protected static void awaitApplicationCleanup(String tenant, String applicationId) { final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant; Awaitility.await() .atMost(2, TimeUnit.MINUTES) diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpec.java b/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpec.java index 8f3d822c0..6440a97ee 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpec.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpec.java @@ -18,7 +18,6 @@ import ai.langstream.deployer.k8s.api.crds.NamespacedSpec; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.SneakyThrows; @@ -42,6 +41,19 @@ public static SerializedApplicationInstance deserializeApplication( return mapper.readValue(serializedApplicationInstance, SerializedApplicationInstance.class); } + @SneakyThrows + public static ApplicationSpecOptions deserializeOptions(String options) { + if (options == null) { + return new ApplicationSpecOptions(); + } + return mapper.readValue(options, ApplicationSpecOptions.class); + } + + @SneakyThrows + public static String serializeOptions(ApplicationSpecOptions options) { + return mapper.writeValueAsString(options); + } + @Deprecated private String image; @Deprecated private String imagePullPolicy; @@ -52,18 +64,5 @@ public static SerializedApplicationInstance deserializeApplication( private String application; private String codeArchiveId; - - @Builder - public ApplicationSpec( - String tenant, - String image, - String imagePullPolicy, - String application, - String codeArchiveId) { - super(tenant); - this.image = image; - this.imagePullPolicy = imagePullPolicy; - this.application = application; - this.codeArchiveId = codeArchiveId; - } + private String options; } diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpecOptions.java b/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpecOptions.java new file mode 100644 index 000000000..5f61c7ed2 --- /dev/null +++ b/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpecOptions.java @@ -0,0 +1,32 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.deployer.k8s.api.crds.apps; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class ApplicationSpecOptions { + + public enum DeleteMode { + CLEANUP_REQUIRED, + CLEANUP_BEST_EFFORT; + } + + private DeleteMode deleteMode = DeleteMode.CLEANUP_REQUIRED; + private boolean markedForDeletion; +} diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/util/KubeUtil.java b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/util/KubeUtil.java index f5113c126..a01f10f51 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/util/KubeUtil.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/util/KubeUtil.java @@ -64,6 +64,31 @@ public static boolean isJobCompleted(Job job) { return succeeded != null && succeeded > 0; } + public static boolean isJobFailed(Job job) { + if (job == null) { + return false; + } + final Integer failed = job.getStatus().getFailed(); + return failed != null && failed > 0; + } + + public static Pod getJobPod(Job job, KubernetesClient client) { + if (job == null) { + return null; + } + final String uid = job.getMetadata().getUid(); + final List pods = + client.pods() + .inNamespace(job.getMetadata().getNamespace()) + .withLabel("controller-uid", uid) + .list() + .getItems(); + if (pods.isEmpty()) { + return null; + } + return pods.get(0); + } + public static boolean isStatefulSetReady(StatefulSet sts) { if (sts == null || sts.getStatus() == null) { return false; diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java index 9b13eb224..eb2c50bf0 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java @@ -17,6 +17,8 @@ import ai.langstream.api.model.ApplicationLifecycleStatus; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; +import ai.langstream.deployer.k8s.api.crds.apps.ApplicationSpec; +import ai.langstream.deployer.k8s.api.crds.apps.ApplicationSpecOptions; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationStatus; import ai.langstream.deployer.k8s.apps.AppResourcesFactory; import ai.langstream.deployer.k8s.controllers.BaseController; @@ -28,6 +30,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.javaoperatorsdk.operator.api.reconciler.Constants; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -37,6 +40,7 @@ import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import java.time.Duration; +import java.util.List; import lombok.Data; import lombok.NoArgsConstructor; import lombok.SneakyThrows; @@ -85,27 +89,44 @@ public static class AppLastApplied { String runtimeDeployer; } + public record HandleJobResult(boolean proceed, Duration reschedule) {} + + public class ApplicationDeletedException extends Exception {} + @Override protected PatchResult patchResources( ApplicationCustomResource resource, Context context) { + final ApplicationSpecOptions applicationSpecOptions = + ApplicationSpec.deserializeOptions(resource.getSpec().getOptions()); AppLastApplied appLastApplied = getAppLastApplied(resource); - Duration rescheduleDuration = handleJob(resource, appLastApplied, true, false); - if (rescheduleDuration == null) { - log.infof( - "setup job for %s is completed, checking deployer", - resource.getMetadata().getName()); - rescheduleDuration = handleJob(resource, appLastApplied, false, false); - log.infof( - "setup job for %s is %s", - resource.getMetadata().getName(), - rescheduleDuration != null ? "not completed" : "completed"); - appLastApplied.setRuntimeDeployer(SerializationUtil.writeAsJson(resource.getSpec())); + Duration rescheduleDuration; + if (applicationSpecOptions.isMarkedForDeletion()) { + try { + rescheduleDuration = cleanupApplication(resource, appLastApplied); + } catch (ApplicationDeletedException exception) { + return PatchResult.patch(UpdateControl.noUpdate()); + } } else { - if (appLastApplied == null) { - appLastApplied = new AppLastApplied(); + final HandleJobResult setupJobResult = handleJob(resource, appLastApplied, true, false); + if (setupJobResult.proceed()) { + log.infof( + "[deploy] setup job for %s is completed, checking deployer", + resource.getMetadata().getName()); + final HandleJobResult deployerJobResult = + handleJob(resource, appLastApplied, false, false); + log.infof( + "[deploy] setup job for %s is %s", + resource.getMetadata().getName(), + deployerJobResult.proceed() ? "completed" : "not completed"); + + rescheduleDuration = deployerJobResult.reschedule(); + } else { + + log.infof( + "[deploy] setup job for %s is not completed yet", + resource.getMetadata().getName()); + rescheduleDuration = setupJobResult.reschedule(); } - appLastApplied.setSetup(SerializationUtil.writeAsJson(resource.getSpec())); - log.infof("setup job for %s is not completed yet", resource.getMetadata().getName()); } final UpdateControl updateControl = rescheduleDuration != null @@ -117,29 +138,59 @@ protected PatchResult patchResources( @Override protected DeleteControl cleanupResources( ApplicationCustomResource resource, Context context) { - appResourcesLimiter.onAppBeingDeleted(resource); - final AppLastApplied appLastApplied = getAppLastApplied(resource); - Duration rescheduleDuration = handleJob(resource, appLastApplied, false, true); + AppLastApplied appLastApplied = getAppLastApplied(resource); + Duration rescheduleDuration; + try { + rescheduleDuration = cleanupApplication(resource, appLastApplied); + } catch (ApplicationDeletedException ex) { + rescheduleDuration = null; + } + if (rescheduleDuration == null) { log.infof( - "deployer cleanup job for %s is completed, checking setup cleanup", + "cleanup complete for app %s is completed, removing from limiter", resource.getMetadata().getName()); - rescheduleDuration = handleJob(resource, appLastApplied, true, true); + appResourcesLimiter.onAppBeingDeleted(resource); + return DeleteControl.defaultDelete(); + } else { + return DeleteControl.noFinalizerRemoval().rescheduleAfter(rescheduleDuration); + } + } + + private Duration cleanupApplication( + ApplicationCustomResource resource, AppLastApplied appLastApplied) + throws ApplicationDeletedException { + final HandleJobResult deployerJobResult = handleJob(resource, appLastApplied, false, true); + Duration rescheduleDuration; + if (deployerJobResult.proceed()) { + log.infof( + "[cleanup] deployer cleanup job for %s is completed, checking setup cleanup", + resource.getMetadata().getName()); + + final HandleJobResult setupJobResult = handleJob(resource, appLastApplied, true, true); log.infof( - "setup cleanup job for %s is %s", + "[cleanup] setup cleanup job for %s is %s", resource.getMetadata().getName(), - rescheduleDuration != null ? "not completed" : "completed"); + setupJobResult.proceed() ? "completed" : "not completed"); + if (setupJobResult.proceed()) { + if (!resource.isMarkedForDeletion()) { + client.resource(resource).delete(); + throw new ApplicationDeletedException(); + } + return null; + } else { + return setupJobResult.reschedule(); + } } else { log.infof( - "deployer cleanup job for %s is not completed yet", + "[cleanup] deployer cleanup job for %s is not completed yet", resource.getMetadata().getName()); + rescheduleDuration = deployerJobResult.reschedule(); } - return rescheduleDuration != null - ? DeleteControl.noFinalizerRemoval().rescheduleAfter(rescheduleDuration) - : DeleteControl.defaultDelete(); + return rescheduleDuration; } - private Duration handleJob( + private HandleJobResult handleJob( ApplicationCustomResource application, AppLastApplied appLastApplied, boolean isSetupJob, @@ -154,6 +205,15 @@ private Duration handleJob( final Job currentJob = client.batch().v1().jobs().inNamespace(namespace).withName(jobName).get(); if (currentJob == null || areSpecChanged(application, appLastApplied, isSetupJob)) { + if (appLastApplied == null) { + appLastApplied = new AppLastApplied(); + } + if (isSetupJob) { + appLastApplied.setSetup(SerializationUtil.writeAsJson(application.getSpec())); + } else { + appLastApplied.setRuntimeDeployer( + SerializationUtil.writeAsJson(application.getSpec())); + } if (isSetupJob && !delete) { final boolean isDeployable = appResourcesLimiter.checkLimitsForTenant(application); if (!isDeployable) { @@ -168,7 +228,7 @@ private Duration handleJob( application .getStatus() .setResourceLimitStatus(ApplicationStatus.ResourceLimitStatus.REJECTED); - return Duration.ofSeconds(30); + return new HandleJobResult(false, Duration.ofSeconds(30)); } else { application .getStatus() @@ -181,15 +241,58 @@ private Duration handleJob( } else { application.getStatus().setStatus(ApplicationLifecycleStatus.DELETING); } - return DEFAULT_RESCHEDULE_DURATION; + return new HandleJobResult(false, DEFAULT_RESCHEDULE_DURATION); } else { - if (KubeUtil.isJobCompleted(currentJob)) { + if (KubeUtil.isJobFailed(currentJob)) { + if (isSetupJob && delete) { + // failed cleaning up the assets/topics + final ApplicationSpecOptions applicationSpecOptions = + ApplicationSpec.deserializeOptions(application.getSpec().getOptions()); + if (applicationSpecOptions.getDeleteMode() + == ApplicationSpecOptions.DeleteMode.CLEANUP_BEST_EFFORT) { + return new HandleJobResult(true, null); + } + } + + String errorMessage = "?"; + final Pod pod = KubeUtil.getJobPod(currentJob, client); + if (pod != null) { + final KubeUtil.PodStatus status = + KubeUtil.getPodsStatuses(List.of(pod)).values().iterator().next(); + if (status.getState() == KubeUtil.PodStatus.State.ERROR) { + errorMessage = status.getMessage(); + } + } + + if (delete) { + final String errMessageJobDescription = isSetupJob ? "assets/topics" : "agents"; + application + .getStatus() + .setStatus( + ApplicationLifecycleStatus.errorDeleting( + "Failed to cleanup the %s, to delete the application, please cleanup the assets/topics manually and force-delete the application again. Error was:\n%s" + .formatted( + errMessageJobDescription, + errorMessage))); + } else { + final String errMessageJobDescription = isSetupJob ? "setup" : "deployer"; + application + .getStatus() + .setStatus( + ApplicationLifecycleStatus.errorDeploying( + "Failed to deploy the application, error during job: %s. Error was:\n%s" + .formatted( + errMessageJobDescription, + errorMessage))); + } + return new HandleJobResult(false, null); + } else if (KubeUtil.isJobCompleted(currentJob)) { if (!isSetupJob && !delete) { application.getStatus().setStatus(ApplicationLifecycleStatus.DEPLOYED); } - return null; + return new HandleJobResult(true, null); } else { - return DEFAULT_RESCHEDULE_DURATION; + return new HandleJobResult(false, DEFAULT_RESCHEDULE_DURATION); } } } @@ -210,17 +313,22 @@ private void createJob( setupJob ? AppResourcesFactory.generateSetupJob(params) : AppResourcesFactory.generateDeployerJob(params); + log.debugf( + "Applying job %s in namespace %s", + job.getMetadata().getName(), job.getMetadata().getNamespace()); KubeUtil.patchJob(client, job); } private static boolean areSpecChanged( ApplicationCustomResource cr, AppLastApplied appLastApplied, boolean checkSetup) { if (appLastApplied == null) { + log.infof("Spec changed for %s, no status found", cr.getMetadata().getName()); return true; } final String lastAppliedAsString = checkSetup ? appLastApplied.getSetup() : appLastApplied.getRuntimeDeployer(); if (lastAppliedAsString == null) { + log.infof("Spec changed for %s, no status found", cr.getMetadata().getName()); return true; } final JSONComparator.Result diff = diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java index 0a88c3631..2e03ed9c4 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java @@ -17,17 +17,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import ai.langstream.api.model.ApplicationLifecycleStatus; +import ai.langstream.deployer.k8s.CRDConstants; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationStatus; import ai.langstream.deployer.k8s.apps.AppResourcesFactory; +import ai.langstream.deployer.k8s.controllers.apps.AppController; import ai.langstream.deployer.k8s.util.SerializationUtil; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.NamespaceBuilder; import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.SecretBuilder; +import io.fabric8.kubernetes.api.model.ServiceAccountBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.api.model.batch.v1.JobSpec; @@ -35,6 +40,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -50,7 +56,7 @@ public class AppControllerIT { Map.of( "DEPLOYER_AGENT_RESOURCES", "{defaultMaxTotalResourceUnitsPerTenant: 3}", - "DEPLOYER_RUNTIME_IMAGE", "busybox", + "DEPLOYER_RUNTIME_IMAGE", "bash", "DEPLOYER_RUNTIME_IMAGE_PULL_POLICY", "IfNotPresent")); static AtomicInteger counter = new AtomicInteger(0); @@ -63,6 +69,7 @@ static String genTenant() { void testAppController() { final String tenant = "my-tenant"; + setupTenant(tenant); final String namespace = "langstream-" + tenant; final String applicationId = "my-app"; final ApplicationCustomResource resource = @@ -74,21 +81,25 @@ void testAppController() { name: %s namespace: %s spec: - image: busybox + image: bash imagePullPolicy: IfNotPresent application: '{"modules": {}}' tenant: %s """ .formatted(applicationId, namespace, tenant)); final KubernetesClient client = deployment.getClient(); - client.resource( - new NamespaceBuilder() + deployment + .getClient() + .resource( + new SecretBuilder() .withNewMetadata() - .withName(namespace) + .withName(applicationId) .endMetadata() .build()) + .inNamespace(namespace) .serverSideApply(); - client.resource(resource).inNamespace(namespace).create(); + final ApplicationCustomResource createdCr = + client.resource(resource).inNamespace(namespace).create(); Awaitility.await() .untilAsserted( @@ -116,27 +127,9 @@ void testAppController() { // simulate job finished client.resource(job).inNamespace(namespace).delete(); - final Job mockJob = - new JobBuilder() - .withNewMetadata() - .withName(job.getMetadata().getName()) - .endMetadata() - .withNewSpec() - .withNewTemplate() - .withNewSpec() - .withContainers( - List.of( - new ContainerBuilder() - .withName("test") - .withImage("busybox") - .withCommand(List.of("sleep", "1")) - .build())) - .withRestartPolicy("Never") - .endSpec() - .endTemplate() - .endSpec() - .build(); - client.resource(mockJob).inNamespace(namespace).create(); + createMockJob(namespace, client, job.getMetadata().getName()); + patchCustomResourceWithStatusDone(createdCr); + awaitJobCompleted(namespace, job.getMetadata().getName()); Awaitility.await() .atMost(Duration.ofSeconds(30)) @@ -199,6 +192,30 @@ void testAppController() { assertNotNull(client.resource(resource).inNamespace(namespace).get()); } + private void createMockJob(String namespace, KubernetesClient client, String name) { + final Job mockJob = + new JobBuilder() + .withNewMetadata() + .withName(name) + .endMetadata() + .withNewSpec() + .withNewTemplate() + .withNewSpec() + .withContainers( + List.of( + new ContainerBuilder() + .withName("test") + .withImage("bash") + .withCommand(List.of("sleep", "1")) + .build())) + .withRestartPolicy("Never") + .endSpec() + .endTemplate() + .endSpec() + .build(); + client.resource(mockJob).inNamespace(namespace).create(); + } + @Test void testAppResources() { @@ -213,17 +230,77 @@ void testAppResources() { final ApplicationCustomResource app3 = createAppWithResources(tenant, 2, 1); awaitApplicationErrorForResources(app3); - deployment.getClient().resource(app2).delete(); + simulateAppDeletion(app2); awaitApplicationDeployingStatus(app3); final ApplicationCustomResource app4 = createAppWithResources(tenant, 2, 1); awaitApplicationErrorForResources(app4); deployment.restartDeployerOperator(); - deployment.getClient().resource(app3).delete(); + simulateAppDeletion(app3); awaitApplicationDeployingStatus(app4); } + private void simulateAppDeletion(ApplicationCustomResource app) { + final String namespace = app.getMetadata().getNamespace(); + final String deployerJobName = + AppResourcesFactory.getDeployerJobName(app.getMetadata().getName(), true); + final String setupJobName = + AppResourcesFactory.getSetupJobName(app.getMetadata().getName(), true); + createMockJob(namespace, deployment.getClient(), setupJobName); + + createMockJob(namespace, deployment.getClient(), deployerJobName); + + awaitJobCompleted(namespace, deployerJobName); + awaitJobCompleted(namespace, setupJobName); + patchCustomResourceWithStatusDone(app); + + deployment + .getClient() + .resources(ApplicationCustomResource.class) + .inNamespace(app.getMetadata().getNamespace()) + .withName(app.getMetadata().getName()) + .delete(); + } + + private void patchCustomResourceWithStatusDone(ApplicationCustomResource app) { + final ApplicationStatus status = new ApplicationStatus(); + final AppController.AppLastApplied appLastApplied = new AppController.AppLastApplied(); + appLastApplied.setSetup(SerializationUtil.writeAsJson(app.getSpec())); + appLastApplied.setRuntimeDeployer(SerializationUtil.writeAsJson(app.getSpec())); + status.setLastApplied(SerializationUtil.writeAsJson(appLastApplied)); + final ApplicationCustomResource resource = + deployment + .getClient() + .resources(ApplicationCustomResource.class) + .inNamespace(app.getMetadata().getNamespace()) + .withName(app.getMetadata().getName()) + .get(); + resource.setStatus(status); + deployment.getClient().resource(resource).patchStatus(); + } + + private void awaitJobCompleted(String namespace, String deployerJobName) { + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .pollInterval(5, TimeUnit.SECONDS) + .untilAsserted( + () -> { + final Integer succeeded = + deployment + .getClient() + .batch() + .v1() + .jobs() + .inNamespace(namespace) + .withName(deployerJobName) + .get() + .getStatus() + .getSucceeded(); + assertTrue(succeeded != null && succeeded > 0); + }); + } + private void awaitApplicationErrorForResources(ApplicationCustomResource original) { org.awaitility.Awaitility.await() .untilAsserted( @@ -267,7 +344,7 @@ private void checkSetupJob(Job job) { final JobSpec spec = job.getSpec(); final PodSpec templateSpec = spec.getTemplate().getSpec(); final Container container = templateSpec.getContainers().get(0); - assertEquals("busybox", container.getImage()); + assertEquals("bash", container.getImage()); assertEquals("IfNotPresent", container.getImagePullPolicy()); assertEquals("setup", container.getName()); assertEquals(Quantity.parse("100m"), container.getResources().getRequests().get("cpu")); @@ -282,7 +359,7 @@ private void checkSetupJob(Job job) { assertEquals("deploy", container.getArgs().get(args++)); final Container initContainer = templateSpec.getInitContainers().get(0); - assertEquals("busybox", initContainer.getImage()); + assertEquals("bash", initContainer.getImage()); assertEquals("IfNotPresent", initContainer.getImagePullPolicy()); assertEquals("setup-init-config", initContainer.getName()); assertEquals("/app-config", initContainer.getVolumeMounts().get(0).getMountPath()); @@ -302,7 +379,7 @@ private void checkDeployerJob(Job job, boolean cleanup) { final JobSpec spec = job.getSpec(); final PodSpec templateSpec = spec.getTemplate().getSpec(); final Container container = templateSpec.getContainers().get(0); - assertEquals("busybox", container.getImage()); + assertEquals("bash", container.getImage()); assertEquals("IfNotPresent", container.getImagePullPolicy()); assertEquals("deployer", container.getName()); assertEquals(Quantity.parse("100m"), container.getResources().getRequests().get("cpu")); @@ -356,7 +433,7 @@ private void checkDeployerJob(Job job, boolean cleanup) { .getValue()); final Container initContainer = templateSpec.getInitContainers().get(0); - assertEquals("busybox", initContainer.getImage()); + assertEquals("bash", initContainer.getImage()); assertEquals("IfNotPresent", initContainer.getImagePullPolicy()); assertEquals("deployer-init-config", initContainer.getName()); assertEquals("/app-config", initContainer.getVolumeMounts().get(0).getMountPath()); @@ -378,15 +455,51 @@ private ApplicationCustomResource getCr(String yaml) { } private void setupTenant(String tenant) { + final String namespace = "langstream-" + tenant; deployment .getClient() .resource( new NamespaceBuilder() .withNewMetadata() - .withName("langstream-" + tenant) + .withName(namespace) .endMetadata() .build()) .create(); + deployment + .getClient() + .resource( + new ServiceAccountBuilder() + .withNewMetadata() + .withName( + CRDConstants.computeRuntimeServiceAccountForTenant(tenant)) + .endMetadata() + .build()) + .inNamespace(namespace) + .serverSideApply(); + + deployment + .getClient() + .resource( + new ServiceAccountBuilder() + .withNewMetadata() + .withName( + CRDConstants.computeDeployerServiceAccountForTenant(tenant)) + .endMetadata() + .build()) + .inNamespace(namespace) + .serverSideApply(); + + deployment + .getClient() + .resource( + new SecretBuilder() + .withNewMetadata() + .withName(CRDConstants.TENANT_CLUSTER_CONFIG_SECRET) + .endMetadata() + .withData(Map.of(CRDConstants.TENANT_CLUSTER_CONFIG_SECRET_KEY, "")) + .build()) + .inNamespace(namespace) + .serverSideApply(); } private ApplicationCustomResource createAppWithResources( @@ -404,11 +517,14 @@ private ApplicationCustomResource createAppWithResources( tenant: %s """ .formatted(appId, size, parallelism, tenant)); - return deployment + final String namespace = "langstream-" + tenant; + deployment .getClient() - .resource(resource) - .inNamespace("langstream-" + tenant) - .create(); + .resource( + new SecretBuilder().withNewMetadata().withName(appId).endMetadata().build()) + .inNamespace(namespace) + .serverSideApply(); + return deployment.getClient().resource(resource).inNamespace(namespace).create(); } private String genAppId() { diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/OperatorExtension.java b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/OperatorExtension.java index 44b1fc370..20c3b168e 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/OperatorExtension.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/OperatorExtension.java @@ -115,6 +115,7 @@ private void startDeployerOperator() throws IOException { container.withEnv( "QUARKUS_LOG_CATEGORY__IO_JAVAOPERATORSDK_OPERATOR_PROCESSING_EVENT_SOURCE_CONTROLLER", "debug"); + container.withEnv("QUARKUS_LOG_CATEGORY__AI_LANGSTREAM_DEPLOYER_K8S_APPS", "debug"); env.forEach(container::withEnv); container.withExposedPorts(8080); container.withAccessToHost(true); diff --git a/langstream-k8s-storage/src/main/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStore.java b/langstream-k8s-storage/src/main/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStore.java index 9866d6526..0acfefdf8 100644 --- a/langstream-k8s-storage/src/main/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStore.java +++ b/langstream-k8s-storage/src/main/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStore.java @@ -26,6 +26,7 @@ import ai.langstream.deployer.k8s.agents.AgentResourcesFactory; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationSpec; +import ai.langstream.deployer.k8s.api.crds.apps.ApplicationSpecOptions; import ai.langstream.deployer.k8s.api.crds.apps.SerializedApplicationInstance; import ai.langstream.deployer.k8s.apps.AppResourcesFactory; import ai.langstream.deployer.k8s.limits.ApplicationResourceLimitsChecker; @@ -147,6 +148,12 @@ public void put( throw new IllegalArgumentException( "Application " + applicationId + " is marked for deletion."); } + final ApplicationSpecOptions options = + ApplicationSpec.deserializeOptions(existing.getSpec().getOptions()); + if (options.isMarkedForDeletion()) { + throw new IllegalArgumentException( + "Application " + applicationId + " is marked for deletion."); + } } final String namespace = tenantToNamespace(tenant); @@ -156,12 +163,12 @@ public void put( new ObjectMetaBuilder().withName(applicationId).withNamespace(namespace).build()); final SerializedApplicationInstance serializedApp = new SerializedApplicationInstance(applicationInstance, executionPlan); - final ApplicationSpec spec = - ApplicationSpec.builder() - .tenant(tenant) - .application(ApplicationSpec.serializeApplication(serializedApp)) - .codeArchiveId(codeArchiveId) - .build(); + + final ApplicationSpec spec = new ApplicationSpec(); + spec.setTenant(tenant); + spec.setApplication(ApplicationSpec.serializeApplication(serializedApp)); + spec.setCodeArchiveId(codeArchiveId); + log.info( "Creating application {} in namespace {}, spec {}", applicationId, namespace, spec); crd.setSpec(spec); @@ -246,13 +253,30 @@ private ApplicationCustomResource getApplicationCustomResource( } @Override - public void delete(String tenant, String applicationId) { - final String namespace = tenantToNamespace(tenant); + public void delete(String tenant, String applicationId, boolean force) { + final ApplicationCustomResource existing = + getApplicationCustomResource(tenant, applicationId); + if (existing == null || existing.isMarkedForDeletion()) { + return; + } + final ApplicationSpecOptions options = + ApplicationSpec.deserializeOptions(existing.getSpec().getOptions()); - client.resources(ApplicationCustomResource.class) - .inNamespace(namespace) - .withName(applicationId) - .delete(); + boolean apply = false; + if (!options.isMarkedForDeletion()) { + options.setMarkedForDeletion(true); + apply = true; + } + + if (options.getDeleteMode() == ApplicationSpecOptions.DeleteMode.CLEANUP_REQUIRED + && force) { + options.setDeleteMode(ApplicationSpecOptions.DeleteMode.CLEANUP_BEST_EFFORT); + apply = true; + } + if (apply) { + existing.getSpec().setOptions(ApplicationSpec.serializeOptions(options)); + client.resource(existing).serverSideApply(); + } // the secret deletion will happen automatically once the app custom resource has been // deleted completely } diff --git a/langstream-k8s-storage/src/test/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStoreTest.java b/langstream-k8s-storage/src/test/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStoreTest.java index 5a7063cc7..e8e9930ad 100644 --- a/langstream-k8s-storage/src/test/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStoreTest.java +++ b/langstream-k8s-storage/src/test/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStoreTest.java @@ -20,6 +20,8 @@ import ai.langstream.api.model.Application; import ai.langstream.api.model.Secrets; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; +import ai.langstream.deployer.k8s.api.crds.apps.ApplicationSpec; +import ai.langstream.deployer.k8s.api.crds.apps.ApplicationSpecOptions; import ai.langstream.impl.k8s.tests.KubeK3sServer; import io.fabric8.kubernetes.api.model.OwnerReference; import io.fabric8.kubernetes.api.model.Secret; @@ -85,9 +87,9 @@ void testApp() { assertEquals(1, store.list(tenant).size()); assertNotNull(store.get(tenant, "myapp", false)); - store.delete(tenant, "myapp"); + store.delete(tenant, "myapp", false); - assertNull( + assertNotNull( k3s.getClient() .resources(ApplicationCustomResource.class) .inNamespace("s" + tenant) @@ -98,15 +100,6 @@ void testApp() { // deletion until the cleanup job is finished assertNotNull(k3s.getClient().secrets().inNamespace("s" + tenant).withName("myapp").get()); - Awaitility.await() - .untilAsserted( - () -> - assertNull( - k3s.getClient() - .secrets() - .inNamespace("s" + tenant) - .withName("myapp") - .get())); store.onTenantDeleted(tenant); assertTrue(k3s.getClient().namespaces().withName("s" + tenant).get().isMarkedForDeletion()); } @@ -132,16 +125,35 @@ void testBlockDeployWhileDeleting() { store.onTenantCreated(tenant); final Application app = new Application(); store.put(tenant, "myapp", app, "code-1", null); - ApplicationCustomResource createdCr = + ApplicationCustomResource applicationCustomResource = k3s.getClient() .resources(ApplicationCustomResource.class) .inNamespace("s" + tenant) .withName("myapp") .get(); - createdCr.getMetadata().setFinalizers(List.of("test-finalizer")); - k3s.getClient().resource(createdCr).update(); - store.delete(tenant, "myapp"); - assertTrue(k3s.getClient().resource(createdCr).get().isMarkedForDeletion()); + applicationCustomResource.getMetadata().setFinalizers(List.of("test-finalizer")); + k3s.getClient().resource(applicationCustomResource).update(); + store.delete(tenant, "myapp", false); + applicationCustomResource = k3s.getClient().resource(applicationCustomResource).get(); + assertFalse(applicationCustomResource.isMarkedForDeletion()); + assertTrue( + ApplicationSpec.deserializeOptions(applicationCustomResource.getSpec().getOptions()) + .isMarkedForDeletion()); + assertEquals( + ApplicationSpecOptions.DeleteMode.CLEANUP_REQUIRED, + ApplicationSpec.deserializeOptions(applicationCustomResource.getSpec().getOptions()) + .getDeleteMode()); + + store.delete(tenant, "myapp", true); + applicationCustomResource = k3s.getClient().resource(applicationCustomResource).get(); + assertFalse(applicationCustomResource.isMarkedForDeletion()); + assertTrue( + ApplicationSpec.deserializeOptions(applicationCustomResource.getSpec().getOptions()) + .isMarkedForDeletion()); + assertEquals( + ApplicationSpecOptions.DeleteMode.CLEANUP_BEST_EFFORT, + ApplicationSpec.deserializeOptions(applicationCustomResource.getSpec().getOptions()) + .getDeleteMode()); try { store.put(tenant, "myapp", app, "code-1", null); @@ -149,9 +161,10 @@ void testBlockDeployWhileDeleting() { } catch (IllegalArgumentException aie) { assertEquals("Application myapp is marked for deletion.", aie.getMessage()); } - createdCr = k3s.getClient().resource(createdCr).get(); - createdCr.getMetadata().setFinalizers(List.of()); - k3s.getClient().resource(createdCr).update(); + applicationCustomResource = applicationCustomResource; + applicationCustomResource.getMetadata().setFinalizers(List.of()); + k3s.getClient().resource(applicationCustomResource).update(); + k3s.getClient().resource(applicationCustomResource).delete(); Awaitility.await() .until( diff --git a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java index f39f36c22..5e3eecdc3 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java +++ b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java @@ -134,7 +134,7 @@ private static String getKey(String tenant, String applicationId) { } @Override - public void delete(String tenant, String applicationId) { + public void delete(String tenant, String applicationId, boolean force) { APPLICATIONS.remove(getKey(tenant, applicationId)); SECRETS.remove(getKey(tenant, applicationId)); } diff --git a/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationResource.java b/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationResource.java index d4eac3623..7ec464cd0 100644 --- a/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationResource.java +++ b/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationResource.java @@ -283,10 +283,11 @@ private static void deleteDirectory(Path tempdir) throws IOException { void deleteApplication( Authentication authentication, @NotBlank @PathVariable("tenant") String tenant, - @NotBlank @PathVariable("applicationId") String applicationId) { + @NotBlank @PathVariable("applicationId") String applicationId, + @RequestParam(value = "force", required = false) boolean force) { performAuthorization(authentication, tenant); getAppOrThrow(tenant, applicationId); - applicationService.deleteApplication(tenant, applicationId); + applicationService.deleteApplication(tenant, applicationId, force); log.info("Deleted application {}", applicationId); } diff --git a/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java b/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java index 18e2a3006..9b69bb15e 100644 --- a/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java +++ b/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java @@ -347,9 +347,9 @@ public ApplicationSpecs getApplicationSpecs(String tenant, String applicationId) } @SneakyThrows - public void deleteApplication(String tenant, String applicationId) { + public void deleteApplication(String tenant, String applicationId, boolean force) { checkTenant(tenant); - applicationStore.delete(tenant, applicationId); + applicationStore.delete(tenant, applicationId, force); } private void checkTenant(String tenant) { diff --git a/langstream-webservice/src/main/java/ai/langstream/webservice/common/TenantResource.java b/langstream-webservice/src/main/java/ai/langstream/webservice/common/TenantResource.java index 36d9f63ff..e60caba57 100644 --- a/langstream-webservice/src/main/java/ai/langstream/webservice/common/TenantResource.java +++ b/langstream-webservice/src/main/java/ai/langstream/webservice/common/TenantResource.java @@ -120,7 +120,7 @@ void deleteTenant(@NotBlank @PathVariable("tenant") String tenant) throws Tenant .forEach( app -> { try { - applicationService.deleteApplication(tenant, app); + applicationService.deleteApplication(tenant, app, true); } catch (Exception e) { log.error( "Error deleting application {} for tenant {}", diff --git a/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationResourceTest.java b/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationResourceTest.java index c56d012d3..e2ec04f8c 100644 --- a/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationResourceTest.java +++ b/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationResourceTest.java @@ -152,7 +152,8 @@ void testAppCrud() throws Exception { mockMvc.perform(delete("/api/applications/my-tenant/test")).andExpect(status().isOk()); - mockMvc.perform(get("/api/applications/my-tenant/test")).andExpect(status().isNotFound()); + // the delete is actually a crd update + mockMvc.perform(get("/api/applications/my-tenant/test")).andExpect(status().isOk()); } @Test diff --git a/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceResourceLimitTest.java b/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceResourceLimitTest.java index 0d1c2c784..2bfdbca99 100644 --- a/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceResourceLimitTest.java +++ b/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceResourceLimitTest.java @@ -219,7 +219,7 @@ public Secrets getSecrets(String tenant, String applicationId) { } @Override - public void delete(String tenant, String applicationId) { + public void delete(String tenant, String applicationId, boolean force) { throw new UnsupportedOperationException(); }