diff --git a/README.md b/README.md index c206158..fee5aaa 100644 --- a/README.md +++ b/README.md @@ -100,12 +100,12 @@ properties.put("clientSecret", "${clientSecret}"); ### Connection settings See this page on available [connection settings][connection settings]. -These settings can be configured in properties by using the prefix `serverSetting.` +These settings can be configured in properties by using the prefix `querySetting.` For example, to control locale set the following property: ```java -properties.put("serverSetting.lc_time", "en_US"); +properties.put("querySetting.lc_time", "en_US"); ``` --- diff --git a/src/main/java/com/salesforce/datacloud/jdbc/core/HyperConnectionSettings.java b/src/main/java/com/salesforce/datacloud/jdbc/core/ConnectionQuerySettings.java similarity index 52% rename from src/main/java/com/salesforce/datacloud/jdbc/core/HyperConnectionSettings.java rename to src/main/java/com/salesforce/datacloud/jdbc/core/ConnectionQuerySettings.java index e25ecf4..c23287c 100644 --- a/src/main/java/com/salesforce/datacloud/jdbc/core/HyperConnectionSettings.java +++ b/src/main/java/com/salesforce/datacloud/jdbc/core/ConnectionQuerySettings.java @@ -16,25 +16,33 @@ package com.salesforce.datacloud.jdbc.core; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Properties; -import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.val; @AllArgsConstructor(access = AccessLevel.PRIVATE) -public class HyperConnectionSettings { - private static final String HYPER_SETTING = "serverSetting."; +public class ConnectionQuerySettings { + private static final String HYPER_SETTING = "querySetting."; + private static final String HYPER_LEGACY_SETTING = "serverSetting."; private final Map settings; - public static HyperConnectionSettings of(Properties properties) { - val result = properties.entrySet().stream() - .filter(e -> e.getKey().toString().startsWith(HYPER_SETTING)) - .collect( - Collectors.toMap(e -> e.getKey().toString().substring(HYPER_SETTING.length()), e -> e.getValue() - .toString())); - return new HyperConnectionSettings(result); + public static ConnectionQuerySettings of(Properties properties) { + Map settings = new HashMap<>(); + for (val e : properties.entrySet()) { + if (e.getKey().toString().startsWith(HYPER_SETTING)) { + settings.put( + e.getKey().toString().substring(HYPER_SETTING.length()), + e.getValue().toString()); + } else if (e.getKey().toString().startsWith(HYPER_LEGACY_SETTING)) { + settings.put( + e.getKey().toString().substring(HYPER_LEGACY_SETTING.length()), + e.getValue().toString()); + } + } + return new ConnectionQuerySettings(settings); } public Map getSettings() { diff --git a/src/main/java/com/salesforce/datacloud/jdbc/core/HyperGrpcClientExecutor.java b/src/main/java/com/salesforce/datacloud/jdbc/core/HyperGrpcClientExecutor.java index 36467ad..f708791 100644 --- a/src/main/java/com/salesforce/datacloud/jdbc/core/HyperGrpcClientExecutor.java +++ b/src/main/java/com/salesforce/datacloud/jdbc/core/HyperGrpcClientExecutor.java @@ -67,7 +67,7 @@ public static HyperGrpcClientExecutor of(@NonNull ManagedChannelBuilder build throws SQLException { val client = HyperGrpcClientExecutor.builder(); - val settings = HyperConnectionSettings.of(properties).getSettings(); + val settings = ConnectionQuerySettings.of(properties).getSettings(); if (!settings.isEmpty()) { client.settingsQueryParams( QueryParam.newBuilder().putAllSettings(settings).build()); diff --git a/src/test/java/com/salesforce/datacloud/jdbc/core/ConnectionSettingsTest.java b/src/test/java/com/salesforce/datacloud/jdbc/core/ConnectionQuerySettingsTest.java similarity index 62% rename from src/test/java/com/salesforce/datacloud/jdbc/core/ConnectionSettingsTest.java rename to src/test/java/com/salesforce/datacloud/jdbc/core/ConnectionQuerySettingsTest.java index 1cc6276..e3b1878 100644 --- a/src/test/java/com/salesforce/datacloud/jdbc/core/ConnectionSettingsTest.java +++ b/src/test/java/com/salesforce/datacloud/jdbc/core/ConnectionQuerySettingsTest.java @@ -19,28 +19,35 @@ import com.google.common.collect.Maps; import com.salesforce.datacloud.jdbc.hyper.HyperTestBase; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; import lombok.SneakyThrows; import lombok.val; import org.junit.jupiter.api.Test; -public class ConnectionSettingsTest extends HyperTestBase { +public class ConnectionQuerySettingsTest extends HyperTestBase { @Test @SneakyThrows - public void testHyperRespectsConnectionSetting() { + public void testLegacyQuerySetting() { val settings = Maps.immutableEntry("serverSetting.date_style", "YMD"); - val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); assertWithStatement( statement -> { - val result = statement.executeQuery("SELECT CURRENT_DATE"); + val result = statement.executeQuery("SHOW date_style"); result.next(); + assertThat(result.getString(1)).isEqualTo("ISO, YMD"); + }, + settings); + } - val expected = LocalDate.parse(result.getDate(1).toString(), formatter); - val actual = result.getDate(1); + @Test + @SneakyThrows + public void testQuerySetting() { + val settings = Maps.immutableEntry("querySetting.date_style", "YMD"); - assertThat(actual.toString()).isEqualTo(expected.toString()); + assertWithStatement( + statement -> { + val result = statement.executeQuery("SHOW date_style"); + result.next(); + assertThat(result.getString(1)).isEqualTo("ISO, YMD"); }, settings); } diff --git a/src/test/java/com/salesforce/datacloud/jdbc/core/HyperConnectionSettingsTest.java b/src/test/java/com/salesforce/datacloud/jdbc/core/HyperConnectionSettingsTest.java index ebb5686..65191e9 100644 --- a/src/test/java/com/salesforce/datacloud/jdbc/core/HyperConnectionSettingsTest.java +++ b/src/test/java/com/salesforce/datacloud/jdbc/core/HyperConnectionSettingsTest.java @@ -32,7 +32,7 @@ import org.junit.jupiter.api.Test; class HyperConnectionSettingsTest extends HyperGrpcTestBase { - private static final String HYPER_SETTING = "serverSetting."; + private static final String HYPER_SETTING = "querySetting."; @Test void testGetSettingWithCorrectPrefix() { @@ -40,8 +40,8 @@ void testGetSettingWithCorrectPrefix() { Properties properties = new Properties(); properties.setProperty(HYPER_SETTING + "lc_time", "en_US"); properties.setProperty("username", "alice"); - HyperConnectionSettings hyperConnectionSettings = HyperConnectionSettings.of(properties); - assertThat(hyperConnectionSettings.getSettings()).containsExactlyInAnyOrderEntriesOf(expected); + ConnectionQuerySettings connectionQuerySettings = ConnectionQuerySettings.of(properties); + assertThat(connectionQuerySettings.getSettings()).containsExactlyInAnyOrderEntriesOf(expected); } @Test @@ -50,16 +50,16 @@ void testGetSettingReturnEmptyResultSet() { Properties properties = new Properties(); properties.setProperty("c_time", "en_US"); properties.setProperty("username", "alice"); - HyperConnectionSettings hyperConnectionSettings = HyperConnectionSettings.of(properties); - assertThat(hyperConnectionSettings.getSettings()).containsExactlyInAnyOrderEntriesOf(expected); + ConnectionQuerySettings connectionQuerySettings = ConnectionQuerySettings.of(properties); + assertThat(connectionQuerySettings.getSettings()).containsExactlyInAnyOrderEntriesOf(expected); } @Test void testGetSettingWithEmptyProperties() { Map expected = ImmutableMap.of(); Properties properties = new Properties(); - HyperConnectionSettings hyperConnectionSettings = HyperConnectionSettings.of(properties); - assertThat(hyperConnectionSettings.getSettings()).containsExactlyInAnyOrderEntriesOf(expected); + ConnectionQuerySettings connectionQuerySettings = ConnectionQuerySettings.of(properties); + assertThat(connectionQuerySettings.getSettings()).containsExactlyInAnyOrderEntriesOf(expected); } @SneakyThrows diff --git a/src/test/java/com/salesforce/datacloud/jdbc/hyper/HyperServerProcess.java b/src/test/java/com/salesforce/datacloud/jdbc/hyper/HyperServerProcess.java new file mode 100644 index 0000000..6713919 --- /dev/null +++ b/src/test/java/com/salesforce/datacloud/jdbc/hyper/HyperServerProcess.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2024, Salesforce, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.salesforce.datacloud.jdbc.hyper; + +import static java.util.Objects.requireNonNull; + +import java.io.*; +import java.nio.file.Paths; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.regex.Pattern; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +import org.junit.jupiter.api.Assertions; + +@Slf4j +public class HyperServerProcess { + private static final Pattern PORT_PATTERN = Pattern.compile(".*gRPC listening on 127.0.0.1:([0-9]+).*"); + + private final Process hyperProcess; + private final ExecutorService hyperMonitors; + private Integer port; + + @SneakyThrows + public HyperServerProcess() { + log.info("starting hyperd, this might take a few seconds"); + + val executable = new File("./target/hyper/hyperd"); + val properties = Paths.get(requireNonNull(HyperTestBase.class.getResource("/hyper.yaml")) + .toURI()) + .toFile(); + + if (!executable.exists()) { + Assertions.fail("hyperd executable couldn't be found, have you run mvn process-test-resources? expected=" + + executable.getAbsolutePath()); + } + + hyperProcess = new ProcessBuilder() + .command(executable.getAbsolutePath(), "--config", properties.getAbsolutePath(), "--no-password", "run") + .start(); + + // Wait until process is listening and extract port on which it is listening + val latch = new CountDownLatch(1); + hyperMonitors = Executors.newFixedThreadPool(2); + hyperMonitors.execute(() -> logStream(hyperProcess.getErrorStream(), log::error)); + hyperMonitors.execute(() -> logStream(hyperProcess.getInputStream(), line -> { + log.info(line); + val matcher = PORT_PATTERN.matcher(line); + if (matcher.matches()) { + port = Integer.valueOf(matcher.group(1)); + latch.countDown(); + } + })); + + if (!latch.await(30, TimeUnit.SECONDS)) { + Assertions.fail("failed to start instance of hyper within 30 seconds"); + } + } + + @SneakyThrows + void shutdown() throws InterruptedException { + if (hyperProcess != null && hyperProcess.isAlive()) { + log.info("destroy hyper process"); + hyperProcess.destroy(); + hyperProcess.waitFor(); + } + + log.info("shutdown hyper monitors"); + hyperMonitors.shutdown(); + } + + int getPort() { + return port; + } + + boolean isHealthy() { + return hyperProcess != null && hyperProcess.isAlive(); + } + + private static void logStream(InputStream inputStream, Consumer consumer) { + try (val reader = new BufferedReader(new BufferedReader(new InputStreamReader(inputStream)))) { + String line; + while ((line = reader.readLine()) != null) { + consumer.accept("hyperd - " + line); + } + } catch (IOException e) { + log.warn("Caught exception while consuming log stream, it probably closed", e); + } catch (Exception e) { + log.error("Caught unexpected exception", e); + } + } +} diff --git a/src/test/java/com/salesforce/datacloud/jdbc/hyper/HyperTestBase.java b/src/test/java/com/salesforce/datacloud/jdbc/hyper/HyperTestBase.java index 9a379b1..2f9d718 100644 --- a/src/test/java/com/salesforce/datacloud/jdbc/hyper/HyperTestBase.java +++ b/src/test/java/com/salesforce/datacloud/jdbc/hyper/HyperTestBase.java @@ -15,7 +15,6 @@ */ package com.salesforce.datacloud.jdbc.hyper; -import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; import com.google.common.collect.ImmutableMap; @@ -23,21 +22,10 @@ import com.salesforce.datacloud.jdbc.core.DataCloudStatement; import com.salesforce.datacloud.jdbc.interceptor.AuthorizationHeaderInterceptor; import io.grpc.ManagedChannelBuilder; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.file.Paths; import java.sql.ResultSet; import java.util.Map; import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import lombok.val; @@ -52,7 +40,7 @@ @Slf4j @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class HyperTestBase { - private static final String LISTENING = "gRPC listening on"; + private static HyperServerProcess instance; @SneakyThrows public static void assertEachRowIsTheSame(ResultSet rs, AtomicInteger prev) { @@ -92,77 +80,29 @@ public static DataCloudConnection getHyperQueryConnection(Map co val properties = new Properties(); properties.putAll(connectionSettings); val auth = AuthorizationHeaderInterceptor.of(new NoopTokenSupplier()); - ManagedChannelBuilder channel = - ManagedChannelBuilder.forAddress("0.0.0.0", 8181).usePlaintext(); + log.info("Creating connection to port {}", instance.getPort()); + ManagedChannelBuilder channel = ManagedChannelBuilder.forAddress("127.0.0.1", instance.getPort()) + .usePlaintext(); return DataCloudConnection.fromTokenSupplier(auth, channel, properties); } - private Process hyperProcess; - private final ExecutorService hyperMonitors = Executors.newFixedThreadPool(2); - @SneakyThrows @AfterAll @Timeout(5_000) public void afterAll() { - try { - if (hyperProcess != null && hyperProcess.isAlive()) { - log.info("destroy hyper process"); - hyperProcess.destroy(); - } - } catch (Throwable e) { - log.error("Failed to destroy hyperd", e); - } - - if (hyperProcess != null && hyperProcess.isAlive()) { - Thread.sleep(3_000); - } - - try { - log.info("shutdown hyper monitors"); - hyperMonitors.shutdown(); - } catch (Throwable e) { - log.error("Failed to shutdown hyper monitor thread pool", e); - } + instance.shutdown(); } @SneakyThrows @BeforeAll public void beforeAll() { - log.info("starting hyperd, this might take a few seconds"); - - val hyperd = new File("./target/hyper/hyperd"); - val properties = Paths.get(requireNonNull(HyperTestBase.class.getResource("/hyper.yaml")) - .toURI()) - .toFile(); - - if (!hyperd.exists()) { - Assertions.fail("hyperd executable couldn't be found, have you run mvn process-test-resources? expected=" - + hyperd.getAbsolutePath()); - } - - hyperProcess = new ProcessBuilder() - .command(hyperd.getAbsolutePath(), "--config", properties.getAbsolutePath(), "--no-password", "run") - .start(); - - val latch = new CountDownLatch(1); - - hyperMonitors.execute(() -> logStream(hyperProcess.getErrorStream(), log::error)); - hyperMonitors.execute(() -> logStream(hyperProcess.getInputStream(), line -> { - log.info(line); - if (line.contains(LISTENING)) { - latch.countDown(); - } - })); - - if (!latch.await(30, TimeUnit.SECONDS)) { - Assertions.fail("failed to start instance of hyper within 30 seconds"); - } + instance = new HyperServerProcess(); } @BeforeEach public void assumeHyperEnabled() { - Assertions.assertTrue(hyperProcess != null && hyperProcess.isAlive(), "Hyper wasn't started, failing test"); + Assertions.assertTrue((instance != null) && instance.isHealthy(), "Hyper wasn't started, failing test"); } static class NoopTokenSupplier implements AuthorizationHeaderInterceptor.TokenSupplier { @@ -171,17 +111,4 @@ public String getToken() { return ""; } } - - private static void logStream(InputStream inputStream, Consumer consumer) { - try (val reader = new BufferedReader(new BufferedReader(new InputStreamReader(inputStream)))) { - String line; - while ((line = reader.readLine()) != null) { - consumer.accept("hyperd - " + line); - } - } catch (IOException e) { - log.warn("Caught exception while consuming log stream, it probably closed", e); - } catch (Exception e) { - log.error("Caught unexpected exception", e); - } - } } diff --git a/src/test/resources/hyper.yaml b/src/test/resources/hyper.yaml index 8d99af3..e171860 100644 --- a/src/test/resources/hyper.yaml +++ b/src/test/resources/hyper.yaml @@ -1,8 +1,8 @@ -listen-connection: tcp.grpc://0.0.0.0:8181 +listen-connection: tcp.grpc://127.0.0.1:auto skip-license: true +strict-settings-mode: true language: en_US no-password: true use_v3_new_endpoints: true -use_result_spooling: true grpc_persist_results: true log_pipelines: true \ No newline at end of file diff --git a/src/test/resources/simplelogger.properties b/src/test/resources/simplelogger.properties index a6610e3..7925e1a 100644 --- a/src/test/resources/simplelogger.properties +++ b/src/test/resources/simplelogger.properties @@ -1,4 +1,4 @@ org.slf4j.simpleLogger.logFile=System.out -org.slf4j.simpleLogger.defaultLogLevel=error +org.slf4j.simpleLogger.defaultLogLevel=info -org.slf4j.simpleLogger.log.com.salesforce.cdp.queryservice.core.QueryServiceConnection=debug +org.slf4j.simpleLogger.log.com.salesforce.cdp.queryservice.core.QueryServiceConnection=info