From 1daa4f5ee7365c19cdd12d933dcd803b103939a3 Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Sat, 31 Aug 2024 19:31:16 +0800 Subject: [PATCH] [INLONG-10725][Sort] fix defect --- .../sort-end-to-end-tests-v1.18/pom.xml | 33 ---- .../tests/Elasticsearch6ContainerTest.java | 177 ------------------ .../tests/Elasticsearch7ContainerTest.java | 177 ------------------ .../src/test/resources/flinkSql/pg2es6.sql | 32 ---- .../src/test/resources/flinkSql/pg2es7.sql | 31 --- .../src/test/resources/log4j2-test.properties | 35 ---- 6 files changed, 485 deletions(-) delete mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Elasticsearch6ContainerTest.java delete mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Elasticsearch7ContainerTest.java delete mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/pg2es6.sql delete mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/pg2es7.sql diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml index 0a5457cf8bb..46a86f6cd54 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml @@ -157,39 +157,6 @@ jar ${project.build.directory}/dependencies - - mysql - mysql-connector-java - ${mysql.jdbc.version} - mysql-driver.jar - jar - ${project.build.directory}/dependencies - - - - org.apache.inlong - sort-connector-postgres-cdc-v1.15 - ${project.version} - sort-connector-postgres-cdc.jar - jar - ${project.build.directory}/dependencies - - - org.apache.inlong - sort-connector-elasticsearch6-v1.18 - ${project.version} - sort-connector-elasticsearch6.jar - jar - ${project.build.directory}/dependencies - - - org.apache.inlong - sort-connector-elasticsearch7-v1.18 - ${project.version} - sort-connector-elasticsearch7.jar - jar - ${project.build.directory}/dependencies - diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Elasticsearch6ContainerTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Elasticsearch6ContainerTest.java deleted file mode 100644 index 8c941061035..00000000000 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Elasticsearch6ContainerTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.tests; - -import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; -import org.apache.inlong.sort.tests.utils.TestUtils; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.ClassRule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.utility.DockerImageName; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.time.Duration; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; - -public class Elasticsearch6ContainerTest extends FlinkContainerTestEnvJRE8 { - - private static final Logger PG_LOG = LoggerFactory.getLogger(PostgreSQLContainer.class); - private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6ContainerTest.class); - private static final Path postgresJar = TestUtils.getResource("sort-connector-postgres-cdc.jar"); - private static final Path es6Jar = TestUtils.getResource("sort-connector-elasticsearch6.jar"); - private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar"); - private static final String sqlFile; - private static final DockerImageName ELASTICSEARCH_IMAGE = DockerImageName - .parse("docker.elastic.co/elasticsearch/elasticsearch:6.8.17"); - private static ElasticsearchContainer elasticsearchContainer; - private static RestHighLevelClient client; - - static { - try { - sqlFile = Paths.get(Elasticsearch6ContainerTest.class.getResource("/flinkSql/pg2es6.sql").toURI()) - .toString(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - @ClassRule - public static final PostgreSQLContainer POSTGRES_CONTAINER = (PostgreSQLContainer) new PostgreSQLContainer( - DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres")) - .withUsername("flinkuser") - .withPassword("flinkpw") - .withDatabaseName("test") - .withNetwork(NETWORK) - .withNetworkAliases("postgres") - .withLogConsumer(new Slf4jLogConsumer(PG_LOG)); - - @Before - public void setup() { - elasticsearchContainer = new ElasticsearchContainer(ELASTICSEARCH_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases("elasticsearch"); - elasticsearchContainer.start(); - - client = new RestHighLevelClient( - RestClient.builder(HttpHost.create(elasticsearchContainer.getHttpHostAddress()))); - initializePostgresTable(); - waitUntilJobRunning(Duration.ofSeconds(30)); - } - - private void initializePostgresTable() { - try { - Class.forName(POSTGRES_CONTAINER.getDriverClassName()); - Connection conn = DriverManager - .getConnection(POSTGRES_CONTAINER.getJdbcUrl(), POSTGRES_CONTAINER.getUsername(), - POSTGRES_CONTAINER.getPassword()); - Statement stat = conn.createStatement(); - stat.execute( - "CREATE TABLE test_input1 (\n" - + " id SERIAL,\n" - + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" - + " description VARCHAR(512),\n" - + " PRIMARY KEY(id)\n" - + ");"); - stat.execute( - "ALTER TABLE test_input1 REPLICA IDENTITY FULL; "); - stat.close(); - conn.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @AfterClass - public static void teardown() throws IOException { - if (POSTGRES_CONTAINER != null) { - POSTGRES_CONTAINER.stop(); - } - if (client != null) { - client.close(); - } - if (elasticsearchContainer != null) { - elasticsearchContainer.stop(); - } - } - - /** - * Test flink sql postgresql cdc to StarRocks - * - * @throws Exception The exception may throws when execute the case - */ - @org.junit.Test - public void testPostgresUpdateAndDelete() throws Exception { - submitSQLJob(sqlFile, es6Jar, postgresJar, mysqlJdbcJar); - waitUntilJobRunning(Duration.ofSeconds(10)); - - // generate input - try (Connection conn = - DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(), POSTGRES_CONTAINER.getUsername(), - POSTGRES_CONTAINER.getPassword()); - Statement stat = conn.createStatement()) { - stat.execute( - "INSERT INTO test_input1 " - + "VALUES (1,'jacket','water resistent white wind breaker');"); - stat.execute( - "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel scooter ');"); - stat.execute( - "update test_input1 set name = 'tom' where id = 2;"); - stat.execute( - "delete from test_input1 where id = 1;"); - } catch (SQLException e) { - LOG.error("Update table for CDC failed.", e); - throw e; - } - Thread.sleep(5000L); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()); - SearchRequest searchRequest = new SearchRequest("test_index"); - searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - SearchHit[] hits = searchResponse.getHits().getHits(); - assertNotEquals(0, hits.length); - Map fields = hits[0].getSourceAsMap(); - assertEquals(2, fields.get("id")); - assertEquals("tom", fields.get("name")); - assertEquals("Big 2-wheel scooter ", fields.get("description")); - } -} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Elasticsearch7ContainerTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Elasticsearch7ContainerTest.java deleted file mode 100644 index 142801a3696..00000000000 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Elasticsearch7ContainerTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.tests; - -import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; -import org.apache.inlong.sort.tests.utils.TestUtils; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.ClassRule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.utility.DockerImageName; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.time.Duration; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; - -public class Elasticsearch7ContainerTest extends FlinkContainerTestEnvJRE8 { - - private static final Logger PG_LOG = LoggerFactory.getLogger(PostgreSQLContainer.class); - private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7ContainerTest.class); - private static final Path postgresJar = TestUtils.getResource("sort-connector-postgres-cdc.jar"); - private static final Path es6Jar = TestUtils.getResource("sort-connector-elasticsearch7.jar"); - private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar"); - private static final String sqlFile; - private static final DockerImageName ELASTICSEARCH_IMAGE = DockerImageName - .parse("docker.elastic.co/elasticsearch/elasticsearch:7.10.2"); - private static ElasticsearchContainer elasticsearchContainer; - private static RestHighLevelClient client; - - static { - try { - sqlFile = Paths.get(Elasticsearch7ContainerTest.class.getResource("/flinkSql/pg2es7.sql").toURI()) - .toString(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - @ClassRule - public static final PostgreSQLContainer POSTGRES_CONTAINER = (PostgreSQLContainer) new PostgreSQLContainer( - DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres")) - .withUsername("flinkuser") - .withPassword("flinkpw") - .withDatabaseName("test") - .withNetwork(NETWORK) - .withNetworkAliases("postgres") - .withLogConsumer(new Slf4jLogConsumer(PG_LOG)); - - @Before - public void setup() { - elasticsearchContainer = new ElasticsearchContainer(ELASTICSEARCH_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases("elasticsearch"); - elasticsearchContainer.start(); - - client = new RestHighLevelClient( - RestClient.builder(HttpHost.create(elasticsearchContainer.getHttpHostAddress()))); - initializePostgresTable(); - waitUntilJobRunning(Duration.ofSeconds(30)); - } - - private void initializePostgresTable() { - try { - Class.forName(POSTGRES_CONTAINER.getDriverClassName()); - Connection conn = DriverManager - .getConnection(POSTGRES_CONTAINER.getJdbcUrl(), POSTGRES_CONTAINER.getUsername(), - POSTGRES_CONTAINER.getPassword()); - Statement stat = conn.createStatement(); - stat.execute( - "CREATE TABLE test_input1 (\n" - + " id SERIAL,\n" - + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" - + " description VARCHAR(512),\n" - + " PRIMARY KEY(id)\n" - + ");"); - stat.execute( - "ALTER TABLE test_input1 REPLICA IDENTITY FULL; "); - stat.close(); - conn.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @AfterClass - public static void teardown() throws IOException { - if (POSTGRES_CONTAINER != null) { - POSTGRES_CONTAINER.stop(); - } - if (client != null) { - client.close(); - } - if (elasticsearchContainer != null) { - elasticsearchContainer.stop(); - } - } - - /** - * Test flink sql postgresql cdc to StarRocks - * - * @throws Exception The exception may throws when execute the case - */ - @org.junit.Test - public void testPostgresUpdateAndDelete() throws Exception { - submitSQLJob(sqlFile, es6Jar, postgresJar, mysqlJdbcJar); - waitUntilJobRunning(Duration.ofSeconds(10)); - - // generate input - try (Connection conn = - DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(), POSTGRES_CONTAINER.getUsername(), - POSTGRES_CONTAINER.getPassword()); - Statement stat = conn.createStatement()) { - stat.execute( - "INSERT INTO test_input1 " - + "VALUES (1,'jacket','water resistent white wind breaker');"); - stat.execute( - "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel scooter ');"); - stat.execute( - "update test_input1 set name = 'tom' where id = 2;"); - stat.execute( - "delete from test_input1 where id = 1;"); - } catch (SQLException e) { - LOG.error("Update table for CDC failed.", e); - throw e; - } - Thread.sleep(5000L); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()); - SearchRequest searchRequest = new SearchRequest("test_index"); - searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - SearchHit[] hits = searchResponse.getHits().getHits(); - assertNotEquals(0, hits.length); - Map fields = hits[0].getSourceAsMap(); - assertEquals(2, fields.get("id")); - assertEquals("tom", fields.get("name")); - assertEquals("Big 2-wheel scooter ", fields.get("description")); - } -} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/pg2es6.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/pg2es6.sql deleted file mode 100644 index 43628a8ef04..00000000000 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/pg2es6.sql +++ /dev/null @@ -1,32 +0,0 @@ -CREATE TABLE test_input1 ( - `id` INT, - name STRING, - description STRING -) WITH ( - 'connector' = 'postgres-cdc-inlong', - 'hostname' = 'postgres', - 'port' = '5432', - 'username' = 'flinkuser', - 'password' = 'flinkpw', - 'database-name' = 'test', - 'table-name' = 'test_input1', - 'schema-name' = 'public', - 'decoding.plugin.name' = 'pgoutput', - 'slot.name' = 'inlong_slot', - 'debezium.slot.name' = 'inlong_slot' -); -CREATE TABLE test_output1 ( - `id` INT, - name STRING, - description STRING -) WITH ( - 'connector' = 'elasticsearch6-inlong', - 'hosts' = 'http://elasticsearch:9200', - 'index' = 'test_index', - 'document-type' = '_doc', - 'format' = 'json' -); -INSERT INTO test_output1 select * from test_input1; - - - diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/pg2es7.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/pg2es7.sql deleted file mode 100644 index b79aaf5f57d..00000000000 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/pg2es7.sql +++ /dev/null @@ -1,31 +0,0 @@ -CREATE TABLE test_input1 ( - `id` INT, - name STRING, - description STRING -) WITH ( - 'connector' = 'postgres-cdc-inlong', - 'hostname' = 'postgres', - 'port' = '5432', - 'username' = 'flinkuser', - 'password' = 'flinkpw', - 'database-name' = 'test', - 'table-name' = 'test_input1', - 'schema-name' = 'public', - 'decoding.plugin.name' = 'pgoutput', - 'slot.name' = 'inlong_slot', - 'debezium.slot.name' = 'inlong_slot' -); -CREATE TABLE test_output1 ( - `id` INT, - name STRING, - description STRING -) WITH ( - 'connector' = 'elasticsearch7-inlong', - 'hosts' = 'http://elasticsearch:9200', - 'index' = 'test_index', - 'format' = 'json' -); -INSERT INTO test_output1 select * from test_input1; - - - diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/log4j2-test.properties b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/log4j2-test.properties index 7d81ec0bbb5..3e95477751b 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/log4j2-test.properties +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/log4j2-test.properties @@ -35,30 +35,6 @@ appender.tm.fileName = target/logs/taskmanager.log appender.tm.layout.type = PatternLayout appender.tm.layout.pattern = - %m%n -appender.kafka.type = File -appender.kafka.name = kafkaserver -appender.kafka.fileName = target/logs/kafka.log -appender.kafka.layout.type = PatternLayout -appender.kafka.layout.pattern = - %m%n - -appender.starrocks.type = File -appender.starrocks.name = starrocks -appender.starrocks.fileName = target/logs/starrocks.log -appender.starrocks.layout.type = PatternLayout -appender.starrocks.layout.pattern = - %m%n - -appender.postgres.type = File -appender.postgres.name = postgres -appender.postgres.fileName = target/logs/postgres.log -appender.postgres.layout.type = PatternLayout -appender.postgres.layout.pattern = - %m%n - -appender.redis.type = File -appender.redis.name = redis -appender.redis.fileName = target/logs/redis.log -appender.redis.layout.type = PatternLayout -appender.redis.layout.pattern = - %m%n - logger.jm=INFO, jobmanager logger.jm.name=org.apache.flink.runtime.jobmaster.JobMaster logger.jm.additivity=false @@ -67,16 +43,5 @@ logger.tm=INFO, taskmanager logger.tm.name=org.apache.flink.runtime.taskexecutor.TaskExecutor logger.tm.additivity=false -logger.starrocks=INFO, starrocks -logger.starrocks.name=org.apache.inlong.sort.tests.utils.StarRocksContainer -logger.starrocks.additivity=false - -logger.postgres=INFO, postgres -logger.postgres.name=org.testcontainers.containers.PostgreSQLContainer -logger.postgres.additivity=false - -logger.redis=INFO, elasticsearch -logger.redis.name=org.testcontainers.containers.ElasticsearchContainer -logger.redis.additivity=false