Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CTR : not working in SNAPSHOT-3.0.0 #6071

Open
cbonami opened this issue Nov 14, 2024 · 13 comments
Open

CTR : not working in SNAPSHOT-3.0.0 #6071

cbonami opened this issue Nov 14, 2024 · 13 comments
Labels
status/need-feedback Calling participant to provide feedback

Comments

@cbonami
Copy link

cbonami commented Nov 14, 2024

Description:

Our journey began when trying to have a Composed Task with a Boot 3 based applications. The CTR of the latest released version of SCDF is still based on Boot 2. So the CTR itself started reading/writing in BOOT3-prefixed tables, while it is a Boot 2 application. That's why this didn't work. We also assumed that it is even not supposed to work due to the combination of Boot2 and Boot3. A bit odd, as Boot 3 is available already for such a long time.

So we hoped that the Boot 3 version of the CTR, available in SCDF github branch 'main3' would solve our issue. We also assumed that this branch corresponds to the image on dockerhub springcloud/spring-cloud-dataflow-composed-task-runner:3.0.0-SNAPSHOT.
We ran our composed task with SCDF version: bitnami/spring-cloud-dataflow:2.11.5-debian-12-r3.

But then, even before sub-tasks are launched, we encounter this exception in the container running the composed task:

2024-11-14 10:57:39.320  INFO 1 --- [           main] o.s.c.d.c.ComposedTaskRunner             : Started ComposedTaskRunner in 27.109 seconds (process running for 28.389)
2024-11-14 10:57:39.326  INFO 1 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2024-11-14 10:57:39.538  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=migrate-employer]] launched with the following parameters: [{'ctr.id':'{value=15d0d628-c9ea-4c42-8b25-ac66711b573b, type=class java.lang.String, identifying=true}'}]
2024-11-14 10:57:39.620 ERROR 1 --- [           main] o.s.batch.core.job.AbstractJob           : Encountered fatal error executing job

org.springframework.dao.EmptyResultDataAccessException: Incorrect result size: expected 1, actual 0
	at org.springframework.dao.support.DataAccessUtils.nullableSingleResult(DataAccessUtils.java:190) ~[spring-tx-6.1.14.jar:6.1.14]
	at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:890) ~[spring-jdbc-6.1.14.jar:6.1.14]
	at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:916) ~[spring-jdbc-6.1.14.jar:6.1.14]
	at org.springframework.batch.core.repository.dao.JdbcJobExecutionDao.synchronizeStatus(JdbcJobExecutionDao.java:394) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:194) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Unknown Source) ~[na:na]
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:355) ~[spring-aop-6.1.14.jar:6.1.14]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) ~[spring-aop-6.1.14.jar:6.1.14]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-6.1.14.jar:6.1.14]
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:379) ~[spring-tx-6.1.14.jar:6.1.14]
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-6.1.14.jar:6.1.14]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.1.14.jar:6.1.14]
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) ~[spring-aop-6.1.14.jar:6.1.14]
	at jdk.proxy2/jdk.proxy2.$Proxy108.update(Unknown Source) ~[na:na]
	at org.springframework.batch.core.job.AbstractJob.updateStatus(AbstractJob.java:445) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:312) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:157) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-6.1.14.jar:6.1.14]
	at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:148) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.batch.core.launch.support.TaskExecutorJobLauncher.run(TaskExecutorJobLauncher.java:59) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.execute(JobLauncherApplicationRunner.java:210) ~[spring-boot-autoconfigure-3.3.5.jar:3.3.5]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.executeLocalJobs(JobLauncherApplicationRunner.java:194) ~[spring-boot-autoconfigure-3.3.5.jar:3.3.5]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.launchJobFromProperties(JobLauncherApplicationRunner.java:174) ~[spring-boot-autoconfigure-3.3.5.jar:3.3.5]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:169) ~[spring-boot-autoconfigure-3.3.5.jar:3.3.5]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:164) ~[spring-boot-autoconfigure-3.3.5.jar:3.3.5]
	at org.springframework.cloud.task.configuration.observation.ObservationApplicationRunner.run(ObservationApplicationRunner.java:59) ~[spring-cloud-task-core-3.1.2.jar:3.1.2]
	at org.springframework.boot.SpringApplication.lambda$callRunner$4(SpringApplication.java:786) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:83) ~[spring-core-6.1.14.jar:6.1.14]
	at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60) ~[spring-core-6.1.14.jar:6.1.14]
	at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:88) ~[spring-core-6.1.14.jar:6.1.14]
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:786) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:774) ~[spring-boot-3.3.5.jar:3.3.5]
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source) ~[na:na]
	at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(Unknown Source) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[na:na]
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source) ~[na:na]
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[na:na]
	at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source) ~[na:na]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:774) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:342) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1363) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1352) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskRunner.main(ComposedTaskRunner.java:31) ~[classes/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Unknown Source) ~[na:na]
	at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:95) ~[workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[workspace/:na]
	at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:65) ~[workspace/:na]
	at org.springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:31) ~[workspace/:na]

2024-11-14 10:57:39.630  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=migrate-employer]] failed unexpectedly and fatally with the following parameters: [{'ctr.id':'{value=15d0d628-c9ea-4c42-8b25-ac66711b573b, type=class java.lang.String, identifying=true}'}]`

No clue why this happens.

Is this (still) as expected? Do we have to wait for the final 3.0.0 version of the CTR?

Release versions:

  • SCDF: bitnami/spring-cloud-dataflow:2.11.5-debian-12-r3
  • Composed Task Runner: springcloud/spring-cloud-dataflow-composed-task-runner:3.0.0-SNAPSHOT
@github-actions github-actions bot added the status/need-triage Team needs to triage and take a first look label Nov 14, 2024
@cppwfs
Copy link
Contributor

cppwfs commented Nov 14, 2024

Hello,
SCDF 2.11.5 only supports the Boot 2 implementation of CTR. Looking at the stack trace above it looks like it is from the BOOT-3 implementation of CTR.

If you were to create the following CTR definition, t1: timestamp && t2: timestamp do you get the same results?
If so, what properties are you setting when you launch your CTR?

@cppwfs cppwfs added status/need-feedback Calling participant to provide feedback and removed status/need-triage Team needs to triage and take a first look labels Nov 14, 2024
@cbonami
Copy link
Author

cbonami commented Nov 14, 2024

I will try to do what you asked asap. But in the mean time, I can confirm: we deployed a CTR Boot 3 version with a SCDF 2.11.5. Because we are so desperate to get our Boot 3 tasks/jobs running in SCDF. We really need them to be Boot 3 (and not Boot 2 anymore) because we want to use Boot 3's support for improved metrics with Micrometer and new distributed tracing support with Micrometer Tracing.

So we configured the SCDF Helm chart as follows:

spring-cloud-dataflow:
  ...
  server:
    composedTaskRunner:
      image:
        registry: docker.io
        repository: springcloud/spring-cloud-dataflow-composed-task-runner
        tag: 3.0.0-SNAPSHOT
        digest: ""
    image:
      registry: docker.io
      repository: bitnami/spring-cloud-dataflow
      tag: 2.11.5-debian-12-r3
      digest: ""

Using the default Boot 2 CTR in combination with SB3 jobs simply didn't work, as that CTR will write in different tables. Juggling with table prefixes didn't lead anywhere.

But from what I understood from your answer, running a Boot 3 CTR on SCDF 2.11.5 won't be possible anyway, right?

@github-actions github-actions bot added for/team-attention For team attention and removed status/need-feedback Calling participant to provide feedback labels Nov 14, 2024
@cppwfs
Copy link
Contributor

cppwfs commented Nov 14, 2024

CTR 2.11.5 can launch Boot 3 applications. So don't use CTR 3.0.0 with SCDF 2.11.5.

@github-actions github-actions bot added status/need-feedback Calling participant to provide feedback and removed for/team-attention For team attention labels Nov 14, 2024
@cbonami
Copy link
Author

cbonami commented Nov 18, 2024

We didn't get CTR 2.11.5 working with Boot 3 BATCH applications. The CTR itself started reading/writing in BOOT3-prefixed tables, while it is a Boot 2 application. We received SQL exceptions about jobs not existing etc. We will check this again and give more details.

@github-actions github-actions bot added for/team-attention For team attention and removed status/need-feedback Calling participant to provide feedback labels Nov 18, 2024
@klopfdreh
Copy link
Contributor

klopfdreh commented Nov 19, 2024

Hey @cbonami,

in your task application are you defining a custom JobExplorer or JobRepository?

If so try to autowire org.springframework.boot.autoconfigure.batch.BatchProperties into the bean definition and create the beans like this

        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        // ....
        Optional.ofNullable(batchProperties)
            .map(BatchProperties::getJdbc)
            .map(BatchProperties.Jdbc::getTablePrefix)
            .ifPresent(factory::setTablePrefix);
        factory.afterPropertiesSet();
        return Objects.requireNonNull(factory.getObject());
        JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
        // ....
        Optional.ofNullable(batchProperties)
            .map(BatchProperties::getJdbc)
            .map(BatchProperties.Jdbc::getTablePrefix)
            .ifPresent(jobExplorerFactoryBean::setTablePrefix);
        jobExplorerFactoryBean.afterPropertiesSet();
        return Objects.requireNonNull(jobExplorerFactoryBean.getObject());

@cbonami
Copy link
Author

cbonami commented Nov 21, 2024

@klopfdreh , thanks for your reply. But no, we did not provide a custom JobExplorer or JobRepository.

Anyway, in the meantime we could get it running via a hack. At least, we consider this a hack.

We reverted everything to SCDF 2.11.5 and the standard Boot2-based CTR.

We deployed our Composed Task comprising 2 subtasks: a Boot3 task, and a Boot3 batch app.
Like in our initial experiments, the FIRST run of the CT worked just fine.
However, when launching the CT for the second time, we received this:

Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={run.id=1}.  If you want to run this job again, change the parameters.

This is what put us on the wrong foot during our first experiments when we tried the CTR (Boot2) with the 2 subtasks that where Boot2 before, but that we converted to Boot3. At that time we received exactly the same error, something that we never encountered in the Boot2-world. This made us conclude that the CTR also needed to be Boot3, which apparently is not the case.

So now, we circumvented this problem by adding an argument (called 'unique') when starting the CT. The argument contains a random UUID.

Now we can run the CT multiple times without any problem; this is what we see in the logs:

Job: [FlowJob: [name=migrate-employer]] launched with the following parameters: [{run.id=1, unique=fd1ed4b1-c9a6-40a2-9f9f-ac77a2414da0}]

As you can see, run.id always remains 1 for some reason, but we make the arguments unique with our 'hack'.
Of course, we don't think that this is how this is intended to be done. The run.id should auto-increment, just like it did (we suppose) when using the Boot2-subtasks.

@cppwfs
Copy link
Contributor

cppwfs commented Nov 21, 2024

To clarify. Its CTR that is throwing the JobInstanceAlreadyCompleteException exception?

@cppwfs cppwfs added status/need-feedback Calling participant to provide feedback and removed for/team-attention For team attention labels Nov 21, 2024
@cbonami
Copy link
Author

cbonami commented Nov 27, 2024

Indeed.

@github-actions github-actions bot added for/team-attention For team attention and removed status/need-feedback Calling participant to provide feedback labels Nov 27, 2024
@cppwfs
Copy link
Contributor

cppwfs commented Dec 2, 2024

Can you share with us the deployer and app properties (sensitive information redacted) you are using? Currently we are unable to reproduce.

@cppwfs cppwfs added status/need-feedback Calling participant to provide feedback and removed for/team-attention For team attention labels Dec 2, 2024
@cbonami
Copy link
Author

cbonami commented Dec 10, 2024

Sorry for the late reply. Here you go:

**Schema Target:** boot3

**Arguments:**
--migrate-employer.ignore-prerequisites: OVERRULE,WORK_SCHEDULE,CASCADE
--migrate-employer.pulsar-broker-url: ******
--migrate-employer.dicco-ds-url: ******
--spring.datasource.driverClassName: com.microsoft.sqlserver.jdbc.SQLServerDriver
--management.metrics.tags.application: ${spring.cloud.task.name:unknown}-${spring.cloud.task.executionid:unknown}
--spring.cloud.task.name: migrate-employer-migrate-employer
--spring.datasource.password: ******
--spring.cloud.deployer.bootVersion: 3
--management.metrics.tags.service: task-application
--spring.datasource.username: ******
--spring.datasource.url: ******
--spring.cloud.task.initialize-enabled: false
--migrate-employer.skip-work-schedule-check: false
--migrate-employer.pulsar-client-url: ******
--spring.cloud.task.schemaTarget: boot3
--spring.batch.jdbc.table-prefix: BOOT3_BATCH_
--migrate-employer.continue-on-fail: false
--migrate-employer.onboarding-service-grpc-host: clink-grpc.connect-evolution-inttst
--spring.cloud.task.tablePrefix: BOOT3_TASK_
--migrate-employer.exclude-phases:
--migrate-employer.include-phases: CACHE_EMPLOYER,SPLIT_ADJUSTMENTS,EMPLOYER,AGREEMENT,WORK_SCHEDULE,WORKING_TIMES
--spring.cloud.task.parent-execution-id: 3
--spring.cloud.task.parent-schema-target: boot3
--app.migrate-employer.spring.cloud.task.initialize-enabled: false
--app.migrate-employer.spring.batch.jdbc.table-prefix: BOOT3_BATCH_
--app.migrate-employer.spring.cloud.task.tablePrefix: BOOT3_TASK_
--app.migrate-employer.spring.cloud.task.schemaTarget: boot3
--app.migrate-employer.spring.cloud.deployer.bootVersion: 3
--spring.cloud.task.executionid: 5

**External Execution Id:**
migrate-employer-migrate-employer-kev1mo4zqm

**Job Execution Ids:**
[3](https://scdf-server.inttst.tst.ce.acerta.io/dashboard/index.html#/tasks-jobs/job-executions/3/schemaTarget/boot3)
...

**Resource URL:**
Docker Resource [docker:harbor.tools.acerta.io/acerta-releases/acerta-connect-evolution/migrate-employer-task:0.59.0]

**Application Properties:**
migrate-employer.ignore-prerequisites: OVERRULE,WORK_SCHEDULE,CASCADE
migrate-employer.pulsar-broker-url: ******
migrate-employer.dicco-ds-url: ******
spring.datasource.driverClassName: com.microsoft.sqlserver.jdbc.SQLServerDriver
management.metrics.tags.application: ${spring.cloud.task.name:unknown}-${spring.cloud.task.executionid:unknown}
spring.cloud.task.name: migrate-employer-migrate-employer
spring.datasource.password: ******
spring.cloud.deployer.bootVersion: 3
management.metrics.tags.service: task-application
spring.datasource.username: ******
spring.datasource.url: ******
spring.cloud.task.initialize-enabled: false
migrate-employer.skip-work-schedule-check: false
migrate-employer.pulsar-client-url: ******
spring.cloud.task.schemaTarget: boot3
spring.batch.jdbc.table-prefix: BOOT3_BATCH_
migrate-employer.continue-on-fail: false
migrate-employer.onboarding-service-grpc-host: clink-grpc.connect-evolution-inttst
spring.cloud.task.tablePrefix: BOOT3_TASK_

**Platform Properties:**
app.migrate-employer.spring.cloud.task.initialize-enabled: false
deployer.migrate-employer.kubernetes.secretKeyRefs: [{"envVarName":"MIGRATE_EMPLOYER.DICCO-DS-USERNAME","secretName":"scdf-server-dicco-datasource-cred","dataKey":"username"},{"envVarName":"MIGRATE_EMPLOYER.DICCO-DS-PASSWORD","secretName":"scdf-server-dicco-datasource-cred","dataKey":"password"}]
app.migrate-employer.dicco-ds-url: ******
deployer.migrate-employer.kubernetes.deploymentLabels: loki:acerta-payroll-service-tst,app.kubernetes.io/name:migrate-employer-migrate-employer
app.migrate-employer.ignore-prerequisites: OVERRULE,WORK_SCHEDULE,CASCADE
deployer.*.kubernetes.environment-variables: DATAFLOW_COMPOSED_TASK_NAME=migrate-employer,MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_ENABLED=true,MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_HOST=prometheus-rsocket-proxy.connect-evolution-prometheus-rsocket-proxy-inttst
app.migrate-employer.onboarding-service-grpc-host: clink-grpc.connect-evolution-inttst
app.migrate-employer.spring.cloud.task.schemaTarget: boot3
app.migrate-employer.pulsar-client-url: ******
app.migrate-employer.spring.cloud.task.tablePrefix: BOOT3_TASK_
app.migrate-employer.skip-work-schedule-check: false
app.migrate-employer.pulsar-broker-url: ******
app.migrate-employer.spring.cloud.deployer.bootVersion: 3
deployer.migrate-employer.kubernetes.podAnnotations: instrumentation.opentelemetry.io/inject-sdk: open-telemetry/default-sdk-tst
app.migrate-employer.spring.batch.jdbc.table-prefix: BOOT3_BATCH_
app.migrate-employer.continue-on-fail: false

Maybe not noteworthy, but we configure the apps and launch the CTR using SCDF-server's API.

@github-actions github-actions bot added for/team-attention For team attention and removed status/need-feedback Calling participant to provide feedback labels Dec 10, 2024
@cppwfs
Copy link
Contributor

cppwfs commented Dec 10, 2024

Hello @cbonami ,
Assuming **Arguments:** is for CTR you should not set --spring.cloud.task.schemaTarget: boot3 nor --spring.batch.jdbc.table-prefix: BOOT3_BATCH_ for the CTR. Since it is a Boot2 Application.

CTR is a Boot 2.x application that sends requests to SCDF to launch both Boot2 and Boot3 apps.

@github-actions github-actions bot removed the for/team-attention For team attention label Dec 10, 2024
@cppwfs cppwfs added the status/need-feedback Calling participant to provide feedback label Dec 10, 2024
@cbonami
Copy link
Author

cbonami commented Dec 10, 2024

Agreed. But here's the thing: we don't add these arguments. They are added automatically somehow.
We register the apps and create the CT-task using the SCDF-server's API.
We also start the CT-task using the SCDF-server's API.

Now, isn't this maybe related to naming? :

  • we have a CT called 'migrate-employer'; the DSL/definition is "init && migrate-employer". In the screenshots below, you will see that in fact there's more: "init && migrate-employer && onboard-ce && notify". The last 2 tasks are Spring Boot 2 tasks. But, if I remember well, we also tested with only "init && migrate-employer".
  • we have an app called "init" that is registered as a "bootVersion: 3"
  • we have an app called "migrate-employer" (same name as the CT) that is registered as a "bootVersion: 3"

In 'Applications':

image

In 'Tasks':

image

Here's the code that we use to start the CT 'migrate-employer':

...
import org.springframework.cloud.dataflow.rest.client.DataFlowOperations;
import org.springframework.cloud.dataflow.rest.client.dsl.DeploymentPropertiesBuilder;
import org.springframework.cloud.dataflow.rest.client.dsl.task.Task;
import org.springframework.cloud.dataflow.rest.resource.LaunchResponseResource;
...
    @Override
    public String startMigrateEmployer(String initiator, List<MigrateEmployerData> employers, @NotNull List<TaskPhase> includePhases, @NotNull List<TaskPhase> excludePhases) {
        final ComposedTask task = ComposedTask.MIGRATE_EMPLOYER;

        final Task scdfTask = Task.builder(dataFlowOperations).findByName(task.getName()).orElseThrow(() -> new IllegalArgumentException("Task [%s] not installed in SCDF Server".formatted(task.getName())));

        final DeploymentPropertiesBuilder deploymentPropertiesBuilder = new DeploymentPropertiesBuilder();
        final ScdfArgumentBuilder argumentBuilder = new ScdfArgumentBuilder();
        addGlobalProperties(deploymentPropertiesBuilder, task);
        addInitiator(deploymentPropertiesBuilder, initiator);
        addMigrateEmployerData(deploymentPropertiesBuilder, employers);

        DeterminePhasesForComposedTask.execute(task, includePhases, excludePhases)
                .forEach((key, value) -> {
                    addIncludedPhases(argumentBuilder, key, value.getFirst());
                    addExcludedPhases(argumentBuilder, key, value.getSecond());
                });
        addCustomAppProperties(deploymentPropertiesBuilder, task);
        addK8sLabels(deploymentPropertiesBuilder, task);
        addK8sAnnotations(deploymentPropertiesBuilder, task);

        Map<String, String> deploymentProperties = deploymentPropertiesBuilder.build();
        List<String> arguments = argumentBuilder.build();

        // If you omit or leave properties empty in subsequent runs, scdf will reuse the property and its filled in
        // value from the previous run. For include/exclude phases we don't want that behavior because its value
        // could be empty. To avoid the -transfer properties from last run behavior-, we use arguments.
        // Arguments always start fresh and scdf won't look at arguments from previous runs.
        final LaunchResponseResource launchResponse = scdfTask.launch(deploymentProperties, arguments);
        final long executionId = launchResponse.getExecutionId();

        return ComposedTaskId.from(task, executionId).getId();
    }

    private void addGlobalProperties(final DeploymentPropertiesBuilder builder, final ComposedTask task) {
        final Map<String, String> environmentVariables = new HashMap<>();

        environmentVariables.put(COMPOSED_TASK_NAME_ENVIRONMENT_VARIABLE, task.getName());
        if (scdfProperties.monitor().enabled()) {
            environmentVariables.put("MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_ENABLED", "true");
            final ScdfProperties.monitor monitor = scdfProperties.monitor();
            if (StringUtils.isNotBlank(monitor.prometheusRSocketHost())) {
                environmentVariables.put("MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_HOST", monitor.prometheusRSocketHost());
            }
            if (Objects.nonNull(monitor.prometheusRSocketPort())) {
                environmentVariables.put("MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_PORT", monitor.prometheusRSocketPort().toString());
            }
        }

        builder.put(
                "deployer.*.kubernetes.environment-variables",
                environmentVariables.entrySet().stream()
                        .map(entry -> entry.getKey() + "=" + entry.getValue())
                        .collect(Collectors.joining(","))
        );
    }

    private void addInitiator(final DeploymentPropertiesBuilder builder, String initiator) {
        builder.put("app.%s.%s.initiator".formatted(INIT.getName(), INIT.getName()), initiator);
    }

    private void addMigrateEmployerData(final DeploymentPropertiesBuilder builder, final List<MigrateEmployerData> employers) {
        builder.put(
                "app.%s.%s.migrate-employer-data".
                        formatted(
                                INIT.getName(),
                                INIT.getName()
                        ),
                employers.stream()
                        .map(jsonSerializer::serialize)
                        .collect(Collectors.joining(";"))
        );
    }

    private void addCustomAppProperties(DeploymentPropertiesBuilder builder, ComposedTask composedTask) {
        composedTask.getSubTasks().forEach(task -> {
            final Map<String, String> taskProperties = scdfProperties.tasks().getOrDefault(task.getName(), Map.of());

            final String secrets = taskProperties
                    .entrySet()
                    .stream()
                    .filter(e -> Objects.nonNull(e.getValue()) && e.getValue().contains("secretName"))
                    .map(e -> {
                        final ScdfSecret scdfSecret = jsonSerializer.parse(e.getValue(), ScdfSecret.class);
                        return jsonSerializer.serialize(
                                new ScdfSecret(
                                        "%s.%s".formatted(task.name(), e.getKey().toUpperCase()),
                                        scdfSecret.secretName(),
                                        scdfSecret.dataKey()
                                )
                        );
                    })
                    .collect(Collectors.joining(","));

            builder.put("deployer.%s.kubernetes.secretKeyRefs".formatted(task.getName()), "[%s]".formatted(secrets));

            taskProperties
                    .entrySet()
                    .stream()
                    .filter(entry -> Objects.nonNull(entry.getValue()) && !entry.getValue().contains("secretName"))
                    .forEach(entry -> builder.put("app.%s.%s.%s".formatted(task.getName(), task.getName(), entry.getKey()), entry.getValue()));
        });
    }

and

public class ScdfArgumentBuilder {

    private final Map<Task, List<Pair<String, String>>> argumentsByTask = new EnumMap<>(Task.class);

    ScdfArgumentBuilder() {
    }

    ScdfArgumentBuilder add(final Task task, final String key, final String value) {
        final List<Pair<String, String>> arguments = argumentsByTask.computeIfAbsent(task, t -> new ArrayList<>());
        arguments.add(Pair.of(key, value));
        return this;
    }

    List<String> build() {
        List<String> list = argumentsByTask.entrySet().stream()
                .map(taskMapEntry -> {
                    final Task task = taskMapEntry.getKey();
                    return IntStream.range(0, taskMapEntry.getValue().size())
                            .mapToObj(i -> {
                                final Pair<String, String> argument = taskMapEntry.getValue().get(i);
                                return "app.%s.%s=--%s=%s".formatted(task.getName(), i, argument.getKey(), argument.getValue());
                            })
                            .toList();
                })
                .flatMap(List::stream)
                .toList();
        List<String> list2 = new ArrayList<>(list);
        // hack: since Boot 3 subtasks are used, we need to add a unique identifier to the arguments
        list2.add("unique=" + UUID.randomUUID());
        return list2;
    }
}

@github-actions github-actions bot added for/team-attention For team attention and removed status/need-feedback Calling participant to provide feedback labels Dec 10, 2024
@cppwfs
Copy link
Contributor

cppwfs commented Jan 2, 2025

Are you deleting the previous task-executions of your CTR before launching the new one?

@cppwfs cppwfs added the status/need-feedback Calling participant to provide feedback label Jan 2, 2025
@github-actions github-actions bot removed the for/team-attention For team attention label Jan 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-feedback Calling participant to provide feedback
Projects
None yet
Development

No branches or pull requests

3 participants