diff --git a/dataline-config/init/src/main/java/io/dataline/config/init/DatalineConfigInitConstants.java b/dataline-config/init/src/main/java/io/dataline/config/init/DatalineConfigInitConstants.java deleted file mode 100644 index 5a465ccf23a5..000000000000 --- a/dataline-config/init/src/main/java/io/dataline/config/init/DatalineConfigInitConstants.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Dataline - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.dataline.config.init; - -public class DatalineConfigInitConstants { - - public static final String PLACEHOLDER = ""; - -} diff --git a/dataline-config/persistence/src/main/java/io/dataline/config/persistence/ConfigPersistence.java b/dataline-config/persistence/src/main/java/io/dataline/config/persistence/ConfigPersistence.java index d1701fbf32f0..dc1a0e401dff 100644 --- a/dataline-config/persistence/src/main/java/io/dataline/config/persistence/ConfigPersistence.java +++ b/dataline-config/persistence/src/main/java/io/dataline/config/persistence/ConfigPersistence.java @@ -37,7 +37,7 @@ T getConfig(ConfigSchema configType, List listConfigs(ConfigSchema configType, Class clazz) - throws JsonValidationException, IOException, ConfigNotFoundException; + throws ConfigNotFoundException, JsonValidationException, IOException; void writeConfig(ConfigSchema configType, String configId, diff --git a/dataline-config/persistence/src/main/java/io/dataline/config/persistence/ConfigRepository.java b/dataline-config/persistence/src/main/java/io/dataline/config/persistence/ConfigRepository.java new file mode 100644 index 000000000000..b0979238d00e --- /dev/null +++ b/dataline-config/persistence/src/main/java/io/dataline/config/persistence/ConfigRepository.java @@ -0,0 +1,207 @@ +/* + * MIT License + * + * Copyright (c) 2020 Dataline + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.dataline.config.persistence; + +import io.dataline.config.ConfigSchema; +import io.dataline.config.DestinationConnectionImplementation; +import io.dataline.config.DestinationConnectionSpecification; +import io.dataline.config.SourceConnectionImplementation; +import io.dataline.config.SourceConnectionSpecification; +import io.dataline.config.StandardDestination; +import io.dataline.config.StandardSource; +import io.dataline.config.StandardSync; +import io.dataline.config.StandardSyncSchedule; +import io.dataline.config.StandardWorkspace; +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConfigRepository { + + private final static Logger LOGGER = LoggerFactory.getLogger(ConfigRepository.class); + + private final ConfigPersistence persistence; + + public ConfigRepository(ConfigPersistence persistence) { + this.persistence = persistence; + } + + public StandardWorkspace getStandardWorkspace(final UUID workspaceId) + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.getConfig( + ConfigSchema.STANDARD_WORKSPACE, + workspaceId.toString(), + StandardWorkspace.class); + } + + public void writeStandardWorkspace(final StandardWorkspace workspace) + throws JsonValidationException, IOException { + persistence.writeConfig( + ConfigSchema.STANDARD_WORKSPACE, + workspace.getWorkspaceId().toString(), + workspace); + } + + public StandardSource getStandardSource(final UUID sourceId) + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.getConfig( + ConfigSchema.STANDARD_SOURCE, sourceId.toString(), StandardSource.class); + } + + public List listStandardSources() + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.listConfigs(ConfigSchema.STANDARD_SOURCE, StandardSource.class); + } + + public StandardDestination getStandardDestination(final UUID destinationId) + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.getConfig( + ConfigSchema.STANDARD_DESTINATION, + destinationId.toString(), + StandardDestination.class); + } + + public List listStandardDestinations() + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.listConfigs(ConfigSchema.STANDARD_DESTINATION, StandardDestination.class); + } + + public SourceConnectionSpecification getSourceConnectionSpecification(final UUID sourceSpecificationId) + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.getConfig( + ConfigSchema.SOURCE_CONNECTION_SPECIFICATION, + sourceSpecificationId.toString(), + SourceConnectionSpecification.class); + } + + public List listSourceConnectionSpecifications() + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.listConfigs( + ConfigSchema.SOURCE_CONNECTION_SPECIFICATION, + SourceConnectionSpecification.class); + } + + public DestinationConnectionSpecification getDestinationConnectionSpecification(final UUID destinationSpecificationId) + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.getConfig( + ConfigSchema.DESTINATION_CONNECTION_SPECIFICATION, + destinationSpecificationId.toString(), + DestinationConnectionSpecification.class); + } + + public List listDestinationConnectionSpecifications() + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.listConfigs( + ConfigSchema.DESTINATION_CONNECTION_SPECIFICATION, + DestinationConnectionSpecification.class); + } + + public SourceConnectionImplementation getSourceConnectionImplementation(final UUID sourceImplementationId) + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.getConfig( + ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, + sourceImplementationId.toString(), + SourceConnectionImplementation.class); + } + + public void writeSourceConnectionImplementation(final SourceConnectionImplementation sourceImplementation) + throws JsonValidationException, IOException { + persistence.writeConfig( + ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, + sourceImplementation.getSourceImplementationId().toString(), + sourceImplementation); + } + + public List listSourceConnectionImplementations() + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.listConfigs( + ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, + SourceConnectionImplementation.class); + } + + public DestinationConnectionImplementation getDestinationConnectionImplementation(final UUID destinationImplementationId) + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.getConfig( + ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, + destinationImplementationId.toString(), + DestinationConnectionImplementation.class); + } + + public void writeDestinationConnectionImplementation(DestinationConnectionImplementation destinationConnectionImplementation) + throws JsonValidationException, IOException { + persistence.writeConfig( + ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, + destinationConnectionImplementation.getDestinationImplementationId().toString(), + destinationConnectionImplementation); + } + + public List listDestinationConnectionImplementations() + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.listConfigs( + ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, + DestinationConnectionImplementation.class); + } + + public StandardSync getStandardSync(final UUID connectionId) + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.getConfig( + ConfigSchema.STANDARD_SYNC, + connectionId.toString(), + StandardSync.class); + } + + public void writeStandardSync(final StandardSync standardSync) + throws JsonValidationException, IOException { + persistence.writeConfig( + ConfigSchema.STANDARD_SYNC, + standardSync.getConnectionId().toString(), + standardSync); + } + + public List listStandardSyncs() + throws ConfigNotFoundException, IOException, JsonValidationException { + return persistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class); + } + + public StandardSyncSchedule getStandardSyncSchedule(final UUID connectionId) + throws JsonValidationException, IOException, ConfigNotFoundException { + return persistence.getConfig( + ConfigSchema.STANDARD_SYNC_SCHEDULE, + connectionId.toString(), + StandardSyncSchedule.class); + } + + public void writeStandardSchedule(final StandardSyncSchedule schedule) + throws JsonValidationException, IOException { + // todo (cgardens) - stored on sync id (there is no schedule id concept). this is non-intuitive. + persistence.writeConfig( + ConfigSchema.STANDARD_SYNC_SCHEDULE, + schedule.getConnectionId().toString(), + schedule); + } + +} diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/ConfigFetchers.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/ConfigFetchers.java deleted file mode 100644 index 1af50a8ac585..000000000000 --- a/dataline-scheduler/src/main/java/io/dataline/scheduler/ConfigFetchers.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Dataline - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.dataline.scheduler; - -import io.dataline.config.ConfigSchema; -import io.dataline.config.DestinationConnectionImplementation; -import io.dataline.config.SourceConnectionImplementation; -import io.dataline.config.StandardSync; -import io.dataline.config.StandardSyncSchedule; -import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; -import io.dataline.config.persistence.JsonValidationException; -import java.io.IOException; -import java.util.List; -import java.util.UUID; - -// todo (cgardens) - deduplicate this with the ConfigFetchers in dataline-server. requires creating -// a class that takes in an exception provider. also requires figuring out the dependency DAG to -// avoid circular dependency issues. -/** - * These helpers catch exceptions thrown in the config persistence and throws them as - * RuntimeExceptions - */ -public class ConfigFetchers { - - public static SourceConnectionImplementation getSourceConnectionImplementation(ConfigPersistence configPersistence, - UUID sourceImplementationId) { - try { - return configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceImplementationId.toString(), - SourceConnectionImplementation.class); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (JsonValidationException e) { - throw getJsonInvalidException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static DestinationConnectionImplementation getDestinationConnectionImplementation(ConfigPersistence configPersistence, - UUID destinationImplementationId) { - try { - return configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - destinationImplementationId.toString(), - DestinationConnectionImplementation.class); - } catch (JsonValidationException e) { - throw getJsonInvalidException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static StandardSync getStandardSync(ConfigPersistence configPersistence, - UUID connectionId) { - try { - return configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC, - connectionId.toString(), - StandardSync.class); - } catch (JsonValidationException e) { - throw getJsonInvalidException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static List getStandardSyncs(ConfigPersistence configPersistence) { - try { - return configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (JsonValidationException e) { - throw getJsonInvalidException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static StandardSyncSchedule getStandardSyncSchedule(ConfigPersistence configPersistence, - UUID connectionId) { - try { - return configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC_SCHEDULE, - connectionId.toString(), - StandardSyncSchedule.class); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (JsonValidationException e) { - throw getJsonInvalidException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private static RuntimeException getConfigNotFoundException(ConfigNotFoundException e) { - return new RuntimeException( - String.format("Could not find sync configuration for %s: %s.", e.getType().toString(), e.getConfigId()), e); - } - - private static RuntimeException getJsonInvalidException(Throwable e) { - return new RuntimeException( - String.format( - "The provided configuration does not fulfill the specification. Errors: %s", - e.getMessage()), - e); - } - -} diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/JobScheduler.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/JobScheduler.java index 9248d5a745ba..5775e0f3f919 100644 --- a/dataline-scheduler/src/main/java/io/dataline/scheduler/JobScheduler.java +++ b/dataline-scheduler/src/main/java/io/dataline/scheduler/JobScheduler.java @@ -27,7 +27,9 @@ import com.google.common.annotations.VisibleForTesting; import io.dataline.config.StandardSync; import io.dataline.config.StandardSyncSchedule; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; +import io.dataline.config.persistence.JsonValidationException; import io.dataline.scheduler.job_factory.DefaultSyncJobFactory; import io.dataline.scheduler.job_factory.SyncJobFactory; import io.dataline.scheduler.persistence.SchedulerPersistence; @@ -44,27 +46,28 @@ public class JobScheduler implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(JobScheduler.class); private final SchedulerPersistence schedulerPersistence; - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; private final BiPredicate, StandardSyncSchedule> scheduleJobPredicate; private final SyncJobFactory jobFactory; @VisibleForTesting - JobScheduler(SchedulerPersistence schedulerPersistence, - ConfigPersistence configPersistence, - BiPredicate, StandardSyncSchedule> scheduleJobPredicate, - SyncJobFactory jobFactory) { + JobScheduler(final SchedulerPersistence schedulerPersistence, + final ConfigRepository configRepository, + final BiPredicate, StandardSyncSchedule> scheduleJobPredicate, + final SyncJobFactory jobFactory) { this.schedulerPersistence = schedulerPersistence; - this.configPersistence = configPersistence; + this.configRepository = configRepository; this.scheduleJobPredicate = scheduleJobPredicate; this.jobFactory = jobFactory; } - public JobScheduler(SchedulerPersistence schedulerPersistence, ConfigPersistence configPersistence) { + public JobScheduler(final SchedulerPersistence schedulerPersistence, + final ConfigRepository configRepository) { this( schedulerPersistence, - configPersistence, + configRepository, new ScheduleJobPredicate(Instant::now), - new DefaultSyncJobFactory(schedulerPersistence, configPersistence)); + new DefaultSyncJobFactory(schedulerPersistence, configRepository)); } @Override @@ -82,7 +85,7 @@ public void run() { private void scheduleSyncJobs() throws IOException { for (StandardSync connection : getAllActiveConnections()) { Optional previousJobOptional = schedulerPersistence.getLastSyncJob(connection.getConnectionId()); - final StandardSyncSchedule standardSyncSchedule = ConfigFetchers.getStandardSyncSchedule(configPersistence, connection.getConnectionId()); + final StandardSyncSchedule standardSyncSchedule = getStandardSyncSchedule(connection); if (scheduleJobPredicate.test(previousJobOptional, standardSyncSchedule)) { jobFactory.create(connection.getConnectionId()); @@ -90,8 +93,20 @@ private void scheduleSyncJobs() throws IOException { } } + private StandardSyncSchedule getStandardSyncSchedule(StandardSync connection) { + try { + return configRepository.getStandardSyncSchedule(connection.getConnectionId()); + } catch (JsonValidationException | IOException | ConfigNotFoundException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + private List getAllActiveConnections() { - return ConfigFetchers.getStandardSyncs(configPersistence); + try { + return configRepository.listStandardSyncs(); + } catch (JsonValidationException | IOException | ConfigNotFoundException e) { + throw new RuntimeException(e.getMessage(), e); + } } } diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/SchedulerApp.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/SchedulerApp.java index 7c3a180c187e..f9fdc3535bb9 100644 --- a/dataline-scheduler/src/main/java/io/dataline/scheduler/SchedulerApp.java +++ b/dataline-scheduler/src/main/java/io/dataline/scheduler/SchedulerApp.java @@ -28,6 +28,7 @@ import io.dataline.config.Configs; import io.dataline.config.EnvConfigs; import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.DefaultConfigPersistence; import io.dataline.db.DatabaseHelper; import io.dataline.scheduler.persistence.DefaultSchedulerPersistence; @@ -82,7 +83,7 @@ public void start() { final WorkerRunFactory workerRunFactory = new WorkerRunFactory(workspaceRoot, pbf); final JobSubmitter jobSubmitter = new JobSubmitter(workerThreadPool, schedulerPersistence, workerRunFactory); - final JobScheduler jobScheduler = new JobScheduler(schedulerPersistence, configPersistence); + final JobScheduler jobScheduler = new JobScheduler(schedulerPersistence, new ConfigRepository(configPersistence)); scheduledPool.scheduleWithFixedDelay( () -> { diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/job_factory/DefaultSyncJobFactory.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/job_factory/DefaultSyncJobFactory.java index 5b994a92a6ee..56bf3366b558 100644 --- a/dataline-scheduler/src/main/java/io/dataline/scheduler/job_factory/DefaultSyncJobFactory.java +++ b/dataline-scheduler/src/main/java/io/dataline/scheduler/job_factory/DefaultSyncJobFactory.java @@ -24,11 +24,10 @@ package io.dataline.scheduler.job_factory; -import io.dataline.config.DestinationConnectionImplementation; -import io.dataline.config.SourceConnectionImplementation; import io.dataline.config.StandardSync; -import io.dataline.config.persistence.ConfigPersistence; -import io.dataline.scheduler.ConfigFetchers; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; +import io.dataline.config.persistence.JsonValidationException; import io.dataline.scheduler.persistence.SchedulerPersistence; import java.io.IOException; import java.util.UUID; @@ -36,25 +35,24 @@ public class DefaultSyncJobFactory implements SyncJobFactory { private final SchedulerPersistence schedulerPersistence; - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; - public DefaultSyncJobFactory(SchedulerPersistence schedulerPersistence, ConfigPersistence configPersistence) { + public DefaultSyncJobFactory(final SchedulerPersistence schedulerPersistence, + final ConfigRepository configRepository) { this.schedulerPersistence = schedulerPersistence; - this.configPersistence = configPersistence; + this.configRepository = configRepository; } - public Long create(UUID connectionId) { - final StandardSync standardSync = ConfigFetchers.getStandardSync(configPersistence, connectionId); - - final SourceConnectionImplementation sourceConnectionImplementation = - ConfigFetchers.getSourceConnectionImplementation(configPersistence, standardSync.getSourceImplementationId()); - final DestinationConnectionImplementation destinationConnectionImplementation = - ConfigFetchers.getDestinationConnectionImplementation(configPersistence, standardSync.getDestinationImplementationId()); - + public Long create(final UUID connectionId) { try { - return schedulerPersistence.createSyncJob(sourceConnectionImplementation, destinationConnectionImplementation, standardSync); - } catch (IOException e) { + final StandardSync standardSync = configRepository.getStandardSync(connectionId); + + return schedulerPersistence.createSyncJob( + configRepository.getSourceConnectionImplementation(standardSync.getSourceImplementationId()), + configRepository.getDestinationConnectionImplementation(standardSync.getDestinationImplementationId()), + standardSync); + } catch (IOException | JsonValidationException | ConfigNotFoundException e) { throw new RuntimeException(e); } } diff --git a/dataline-scheduler/src/test/java/io/dataline/scheduler/JobSchedulerTest.java b/dataline-scheduler/src/test/java/io/dataline/scheduler/JobSchedulerTest.java index 9d6c16d9c769..a3106edb49d4 100644 --- a/dataline-scheduler/src/test/java/io/dataline/scheduler/JobSchedulerTest.java +++ b/dataline-scheduler/src/test/java/io/dataline/scheduler/JobSchedulerTest.java @@ -34,7 +34,6 @@ import com.google.common.collect.Lists; import io.dataline.commons.json.Jsons; import io.dataline.config.Column; -import io.dataline.config.ConfigSchema; import io.dataline.config.DataType; import io.dataline.config.DestinationConnectionImplementation; import io.dataline.config.Schema; @@ -43,7 +42,7 @@ import io.dataline.config.StandardSyncSchedule; import io.dataline.config.Table; import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; import io.dataline.integrations.Integrations; import io.dataline.scheduler.job_factory.SyncJobFactory; @@ -129,7 +128,7 @@ class JobSchedulerTest { 1L); } - private ConfigPersistence configPersistence; + private ConfigRepository configRepository; private SchedulerPersistence schedulerPersistence; private ScheduleJobPredicate scheduleJobPredicate; private SyncJobFactory jobFactory; @@ -137,12 +136,12 @@ class JobSchedulerTest { @BeforeEach public void setup() { - configPersistence = mock(ConfigPersistence.class); + configRepository = mock(ConfigRepository.class); schedulerPersistence = mock(SchedulerPersistence.class); scheduleJobPredicate = mock(ScheduleJobPredicate.class); jobFactory = mock(SyncJobFactory.class); - scheduler = new JobScheduler(schedulerPersistence, configPersistence, scheduleJobPredicate, jobFactory); + scheduler = new JobScheduler(schedulerPersistence, configRepository, scheduleJobPredicate, jobFactory); } @Test @@ -195,22 +194,15 @@ public void testDoNotScheduleJob() throws JsonValidationException, ConfigNotFoun // sets all mocks that are related to fetching configs. these are the same for all tests in this // test suite. private void setConfigMocks() throws JsonValidationException, ConfigNotFoundException, IOException { - when(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class)) - .thenReturn(Collections.singletonList(STANDARD_SYNC)); - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC_SCHEDULE, - STANDARD_SYNC.getConnectionId().toString(), - StandardSyncSchedule.class)).thenReturn(STANDARD_SYNC_SCHEDULE); + when(configRepository.listStandardSyncs()).thenReturn(Collections.singletonList(STANDARD_SYNC)); + when(configRepository.getStandardSyncSchedule(STANDARD_SYNC.getConnectionId())).thenReturn(STANDARD_SYNC_SCHEDULE); } // verify all mocks that are related to fetching configs are called. these are the same for all // tests in this test suite. private void verifyConfigCalls() throws ConfigNotFoundException, IOException, JsonValidationException { - verify(configPersistence).listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class); - verify(configPersistence).getConfig( - ConfigSchema.STANDARD_SYNC_SCHEDULE, - STANDARD_SYNC.getConnectionId().toString(), - StandardSyncSchedule.class); + verify(configRepository).listStandardSyncs(); + verify(configRepository).getStandardSyncSchedule(STANDARD_SYNC.getConnectionId()); } } diff --git a/dataline-scheduler/src/test/java/io/dataline/scheduler/job_factory/DefaultSyncJobFactoryTest.java b/dataline-scheduler/src/test/java/io/dataline/scheduler/job_factory/DefaultSyncJobFactoryTest.java index 28f99befb6d5..bb526f9aed9d 100644 --- a/dataline-scheduler/src/test/java/io/dataline/scheduler/job_factory/DefaultSyncJobFactoryTest.java +++ b/dataline-scheduler/src/test/java/io/dataline/scheduler/job_factory/DefaultSyncJobFactoryTest.java @@ -29,12 +29,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import io.dataline.config.ConfigSchema; import io.dataline.config.DestinationConnectionImplementation; import io.dataline.config.SourceConnectionImplementation; import io.dataline.config.StandardSync; import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; import io.dataline.scheduler.persistence.SchedulerPersistence; import java.io.IOException; @@ -49,7 +48,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo final UUID sourceImplId = UUID.randomUUID(); final UUID destinationImplId = UUID.randomUUID(); final SchedulerPersistence schedulerPersistence = mock(SchedulerPersistence.class); - final ConfigPersistence configPersistence = mock(ConfigPersistence.class); + final ConfigRepository configRepository = mock(ConfigRepository.class); final long jobId = 11L; final StandardSync standardSync = new StandardSync() @@ -59,35 +58,15 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo final SourceConnectionImplementation sourceConnectionImplementation = new SourceConnectionImplementation(); final DestinationConnectionImplementation destinationConnectionImplementation = new DestinationConnectionImplementation(); - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC, - connectionId.toString(), - StandardSync.class)).thenReturn(standardSync); - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceImplId.toString(), - SourceConnectionImplementation.class)).thenReturn(sourceConnectionImplementation); - when(configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - destinationImplId.toString(), - DestinationConnectionImplementation.class)).thenReturn(destinationConnectionImplementation); + when(configRepository.getStandardSync(connectionId)).thenReturn(standardSync); + when(configRepository.getSourceConnectionImplementation(sourceImplId)).thenReturn(sourceConnectionImplementation); + when(configRepository.getDestinationConnectionImplementation(destinationImplId)).thenReturn(destinationConnectionImplementation); when(schedulerPersistence.createSyncJob(sourceConnectionImplementation, destinationConnectionImplementation, standardSync)).thenReturn(jobId); - final SyncJobFactory factory = new DefaultSyncJobFactory(schedulerPersistence, configPersistence); + final SyncJobFactory factory = new DefaultSyncJobFactory(schedulerPersistence, configRepository); final long actualJobId = factory.create(connectionId); assertEquals(jobId, actualJobId); - verify(configPersistence).getConfig(ConfigSchema.STANDARD_SYNC, connectionId.toString(), StandardSync.class); - verify(configPersistence) - .getConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceImplId.toString(), - SourceConnectionImplementation.class); - verify(configPersistence) - .getConfig( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - destinationImplId.toString(), - DestinationConnectionImplementation.class); verify(schedulerPersistence).createSyncJob(sourceConnectionImplementation, destinationConnectionImplementation, standardSync); } diff --git a/dataline-server/src/main/java/io/dataline/server/apis/ConfigurationApi.java b/dataline-server/src/main/java/io/dataline/server/apis/ConfigurationApi.java index a6046454e15a..76c42329c679 100644 --- a/dataline-server/src/main/java/io/dataline/server/apis/ConfigurationApi.java +++ b/dataline-server/src/main/java/io/dataline/server/apis/ConfigurationApi.java @@ -60,10 +60,13 @@ import io.dataline.api.model.WorkspaceIdRequestBody; import io.dataline.api.model.WorkspaceRead; import io.dataline.api.model.WorkspaceUpdate; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.DefaultConfigPersistence; +import io.dataline.config.persistence.JsonValidationException; import io.dataline.scheduler.persistence.DefaultSchedulerPersistence; import io.dataline.scheduler.persistence.SchedulerPersistence; +import io.dataline.server.errors.KnownException; import io.dataline.server.handlers.ConnectionsHandler; import io.dataline.server.handlers.DestinationImplementationsHandler; import io.dataline.server.handlers.DestinationSpecificationsHandler; @@ -76,9 +79,11 @@ import io.dataline.server.handlers.WebBackendConnectionsHandler; import io.dataline.server.handlers.WorkspacesHandler; import io.dataline.server.validation.IntegrationSchemaValidation; +import java.io.IOException; import java.nio.file.Path; import javax.validation.Valid; import org.apache.commons.dbcp2.BasicDataSource; +import org.eclipse.jetty.http.HttpStatus; @javax.ws.rs.Path("/v1") public class ConfigurationApi implements io.dataline.api.V1Api { @@ -96,18 +101,18 @@ public class ConfigurationApi implements io.dataline.api.V1Api { private final WebBackendConnectionsHandler webBackendConnectionsHandler; public ConfigurationApi(final Path dbRoot, BasicDataSource connectionPool) { - ConfigPersistence configPersistence = new DefaultConfigPersistence(dbRoot); - final IntegrationSchemaValidation integrationSchemaValidation = new IntegrationSchemaValidation(configPersistence); - workspacesHandler = new WorkspacesHandler(configPersistence); - sourcesHandler = new SourcesHandler(configPersistence); - sourceSpecificationsHandler = new SourceSpecificationsHandler(configPersistence); - connectionsHandler = new ConnectionsHandler(configPersistence); - sourceImplementationsHandler = new SourceImplementationsHandler(configPersistence, integrationSchemaValidation, connectionsHandler); - destinationsHandler = new DestinationsHandler(configPersistence); - destinationSpecificationsHandler = new DestinationSpecificationsHandler(configPersistence); - destinationImplementationsHandler = new DestinationImplementationsHandler(configPersistence, integrationSchemaValidation); + final ConfigRepository configRepository = new ConfigRepository(new DefaultConfigPersistence(dbRoot)); + final IntegrationSchemaValidation integrationSchemaValidation = new IntegrationSchemaValidation(); + workspacesHandler = new WorkspacesHandler(configRepository); + sourcesHandler = new SourcesHandler(configRepository); + sourceSpecificationsHandler = new SourceSpecificationsHandler(configRepository); + connectionsHandler = new ConnectionsHandler(configRepository); + sourceImplementationsHandler = new SourceImplementationsHandler(configRepository, integrationSchemaValidation, connectionsHandler); + destinationsHandler = new DestinationsHandler(configRepository); + destinationSpecificationsHandler = new DestinationSpecificationsHandler(configRepository); + destinationImplementationsHandler = new DestinationImplementationsHandler(configRepository, integrationSchemaValidation); final SchedulerPersistence schedulerPersistence = new DefaultSchedulerPersistence(connectionPool); - schedulerHandler = new SchedulerHandler(configPersistence, schedulerPersistence); + schedulerHandler = new SchedulerHandler(configRepository, schedulerPersistence); jobHistoryHandler = new JobHistoryHandler(schedulerPersistence); webBackendConnectionsHandler = new WebBackendConnectionsHandler(connectionsHandler, sourceImplementationsHandler, jobHistoryHandler); } @@ -116,177 +121,194 @@ public ConfigurationApi(final Path dbRoot, BasicDataSource connectionPool) { @Override public WorkspaceRead getWorkspace(@Valid WorkspaceIdRequestBody workspaceIdRequestBody) { - return workspacesHandler.getWorkspace(workspaceIdRequestBody); + return execute(() -> workspacesHandler.getWorkspace(workspaceIdRequestBody)); } @Override public WorkspaceRead getWorkspaceBySlug(@Valid SlugRequestBody slugRequestBody) { - return workspacesHandler.getWorkspaceBySlug(slugRequestBody); + return execute(() -> workspacesHandler.getWorkspaceBySlug(slugRequestBody)); } @Override public WorkspaceRead updateWorkspace(@Valid WorkspaceUpdate workspaceUpdate) { - return workspacesHandler.updateWorkspace(workspaceUpdate); + return execute(() -> workspacesHandler.updateWorkspace(workspaceUpdate)); } // SOURCE @Override public SourceReadList listSources() { - return sourcesHandler.listSources(); + return execute(sourcesHandler::listSources); } @Override public SourceRead getSource(@Valid SourceIdRequestBody sourceIdRequestBody) { - return sourcesHandler.getSource(sourceIdRequestBody); + return execute(() -> sourcesHandler.getSource(sourceIdRequestBody)); } // SOURCE SPECIFICATION @Override public SourceSpecificationRead getSourceSpecification(@Valid SourceIdRequestBody sourceIdRequestBody) { - return sourceSpecificationsHandler.getSourceSpecification(sourceIdRequestBody); + return execute(() -> sourceSpecificationsHandler.getSourceSpecification(sourceIdRequestBody)); } // SOURCE IMPLEMENTATION @Override public SourceImplementationRead createSourceImplementation(@Valid SourceImplementationCreate sourceImplementationCreate) { - return sourceImplementationsHandler.createSourceImplementation(sourceImplementationCreate); + return execute(() -> sourceImplementationsHandler.createSourceImplementation(sourceImplementationCreate)); } @Override public SourceImplementationRead updateSourceImplementation(@Valid SourceImplementationUpdate sourceImplementationUpdate) { - return sourceImplementationsHandler.updateSourceImplementation(sourceImplementationUpdate); + return execute(() -> sourceImplementationsHandler.updateSourceImplementation(sourceImplementationUpdate)); } @Override public SourceImplementationReadList listSourceImplementationsForWorkspace(@Valid WorkspaceIdRequestBody workspaceIdRequestBody) { - return sourceImplementationsHandler.listSourceImplementationsForWorkspace( - workspaceIdRequestBody); + return execute(() -> sourceImplementationsHandler.listSourceImplementationsForWorkspace(workspaceIdRequestBody)); } @Override public SourceImplementationRead getSourceImplementation(@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) { - return sourceImplementationsHandler.getSourceImplementation(sourceImplementationIdRequestBody); + return execute(() -> sourceImplementationsHandler.getSourceImplementation(sourceImplementationIdRequestBody)); } @Override public void deleteSourceImplementation(@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) { - sourceImplementationsHandler.deleteSourceImplementation(sourceImplementationIdRequestBody); + execute(() -> { + sourceImplementationsHandler.deleteSourceImplementation(sourceImplementationIdRequestBody); + return null; + }); } @Override public CheckConnectionRead checkConnectionToSourceImplementation(@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) { - return schedulerHandler.checkSourceImplementationConnection(sourceImplementationIdRequestBody); + return execute(() -> schedulerHandler.checkSourceImplementationConnection(sourceImplementationIdRequestBody)); } @Override public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementation(@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) { - return schedulerHandler.discoverSchemaForSourceImplementation( - sourceImplementationIdRequestBody); + return execute(() -> schedulerHandler.discoverSchemaForSourceImplementation(sourceImplementationIdRequestBody)); } // DESTINATION @Override public DestinationReadList listDestinations() { - return destinationsHandler.listDestinations(); + return execute(destinationsHandler::listDestinations); } @Override public DestinationRead getDestination(@Valid DestinationIdRequestBody destinationIdRequestBody) { - return destinationsHandler.getDestination(destinationIdRequestBody); + return execute(() -> destinationsHandler.getDestination(destinationIdRequestBody)); } // DESTINATION SPECIFICATION @Override public DestinationSpecificationRead getDestinationSpecification(@Valid DestinationIdRequestBody destinationIdRequestBody) { - return destinationSpecificationsHandler.getDestinationSpecification(destinationIdRequestBody); + return execute(() -> destinationSpecificationsHandler.getDestinationSpecification(destinationIdRequestBody)); } // DESTINATION IMPLEMENTATION @Override public DestinationImplementationRead createDestinationImplementation(@Valid DestinationImplementationCreate destinationImplementationCreate) { - return destinationImplementationsHandler.createDestinationImplementation( - destinationImplementationCreate); + return execute(() -> destinationImplementationsHandler.createDestinationImplementation(destinationImplementationCreate)); } @Override public DestinationImplementationRead updateDestinationImplementation(@Valid DestinationImplementationUpdate destinationImplementationUpdate) { - return destinationImplementationsHandler.updateDestinationImplementation( - destinationImplementationUpdate); + return execute(() -> destinationImplementationsHandler.updateDestinationImplementation(destinationImplementationUpdate)); } @Override public DestinationImplementationReadList listDestinationImplementationsForWorkspace(@Valid WorkspaceIdRequestBody workspaceIdRequestBody) { - return destinationImplementationsHandler.listDestinationImplementationsForWorkspace( - workspaceIdRequestBody); + return execute(() -> destinationImplementationsHandler.listDestinationImplementationsForWorkspace(workspaceIdRequestBody)); } @Override - public DestinationImplementationRead getDestinationImplementation( - @Valid DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) { - return destinationImplementationsHandler.getDestinationImplementation( - destinationImplementationIdRequestBody); + public DestinationImplementationRead getDestinationImplementation(@Valid DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) { + return execute(() -> destinationImplementationsHandler.getDestinationImplementation(destinationImplementationIdRequestBody)); } @Override public CheckConnectionRead checkConnectionToDestinationImplementation(@Valid DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) { - return schedulerHandler.checkDestinationImplementationConnection( - destinationImplementationIdRequestBody); + return execute(() -> schedulerHandler.checkDestinationImplementationConnection(destinationImplementationIdRequestBody)); } // CONNECTION @Override public ConnectionRead createConnection(@Valid ConnectionCreate connectionCreate) { - return connectionsHandler.createConnection(connectionCreate); + return execute(() -> connectionsHandler.createConnection(connectionCreate)); } @Override public ConnectionRead updateConnection(@Valid ConnectionUpdate connectionUpdate) { - return connectionsHandler.updateConnection(connectionUpdate); + return execute(() -> connectionsHandler.updateConnection(connectionUpdate)); } @Override public ConnectionReadList listConnectionsForWorkspace(@Valid WorkspaceIdRequestBody workspaceIdRequestBody) { - return connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody); + return execute(() -> connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)); } @Override public ConnectionRead getConnection(@Valid ConnectionIdRequestBody connectionIdRequestBody) { - return connectionsHandler.getConnection(connectionIdRequestBody); + return execute(() -> connectionsHandler.getConnection(connectionIdRequestBody)); } @Override public ConnectionSyncRead syncConnection(@Valid ConnectionIdRequestBody connectionIdRequestBody) { - return schedulerHandler.syncConnection(connectionIdRequestBody); + return execute(() -> schedulerHandler.syncConnection(connectionIdRequestBody)); } // JOB HISTORY @Override public JobReadList listJobsFor(@Valid JobListRequestBody jobListRequestBody) { - return jobHistoryHandler.listJobsFor(jobListRequestBody); + return execute(() -> jobHistoryHandler.listJobsFor(jobListRequestBody)); } @Override public JobInfoRead getJobInfo(@Valid JobIdRequestBody jobIdRequestBody) { - return jobHistoryHandler.getJobInfo(jobIdRequestBody); + return execute(() -> jobHistoryHandler.getJobInfo(jobIdRequestBody)); } // WEB BACKEND @Override public WbConnectionReadList webBackendListConnectionsForWorkspace(@Valid WorkspaceIdRequestBody workspaceIdRequestBody) { - return webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody); + return execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody)); } @Override public WbConnectionRead webBackendGetConnection(@Valid ConnectionIdRequestBody connectionIdRequestBody) { - return webBackendConnectionsHandler.webBackendGetConnection(connectionIdRequestBody); + return execute(() -> webBackendConnectionsHandler.webBackendGetConnection(connectionIdRequestBody)); + } + + private T execute(HandlerCall call) { + try { + return call.call(); + } catch (ConfigNotFoundException e) { + throw new KnownException( + HttpStatus.UNPROCESSABLE_ENTITY_422, + String.format("Could not find configuration for %s: %s.", e.getType().toString(), e.getConfigId()), e); + } catch (JsonValidationException e) { + throw new KnownException( + HttpStatus.UNPROCESSABLE_ENTITY_422, + String.format("The provided configuration does not fulfill the specification. Errors: %s", e.getMessage()), e); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private interface HandlerCall { + + T call() throws ConfigNotFoundException, IOException, JsonValidationException; + } } diff --git a/dataline-server/src/main/java/io/dataline/server/converters/SchemaConverter.java b/dataline-server/src/main/java/io/dataline/server/converters/SchemaConverter.java index dcf28d4bf61d..99c94e960694 100644 --- a/dataline-server/src/main/java/io/dataline/server/converters/SchemaConverter.java +++ b/dataline-server/src/main/java/io/dataline/server/converters/SchemaConverter.java @@ -37,9 +37,9 @@ public class SchemaConverter { - public static Schema toPersistenceSchema(SourceSchema api) { + public static Schema toPersistenceSchema(SourceSchema sourceSchema) { final List persistenceTables = - api.getTables().stream() + sourceSchema.getTables().stream() .map( apiTable -> { final List persistenceColumns = @@ -62,7 +62,6 @@ public static Schema toPersistenceSchema(SourceSchema api) { } public static SourceSchema toApiSchema(Schema persistenceSchema) { - final List persistenceTables = persistenceSchema.getTables().stream() .map( diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/ConnectionsHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/ConnectionsHandler.java index 7319e8632bd1..bae6de6aff23 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/ConnectionsHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/ConnectionsHandler.java @@ -24,6 +24,8 @@ package io.dataline.server.handlers; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import io.dataline.api.model.ConnectionCreate; import io.dataline.api.model.ConnectionIdRequestBody; import io.dataline.api.model.ConnectionRead; @@ -33,58 +35,59 @@ import io.dataline.api.model.ConnectionUpdate; import io.dataline.api.model.WorkspaceIdRequestBody; import io.dataline.commons.enums.Enums; -import io.dataline.config.ConfigSchema; import io.dataline.config.Schedule; import io.dataline.config.Schema; import io.dataline.config.StandardSync; import io.dataline.config.StandardSyncSchedule; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; +import io.dataline.config.persistence.JsonValidationException; import io.dataline.server.converters.SchemaConverter; -import io.dataline.server.helpers.ConfigFetchers; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.function.Supplier; -import java.util.stream.Collectors; public class ConnectionsHandler { - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; private final Supplier uuidGenerator; - public ConnectionsHandler(ConfigPersistence configPersistence, Supplier uuidGenerator) { - this.configPersistence = configPersistence; + @VisibleForTesting + ConnectionsHandler(final ConfigRepository configRepository, + final Supplier uuidGenerator) { + this.configRepository = configRepository; this.uuidGenerator = uuidGenerator; } - public ConnectionsHandler(ConfigPersistence configPersistence) { - this(configPersistence, UUID::randomUUID); + public ConnectionsHandler(final ConfigRepository configRepository) { + this(configRepository, UUID::randomUUID); } - public ConnectionRead createConnection(ConnectionCreate connectionCreate) { + public ConnectionRead createConnection(ConnectionCreate connectionCreate) + throws JsonValidationException, IOException, ConfigNotFoundException { final UUID connectionId = uuidGenerator.get(); // persist sync final StandardSync standardSync = new StandardSync() .withConnectionId(connectionId) + .withName(connectionCreate.getName() != null ? connectionCreate.getName() : "default") .withSourceImplementationId(connectionCreate.getSourceImplementationId()) .withDestinationImplementationId(connectionCreate.getDestinationImplementationId()) - // todo (cgardens): for MVP we only support append. - .withSyncMode(StandardSync.SyncMode.APPEND); + .withSyncMode(StandardSync.SyncMode.APPEND) // todo (cgardens): for MVP we only support append. + .withStatus(toPersistenceStatus(connectionCreate.getStatus())); + if (connectionCreate.getSyncSchema() != null) { standardSync.withSchema(SchemaConverter.toPersistenceSchema(connectionCreate.getSyncSchema())); } else { standardSync.withSchema(new Schema().withTables(Collections.emptyList())); } - standardSync - .withName(connectionCreate.getName() != null ? connectionCreate.getName() : "default") - .withStatus(toPersistenceStatus(connectionCreate.getStatus())); - writeStandardSync(standardSync); + configRepository.writeStandardSync(standardSync); // persist schedule - final StandardSyncSchedule standardSyncSchedule = new StandardSyncSchedule() - .withConnectionId(connectionId); + final StandardSyncSchedule standardSyncSchedule = new StandardSyncSchedule().withConnectionId(connectionId); if (connectionCreate.getSchedule() != null) { final Schedule schedule = new Schedule() .withTimeUnit(toPersistenceTimeUnit(connectionCreate.getSchedule().getTimeUnit())) @@ -96,38 +99,22 @@ public ConnectionRead createConnection(ConnectionCreate connectionCreate) { standardSyncSchedule.withManual(true); } - writeSchedule(standardSyncSchedule); + configRepository.writeStandardSchedule(standardSyncSchedule); - return getConnectionInternal(connectionId); + return buildConnectionRead(connectionId); } - private void writeStandardSync(StandardSync standardSync) { - ConfigFetchers.writeConfig( - configPersistence, - ConfigSchema.STANDARD_SYNC, - standardSync.getConnectionId().toString(), - standardSync); - } - - // todo (cgardens) - stored on sync id (there is no schedule id concept). this is non-intuitive. - private void writeSchedule(StandardSyncSchedule schedule) { - ConfigFetchers.writeConfig( - configPersistence, - ConfigSchema.STANDARD_SYNC_SCHEDULE, - schedule.getConnectionId().toString(), - schedule); - } - - public ConnectionRead updateConnection(ConnectionUpdate connectionUpdate) { + public ConnectionRead updateConnection(ConnectionUpdate connectionUpdate) + throws ConfigNotFoundException, IOException, JsonValidationException { final UUID connectionId = connectionUpdate.getConnectionId(); - // get existing sync - final StandardSync persistedSync = getStandardSync(connectionId) + // retrieve sync + final StandardSync persistedSync = configRepository.getStandardSync(connectionId) .withSchema(SchemaConverter.toPersistenceSchema(connectionUpdate.getSyncSchema())) .withStatus(toPersistenceStatus(connectionUpdate.getStatus())); - // get existing schedule - final StandardSyncSchedule persistedSchedule = getSyncSchedule(connectionId); + // retrieve schedule + final StandardSyncSchedule persistedSchedule = configRepository.getStandardSyncSchedule(connectionId); if (connectionUpdate.getSchedule() != null) { final Schedule schedule = new Schedule() .withTimeUnit(toPersistenceTimeUnit(connectionUpdate.getSchedule().getTimeUnit())) @@ -142,68 +129,53 @@ public ConnectionRead updateConnection(ConnectionUpdate connectionUpdate) { .withManual(true); } - // persist sync - writeStandardSync(persistedSync); + configRepository.writeStandardSync(persistedSync); + configRepository.writeStandardSchedule(persistedSchedule); - // persist schedule - writeSchedule(persistedSchedule); + return buildConnectionRead(connectionId); + } + + public ConnectionReadList listConnectionsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) + throws JsonValidationException, IOException, ConfigNotFoundException { + final List connectionReads = Lists.newArrayList(); + + for (StandardSync standardSync : configRepository.listStandardSyncs()) { + if (standardSync.getStatus() == StandardSync.Status.DEPRECATED) { + continue; + } + if (!isStandardSyncInWorkspace(workspaceIdRequestBody.getWorkspaceId(), standardSync)) { + continue; + } + + connectionReads.add(buildConnectionRead(standardSync.getConnectionId())); + } - return getConnectionInternal(connectionId); + return new ConnectionReadList().connections(connectionReads); } - // todo (cgardens) - this is a disaster without a relational db. - public ConnectionReadList listConnectionsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) { - - final List reads = - // read all connections. - ConfigFetchers.getStandardSyncs(configPersistence).stream() - // filter out connections attached to source implementations NOT associated with the - // workspace - .filter( - standardSync -> ConfigFetchers.getSourceConnectionImplementation( - configPersistence, standardSync.getSourceImplementationId()) - .getWorkspaceId() - .equals(workspaceIdRequestBody.getWorkspaceId())) - // filter out deprecated connections - .filter(standardSync -> !standardSync.getStatus().equals(StandardSync.Status.DEPRECATED)) - // pull the sync schedule - // convert to api format - .map( - standardSync -> { - final StandardSyncSchedule syncSchedule = - getSyncSchedule(standardSync.getConnectionId()); - return toConnectionRead(standardSync, syncSchedule); - }) - .collect(Collectors.toList()); - - final ConnectionReadList connectionReadList = new ConnectionReadList(); - connectionReadList.setConnections(reads); - return connectionReadList; + public ConnectionRead getConnection(ConnectionIdRequestBody connectionIdRequestBody) + throws JsonValidationException, IOException, ConfigNotFoundException { + return buildConnectionRead(connectionIdRequestBody.getConnectionId()); } - public ConnectionRead getConnection(ConnectionIdRequestBody connectionIdRequestBody) { - return getConnectionInternal(connectionIdRequestBody.getConnectionId()); + private boolean isStandardSyncInWorkspace(final UUID workspaceId, + final StandardSync standardSync) + throws ConfigNotFoundException, IOException, JsonValidationException { + return configRepository.getSourceConnectionImplementation(standardSync.getSourceImplementationId()).getWorkspaceId().equals(workspaceId); } - private ConnectionRead getConnectionInternal(UUID connectionId) { + private ConnectionRead buildConnectionRead(UUID connectionId) + throws ConfigNotFoundException, IOException, JsonValidationException { // read sync from db - final StandardSync standardSync = getStandardSync(connectionId); + final StandardSync standardSync = configRepository.getStandardSync(connectionId); // read schedule from db - final StandardSyncSchedule standardSyncSchedule = getSyncSchedule(connectionId); - return toConnectionRead(standardSync, standardSyncSchedule); - } - - private StandardSync getStandardSync(UUID connectionId) { - return ConfigFetchers.getStandardSync(configPersistence, connectionId); - } - - private StandardSyncSchedule getSyncSchedule(UUID connectionId) { - return ConfigFetchers.getStandardSyncSchedule(configPersistence, connectionId); + final StandardSyncSchedule standardSyncSchedule = configRepository.getStandardSyncSchedule(connectionId); + return buildConnectionRead(standardSync, standardSyncSchedule); } - private ConnectionRead toConnectionRead(StandardSync standardSync, - StandardSyncSchedule standardSyncSchedule) { + private ConnectionRead buildConnectionRead(final StandardSync standardSync, + final StandardSyncSchedule standardSyncSchedule) { ConnectionSchedule apiSchedule = null; if (!standardSyncSchedule.getManual()) { diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/DestinationImplementationsHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/DestinationImplementationsHandler.java index af808bb05aa3..27bd07574bab 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/DestinationImplementationsHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/DestinationImplementationsHandler.java @@ -25,45 +25,46 @@ package io.dataline.server.handlers; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; import io.dataline.api.model.DestinationImplementationCreate; import io.dataline.api.model.DestinationImplementationIdRequestBody; import io.dataline.api.model.DestinationImplementationRead; import io.dataline.api.model.DestinationImplementationReadList; import io.dataline.api.model.DestinationImplementationUpdate; import io.dataline.api.model.WorkspaceIdRequestBody; -import io.dataline.config.ConfigSchema; import io.dataline.config.DestinationConnectionImplementation; +import io.dataline.config.DestinationConnectionSpecification; import io.dataline.config.StandardDestination; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; -import io.dataline.server.errors.KnownException; -import io.dataline.server.helpers.ConfigFetchers; import io.dataline.server.validation.IntegrationSchemaValidation; +import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.function.Supplier; -import java.util.stream.Collectors; public class DestinationImplementationsHandler { private final Supplier uuidGenerator; - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; private final IntegrationSchemaValidation validator; - public DestinationImplementationsHandler(ConfigPersistence configPersistence, - IntegrationSchemaValidation integrationSchemaValidation, - Supplier uuidGenerator) { - this.configPersistence = configPersistence; + public DestinationImplementationsHandler(final ConfigRepository configRepository, + final IntegrationSchemaValidation integrationSchemaValidation, + final Supplier uuidGenerator) { + this.configRepository = configRepository; this.validator = integrationSchemaValidation; this.uuidGenerator = uuidGenerator; } - public DestinationImplementationsHandler(ConfigPersistence configPersistence, - IntegrationSchemaValidation integrationSchemaValidation) { - this(configPersistence, integrationSchemaValidation, UUID::randomUUID); + public DestinationImplementationsHandler(final ConfigRepository configRepository, + final IntegrationSchemaValidation integrationSchemaValidation) { + this(configRepository, integrationSchemaValidation, UUID::randomUUID); } - public DestinationImplementationRead createDestinationImplementation(DestinationImplementationCreate destinationImplementationCreate) { + public DestinationImplementationRead createDestinationImplementation(final DestinationImplementationCreate destinationImplementationCreate) + throws ConfigNotFoundException, IOException, JsonValidationException { // validate configuration validateDestinationImplementation( destinationImplementationCreate.getDestinationSpecificationId(), @@ -79,105 +80,65 @@ public DestinationImplementationRead createDestinationImplementation(Destination destinationImplementationCreate.getConnectionConfiguration()); // read configuration from db - return getDestinationImplementationInternal(destinationImplementationId); + return buildDestinationImplementationRead(destinationImplementationId); } - public DestinationImplementationRead updateDestinationImplementation(DestinationImplementationUpdate destinationImplementationUpdate) { + public DestinationImplementationRead updateDestinationImplementation(final DestinationImplementationUpdate destinationImplementationUpdate) + throws ConfigNotFoundException, IOException, JsonValidationException { // get existing implementation - final DestinationImplementationRead persistedDestinationImplementation = - getDestinationImplementationInternal( - destinationImplementationUpdate.getDestinationImplementationId()); + final DestinationConnectionImplementation dci = + configRepository.getDestinationConnectionImplementation(destinationImplementationUpdate.getDestinationImplementationId()); // validate configuration validateDestinationImplementation( - persistedDestinationImplementation.getDestinationSpecificationId(), + dci.getDestinationSpecificationId(), destinationImplementationUpdate.getConnectionConfiguration()); // persist persistDestinationConnectionImplementation( destinationImplementationUpdate.getName(), - persistedDestinationImplementation.getDestinationSpecificationId(), - persistedDestinationImplementation.getWorkspaceId(), + dci.getDestinationSpecificationId(), + dci.getWorkspaceId(), destinationImplementationUpdate.getDestinationImplementationId(), destinationImplementationUpdate.getConnectionConfiguration()); // read configuration from db - return getDestinationImplementationInternal( - destinationImplementationUpdate.getDestinationImplementationId()); + return buildDestinationImplementationRead(destinationImplementationUpdate.getDestinationImplementationId()); } - public DestinationImplementationRead getDestinationImplementation(DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) { - - return getDestinationImplementationInternal( - destinationImplementationIdRequestBody.getDestinationImplementationId()); + public DestinationImplementationRead getDestinationImplementation(DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) + throws JsonValidationException, IOException, ConfigNotFoundException { + return buildDestinationImplementationRead(destinationImplementationIdRequestBody.getDestinationImplementationId()); } - public DestinationImplementationReadList listDestinationImplementationsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) { - final List reads = - ConfigFetchers.getDestinationConnectionImplementations(configPersistence).stream() - .filter( - destinationConnectionImplementation -> destinationConnectionImplementation - .getWorkspaceId() - .equals(workspaceIdRequestBody.getWorkspaceId())) - .map( - destinationConnectionImplementation -> { - final UUID destinationId = - ConfigFetchers.getDestinationConnectionSpecification( - configPersistence, - destinationConnectionImplementation.getDestinationSpecificationId()) - .getDestinationId(); - final StandardDestination standardDestination = - ConfigFetchers.getStandardDestination( - configPersistence, - destinationId); - return toDestinationImplementationRead( - destinationConnectionImplementation, standardDestination); - }) - .collect(Collectors.toList()); - - final DestinationImplementationReadList destinationImplementationReadList = - new DestinationImplementationReadList(); - destinationImplementationReadList.setDestinations(reads); - return destinationImplementationReadList; - } + public DestinationImplementationReadList listDestinationImplementationsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { + final List reads = Lists.newArrayList(); - private DestinationImplementationRead getDestinationImplementationInternal(UUID destinationImplementationId) { - // read configuration from db - final DestinationConnectionImplementation retrievedDestinationConnectionImplementation; - retrievedDestinationConnectionImplementation = - ConfigFetchers.getDestinationConnectionImplementation( - configPersistence, destinationImplementationId); - - final UUID destinationId = - ConfigFetchers.getDestinationConnectionSpecification( - configPersistence, - retrievedDestinationConnectionImplementation.getDestinationSpecificationId()) - .getDestinationId(); - final StandardDestination standardDestination = - ConfigFetchers.getStandardDestination( - configPersistence, - destinationId); - return toDestinationImplementationRead( - retrievedDestinationConnectionImplementation, standardDestination); - } + for (DestinationConnectionImplementation dci : configRepository.listDestinationConnectionImplementations()) { + if (!dci.getWorkspaceId().equals(workspaceIdRequestBody.getWorkspaceId())) { + continue; + } - private void validateDestinationImplementation(UUID destinationConnectionSpecificationId, JsonNode implementationJson) { - try { - validator.validateDestinationConnectionConfiguration(destinationConnectionSpecificationId, implementationJson); - } catch (JsonValidationException e) { - throw new KnownException( - 422, - String.format( - "The provided configuration does not fulfill the specification. Errors: %s", - e.getMessage())); + reads.add(buildDestinationImplementationRead(dci.getDestinationImplementationId())); } + + return new DestinationImplementationReadList().destinations(reads); } - private void persistDestinationConnectionImplementation(String name, - UUID destinationSpecificationId, - UUID workspaceId, - UUID destinationImplementationId, - JsonNode configurationJson) { + private void validateDestinationImplementation(final UUID destinationConnectionSpecificationId, + final JsonNode implementationJson) + throws JsonValidationException, IOException, ConfigNotFoundException { + DestinationConnectionSpecification dcs = configRepository.getDestinationConnectionSpecification(destinationConnectionSpecificationId); + validator.validateConfig(dcs, implementationJson); + } + + private void persistDestinationConnectionImplementation(final String name, + final UUID destinationSpecificationId, + final UUID workspaceId, + final UUID destinationImplementationId, + final JsonNode configurationJson) + throws JsonValidationException, IOException { final DestinationConnectionImplementation destinationConnectionImplementation = new DestinationConnectionImplementation() .withName(name) .withDestinationSpecificationId(destinationSpecificationId) @@ -185,16 +146,21 @@ private void persistDestinationConnectionImplementation(String name, .withDestinationImplementationId(destinationImplementationId) .withConfiguration(configurationJson); - ConfigFetchers.writeConfig( - configPersistence, - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - destinationImplementationId.toString(), - destinationConnectionImplementation); + configRepository.writeDestinationConnectionImplementation(destinationConnectionImplementation); } - private DestinationImplementationRead toDestinationImplementationRead(DestinationConnectionImplementation destinationConnectionImplementation, - StandardDestination standardDestination) { + private DestinationImplementationRead buildDestinationImplementationRead(final UUID destinationImplementationId) + throws ConfigNotFoundException, IOException, JsonValidationException { + // read configuration from db + final DestinationConnectionImplementation dci = configRepository.getDestinationConnectionImplementation(destinationImplementationId); + + final UUID destinationId = configRepository.getDestinationConnectionSpecification(dci.getDestinationSpecificationId()).getDestinationId(); + final StandardDestination standardDestination = configRepository.getStandardDestination(destinationId); + return buildDestinationImplementationRead(dci, standardDestination); + } + private DestinationImplementationRead buildDestinationImplementationRead(final DestinationConnectionImplementation destinationConnectionImplementation, + final StandardDestination standardDestination) { return new DestinationImplementationRead() .destinationId(standardDestination.getDestinationId()) .destinationImplementationId(destinationConnectionImplementation.getDestinationImplementationId()) diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/DestinationSpecificationsHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/DestinationSpecificationsHandler.java index 3f42e074dd5e..00370817d38e 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/DestinationSpecificationsHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/DestinationSpecificationsHandler.java @@ -27,49 +27,44 @@ import io.dataline.api.model.DestinationIdRequestBody; import io.dataline.api.model.DestinationSpecificationRead; import io.dataline.config.DestinationConnectionSpecification; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; +import io.dataline.config.persistence.JsonValidationException; import io.dataline.server.errors.KnownException; -import io.dataline.server.helpers.ConfigFetchers; +import java.io.IOException; +import org.eclipse.jetty.http.HttpStatus; public class DestinationSpecificationsHandler { - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; - public DestinationSpecificationsHandler(ConfigPersistence configPersistence) { - this.configPersistence = configPersistence; + public DestinationSpecificationsHandler(final ConfigRepository configRepository) { + this.configRepository = configRepository; } - public DestinationSpecificationRead getDestinationSpecification(DestinationIdRequestBody destinationIdRequestBody) { - final DestinationConnectionSpecification destinationConnection; + public DestinationSpecificationRead getDestinationSpecification(DestinationIdRequestBody destinationIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { // todo (cgardens) - this is a shortcoming of rolling our own disk storage. since we are not // querying on a the primary key, we have to list all of the specification objects and then // filter. - destinationConnection = - ConfigFetchers.getDestinationConnectionSpecifications(configPersistence).stream() - .filter( - destinationSpecification -> destinationSpecification - .getDestinationId() - .equals(destinationIdRequestBody.getDestinationId())) - .findFirst() - .orElseThrow( - () -> new KnownException( - 404, - String.format( - "Could not find a destination specification for destination: %s", - destinationIdRequestBody.getDestinationId()))); + final DestinationConnectionSpecification destinationConnection = configRepository.listDestinationConnectionSpecifications().stream() + .filter(ds -> ds.getDestinationId().equals(destinationIdRequestBody.getDestinationId())) + .findFirst() + .orElseThrow( + () -> new KnownException( + HttpStatus.NOT_FOUND_404, + String.format( + "Could not find a destination specification for destination: %s", + destinationIdRequestBody.getDestinationId()))); - return toDestinationSpecificationRead(destinationConnection); + return buildDestinationSpecificationRead(destinationConnection); } - private static DestinationSpecificationRead toDestinationSpecificationRead(DestinationConnectionSpecification destinationConnectionSpecification) { - final DestinationSpecificationRead destinationSpecificationRead = new DestinationSpecificationRead(); - destinationSpecificationRead.setDestinationId( - destinationConnectionSpecification.getDestinationId()); - destinationSpecificationRead.setDestinationSpecificationId( - destinationConnectionSpecification.getDestinationSpecificationId()); - destinationSpecificationRead.setConnectionSpecification(destinationConnectionSpecification.getSpecification()); - - return destinationSpecificationRead; + private static DestinationSpecificationRead buildDestinationSpecificationRead(DestinationConnectionSpecification dcs) { + return new DestinationSpecificationRead() + .destinationId(dcs.getDestinationId()) + .destinationSpecificationId(dcs.getDestinationSpecificationId()) + .connectionSpecification(dcs.getSpecification()); } } diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/DestinationsHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/DestinationsHandler.java index 70a210671da5..af0ae5bedc7e 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/DestinationsHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/DestinationsHandler.java @@ -28,40 +28,37 @@ import io.dataline.api.model.DestinationRead; import io.dataline.api.model.DestinationReadList; import io.dataline.config.StandardDestination; -import io.dataline.config.persistence.ConfigPersistence; -import io.dataline.server.helpers.ConfigFetchers; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; +import io.dataline.config.persistence.JsonValidationException; +import java.io.IOException; import java.util.List; -import java.util.UUID; import java.util.stream.Collectors; public class DestinationsHandler { - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; - public DestinationsHandler(ConfigPersistence configPersistence) { - this.configPersistence = configPersistence; + public DestinationsHandler(final ConfigRepository configRepository) { + this.configRepository = configRepository; } - public DestinationReadList listDestinations() { - final List destinationReads; - destinationReads = - ConfigFetchers.getStandardDestinations(configPersistence).stream() - .map(DestinationsHandler::toDestinationRead) - .collect(Collectors.toList()); + public DestinationReadList listDestinations() + throws ConfigNotFoundException, IOException, JsonValidationException { + final List reads = configRepository.listStandardDestinations() + .stream() + .map(DestinationsHandler::buildDestinationRead) + .collect(Collectors.toList()); - final DestinationReadList destinationReadList = new DestinationReadList(); - destinationReadList.setDestinations(destinationReads); - return destinationReadList; + return new DestinationReadList().destinations(reads); } - public DestinationRead getDestination(DestinationIdRequestBody destinationIdRequestBody) { - final UUID destinationId = destinationIdRequestBody.getDestinationId(); - final StandardDestination standardDestination = - ConfigFetchers.getStandardDestination(configPersistence, destinationId); - return toDestinationRead(standardDestination); + public DestinationRead getDestination(DestinationIdRequestBody destinationIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { + return buildDestinationRead(configRepository.getStandardDestination(destinationIdRequestBody.getDestinationId())); } - private static DestinationRead toDestinationRead(StandardDestination standardDestination) { + private static DestinationRead buildDestinationRead(StandardDestination standardDestination) { final DestinationRead destinationRead = new DestinationRead(); destinationRead.setDestinationId(standardDestination.getDestinationId()); destinationRead.setName(standardDestination.getName()); diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/JobHistoryHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/JobHistoryHandler.java index aaac4b414d0f..23f1b3504677 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/JobHistoryHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/JobHistoryHandler.java @@ -54,46 +54,32 @@ public JobHistoryHandler(SchedulerPersistence schedulerPersistence) { this.schedulerPersistence = schedulerPersistence; } - public JobReadList listJobsFor(JobListRequestBody request) { - try { - JobConfig.ConfigType configType = - Enums.convertTo(request.getConfigType(), JobConfig.ConfigType.class); - String configId = request.getConfigId(); - - // todo: use functions for scope scoping - List jobReads = - schedulerPersistence.listJobs(configType, configId).stream() - .map(JobHistoryHandler::getJobRead) - .collect(Collectors.toList()); - - JobReadList jobReadList = new JobReadList(); - jobReadList.setJobs(jobReads); - - return jobReadList; - } catch (IOException e) { - throw new RuntimeException(e); - } - } + public JobReadList listJobsFor(JobListRequestBody request) throws IOException { + JobConfig.ConfigType configType = Enums.convertTo(request.getConfigType(), JobConfig.ConfigType.class); + String configId = request.getConfigId(); - public JobInfoRead getJobInfo(JobIdRequestBody jobIdRequestBody) { - try { - Job job = schedulerPersistence.getJob(jobIdRequestBody.getId()); + // todo: use functions for scope scoping + List jobReads = schedulerPersistence.listJobs(configType, configId) + .stream() + .map(JobHistoryHandler::getJobRead) + .collect(Collectors.toList()); - LogRead logRead = new LogRead(); - logRead.setStdout(getTail(LOG_TAIL_SIZE, job.getStdoutPath())); - logRead.setStderr(getTail(LOG_TAIL_SIZE, job.getStderrPath())); + return new JobReadList().jobs(jobReads); + } - JobInfoRead jobInfo = new JobInfoRead(); - jobInfo.setJob(getJobRead(job)); - jobInfo.setLogs(logRead); + public JobInfoRead getJobInfo(JobIdRequestBody jobIdRequestBody) throws IOException { + Job job = schedulerPersistence.getJob(jobIdRequestBody.getId()); - return jobInfo; - } catch (IOException e) { - throw new RuntimeException(e); - } + LogRead logRead = new LogRead() + .stdout(getTail(LOG_TAIL_SIZE, job.getStdoutPath())) + .stderr(getTail(LOG_TAIL_SIZE, job.getStderrPath())); + + return new JobInfoRead() + .job(getJobRead(job)) + .logs(logRead); } - private static List getTail(int numLines, String path) { + private static List getTail(int numLines, String path) throws IOException { File file = new File(path); try (ReversedLinesFileReader fileReader = new ReversedLinesFileReader(file, Charsets.UTF_8)) { List lines = new ArrayList<>(); @@ -106,15 +92,12 @@ private static List getTail(int numLines, String path) { Collections.reverse(lines); return lines; - } catch (IOException e) { - throw new RuntimeException(e); } } private static JobRead getJobRead(Job job) { String configId = ScopeHelper.getConfigId(job.getScope()); - JobConfigType configType = - Enums.convertTo(job.getConfig().getConfigType(), JobConfigType.class); + JobConfigType configType = Enums.convertTo(job.getConfig().getConfigType(), JobConfigType.class); JobRead jobRead = new JobRead(); diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/SchedulerHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/SchedulerHandler.java index eecdbb2b5cfe..92c7808ec681 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/SchedulerHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/SchedulerHandler.java @@ -36,15 +36,15 @@ import io.dataline.config.StandardCheckConnectionOutput; import io.dataline.config.StandardDiscoverSchemaOutput; import io.dataline.config.StandardSync; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; +import io.dataline.config.persistence.JsonValidationException; import io.dataline.scheduler.Job; import io.dataline.scheduler.JobStatus; import io.dataline.scheduler.persistence.SchedulerPersistence; import io.dataline.server.converters.SchemaConverter; -import io.dataline.server.helpers.ConfigFetchers; import java.io.IOException; import java.util.UUID; -import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,66 +52,43 @@ public class SchedulerHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class); - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; private final SchedulerPersistence schedulerPersistence; - public SchedulerHandler(ConfigPersistence configPersistence, - SchedulerPersistence schedulerPersistence) { + public SchedulerHandler(final ConfigRepository configRepository, + final SchedulerPersistence schedulerPersistence) { - this.configPersistence = configPersistence; + this.configRepository = configRepository; this.schedulerPersistence = schedulerPersistence; } - public CheckConnectionRead checkSourceImplementationConnection(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) { - + public CheckConnectionRead checkSourceImplementationConnection(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { final SourceConnectionImplementation connectionImplementation = - ConfigFetchers.getSourceConnectionImplementation( - configPersistence, sourceImplementationIdRequestBody.getSourceImplementationId()); - - final long jobId; - try { - jobId = schedulerPersistence.createSourceCheckConnectionJob(connectionImplementation); - } catch (IOException e) { - throw new RuntimeException(e); - } + configRepository.getSourceConnectionImplementation(sourceImplementationIdRequestBody.getSourceImplementationId()); - LOGGER.info("jobId = " + jobId); - final Job job = waitUntilJobIsTerminalOrTimeout(jobId); - return reportConnectionStatus(job); + final long jobId = schedulerPersistence.createSourceCheckConnectionJob(connectionImplementation); + LOGGER.debug("jobId = " + jobId); + return reportConnectionStatus(waitUntilJobIsTerminalOrTimeout(jobId)); } - public CheckConnectionRead checkDestinationImplementationConnection(DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) { - + public CheckConnectionRead checkDestinationImplementationConnection(DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { final DestinationConnectionImplementation connectionImplementation = - ConfigFetchers.getDestinationConnectionImplementation( - configPersistence, - destinationImplementationIdRequestBody.getDestinationImplementationId()); - - final long jobId; - try { - jobId = schedulerPersistence.createDestinationCheckConnectionJob(connectionImplementation); - } catch (IOException e) { - throw new RuntimeException(e); - } + configRepository.getDestinationConnectionImplementation(destinationImplementationIdRequestBody.getDestinationImplementationId()); - LOGGER.info("jobId = " + jobId); - final Job job = waitUntilJobIsTerminalOrTimeout(jobId); - return reportConnectionStatus(job); + final long jobId = schedulerPersistence.createDestinationCheckConnectionJob(connectionImplementation); + LOGGER.debug("jobId = " + jobId); + return reportConnectionStatus(waitUntilJobIsTerminalOrTimeout(jobId)); } - public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementation(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) { + public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementation(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { final SourceConnectionImplementation connectionImplementation = - ConfigFetchers.getSourceConnectionImplementation( - configPersistence, sourceImplementationIdRequestBody.getSourceImplementationId()); - - final long jobId; - try { - jobId = schedulerPersistence.createDiscoverSchemaJob(connectionImplementation); - } catch (IOException e) { - throw new RuntimeException(e); - } + configRepository.getSourceConnectionImplementation(sourceImplementationIdRequestBody.getSourceImplementationId()); - LOGGER.info("jobId = " + jobId); + final long jobId = schedulerPersistence.createDiscoverSchemaJob(connectionImplementation); + LOGGER.debug("jobId = " + jobId); final Job job = waitUntilJobIsTerminalOrTimeout(jobId); final StandardDiscoverSchemaOutput output = @@ -119,89 +96,55 @@ public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementat .orElseThrow(() -> new RuntimeException("Terminal job does not have an output")) .getDiscoverSchema(); - LOGGER.info("output = " + output); - - final SourceImplementationDiscoverSchemaRead read = - new SourceImplementationDiscoverSchemaRead(); - read.setSchema(SchemaConverter.toApiSchema(output.getSchema())); + LOGGER.debug("output = " + output); - return read; + return new SourceImplementationDiscoverSchemaRead() + .schema(SchemaConverter.toApiSchema(output.getSchema())); } - public ConnectionSyncRead syncConnection(ConnectionIdRequestBody connectionIdRequestBody) { - @NotNull + public ConnectionSyncRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { final UUID connectionId = connectionIdRequestBody.getConnectionId(); - final StandardSync standardSync; - standardSync = ConfigFetchers.getStandardSync(configPersistence, connectionId); + final StandardSync standardSync = configRepository.getStandardSync(connectionId); final SourceConnectionImplementation sourceConnectionImplementation = - ConfigFetchers.getSourceConnectionImplementation( - configPersistence, standardSync.getSourceImplementationId()); + configRepository.getSourceConnectionImplementation(standardSync.getSourceImplementationId()); final DestinationConnectionImplementation destinationConnectionImplementation = - ConfigFetchers.getDestinationConnectionImplementation( - configPersistence, standardSync.getDestinationImplementationId()); - - final long jobId; - try { - jobId = - schedulerPersistence.createSyncJob( - sourceConnectionImplementation, destinationConnectionImplementation, standardSync); - } catch (IOException e) { - throw new RuntimeException(e); - } + configRepository.getDestinationConnectionImplementation(standardSync.getDestinationImplementationId()); + final long jobId = schedulerPersistence.createSyncJob(sourceConnectionImplementation, destinationConnectionImplementation, standardSync); final Job job = waitUntilJobIsTerminalOrTimeout(jobId); - final ConnectionSyncRead read = new ConnectionSyncRead(); - read.setStatus( - job.getStatus().equals(JobStatus.COMPLETED) - ? ConnectionSyncRead.StatusEnum.SUCCESS - : ConnectionSyncRead.StatusEnum.FAIL); - - return read; + return new ConnectionSyncRead() + .status(job.getStatus().equals(JobStatus.COMPLETED) ? ConnectionSyncRead.StatusEnum.SUCCESS : ConnectionSyncRead.StatusEnum.FAIL); } - private Job waitUntilJobIsTerminalOrTimeout(long jobId) { - int count = 0; - while (true) { - final Job job; - try { - job = schedulerPersistence.getJob(jobId); - } catch (IOException e) { - throw new RuntimeException(e); - } + private Job waitUntilJobIsTerminalOrTimeout(final long jobId) throws IOException { + for (int i = 0; i < 120; i++) { + final Job job = schedulerPersistence.getJob(jobId); if (JobStatus.TERMINAL_STATUSES.contains(job.getStatus())) { return job; } - if (count > 120) { - throw new RuntimeException("Check connection job did not complete."); - } - - count++; - try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } } - } - private CheckConnectionRead reportConnectionStatus(Job job) { - final StandardCheckConnectionOutput output = - job.getOutput() - .orElseThrow(() -> new RuntimeException("Terminal job does not have an output")) - .getCheckConnection(); - - final CheckConnectionRead checkConnectionRead = new CheckConnectionRead(); + throw new RuntimeException("Check connection job did not complete."); + } - checkConnectionRead.setStatus( - Enums.convertTo(output.getStatus(), CheckConnectionRead.StatusEnum.class)); - checkConnectionRead.setMessage(output.getMessage()); + private CheckConnectionRead reportConnectionStatus(final Job job) { + final StandardCheckConnectionOutput output = job.getOutput() + .orElseThrow(() -> new RuntimeException("Terminal job does not have an output")) + .getCheckConnection(); - return checkConnectionRead; + return new CheckConnectionRead() + .status(Enums.convertTo(output.getStatus(), CheckConnectionRead.StatusEnum.class)) + .message(output.getMessage()); } } diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/SourceImplementationsHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/SourceImplementationsHandler.java index 2d26b83c36a7..802ae4e15a7f 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/SourceImplementationsHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/SourceImplementationsHandler.java @@ -25,6 +25,8 @@ package io.dataline.server.handlers; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.dataline.api.model.ConnectionRead; import io.dataline.api.model.ConnectionStatus; import io.dataline.api.model.ConnectionUpdate; import io.dataline.api.model.SourceImplementationCreate; @@ -33,43 +35,43 @@ import io.dataline.api.model.SourceImplementationReadList; import io.dataline.api.model.SourceImplementationUpdate; import io.dataline.api.model.WorkspaceIdRequestBody; -import io.dataline.config.ConfigSchema; import io.dataline.config.SourceConnectionImplementation; +import io.dataline.config.SourceConnectionSpecification; import io.dataline.config.StandardSource; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; -import io.dataline.server.errors.KnownException; -import io.dataline.server.helpers.ConfigFetchers; import io.dataline.server.validation.IntegrationSchemaValidation; +import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.function.Supplier; -import java.util.stream.Collectors; public class SourceImplementationsHandler { private final Supplier uuidGenerator; - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; private final IntegrationSchemaValidation validator; private final ConnectionsHandler connectionsHandler; - public SourceImplementationsHandler(ConfigPersistence configPersistence, - IntegrationSchemaValidation integrationSchemaValidation, - ConnectionsHandler connectionsHandler, - Supplier uuidGenerator) { - this.configPersistence = configPersistence; + public SourceImplementationsHandler(final ConfigRepository configRepository, + final IntegrationSchemaValidation integrationSchemaValidation, + final ConnectionsHandler connectionsHandler, + final Supplier uuidGenerator) { + this.configRepository = configRepository; this.validator = integrationSchemaValidation; this.connectionsHandler = connectionsHandler; this.uuidGenerator = uuidGenerator; } - public SourceImplementationsHandler(ConfigPersistence configPersistence, - IntegrationSchemaValidation integrationSchemaValidation, - ConnectionsHandler connectionsHandler) { - this(configPersistence, integrationSchemaValidation, connectionsHandler, UUID::randomUUID); + public SourceImplementationsHandler(final ConfigRepository configRepository, + final IntegrationSchemaValidation integrationSchemaValidation, + final ConnectionsHandler connectionsHandler) { + this(configRepository, integrationSchemaValidation, connectionsHandler, UUID::randomUUID); } - public SourceImplementationRead createSourceImplementation(SourceImplementationCreate sourceImplementationCreate) { + public SourceImplementationRead createSourceImplementation(SourceImplementationCreate sourceImplementationCreate) + throws ConfigNotFoundException, IOException, JsonValidationException { // validate configuration validateSourceImplementation( sourceImplementationCreate.getSourceSpecificationId(), @@ -86,14 +88,14 @@ public SourceImplementationRead createSourceImplementation(SourceImplementationC sourceImplementationCreate.getConnectionConfiguration()); // read configuration from db - return getSourceImplementationReadInternal(sourceImplementationId); + return buildSourceImplementationRead(sourceImplementationId); } - public SourceImplementationRead updateSourceImplementation(SourceImplementationUpdate sourceImplementationUpdate) { + public SourceImplementationRead updateSourceImplementation(SourceImplementationUpdate sourceImplementationUpdate) + throws ConfigNotFoundException, IOException, JsonValidationException { // get existing implementation final SourceConnectionImplementation persistedSourceImplementation = - getSourceConnectionImplementationInternal( - sourceImplementationUpdate.getSourceImplementationId()); + configRepository.getSourceConnectionImplementation(sourceImplementationUpdate.getSourceImplementationId()); // validate configuration validateSourceImplementation( @@ -110,114 +112,91 @@ public SourceImplementationRead updateSourceImplementation(SourceImplementationU sourceImplementationUpdate.getConnectionConfiguration()); // read configuration from db - return getSourceImplementationReadInternal( - sourceImplementationUpdate.getSourceImplementationId()); + return buildSourceImplementationRead(sourceImplementationUpdate.getSourceImplementationId()); } - public SourceImplementationRead getSourceImplementation(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) { - - return getSourceImplementationReadInternal( - sourceImplementationIdRequestBody.getSourceImplementationId()); + public SourceImplementationRead getSourceImplementation(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) + throws JsonValidationException, IOException, ConfigNotFoundException { + return buildSourceImplementationRead(sourceImplementationIdRequestBody.getSourceImplementationId()); } - public SourceImplementationReadList listSourceImplementationsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) { - - final List reads = - ConfigFetchers.getSourceConnectionImplementations(configPersistence).stream() - .filter(sourceImpl -> sourceImpl.getWorkspaceId().equals(workspaceIdRequestBody.getWorkspaceId())) - .filter(sourceImpl -> !sourceImpl.getTombstone()) - .map( - sourceConnectionImplementation -> { - final UUID sourceId = - ConfigFetchers.getSourceConnectionSpecification( - configPersistence, - sourceConnectionImplementation.getSourceSpecificationId()) - .getSourceId(); - final StandardSource standardSource = - ConfigFetchers.getStandardSource( - configPersistence, - sourceId); - return toSourceImplementationRead(sourceConnectionImplementation, standardSource); - }) - .collect(Collectors.toList()); - - final SourceImplementationReadList sourceImplementationReadList = - new SourceImplementationReadList(); - sourceImplementationReadList.setSources(reads); - return sourceImplementationReadList; + public SourceImplementationReadList listSourceImplementationsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { + final List reads = Lists.newArrayList(); + + for (SourceConnectionImplementation sci : configRepository.listSourceConnectionImplementations()) { + if (!sci.getWorkspaceId().equals(workspaceIdRequestBody.getWorkspaceId())) { + continue; + } + if (sci.getTombstone()) { + continue; + } + + reads.add(buildSourceImplementationRead(sci.getSourceImplementationId())); + } + + return new SourceImplementationReadList().sources(reads); } - public void deleteSourceImplementation(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) { + public void deleteSourceImplementation(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) + throws JsonValidationException, IOException, ConfigNotFoundException { // get existing implementation - final SourceImplementationRead persistedSourceImplementation = - getSourceImplementationReadInternal( - sourceImplementationIdRequestBody.getSourceImplementationId()); + final SourceImplementationRead sourceImplementation = + buildSourceImplementationRead(sourceImplementationIdRequestBody.getSourceImplementationId()); + + // "delete" all connections associated with source implementation as well. + // Delete connections first in case it it fails in the middle, source will still be visible + final WorkspaceIdRequestBody workspaceIdRequestBody = new WorkspaceIdRequestBody().workspaceId(sourceImplementation.getWorkspaceId()); + for (ConnectionRead connectionRead : connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody).getConnections()) { + if (!connectionRead.getSourceImplementationId().equals(sourceImplementation.getSourceImplementationId())) { + continue; + } + + final ConnectionUpdate connectionUpdate = new ConnectionUpdate() + .connectionId(connectionRead.getConnectionId()) + .syncSchema(connectionRead.getSyncSchema()) + .schedule(connectionRead.getSchedule()) + .status(ConnectionStatus.DEPRECATED); + + connectionsHandler.updateConnection(connectionUpdate); + } // persist persistSourceConnectionImplementation( - persistedSourceImplementation.getName(), - persistedSourceImplementation.getSourceSpecificationId(), - persistedSourceImplementation.getWorkspaceId(), - persistedSourceImplementation.getSourceImplementationId(), + sourceImplementation.getName(), + sourceImplementation.getSourceSpecificationId(), + sourceImplementation.getWorkspaceId(), + sourceImplementation.getSourceImplementationId(), true, - persistedSourceImplementation.getConnectionConfiguration()); - - final WorkspaceIdRequestBody workspaceIdRequestBody = new WorkspaceIdRequestBody(); - workspaceIdRequestBody.setWorkspaceId(persistedSourceImplementation.getWorkspaceId()); - // "delete" all connections associated with source implementation as well. - connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody).getConnections().stream() - .filter(connectionRead -> connectionRead.getSourceImplementationId().equals(sourceImplementationIdRequestBody.getSourceImplementationId())) - .forEach(connectionRead -> { - final ConnectionUpdate connectionUpdate = new ConnectionUpdate() - .connectionId(connectionRead.getConnectionId()) - .syncSchema(connectionRead.getSyncSchema()) - .schedule(connectionRead.getSchedule()) - .status(ConnectionStatus.DEPRECATED); - - connectionsHandler.updateConnection(connectionUpdate); - }); - } - - private SourceConnectionImplementation getSourceConnectionImplementationInternal(UUID sourceImplementationId) { - return ConfigFetchers.getSourceConnectionImplementation( - configPersistence, sourceImplementationId); + sourceImplementation.getConnectionConfiguration()); } - private SourceImplementationRead getSourceImplementationReadInternal(UUID sourceImplementationId) { + private SourceImplementationRead buildSourceImplementationRead(UUID sourceImplementationId) + throws ConfigNotFoundException, IOException, JsonValidationException { // read configuration from db - final SourceConnectionImplementation retrievedSourceConnectionImplementation = - getSourceConnectionImplementationInternal(sourceImplementationId); - - final UUID sourceId = - ConfigFetchers.getSourceConnectionSpecification( - configPersistence, - retrievedSourceConnectionImplementation.getSourceSpecificationId()) - .getSourceId(); - final StandardSource standardSource = - ConfigFetchers.getStandardSource( - configPersistence, - sourceId); - return toSourceImplementationRead(retrievedSourceConnectionImplementation, standardSource); + final SourceConnectionImplementation sourceConnectionImplementation = configRepository.getSourceConnectionImplementation(sourceImplementationId); + + final UUID sourceId = configRepository + .getSourceConnectionSpecification(sourceConnectionImplementation.getSourceSpecificationId()) + .getSourceId(); + final StandardSource standardSource = configRepository.getStandardSource(sourceId); + + return toSourceImplementationRead(sourceConnectionImplementation, standardSource); } - private void validateSourceImplementation(UUID sourceConnectionSpecificationId, JsonNode implementationJson) { - try { - validator.validateSourceConnectionConfiguration(sourceConnectionSpecificationId, implementationJson); - } catch (JsonValidationException e) { - throw new KnownException( - 422, - String.format( - "The provided configuration does not fulfill the specification. Errors: %s", - e.getMessage())); - } + private void validateSourceImplementation(UUID sourceConnectionSpecificationId, JsonNode implementationJson) + throws JsonValidationException, IOException, ConfigNotFoundException { + SourceConnectionSpecification scs = configRepository.getSourceConnectionSpecification(sourceConnectionSpecificationId); + validator.validateConfig(scs, implementationJson); } - private void persistSourceConnectionImplementation(String name, - UUID sourceSpecificationId, - UUID workspaceId, - UUID sourceImplementationId, - boolean tombstone, - JsonNode configurationJson) { + private void persistSourceConnectionImplementation(final String name, + final UUID sourceSpecificationId, + final UUID workspaceId, + final UUID sourceImplementationId, + final boolean tombstone, + final JsonNode configurationJson) + throws JsonValidationException, IOException { final SourceConnectionImplementation sourceConnectionImplementation = new SourceConnectionImplementation() .withName(name) .withSourceSpecificationId(sourceSpecificationId) @@ -226,23 +205,19 @@ private void persistSourceConnectionImplementation(String name, .withTombstone(tombstone) .withConfiguration(configurationJson); - ConfigFetchers.writeConfig( - configPersistence, - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceImplementationId.toString(), - sourceConnectionImplementation); + configRepository.writeSourceConnectionImplementation(sourceConnectionImplementation); } - private SourceImplementationRead toSourceImplementationRead(SourceConnectionImplementation sourceConnectionImplementation, - StandardSource standardSource) { + private SourceImplementationRead toSourceImplementationRead(final SourceConnectionImplementation sourceConnectionImplementation, + final StandardSource standardSource) { return new SourceImplementationRead() .sourceId(standardSource.getSourceId()) + .sourceName(standardSource.getName()) .sourceImplementationId(sourceConnectionImplementation.getSourceImplementationId()) .workspaceId(sourceConnectionImplementation.getWorkspaceId()) .sourceSpecificationId(sourceConnectionImplementation.getSourceSpecificationId()) .connectionConfiguration(sourceConnectionImplementation.getConfiguration()) - .name(sourceConnectionImplementation.getName()) - .sourceName(standardSource.getName()); + .name(sourceConnectionImplementation.getName()); } } diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/SourceSpecificationsHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/SourceSpecificationsHandler.java index 1bf01a000e97..12fd6a9d6215 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/SourceSpecificationsHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/SourceSpecificationsHandler.java @@ -27,46 +27,43 @@ import io.dataline.api.model.SourceIdRequestBody; import io.dataline.api.model.SourceSpecificationRead; import io.dataline.config.SourceConnectionSpecification; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; +import io.dataline.config.persistence.JsonValidationException; import io.dataline.server.errors.KnownException; -import io.dataline.server.helpers.ConfigFetchers; +import java.io.IOException; public class SourceSpecificationsHandler { - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; - public SourceSpecificationsHandler(ConfigPersistence configPersistence) { - this.configPersistence = configPersistence; + public SourceSpecificationsHandler(final ConfigRepository configRepository) { + this.configRepository = configRepository; } - public SourceSpecificationRead getSourceSpecification(SourceIdRequestBody sourceIdRequestBody) { - final SourceConnectionSpecification sourceConnection; + public SourceSpecificationRead getSourceSpecification(SourceIdRequestBody sourceIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { // todo (cgardens) - this is a shortcoming of rolling our own disk storage. since we are not // querying on a the primary key, we have to list all of the specification objects and then // filter. - sourceConnection = - ConfigFetchers.getSourceConnectionSpecifications(configPersistence).stream() - .filter( - sourceSpecification -> sourceSpecification.getSourceId().equals(sourceIdRequestBody.getSourceId())) - .findFirst() - .orElseThrow( - () -> new KnownException( - 404, - String.format( - "Could not find a source specification for source: %s", - sourceIdRequestBody.getSourceId()))); + final SourceConnectionSpecification sourceConnection = configRepository.listSourceConnectionSpecifications().stream() + .filter(sourceSpecification -> sourceSpecification.getSourceId().equals(sourceIdRequestBody.getSourceId())) + .findFirst() + .orElseThrow( + () -> new KnownException( + 404, + String.format( + "Could not find a source specification for source: %s", + sourceIdRequestBody.getSourceId()))); - return toSourceSpecificationRead(sourceConnection); + return buildSourceSpecificationRead(sourceConnection); } - private static SourceSpecificationRead toSourceSpecificationRead(SourceConnectionSpecification sourceConnectionSpecification) { - final SourceSpecificationRead sourceSpecificationRead = new SourceSpecificationRead(); - sourceSpecificationRead.setSourceId(sourceConnectionSpecification.getSourceId()); - sourceSpecificationRead.setSourceSpecificationId( - sourceConnectionSpecification.getSourceSpecificationId()); - sourceSpecificationRead.setConnectionSpecification(sourceConnectionSpecification.getSpecification()); - - return sourceSpecificationRead; + private static SourceSpecificationRead buildSourceSpecificationRead(SourceConnectionSpecification scs) { + return new SourceSpecificationRead() + .sourceId(scs.getSourceId()) + .sourceSpecificationId(scs.getSourceSpecificationId()) + .connectionSpecification(scs.getSpecification()); } } diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/SourcesHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/SourcesHandler.java index c9805a435304..9cd19ed57e6b 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/SourcesHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/SourcesHandler.java @@ -28,40 +28,36 @@ import io.dataline.api.model.SourceRead; import io.dataline.api.model.SourceReadList; import io.dataline.config.StandardSource; -import io.dataline.config.persistence.ConfigPersistence; -import io.dataline.server.helpers.ConfigFetchers; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; +import io.dataline.config.persistence.JsonValidationException; +import java.io.IOException; import java.util.List; -import java.util.UUID; import java.util.stream.Collectors; public class SourcesHandler { - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; - public SourcesHandler(ConfigPersistence configPersistence) { - this.configPersistence = configPersistence; + public SourcesHandler(final ConfigRepository configRepository) { + this.configRepository = configRepository; } - public SourceReadList listSources() { - final List sourceReads; - sourceReads = - ConfigFetchers.getStandardSources(configPersistence).stream() - .map(SourcesHandler::toSourceRead) - .collect(Collectors.toList()); + public SourceReadList listSources() throws ConfigNotFoundException, IOException, JsonValidationException { + final List reads = configRepository.listStandardSources() + .stream() + .map(SourcesHandler::buildSourceRead) + .collect(Collectors.toList()); - final SourceReadList sourceReadList = new SourceReadList(); - sourceReadList.setSources(sourceReads); - return sourceReadList; + return new SourceReadList().sources(reads); } - public SourceRead getSource(SourceIdRequestBody sourceIdRequestBody) { - final UUID sourceId = sourceIdRequestBody.getSourceId(); - final StandardSource standardSource = - ConfigFetchers.getStandardSource(configPersistence, sourceId); - return toSourceRead(standardSource); + public SourceRead getSource(SourceIdRequestBody sourceIdRequestBody) throws ConfigNotFoundException, IOException, JsonValidationException { + final StandardSource standardSource = configRepository.getStandardSource(sourceIdRequestBody.getSourceId()); + return buildSourceRead(standardSource); } - private static SourceRead toSourceRead(StandardSource standardSource) { + private static SourceRead buildSourceRead(StandardSource standardSource) { final SourceRead sourceRead = new SourceRead(); sourceRead.setSourceId(standardSource.getSourceId()); sourceRead.setName(standardSource.getName()); diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/WebBackendConnectionsHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/WebBackendConnectionsHandler.java index 545265d810d8..e818fb424391 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/WebBackendConnectionsHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/WebBackendConnectionsHandler.java @@ -24,9 +24,9 @@ package io.dataline.server.handlers; +import com.google.common.collect.Lists; import io.dataline.api.model.ConnectionIdRequestBody; import io.dataline.api.model.ConnectionRead; -import io.dataline.api.model.ConnectionReadList; import io.dataline.api.model.JobConfigType; import io.dataline.api.model.JobListRequestBody; import io.dataline.api.model.JobReadList; @@ -36,8 +36,10 @@ import io.dataline.api.model.WbConnectionReadList; import io.dataline.api.model.WorkspaceIdRequestBody; import io.dataline.commons.enums.Enums; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.JsonValidationException; +import java.io.IOException; import java.util.List; -import java.util.stream.Collectors; public class WebBackendConnectionsHandler { @@ -45,56 +47,52 @@ public class WebBackendConnectionsHandler { private final SourceImplementationsHandler sourceImplementationsHandler; private final JobHistoryHandler jobHistoryHandler; - public WebBackendConnectionsHandler( - ConnectionsHandler connectionsHandler, - SourceImplementationsHandler sourceImplementationsHandler, - JobHistoryHandler jobHistoryHandler) { + public WebBackendConnectionsHandler(final ConnectionsHandler connectionsHandler, + final SourceImplementationsHandler sourceImplementationsHandler, + final JobHistoryHandler jobHistoryHandler) { this.connectionsHandler = connectionsHandler; this.sourceImplementationsHandler = sourceImplementationsHandler; this.jobHistoryHandler = jobHistoryHandler; } - public WbConnectionReadList webBackendListConnectionsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) { - final ConnectionReadList connectionReadList = connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody); + public WbConnectionReadList webBackendListConnectionsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { - final List reads = connectionReadList.getConnections().stream().map(this::getWbConnectionRead).collect(Collectors.toList()); - - final WbConnectionReadList readList = new WbConnectionReadList(); - readList.setConnections(reads); - - return readList; + final List reads = Lists.newArrayList(); + for (ConnectionRead connection : connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody).getConnections()) { + reads.add(buildWbConnectionRead(connection)); + } + return new WbConnectionReadList().connections(reads); } - public WbConnectionRead webBackendGetConnection(ConnectionIdRequestBody connectionIdRequestBody) { - final ConnectionRead connectionRead = connectionsHandler.getConnection(connectionIdRequestBody); - - return getWbConnectionRead(connectionRead); + public WbConnectionRead webBackendGetConnection(ConnectionIdRequestBody connectionIdRequestBody) + throws ConfigNotFoundException, IOException, JsonValidationException { + return buildWbConnectionRead(connectionsHandler.getConnection(connectionIdRequestBody)); } - private WbConnectionRead getWbConnectionRead(ConnectionRead connectionRead) { - final SourceImplementationIdRequestBody sourceImplementationIdRequestBody = new SourceImplementationIdRequestBody(); - sourceImplementationIdRequestBody.setSourceImplementationId(connectionRead.getSourceImplementationId()); - final SourceImplementationRead sourceImplementation = - sourceImplementationsHandler.getSourceImplementation(sourceImplementationIdRequestBody); + private WbConnectionRead buildWbConnectionRead(ConnectionRead connectionRead) throws ConfigNotFoundException, IOException, JsonValidationException { + final SourceImplementationIdRequestBody sourceImplementationIdRequestBody = new SourceImplementationIdRequestBody() + .sourceImplementationId(connectionRead.getSourceImplementationId()); + final SourceImplementationRead sourceImplementation = sourceImplementationsHandler.getSourceImplementation(sourceImplementationIdRequestBody); + + final JobListRequestBody jobListRequestBody = new JobListRequestBody() + .configId(connectionRead.getConnectionId().toString()) + .configType(JobConfigType.SYNC); + + final WbConnectionRead wbConnectionRead = new WbConnectionRead() + .connectionId(connectionRead.getConnectionId()) + .sourceImplementationId(connectionRead.getSourceImplementationId()) + .destinationImplementationId(connectionRead.getDestinationImplementationId()) + .name(connectionRead.getName()) + .syncSchema(connectionRead.getSyncSchema()) + .status(connectionRead.getStatus()) + .syncMode(Enums.convertTo(connectionRead.getSyncMode(), WbConnectionRead.SyncModeEnum.class)) + .schedule(connectionRead.getSchedule()) + .source(sourceImplementation); - final JobListRequestBody jobListRequestBody = new JobListRequestBody(); - jobListRequestBody.setConfigId(connectionRead.getConnectionId().toString()); - jobListRequestBody.setConfigType(JobConfigType.SYNC); final JobReadList jobReadList = jobHistoryHandler.listJobsFor(jobListRequestBody); - - final WbConnectionRead wbConnectionRead = new WbConnectionRead(); - wbConnectionRead.setConnectionId(connectionRead.getConnectionId()); - wbConnectionRead.setSourceImplementationId(connectionRead.getSourceImplementationId()); - wbConnectionRead.setDestinationImplementationId(connectionRead.getDestinationImplementationId()); - wbConnectionRead.setName(connectionRead.getName()); - wbConnectionRead.setSyncSchema(connectionRead.getSyncSchema()); - wbConnectionRead.setStatus(connectionRead.getStatus()); - wbConnectionRead.setSyncMode(Enums.convertTo(connectionRead.getSyncMode(), WbConnectionRead.SyncModeEnum.class)); - wbConnectionRead.setSchedule(connectionRead.getSchedule()); jobReadList.getJobs().stream().findFirst().ifPresent(job -> wbConnectionRead.setLastSync(job.getCreatedAt())); - wbConnectionRead.setSource(sourceImplementation); - return wbConnectionRead; } diff --git a/dataline-server/src/main/java/io/dataline/server/handlers/WorkspacesHandler.java b/dataline-server/src/main/java/io/dataline/server/handlers/WorkspacesHandler.java index ab10a3fe76b4..b5e52c504553 100644 --- a/dataline-server/src/main/java/io/dataline/server/handlers/WorkspacesHandler.java +++ b/dataline-server/src/main/java/io/dataline/server/handlers/WorkspacesHandler.java @@ -28,46 +28,39 @@ import io.dataline.api.model.WorkspaceIdRequestBody; import io.dataline.api.model.WorkspaceRead; import io.dataline.api.model.WorkspaceUpdate; -import io.dataline.config.ConfigSchema; import io.dataline.config.StandardWorkspace; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.ConfigRepository; +import io.dataline.config.persistence.JsonValidationException; import io.dataline.config.persistence.PersistenceConstants; -import io.dataline.server.helpers.ConfigFetchers; +import java.io.IOException; import java.util.UUID; public class WorkspacesHandler { - private final ConfigPersistence configPersistence; + private final ConfigRepository configRepository; - public WorkspacesHandler(ConfigPersistence configPersistence) { - this.configPersistence = configPersistence; + public WorkspacesHandler(final ConfigRepository configRepository) { + this.configRepository = configRepository; } - public WorkspaceRead getWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) { - return getWorkspaceFromId(workspaceIdRequestBody.getWorkspaceId()); + public WorkspaceRead getWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) + throws JsonValidationException, IOException, ConfigNotFoundException { + return buildWorkspaceReadFromId(workspaceIdRequestBody.getWorkspaceId()); } @SuppressWarnings("unused") - public WorkspaceRead getWorkspaceBySlug(SlugRequestBody slugRequestBody) { + public WorkspaceRead getWorkspaceBySlug(SlugRequestBody slugRequestBody) + throws JsonValidationException, IOException, ConfigNotFoundException { // for now we assume there is one workspace and it has a default uuid. - return getWorkspaceFromId(PersistenceConstants.DEFAULT_WORKSPACE_ID); + return buildWorkspaceReadFromId(PersistenceConstants.DEFAULT_WORKSPACE_ID); } - private WorkspaceRead getWorkspaceFromId(UUID workspaceId) { - final StandardWorkspace workspace = - ConfigFetchers.getStandardWorkspace(configPersistence, workspaceId); - - return new WorkspaceRead() - .workspaceId(workspace.getWorkspaceId()) - .name(workspace.getName()) - .slug(workspace.getSlug()) - .initialSetupComplete(workspace.getInitialSetupComplete()); - } - - public WorkspaceRead updateWorkspace(WorkspaceUpdate workspaceUpdate) { + public WorkspaceRead updateWorkspace(WorkspaceUpdate workspaceUpdate) + throws ConfigNotFoundException, IOException, JsonValidationException { final UUID workspaceId = workspaceUpdate.getWorkspaceId(); - final StandardWorkspace persistedWorkspace = ConfigFetchers.getStandardWorkspace(configPersistence, workspaceId); + final StandardWorkspace persistedWorkspace = configRepository.getStandardWorkspace(workspaceId); if (workspaceUpdate.getEmail() != null && !workspaceUpdate.getEmail().equals("")) { persistedWorkspace.withEmail(workspaceUpdate.getEmail()); @@ -77,13 +70,20 @@ public WorkspaceRead updateWorkspace(WorkspaceUpdate workspaceUpdate) { .withNews(workspaceUpdate.getNews()) .withSecurityUpdates(workspaceUpdate.getSecurityUpdates()); - ConfigFetchers.writeConfig( - configPersistence, - ConfigSchema.STANDARD_WORKSPACE, - workspaceId.toString(), - persistedWorkspace); + configRepository.writeStandardWorkspace(persistedWorkspace); - return getWorkspaceFromId(workspaceUpdate.getWorkspaceId()); + return buildWorkspaceReadFromId(workspaceUpdate.getWorkspaceId()); + } + + private WorkspaceRead buildWorkspaceReadFromId(UUID workspaceId) + throws ConfigNotFoundException, IOException, JsonValidationException { + final StandardWorkspace workspace = configRepository.getStandardWorkspace(workspaceId); + + return new WorkspaceRead() + .workspaceId(workspace.getWorkspaceId()) + .name(workspace.getName()) + .slug(workspace.getSlug()) + .initialSetupComplete(workspace.getInitialSetupComplete()); } } diff --git a/dataline-server/src/main/java/io/dataline/server/helpers/ConfigFetchers.java b/dataline-server/src/main/java/io/dataline/server/helpers/ConfigFetchers.java deleted file mode 100644 index 319da000dd8c..000000000000 --- a/dataline-server/src/main/java/io/dataline/server/helpers/ConfigFetchers.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Dataline - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.dataline.server.helpers; - -import io.dataline.config.ConfigSchema; -import io.dataline.config.DestinationConnectionImplementation; -import io.dataline.config.DestinationConnectionSpecification; -import io.dataline.config.SourceConnectionImplementation; -import io.dataline.config.SourceConnectionSpecification; -import io.dataline.config.StandardDestination; -import io.dataline.config.StandardSource; -import io.dataline.config.StandardSync; -import io.dataline.config.StandardSyncSchedule; -import io.dataline.config.StandardWorkspace; -import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; -import io.dataline.config.persistence.JsonValidationException; -import io.dataline.server.errors.KnownException; -import java.io.IOException; -import java.util.List; -import java.util.UUID; - -/** - * These helpers catch exceptions thrown in the config persistence and throws them as - * KnownExceptions that can be processed by the server and returned to the user in human-readable - * form. - */ -public class ConfigFetchers { - - public static StandardWorkspace getStandardWorkspace(ConfigPersistence configPersistence, - UUID workspaceId) { - try { - return configPersistence.getConfig( - ConfigSchema.STANDARD_WORKSPACE, - workspaceId.toString(), - StandardWorkspace.class); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static StandardSource getStandardSource(ConfigPersistence configPersistence, - UUID sourceId) { - try { - return configPersistence.getConfig( - ConfigSchema.STANDARD_SOURCE, sourceId.toString(), StandardSource.class); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - // wrap json validation errors for usages in API handlers. - public static void writeConfig(ConfigPersistence configPersistence, - ConfigSchema configType, - String configId, - T config) { - try { - configPersistence.writeConfig(configType, configId, config); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static List getStandardSources(ConfigPersistence configPersistence) { - try { - return configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE, StandardSource.class); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static SourceConnectionSpecification getSourceConnectionSpecification(ConfigPersistence configPersistence, - UUID sourceSpecificationId) { - try { - return configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_SPECIFICATION, - sourceSpecificationId.toString(), - SourceConnectionSpecification.class); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static List getSourceConnectionSpecifications(ConfigPersistence configPersistence) { - try { - return configPersistence.listConfigs( - ConfigSchema.SOURCE_CONNECTION_SPECIFICATION, - SourceConnectionSpecification.class); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static SourceConnectionImplementation getSourceConnectionImplementation(ConfigPersistence configPersistence, - UUID sourceImplementationId) { - try { - return configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceImplementationId.toString(), - SourceConnectionImplementation.class); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static List getSourceConnectionImplementations(ConfigPersistence configPersistence) { - try { - return configPersistence.listConfigs( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - SourceConnectionImplementation.class); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static StandardDestination getStandardDestination(ConfigPersistence configPersistence, - UUID destinationId) { - try { - return configPersistence.getConfig( - ConfigSchema.STANDARD_DESTINATION, - destinationId.toString(), - StandardDestination.class); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static List getStandardDestinations(ConfigPersistence configPersistence) { - try { - return configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION, StandardDestination.class); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static DestinationConnectionSpecification getDestinationConnectionSpecification(ConfigPersistence configPersistence, - UUID destinationSpecificationId) { - try { - return configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_SPECIFICATION, - destinationSpecificationId.toString(), - DestinationConnectionSpecification.class); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static List getDestinationConnectionSpecifications(ConfigPersistence configPersistence) { - try { - return configPersistence.listConfigs( - ConfigSchema.DESTINATION_CONNECTION_SPECIFICATION, - DestinationConnectionSpecification.class); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static DestinationConnectionImplementation getDestinationConnectionImplementation(ConfigPersistence configPersistence, - UUID destinationImplementationId) { - try { - return configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - destinationImplementationId.toString(), - DestinationConnectionImplementation.class); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static List getDestinationConnectionImplementations(ConfigPersistence configPersistence) { - try { - return configPersistence.listConfigs( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - DestinationConnectionImplementation.class); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static StandardSync getStandardSync(ConfigPersistence configPersistence, - UUID connectionId) { - try { - return configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC, connectionId.toString(), StandardSync.class); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static List getStandardSyncs(ConfigPersistence configPersistence) { - try { - return configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static StandardSyncSchedule getStandardSyncSchedule(ConfigPersistence configPersistence, - UUID connectionId) { - try { - return configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC_SCHEDULE, - connectionId.toString(), - StandardSyncSchedule.class); - } catch (JsonValidationException e) { - throw getInvalidJsonException(e); - } catch (ConfigNotFoundException e) { - throw getConfigNotFoundException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private static KnownException getConfigNotFoundException(ConfigNotFoundException e) { - return new KnownException( - 422, String.format("Could not find sync configuration for %s: %s.", e.getType().toString(), e.getConfigId()), e); - } - - private static KnownException getInvalidJsonException(Throwable e) { - return new KnownException( - 422, - String.format( - "The provided configuration does not fulfill the specification. Errors: %s", - e.getMessage()), - e); - } - -} diff --git a/dataline-server/src/main/java/io/dataline/server/validation/IntegrationSchemaValidation.java b/dataline-server/src/main/java/io/dataline/server/validation/IntegrationSchemaValidation.java index 7d371b2cec39..9cad66db8340 100644 --- a/dataline-server/src/main/java/io/dataline/server/validation/IntegrationSchemaValidation.java +++ b/dataline-server/src/main/java/io/dataline/server/validation/IntegrationSchemaValidation.java @@ -27,41 +27,28 @@ import com.fasterxml.jackson.databind.JsonNode; import io.dataline.config.DestinationConnectionSpecification; import io.dataline.config.SourceConnectionSpecification; -import io.dataline.config.persistence.ConfigPersistence; import io.dataline.config.persistence.JsonSchemaValidator; import io.dataline.config.persistence.JsonValidationException; -import io.dataline.server.helpers.ConfigFetchers; -import java.util.UUID; public class IntegrationSchemaValidation { - private final ConfigPersistence configPersistence; - private final JsonSchemaValidator jsonSchemaValidator; - public IntegrationSchemaValidation(ConfigPersistence configPersistence) { - this.configPersistence = configPersistence; - + public IntegrationSchemaValidation() { this.jsonSchemaValidator = new JsonSchemaValidator(); } - public void validateSourceConnectionConfiguration(UUID sourceConnectionSpecificationId, JsonNode configJson) + public void validateConfig(final SourceConnectionSpecification sourceConnectionSpecification, + final JsonNode configJson) throws JsonValidationException { - final SourceConnectionSpecification sourceConnectionSpecification = - ConfigFetchers.getSourceConnectionSpecification( - configPersistence, sourceConnectionSpecificationId); - final JsonNode schemaJson = sourceConnectionSpecification.getSpecification(); jsonSchemaValidator.ensure(schemaJson, configJson); } - public void validateDestinationConnectionConfiguration(UUID destinationConnectionSpecificationId, JsonNode configJson) + public void validateConfig(final DestinationConnectionSpecification destinationConnectionSpecification, + final JsonNode configJson) throws JsonValidationException { - final DestinationConnectionSpecification destinationConnectionSpecification = - ConfigFetchers.getDestinationConnectionSpecification( - configPersistence, destinationConnectionSpecificationId); - final JsonNode schemaJson = destinationConnectionSpecification.getSpecification(); jsonSchemaValidator.ensure(schemaJson, configJson); diff --git a/dataline-server/src/test/java/io/dataline/server/handlers/ConnectionsHandlerTest.java b/dataline-server/src/test/java/io/dataline/server/handlers/ConnectionsHandlerTest.java index 303c10fcec54..df976d8afb32 100644 --- a/dataline-server/src/test/java/io/dataline/server/handlers/ConnectionsHandlerTest.java +++ b/dataline-server/src/test/java/io/dataline/server/handlers/ConnectionsHandlerTest.java @@ -41,7 +41,6 @@ import io.dataline.api.model.SourceSchema; import io.dataline.api.model.WorkspaceIdRequestBody; import io.dataline.commons.enums.Enums; -import io.dataline.config.ConfigSchema; import io.dataline.config.DataType; import io.dataline.config.Schedule; import io.dataline.config.Schema; @@ -49,7 +48,7 @@ import io.dataline.config.StandardSync; import io.dataline.config.StandardSyncSchedule; import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; import io.dataline.server.helpers.ConnectionHelpers; import io.dataline.server.helpers.SourceImplementationHelpers; @@ -61,7 +60,7 @@ class ConnectionsHandlerTest { - private ConfigPersistence configPersistence; + private ConfigRepository configRepository; private Supplier uuidGenerator; private StandardSync standardSync; @@ -72,31 +71,25 @@ class ConnectionsHandlerTest { @SuppressWarnings("unchecked") @BeforeEach void setUp() throws IOException { - configPersistence = mock(ConfigPersistence.class); + configRepository = mock(ConfigRepository.class); uuidGenerator = mock(Supplier.class); sourceImplementation = SourceImplementationHelpers.generateSourceImplementation(UUID.randomUUID()); standardSync = ConnectionHelpers.generateSync(sourceImplementation.getSourceImplementationId()); standardSyncSchedule = ConnectionHelpers.generateSchedule(standardSync.getConnectionId()); - connectionsHandler = new ConnectionsHandler(configPersistence, uuidGenerator); + connectionsHandler = new ConnectionsHandler(configRepository, uuidGenerator); } @Test void testCreateConnection() throws JsonValidationException, ConfigNotFoundException, IOException { when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId()); - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC, - standardSync.getConnectionId().toString(), - StandardSync.class)) - .thenReturn(standardSync); + when(configRepository.getStandardSync(standardSync.getConnectionId())) + .thenReturn(standardSync); - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC_SCHEDULE, - standardSyncSchedule.getConnectionId().toString(), - StandardSyncSchedule.class)) - .thenReturn(standardSyncSchedule); + when(configRepository.getStandardSyncSchedule(standardSyncSchedule.getConnectionId())) + .thenReturn(standardSyncSchedule); final ConnectionCreate connectionCreate = new ConnectionCreate() .sourceImplementationId(standardSync.getSourceImplementationId()) @@ -117,17 +110,9 @@ void testCreateConnection() throws JsonValidationException, ConfigNotFoundExcept assertEquals(expectedConnectionRead, actualConnectionRead); - verify(configPersistence) - .writeConfig( - ConfigSchema.STANDARD_SYNC, - standardSync.getConnectionId().toString(), - standardSync); - - verify(configPersistence) - .writeConfig( - ConfigSchema.STANDARD_SYNC_SCHEDULE, - standardSyncSchedule.getConnectionId().toString(), - standardSyncSchedule); + verify(configRepository).writeStandardSync(standardSync); + + verify(configRepository).writeStandardSchedule(standardSyncSchedule); } @Test @@ -157,19 +142,13 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept .withConnectionId(standardSyncSchedule.getConnectionId()) .withManual(true); - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC, - standardSync.getConnectionId().toString(), - StandardSync.class)) - .thenReturn(standardSync) - .thenReturn(updatedStandardSync); + when(configRepository.getStandardSync(standardSync.getConnectionId())) + .thenReturn(standardSync) + .thenReturn(updatedStandardSync); - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC_SCHEDULE, - standardSyncSchedule.getConnectionId().toString(), - StandardSyncSchedule.class)) - .thenReturn(standardSyncSchedule) - .thenReturn(updatedPersistenceSchedule); + when(configRepository.getStandardSyncSchedule(standardSyncSchedule.getConnectionId())) + .thenReturn(standardSyncSchedule) + .thenReturn(updatedPersistenceSchedule); final ConnectionRead actualConnectionRead = connectionsHandler.updateConnection(connectionUpdate); @@ -184,32 +163,17 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept assertEquals(expectedConnectionRead, actualConnectionRead); - verify(configPersistence) - .writeConfig( - ConfigSchema.STANDARD_SYNC, - standardSync.getConnectionId().toString(), - updatedStandardSync); - - verify(configPersistence) - .writeConfig( - ConfigSchema.STANDARD_SYNC_SCHEDULE, - standardSyncSchedule.getConnectionId().toString(), - updatedPersistenceSchedule); + verify(configRepository).writeStandardSync(updatedStandardSync); + verify(configRepository).writeStandardSchedule(updatedPersistenceSchedule); } @Test void testGetConnection() throws JsonValidationException, ConfigNotFoundException, IOException { - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC, - standardSync.getConnectionId().toString(), - StandardSync.class)) - .thenReturn(standardSync); - - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC_SCHEDULE, - standardSync.getConnectionId().toString(), - StandardSyncSchedule.class)) - .thenReturn(standardSyncSchedule); + when(configRepository.getStandardSync(standardSync.getConnectionId())) + .thenReturn(standardSync); + + when(configRepository.getStandardSyncSchedule(standardSync.getConnectionId())) + .thenReturn(standardSyncSchedule); final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody(); connectionIdRequestBody.setConnectionId(standardSync.getConnectionId()); @@ -220,28 +184,17 @@ void testGetConnection() throws JsonValidationException, ConfigNotFoundException @Test void testListConnectionsForWorkspace() throws JsonValidationException, ConfigNotFoundException, IOException { - // mock list off all syncs - when(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class)) + when(configRepository.listStandardSyncs()) .thenReturn(Lists.newArrayList(standardSync)); - - // mock get source connection impl (used to check that connection is associated with given - // workspace) - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceImplementation.getSourceImplementationId().toString(), - SourceConnectionImplementation.class)) - .thenReturn(sourceImplementation); - - // mock get schedule for the now verified connection - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SYNC_SCHEDULE, - standardSync.getConnectionId().toString(), - StandardSyncSchedule.class)) - .thenReturn(standardSyncSchedule); + when(configRepository.getSourceConnectionImplementation(sourceImplementation.getSourceImplementationId())) + .thenReturn(sourceImplementation); + when(configRepository.getStandardSync(standardSync.getConnectionId())) + .thenReturn(standardSync); + when(configRepository.getStandardSyncSchedule(standardSync.getConnectionId())) + .thenReturn(standardSyncSchedule); final WorkspaceIdRequestBody workspaceIdRequestBody = new WorkspaceIdRequestBody().workspaceId(sourceImplementation.getWorkspaceId()); - final ConnectionReadList actualConnectionReadList = - connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody); + final ConnectionReadList actualConnectionReadList = connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody); assertEquals( ConnectionHelpers.generateExpectedConnectionRead(standardSync), diff --git a/dataline-server/src/test/java/io/dataline/server/handlers/DestinationImplementationsHandlerTest.java b/dataline-server/src/test/java/io/dataline/server/handlers/DestinationImplementationsHandlerTest.java index 6e23193872ba..e9c189ffe9d8 100644 --- a/dataline-server/src/test/java/io/dataline/server/handlers/DestinationImplementationsHandlerTest.java +++ b/dataline-server/src/test/java/io/dataline/server/handlers/DestinationImplementationsHandlerTest.java @@ -39,12 +39,11 @@ import io.dataline.api.model.DestinationImplementationUpdate; import io.dataline.api.model.WorkspaceIdRequestBody; import io.dataline.commons.json.Jsons; -import io.dataline.config.ConfigSchema; import io.dataline.config.DestinationConnectionImplementation; import io.dataline.config.DestinationConnectionSpecification; import io.dataline.config.StandardDestination; import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; import io.dataline.server.helpers.DestinationHelpers; import io.dataline.server.helpers.DestinationSpecificationHelpers; @@ -60,7 +59,7 @@ class DestinationImplementationsHandlerTest { - private ConfigPersistence configPersistence; + private ConfigRepository configRepository; private StandardDestination standardDestination; private DestinationConnectionSpecification destinationConnectionSpecification; private DestinationConnectionImplementation destinationConnectionImplementation; @@ -71,7 +70,7 @@ class DestinationImplementationsHandlerTest { @SuppressWarnings("unchecked") @BeforeEach void setUp() throws IOException { - configPersistence = mock(ConfigPersistence.class); + configRepository = mock(ConfigRepository.class); validator = mock(IntegrationSchemaValidation.class); uuidGenerator = mock(Supplier.class); @@ -80,7 +79,7 @@ void setUp() throws IOException { destinationConnectionImplementation = generateDestinationImplementation( destinationConnectionSpecification.getDestinationSpecificationId()); - destinationImplementationsHandler = new DestinationImplementationsHandler(configPersistence, validator, uuidGenerator); + destinationImplementationsHandler = new DestinationImplementationsHandler(configRepository, validator, uuidGenerator); } private JsonNode getTestImplementationJson() throws IOException { @@ -111,23 +110,14 @@ void testCreateDestinationImplementation() when(uuidGenerator.get()) .thenReturn(destinationConnectionImplementation.getDestinationImplementationId()); - when(configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - destinationConnectionImplementation.getDestinationImplementationId().toString(), - DestinationConnectionImplementation.class)) - .thenReturn(destinationConnectionImplementation); + when(configRepository.getDestinationConnectionImplementation(destinationConnectionImplementation.getDestinationImplementationId())) + .thenReturn(destinationConnectionImplementation); - when(configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_SPECIFICATION, - destinationConnectionImplementation.getDestinationSpecificationId().toString(), - DestinationConnectionSpecification.class)) - .thenReturn(destinationConnectionSpecification); + when(configRepository.getDestinationConnectionSpecification(destinationConnectionImplementation.getDestinationSpecificationId())) + .thenReturn(destinationConnectionSpecification); - when(configPersistence.getConfig( - ConfigSchema.STANDARD_DESTINATION, - destinationConnectionSpecification.getDestinationId().toString(), - StandardDestination.class)) - .thenReturn(standardDestination); + when(configRepository.getStandardDestination(destinationConnectionSpecification.getDestinationId())) + .thenReturn(standardDestination); final DestinationImplementationCreate destinationImplementationCreate = new DestinationImplementationCreate() .name(destinationConnectionImplementation.getName()) @@ -150,15 +140,11 @@ void testCreateDestinationImplementation() assertEquals(expectedDestinationImplementationRead, actualDestinationImplementationRead); verify(validator) - .validateDestinationConnectionConfiguration( - destinationConnectionSpecification.getDestinationSpecificationId(), + .validateConfig( + destinationConnectionSpecification, destinationConnectionImplementation.getConfiguration()); - verify(configPersistence) - .writeConfig( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - destinationConnectionImplementation.getDestinationImplementationId().toString(), - destinationConnectionImplementation); + verify(configRepository).writeDestinationConnectionImplementation(destinationConnectionImplementation); } @Test @@ -171,24 +157,15 @@ void testUpdateDestinationImplementation() final DestinationConnectionImplementation expectedDestinationConnectionImplementation = Jsons.clone(destinationConnectionImplementation) .withConfiguration(newConfiguration); - when(configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - destinationConnectionImplementation.getDestinationImplementationId().toString(), - DestinationConnectionImplementation.class)) - .thenReturn(destinationConnectionImplementation) - .thenReturn(expectedDestinationConnectionImplementation); - - when(configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_SPECIFICATION, - destinationConnectionImplementation.getDestinationSpecificationId().toString(), - DestinationConnectionSpecification.class)) - .thenReturn(destinationConnectionSpecification); - - when(configPersistence.getConfig( - ConfigSchema.STANDARD_DESTINATION, - destinationConnectionSpecification.getDestinationId().toString(), - StandardDestination.class)) - .thenReturn(standardDestination); + when(configRepository.getDestinationConnectionImplementation(destinationConnectionImplementation.getDestinationImplementationId())) + .thenReturn(destinationConnectionImplementation) + .thenReturn(expectedDestinationConnectionImplementation); + + when(configRepository.getDestinationConnectionSpecification(destinationConnectionImplementation.getDestinationSpecificationId())) + .thenReturn(destinationConnectionSpecification); + + when(configRepository.getStandardDestination(destinationConnectionSpecification.getDestinationId())) + .thenReturn(standardDestination); final DestinationImplementationUpdate destinationImplementationUpdate = new DestinationImplementationUpdate() .destinationImplementationId(destinationConnectionImplementation.getDestinationImplementationId()) @@ -209,32 +186,19 @@ void testUpdateDestinationImplementation() assertEquals(expectedDestinationImplementationRead, actualDestinationImplementationRead); - verify(configPersistence) - .writeConfig( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - destinationConnectionImplementation.getDestinationImplementationId().toString(), - expectedDestinationConnectionImplementation); + verify(configRepository).writeDestinationConnectionImplementation(expectedDestinationConnectionImplementation); } @Test void testGetDestinationImplementation() throws JsonValidationException, ConfigNotFoundException, IOException { - when(configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - destinationConnectionImplementation.getDestinationImplementationId().toString(), - DestinationConnectionImplementation.class)) - .thenReturn(destinationConnectionImplementation); - - when(configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_SPECIFICATION, - destinationConnectionImplementation.getDestinationSpecificationId().toString(), - DestinationConnectionSpecification.class)) - .thenReturn(destinationConnectionSpecification); - - when(configPersistence.getConfig( - ConfigSchema.STANDARD_DESTINATION, - destinationConnectionSpecification.getDestinationId().toString(), - StandardDestination.class)) - .thenReturn(standardDestination); + when(configRepository.getDestinationConnectionImplementation(destinationConnectionImplementation.getDestinationImplementationId())) + .thenReturn(destinationConnectionImplementation); + + when(configRepository.getDestinationConnectionSpecification(destinationConnectionImplementation.getDestinationSpecificationId())) + .thenReturn(destinationConnectionSpecification); + + when(configRepository.getStandardDestination(destinationConnectionSpecification.getDestinationId())) + .thenReturn(standardDestination); DestinationImplementationRead expectedDestinationImplementationRead = new DestinationImplementationRead() .name(destinationConnectionImplementation.getName()) @@ -257,22 +221,14 @@ void testGetDestinationImplementation() throws JsonValidationException, ConfigNo @Test void testListDestinationImplementationsForWorkspace() throws JsonValidationException, ConfigNotFoundException, IOException { - when(configPersistence.listConfigs( - ConfigSchema.DESTINATION_CONNECTION_IMPLEMENTATION, - DestinationConnectionImplementation.class)) - .thenReturn(Lists.newArrayList(destinationConnectionImplementation)); - - when(configPersistence.getConfig( - ConfigSchema.DESTINATION_CONNECTION_SPECIFICATION, - destinationConnectionImplementation.getDestinationSpecificationId().toString(), - DestinationConnectionSpecification.class)) - .thenReturn(destinationConnectionSpecification); - - when(configPersistence.getConfig( - ConfigSchema.STANDARD_DESTINATION, - destinationConnectionSpecification.getDestinationId().toString(), - StandardDestination.class)) - .thenReturn(standardDestination); + when(configRepository.getDestinationConnectionImplementation(destinationConnectionImplementation.getDestinationImplementationId())) + .thenReturn(destinationConnectionImplementation); + when(configRepository.listDestinationConnectionImplementations()) + .thenReturn(Lists.newArrayList(destinationConnectionImplementation)); + when(configRepository.getDestinationConnectionSpecification(destinationConnectionImplementation.getDestinationSpecificationId())) + .thenReturn(destinationConnectionSpecification); + when(configRepository.getStandardDestination(destinationConnectionSpecification.getDestinationId())) + .thenReturn(standardDestination); DestinationImplementationRead expectedDestinationImplementationRead = new DestinationImplementationRead() .name(destinationConnectionImplementation.getName()) diff --git a/dataline-server/src/test/java/io/dataline/server/handlers/DestinationSpecificationsHandlerTest.java b/dataline-server/src/test/java/io/dataline/server/handlers/DestinationSpecificationsHandlerTest.java index 77ba17711316..c96f403fc224 100644 --- a/dataline-server/src/test/java/io/dataline/server/handlers/DestinationSpecificationsHandlerTest.java +++ b/dataline-server/src/test/java/io/dataline/server/handlers/DestinationSpecificationsHandlerTest.java @@ -31,10 +31,9 @@ import com.google.common.collect.Lists; import io.dataline.api.model.DestinationIdRequestBody; import io.dataline.api.model.DestinationSpecificationRead; -import io.dataline.config.ConfigSchema; import io.dataline.config.DestinationConnectionSpecification; import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; import io.dataline.server.helpers.DestinationSpecificationHelpers; import java.io.IOException; @@ -43,37 +42,29 @@ class DestinationSpecificationsHandlerTest { - private ConfigPersistence configPersistence; + private ConfigRepository configRepository; private DestinationConnectionSpecification destinationConnectionSpecification; private DestinationSpecificationsHandler destinationSpecificationHandler; @BeforeEach void setUp() throws IOException { - configPersistence = mock(ConfigPersistence.class); - destinationConnectionSpecification = - DestinationSpecificationHelpers.generateDestinationSpecification(); - destinationSpecificationHandler = new DestinationSpecificationsHandler(configPersistence); + configRepository = mock(ConfigRepository.class); + destinationConnectionSpecification = DestinationSpecificationHelpers.generateDestinationSpecification(); + destinationSpecificationHandler = new DestinationSpecificationsHandler(configRepository); } @Test void testGetDestinationSpecification() throws JsonValidationException, IOException, ConfigNotFoundException { - when(configPersistence.listConfigs( - ConfigSchema.DESTINATION_CONNECTION_SPECIFICATION, - DestinationConnectionSpecification.class)) - .thenReturn(Lists.newArrayList(destinationConnectionSpecification)); + when(configRepository.listDestinationConnectionSpecifications()) + .thenReturn(Lists.newArrayList(destinationConnectionSpecification)); - DestinationSpecificationRead expectedDestinationSpecificationRead = - new DestinationSpecificationRead(); - expectedDestinationSpecificationRead.setDestinationId( - destinationConnectionSpecification.getDestinationId()); - expectedDestinationSpecificationRead.setDestinationSpecificationId( - destinationConnectionSpecification.getDestinationSpecificationId()); - expectedDestinationSpecificationRead.setConnectionSpecification( - destinationConnectionSpecification.getSpecification()); + DestinationSpecificationRead expectedDestinationSpecificationRead = new DestinationSpecificationRead(); + expectedDestinationSpecificationRead.setDestinationId(destinationConnectionSpecification.getDestinationId()); + expectedDestinationSpecificationRead.setDestinationSpecificationId(destinationConnectionSpecification.getDestinationSpecificationId()); + expectedDestinationSpecificationRead.setConnectionSpecification(destinationConnectionSpecification.getSpecification()); final DestinationIdRequestBody destinationIdRequestBody = new DestinationIdRequestBody(); - destinationIdRequestBody.setDestinationId( - expectedDestinationSpecificationRead.getDestinationId()); + destinationIdRequestBody.setDestinationId(expectedDestinationSpecificationRead.getDestinationId()); final DestinationSpecificationRead actualDestinationSpecificationRead = destinationSpecificationHandler.getDestinationSpecification(destinationIdRequestBody); diff --git a/dataline-server/src/test/java/io/dataline/server/handlers/DestinationsHandlerTest.java b/dataline-server/src/test/java/io/dataline/server/handlers/DestinationsHandlerTest.java index 5af71636723a..fd7eef4eda93 100644 --- a/dataline-server/src/test/java/io/dataline/server/handlers/DestinationsHandlerTest.java +++ b/dataline-server/src/test/java/io/dataline/server/handlers/DestinationsHandlerTest.java @@ -25,7 +25,6 @@ package io.dataline.server.handlers; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,28 +32,26 @@ import io.dataline.api.model.DestinationIdRequestBody; import io.dataline.api.model.DestinationRead; import io.dataline.api.model.DestinationReadList; -import io.dataline.config.ConfigSchema; import io.dataline.config.StandardDestination; import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; import java.io.IOException; -import java.util.Optional; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class DestinationsHandlerTest { - private ConfigPersistence configPersistence; + private ConfigRepository configRepository; private StandardDestination destination; private DestinationsHandler destinationHandler; @BeforeEach void setUp() { - configPersistence = mock(ConfigPersistence.class); + configRepository = mock(ConfigRepository.class); destination = generateDestination(); - destinationHandler = new DestinationsHandler(configPersistence); + destinationHandler = new DestinationsHandler(configRepository); } private StandardDestination generateDestination() { @@ -68,14 +65,9 @@ private StandardDestination generateDestination() { @Test void testListDestinations() throws JsonValidationException, IOException, ConfigNotFoundException { final StandardDestination destination2 = generateDestination(); - configPersistence.writeConfig( - ConfigSchema.STANDARD_DESTINATION, - destination2.getDestinationId().toString(), - destination2); - when(configPersistence.listConfigs( - ConfigSchema.STANDARD_DESTINATION, StandardDestination.class)) - .thenReturn(Lists.newArrayList(destination, destination2)); + when(configRepository.listStandardDestinations()) + .thenReturn(Lists.newArrayList(destination, destination2)); DestinationRead expectedDestinationRead1 = new DestinationRead() .destinationId(destination.getDestinationId()) @@ -87,30 +79,15 @@ void testListDestinations() throws JsonValidationException, IOException, ConfigN final DestinationReadList actualDestinationReadList = destinationHandler.listDestinations(); - final Optional actualDestinationRead1 = actualDestinationReadList.getDestinations() - .stream() - .filter( - destinationRead -> destinationRead.getDestinationId().equals(destination.getDestinationId())) - .findFirst(); - final Optional actualDestinationRead2 = actualDestinationReadList.getDestinations() - .stream() - .filter( - destinationRead -> destinationRead.getDestinationId().equals(destination2.getDestinationId())) - .findFirst(); - - assertTrue(actualDestinationRead1.isPresent()); - assertEquals(expectedDestinationRead1, actualDestinationRead1.get()); - assertTrue(actualDestinationRead2.isPresent()); - assertEquals(expectedDestinationRead2, actualDestinationRead2.get()); + assertEquals( + Lists.newArrayList(expectedDestinationRead1, expectedDestinationRead2), + actualDestinationReadList.getDestinations()); } @Test void testGetDestination() throws JsonValidationException, ConfigNotFoundException, IOException { - when(configPersistence.getConfig( - ConfigSchema.STANDARD_DESTINATION, - destination.getDestinationId().toString(), - StandardDestination.class)) - .thenReturn(destination); + when(configRepository.getStandardDestination(destination.getDestinationId())) + .thenReturn(destination); DestinationRead expectedDestinationRead = new DestinationRead() .destinationId(destination.getDestinationId()) diff --git a/dataline-server/src/test/java/io/dataline/server/handlers/SourceImplementationsHandlerTest.java b/dataline-server/src/test/java/io/dataline/server/handlers/SourceImplementationsHandlerTest.java index 36107f033043..535267a51901 100644 --- a/dataline-server/src/test/java/io/dataline/server/handlers/SourceImplementationsHandlerTest.java +++ b/dataline-server/src/test/java/io/dataline/server/handlers/SourceImplementationsHandlerTest.java @@ -43,13 +43,12 @@ import io.dataline.api.model.SourceImplementationUpdate; import io.dataline.api.model.WorkspaceIdRequestBody; import io.dataline.commons.json.Jsons; -import io.dataline.config.ConfigSchema; import io.dataline.config.SourceConnectionImplementation; import io.dataline.config.SourceConnectionSpecification; import io.dataline.config.StandardSource; import io.dataline.config.StandardSync; import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; import io.dataline.server.helpers.ConnectionHelpers; import io.dataline.server.helpers.SourceHelpers; @@ -65,7 +64,7 @@ class SourceImplementationsHandlerTest { - private ConfigPersistence configPersistence; + private ConfigRepository configRepository; private StandardSource standardSource; private SourceConnectionSpecification sourceConnectionSpecification; private SourceConnectionImplementation sourceConnectionImplementation; @@ -77,7 +76,7 @@ class SourceImplementationsHandlerTest { @SuppressWarnings("unchecked") @BeforeEach void setUp() throws IOException { - configPersistence = mock(ConfigPersistence.class); + configRepository = mock(ConfigRepository.class); validator = mock(IntegrationSchemaValidation.class); connectionsHandler = mock(ConnectionsHandler.class); uuidGenerator = mock(Supplier.class); @@ -87,7 +86,7 @@ void setUp() throws IOException { sourceConnectionImplementation = SourceImplementationHelpers.generateSourceImplementation(sourceConnectionSpecification.getSourceSpecificationId()); - sourceImplementationsHandler = new SourceImplementationsHandler(configPersistence, validator, connectionsHandler, uuidGenerator); + sourceImplementationsHandler = new SourceImplementationsHandler(configRepository, validator, connectionsHandler, uuidGenerator); } @Test @@ -96,23 +95,14 @@ void testCreateSourceImplementation() when(uuidGenerator.get()) .thenReturn(sourceConnectionImplementation.getSourceImplementationId()); - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceConnectionImplementation.getSourceImplementationId().toString(), - SourceConnectionImplementation.class)) - .thenReturn(sourceConnectionImplementation); + when(configRepository.getSourceConnectionImplementation(sourceConnectionImplementation.getSourceImplementationId())) + .thenReturn(sourceConnectionImplementation); - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_SPECIFICATION, - sourceConnectionSpecification.getSourceSpecificationId().toString(), - SourceConnectionSpecification.class)) - .thenReturn(sourceConnectionSpecification); + when(configRepository.getSourceConnectionSpecification(sourceConnectionSpecification.getSourceSpecificationId())) + .thenReturn(sourceConnectionSpecification); - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SOURCE, - sourceConnectionSpecification.getSourceId().toString(), - StandardSource.class)) - .thenReturn(standardSource); + when(configRepository.getStandardSource(sourceConnectionSpecification.getSourceId())) + .thenReturn(standardSource); final SourceImplementationCreate sourceImplementationCreate = new SourceImplementationCreate() .name(sourceConnectionImplementation.getName()) @@ -130,15 +120,11 @@ void testCreateSourceImplementation() assertEquals(expectedSourceImplementationRead, actualSourceImplementationRead); verify(validator) - .validateSourceConnectionConfiguration( - sourceConnectionSpecification.getSourceSpecificationId(), + .validateConfig( + sourceConnectionSpecification, sourceConnectionImplementation.getConfiguration()); - verify(configPersistence) - .writeConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceConnectionImplementation.getSourceImplementationId().toString(), - sourceConnectionImplementation); + verify(configRepository).writeSourceConnectionImplementation(sourceConnectionImplementation); } @Test @@ -150,24 +136,15 @@ void testUpdateSourceImplementation() throws JsonValidationException, ConfigNotF .withConfiguration(newConfiguration) .withTombstone(false); - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceConnectionImplementation.getSourceImplementationId().toString(), - SourceConnectionImplementation.class)) - .thenReturn(sourceConnectionImplementation) - .thenReturn(expectedSourceConnectionImplementation); - - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_SPECIFICATION, - sourceConnectionSpecification.getSourceSpecificationId().toString(), - SourceConnectionSpecification.class)) - .thenReturn(sourceConnectionSpecification); - - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SOURCE, - sourceConnectionSpecification.getSourceId().toString(), - StandardSource.class)) - .thenReturn(standardSource); + when(configRepository.getSourceConnectionImplementation(sourceConnectionImplementation.getSourceImplementationId())) + .thenReturn(sourceConnectionImplementation) + .thenReturn(expectedSourceConnectionImplementation); + + when(configRepository.getSourceConnectionSpecification(sourceConnectionSpecification.getSourceSpecificationId())) + .thenReturn(sourceConnectionSpecification); + + when(configRepository.getStandardSource(sourceConnectionSpecification.getSourceId())) + .thenReturn(standardSource); final SourceImplementationUpdate sourceImplementationUpdate = new SourceImplementationUpdate() .name(sourceConnectionImplementation.getName()) @@ -183,32 +160,19 @@ void testUpdateSourceImplementation() throws JsonValidationException, ConfigNotF assertEquals(expectedSourceImplementationRead, actualSourceImplementationRead); - verify(configPersistence) - .writeConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceConnectionImplementation.getSourceImplementationId().toString(), - expectedSourceConnectionImplementation); + verify(configRepository).writeSourceConnectionImplementation(expectedSourceConnectionImplementation); } @Test void testGetSourceImplementation() throws JsonValidationException, ConfigNotFoundException, IOException { - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceConnectionImplementation.getSourceImplementationId().toString(), - SourceConnectionImplementation.class)) - .thenReturn(sourceConnectionImplementation); - - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_SPECIFICATION, - sourceConnectionSpecification.getSourceSpecificationId().toString(), - SourceConnectionSpecification.class)) - .thenReturn(sourceConnectionSpecification); - - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SOURCE, - sourceConnectionSpecification.getSourceId().toString(), - StandardSource.class)) - .thenReturn(standardSource); + when(configRepository.getSourceConnectionImplementation(sourceConnectionImplementation.getSourceImplementationId())) + .thenReturn(sourceConnectionImplementation); + + when(configRepository.getSourceConnectionSpecification(sourceConnectionSpecification.getSourceSpecificationId())) + .thenReturn(sourceConnectionSpecification); + + when(configRepository.getStandardSource(sourceConnectionSpecification.getSourceId())) + .thenReturn(standardSource); SourceImplementationRead expectedSourceImplementationRead = SourceImplementationHelpers.getSourceImplementationRead(sourceConnectionImplementation, standardSource); @@ -225,22 +189,14 @@ void testGetSourceImplementation() throws JsonValidationException, ConfigNotFoun @Test void testListSourceImplementationsForWorkspace() throws JsonValidationException, ConfigNotFoundException, IOException { - when(configPersistence.listConfigs( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - SourceConnectionImplementation.class)) - .thenReturn(Lists.newArrayList(sourceConnectionImplementation)); - - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_SPECIFICATION, - sourceConnectionSpecification.getSourceSpecificationId().toString(), - SourceConnectionSpecification.class)) - .thenReturn(sourceConnectionSpecification); - - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SOURCE, - sourceConnectionSpecification.getSourceId().toString(), - StandardSource.class)) - .thenReturn(standardSource); + when(configRepository.getSourceConnectionImplementation(sourceConnectionImplementation.getSourceImplementationId())) + .thenReturn(sourceConnectionImplementation); + when(configRepository.listSourceConnectionImplementations()) + .thenReturn(Lists.newArrayList(sourceConnectionImplementation)); + when(configRepository.getSourceConnectionSpecification(sourceConnectionSpecification.getSourceSpecificationId())) + .thenReturn(sourceConnectionSpecification); + when(configRepository.getStandardSource(sourceConnectionSpecification.getSourceId())) + .thenReturn(standardSource); SourceImplementationRead expectedSourceImplementationRead = SourceImplementationHelpers.getSourceImplementationRead(sourceConnectionImplementation, standardSource); @@ -262,24 +218,15 @@ void testDeleteSourceImplementation() throws JsonValidationException, ConfigNotF final SourceConnectionImplementation expectedSourceConnectionImplementation = Jsons.clone(sourceConnectionImplementation) .withTombstone(true); - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceConnectionImplementation.getSourceImplementationId().toString(), - SourceConnectionImplementation.class)) - .thenReturn(sourceConnectionImplementation) - .thenReturn(expectedSourceConnectionImplementation); - - when(configPersistence.getConfig( - ConfigSchema.SOURCE_CONNECTION_SPECIFICATION, - sourceConnectionSpecification.getSourceSpecificationId().toString(), - SourceConnectionSpecification.class)) - .thenReturn(sourceConnectionSpecification); - - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SOURCE, - sourceConnectionSpecification.getSourceId().toString(), - StandardSource.class)) - .thenReturn(standardSource); + when(configRepository.getSourceConnectionImplementation(sourceConnectionImplementation.getSourceImplementationId())) + .thenReturn(sourceConnectionImplementation) + .thenReturn(expectedSourceConnectionImplementation); + + when(configRepository.getSourceConnectionSpecification(sourceConnectionSpecification.getSourceSpecificationId())) + .thenReturn(sourceConnectionSpecification); + + when(configRepository.getStandardSource(sourceConnectionSpecification.getSourceId())) + .thenReturn(standardSource); final SourceImplementationIdRequestBody sourceImplementationIdRequestBody = new SourceImplementationIdRequestBody() .sourceImplementationId(sourceConnectionImplementation.getSourceImplementationId()); @@ -295,11 +242,7 @@ void testDeleteSourceImplementation() throws JsonValidationException, ConfigNotF when(connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)).thenReturn(connectionReadList); sourceImplementationsHandler.deleteSourceImplementation(sourceImplementationIdRequestBody); - verify(configPersistence) - .writeConfig( - ConfigSchema.SOURCE_CONNECTION_IMPLEMENTATION, - sourceConnectionImplementation.getSourceImplementationId().toString(), - expectedSourceConnectionImplementation); + verify(configRepository).writeSourceConnectionImplementation(expectedSourceConnectionImplementation); final ConnectionUpdate expectedConnectionUpdate = new ConnectionUpdate() .connectionId(connectionRead.getConnectionId()) diff --git a/dataline-server/src/test/java/io/dataline/server/handlers/SourceSpecificationsHandlerTest.java b/dataline-server/src/test/java/io/dataline/server/handlers/SourceSpecificationsHandlerTest.java index 7c96ea12d1af..cc61c1bac143 100644 --- a/dataline-server/src/test/java/io/dataline/server/handlers/SourceSpecificationsHandlerTest.java +++ b/dataline-server/src/test/java/io/dataline/server/handlers/SourceSpecificationsHandlerTest.java @@ -31,10 +31,9 @@ import com.google.common.collect.Lists; import io.dataline.api.model.SourceIdRequestBody; import io.dataline.api.model.SourceSpecificationRead; -import io.dataline.config.ConfigSchema; import io.dataline.config.SourceConnectionSpecification; import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; import io.dataline.server.helpers.SourceSpecificationHelpers; import java.io.IOException; @@ -43,36 +42,30 @@ class SourceSpecificationsHandlerTest { - private ConfigPersistence configPersistence; + private ConfigRepository configRepository; private SourceConnectionSpecification sourceConnectionSpecification; private SourceSpecificationsHandler sourceSpecificationHandler; @BeforeEach void setUp() throws IOException { - configPersistence = mock(ConfigPersistence.class); + configRepository = mock(ConfigRepository.class); sourceConnectionSpecification = SourceSpecificationHelpers.generateSourceSpecification(); - sourceSpecificationHandler = new SourceSpecificationsHandler(configPersistence); + sourceSpecificationHandler = new SourceSpecificationsHandler(configRepository); } @Test void testGetSourceSpecification() throws JsonValidationException, IOException, ConfigNotFoundException { - when(configPersistence.listConfigs( - ConfigSchema.SOURCE_CONNECTION_SPECIFICATION, - SourceConnectionSpecification.class)) - .thenReturn(Lists.newArrayList(sourceConnectionSpecification)); + when(configRepository.listSourceConnectionSpecifications()) + .thenReturn(Lists.newArrayList(sourceConnectionSpecification)); - SourceSpecificationRead expectedSourceSpecificationRead = new SourceSpecificationRead(); - expectedSourceSpecificationRead.setSourceId(sourceConnectionSpecification.getSourceId()); - expectedSourceSpecificationRead.setSourceSpecificationId( - sourceConnectionSpecification.getSourceSpecificationId()); - expectedSourceSpecificationRead.setConnectionSpecification( - sourceConnectionSpecification.getSpecification()); + SourceSpecificationRead expectedSourceSpecificationRead = new SourceSpecificationRead() + .sourceId(sourceConnectionSpecification.getSourceId()) + .sourceSpecificationId(sourceConnectionSpecification.getSourceSpecificationId()) + .connectionSpecification(sourceConnectionSpecification.getSpecification()); - final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody(); - sourceIdRequestBody.setSourceId(expectedSourceSpecificationRead.getSourceId()); + final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(expectedSourceSpecificationRead.getSourceId()); - final SourceSpecificationRead actualSourceSpecificationRead = - sourceSpecificationHandler.getSourceSpecification(sourceIdRequestBody); + final SourceSpecificationRead actualSourceSpecificationRead = sourceSpecificationHandler.getSourceSpecification(sourceIdRequestBody); assertEquals(expectedSourceSpecificationRead, actualSourceSpecificationRead); } diff --git a/dataline-server/src/test/java/io/dataline/server/handlers/SourcesHandlerTest.java b/dataline-server/src/test/java/io/dataline/server/handlers/SourcesHandlerTest.java index 5c2421703859..92ea3236e0d2 100644 --- a/dataline-server/src/test/java/io/dataline/server/handlers/SourcesHandlerTest.java +++ b/dataline-server/src/test/java/io/dataline/server/handlers/SourcesHandlerTest.java @@ -25,7 +25,6 @@ package io.dataline.server.handlers; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,28 +32,26 @@ import io.dataline.api.model.SourceIdRequestBody; import io.dataline.api.model.SourceRead; import io.dataline.api.model.SourceReadList; -import io.dataline.config.ConfigSchema; import io.dataline.config.StandardSource; import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; import java.io.IOException; -import java.util.Optional; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class SourcesHandlerTest { - private ConfigPersistence configPersistence; + private ConfigRepository configRepository; private StandardSource source; private SourcesHandler sourceHandler; @BeforeEach void setUp() { - configPersistence = mock(ConfigPersistence.class); + configRepository = mock(ConfigRepository.class); source = generateSource(); - sourceHandler = new SourcesHandler(configPersistence); + sourceHandler = new SourcesHandler(configRepository); } private StandardSource generateSource() { @@ -68,10 +65,8 @@ private StandardSource generateSource() { @Test void testListSources() throws JsonValidationException, IOException, ConfigNotFoundException { final StandardSource source2 = generateSource(); - configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE, source2.getSourceId().toString(), source2); - when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE, StandardSource.class)) - .thenReturn(Lists.newArrayList(source, source2)); + when(configRepository.listStandardSources()).thenReturn(Lists.newArrayList(source, source2)); SourceRead expectedSourceRead1 = new SourceRead() .sourceId(source.getSourceId()) @@ -83,28 +78,13 @@ void testListSources() throws JsonValidationException, IOException, ConfigNotFou final SourceReadList actualSourceReadList = sourceHandler.listSources(); - final Optional actualSourceRead1 = actualSourceReadList.getSources() - .stream() - .filter(sourceRead -> sourceRead.getSourceId().equals(source.getSourceId())) - .findFirst(); - final Optional actualSourceRead2 = actualSourceReadList.getSources() - .stream() - .filter(sourceRead -> sourceRead.getSourceId().equals(source2.getSourceId())) - .findFirst(); - - assertTrue(actualSourceRead1.isPresent()); - assertEquals(expectedSourceRead1, actualSourceRead1.get()); - assertTrue(actualSourceRead2.isPresent()); - assertEquals(expectedSourceRead2, actualSourceRead2.get()); + assertEquals(Lists.newArrayList(expectedSourceRead1, expectedSourceRead2), actualSourceReadList.getSources()); } @Test void testGetSource() throws JsonValidationException, ConfigNotFoundException, IOException { - when(configPersistence.getConfig( - ConfigSchema.STANDARD_SOURCE, - source.getSourceId().toString(), - StandardSource.class)) - .thenReturn(source); + when(configRepository.getStandardSource(source.getSourceId())) + .thenReturn(source); SourceRead expectedSourceRead = new SourceRead() .sourceId(source.getSourceId()) diff --git a/dataline-server/src/test/java/io/dataline/server/handlers/WebBackendConnectionsHandlerTest.java b/dataline-server/src/test/java/io/dataline/server/handlers/WebBackendConnectionsHandlerTest.java index cee850b5b1de..6ef3b3676e90 100644 --- a/dataline-server/src/test/java/io/dataline/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/dataline-server/src/test/java/io/dataline/server/handlers/WebBackendConnectionsHandlerTest.java @@ -44,6 +44,8 @@ import io.dataline.config.SourceConnectionImplementation; import io.dataline.config.StandardSource; import io.dataline.config.StandardSync; +import io.dataline.config.persistence.ConfigNotFoundException; +import io.dataline.config.persistence.JsonValidationException; import io.dataline.server.helpers.ConnectionHelpers; import io.dataline.server.helpers.SourceHelpers; import io.dataline.server.helpers.SourceImplementationHelpers; @@ -64,7 +66,7 @@ class WebBackendConnectionsHandlerTest { private WbConnectionRead expected; @BeforeEach - public void setup() throws IOException { + public void setup() throws IOException, JsonValidationException, ConfigNotFoundException { connectionsHandler = mock(ConnectionsHandler.class); SourceImplementationsHandler sourceImplementationsHandler = mock(SourceImplementationsHandler.class); JobHistoryHandler jobHistoryHandler = mock(JobHistoryHandler.class); @@ -112,7 +114,7 @@ public void setup() throws IOException { } @Test - public void testWebBackendListConnectionsForWorkspace() { + public void testWebBackendListConnectionsForWorkspace() throws ConfigNotFoundException, IOException, JsonValidationException { final WorkspaceIdRequestBody workspaceIdRequestBody = new WorkspaceIdRequestBody(); workspaceIdRequestBody.setWorkspaceId(sourceImplementationRead.getWorkspaceId()); @@ -126,7 +128,7 @@ public void testWebBackendListConnectionsForWorkspace() { } @Test - public void testWebBackendGetConnection() { + public void testWebBackendGetConnection() throws ConfigNotFoundException, IOException, JsonValidationException { final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody(); connectionIdRequestBody.setConnectionId(connectionRead.getConnectionId()); diff --git a/dataline-server/src/test/java/io/dataline/server/handlers/WorkspacesHandlerTest.java b/dataline-server/src/test/java/io/dataline/server/handlers/WorkspacesHandlerTest.java index 9e2eee8fb566..dcc0031032e6 100644 --- a/dataline-server/src/test/java/io/dataline/server/handlers/WorkspacesHandlerTest.java +++ b/dataline-server/src/test/java/io/dataline/server/handlers/WorkspacesHandlerTest.java @@ -33,10 +33,9 @@ import io.dataline.api.model.WorkspaceIdRequestBody; import io.dataline.api.model.WorkspaceRead; import io.dataline.api.model.WorkspaceUpdate; -import io.dataline.config.ConfigSchema; import io.dataline.config.StandardWorkspace; import io.dataline.config.persistence.ConfigNotFoundException; -import io.dataline.config.persistence.ConfigPersistence; +import io.dataline.config.persistence.ConfigRepository; import io.dataline.config.persistence.JsonValidationException; import io.dataline.config.persistence.PersistenceConstants; import java.io.IOException; @@ -46,15 +45,15 @@ class WorkspacesHandlerTest { - private ConfigPersistence configPersistence; + private ConfigRepository configRepository; private StandardWorkspace workspace; private WorkspacesHandler workspacesHandler; @BeforeEach void setUp() { - configPersistence = mock(ConfigPersistence.class); + configRepository = mock(ConfigRepository.class); workspace = generateWorkspace(); - workspacesHandler = new WorkspacesHandler(configPersistence); + workspacesHandler = new WorkspacesHandler(configRepository); } private StandardWorkspace generateWorkspace() { @@ -70,11 +69,8 @@ private StandardWorkspace generateWorkspace() { @Test void testGetWorkspace() throws JsonValidationException, ConfigNotFoundException, IOException { - when(configPersistence.getConfig( - ConfigSchema.STANDARD_WORKSPACE, - workspace.getWorkspaceId().toString(), - StandardWorkspace.class)) - .thenReturn(workspace); + when(configRepository.getStandardWorkspace(workspace.getWorkspaceId())) + .thenReturn(workspace); final WorkspaceIdRequestBody workspaceIdRequestBody = new WorkspaceIdRequestBody().workspaceId(workspace.getWorkspaceId()); @@ -89,11 +85,8 @@ void testGetWorkspace() throws JsonValidationException, ConfigNotFoundException, @Test void testGetWorkspaceBySlug() throws JsonValidationException, ConfigNotFoundException, IOException { - when(configPersistence.getConfig( - ConfigSchema.STANDARD_WORKSPACE, - workspace.getWorkspaceId().toString(), - StandardWorkspace.class)) - .thenReturn(workspace); + when(configRepository.getStandardWorkspace(workspace.getWorkspaceId())) + .thenReturn(workspace); final SlugRequestBody slugRequestBody = new SlugRequestBody().slug("default"); @@ -126,12 +119,9 @@ void testUpdateWorkspace() throws JsonValidationException, ConfigNotFoundExcepti .withNews(false) .withInitialSetupComplete(true); - when(configPersistence.getConfig( - ConfigSchema.STANDARD_WORKSPACE, - workspace.getWorkspaceId().toString(), - StandardWorkspace.class)) - .thenReturn(workspace) - .thenReturn(expectedWorkspace); + when(configRepository.getStandardWorkspace(workspace.getWorkspaceId())) + .thenReturn(workspace) + .thenReturn(expectedWorkspace); final WorkspaceRead actualWorkspaceRead = workspacesHandler.updateWorkspace(workspaceUpdate); @@ -141,11 +131,7 @@ void testUpdateWorkspace() throws JsonValidationException, ConfigNotFoundExcepti .slug("default") .initialSetupComplete(true); - verify(configPersistence) - .writeConfig( - ConfigSchema.STANDARD_WORKSPACE, - expectedWorkspace.getWorkspaceId().toString(), - expectedWorkspace); + verify(configRepository).writeStandardWorkspace(expectedWorkspace); assertEquals(expectedWorkspaceRead, actualWorkspaceRead); }