From f56ff9597f295bb40c84d06f87b199e42d48b594 Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle Date: Tue, 29 Oct 2024 11:10:12 +0100 Subject: [PATCH] feat: add queries to arrow flight and as400 --- .../plugin/jdbc/arrowflight/Queries.java | 59 ++++++++++++++++ .../kestra/plugin/jdbc/arrowflight/Query.java | 6 -- .../io/kestra/plugin/jdbc/as400/Queries.java | 68 +++++++++++++++++++ .../io/kestra/plugin/jdbc/as400/Query.java | 11 +-- 4 files changed, 128 insertions(+), 16 deletions(-) create mode 100644 plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Queries.java create mode 100644 plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/as400/Queries.java diff --git a/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Queries.java b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Queries.java new file mode 100644 index 00000000..18ec218b --- /dev/null +++ b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Queries.java @@ -0,0 +1,59 @@ +package io.kestra.plugin.jdbc.arrowflight; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.plugin.jdbc.AbstractCellConverter; +import io.kestra.plugin.jdbc.AbstractJdbcQueries; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.time.ZoneId; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Perform multiple queries on a database through Apache Arrow Flight SQL driver." +) +@Plugin( + examples = { + @Example( + title = "Send SQL queries to a database and fetch row(s) using Apache Arrow Flight SQL driver.", + full = true, + code = """ + id: arrow_flight_sql_queries + namespace: company.team + + tasks: + - id: query + type: io.kestra.plugin.jdbc.arrowflight.Queries + url: jdbc:arrow-flight-sql://localhost:31010/?useEncryption=false + username: db_user + password: db_password + sql: SELECT * FROM employee; SELECT * FROM laptop; + fetchType: FETCH + """ + ) + } +) +public class Queries extends AbstractJdbcQueries implements RunnableTask { + @Override + protected AbstractCellConverter getCellConverter(ZoneId zoneId) { + return new ArrowFlightCellConverter(zoneId); + } + + @Override + public void registerDriver() throws SQLException { + DriverManager.registerDriver(new ArrowFlightJdbcDriver()); + } +} diff --git a/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Query.java b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Query.java index 79e9eda2..125468e7 100644 --- a/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Query.java +++ b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Query.java @@ -3,7 +3,6 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.tasks.RunnableTask; -import io.kestra.core.runners.RunContext; import io.kestra.plugin.jdbc.AbstractCellConverter; import io.kestra.plugin.jdbc.AbstractJdbcQuery; import io.swagger.v3.oas.annotations.media.Schema; @@ -74,9 +73,4 @@ protected AbstractCellConverter getCellConverter(ZoneId zoneId) { public void registerDriver() throws SQLException { DriverManager.registerDriver(new ArrowFlightJdbcDriver()); } - - @Override - public Output run(RunContext runContext) throws Exception { - return super.run(runContext); - } } diff --git a/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/as400/Queries.java b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/as400/Queries.java new file mode 100644 index 00000000..3bef1c34 --- /dev/null +++ b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/as400/Queries.java @@ -0,0 +1,68 @@ +package io.kestra.plugin.jdbc.as400; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.jdbc.AbstractCellConverter; +import io.kestra.plugin.jdbc.AbstractJdbcQueries; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.Properties; + +/** + * Copied from the DB2 code as we cannot test AS400 we assume it works like DB2 + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Perform multiple queries on a AS400 database." +) +@Plugin( + examples = { + @Example( + title = "Send a SQL query to a AS400 Database and fetch a row as output.", + full = true, + code = """ + id: as400_queries + namespace: company.team + + tasks: + - id: query + type: io.kestra.plugin.jdbc.as400.Queries + url: jdbc:as400://127.0.0.1:50000/ + username: as400_user + password: as400_password + sql: select * from employee; select * from laptops; + fetchType: FETCH + """ + ) + } +) +public class Queries extends AbstractJdbcQueries implements RunnableTask { + @Override + protected AbstractCellConverter getCellConverter(ZoneId zoneId) { + return new As400CellConverter(zoneId); + } + + @Override + public void registerDriver() throws SQLException { + DriverManager.registerDriver(new com.ibm.as400.access.AS400JDBCDriver()); + } + + @Override + public Properties connectionProperties(RunContext runContext) throws Exception { + return super.connectionProperties(runContext, "jdbc:as400"); + } +} diff --git a/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/as400/Query.java b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/as400/Query.java index 68337fad..426712af 100644 --- a/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/as400/Query.java +++ b/plugin-jdbc-as400/src/main/java/io/kestra/plugin/jdbc/as400/Query.java @@ -65,15 +65,6 @@ public void registerDriver() throws SQLException { @Override public Properties connectionProperties(RunContext runContext) throws Exception { - Properties props = super.connectionProperties(runContext); - - URI url = URI.create((String) props.get("jdbc.url")); - url = URI.create(url.getSchemeSpecificPart()); - - UriBuilder builder = UriBuilder.of(url).scheme("jdbc:as400"); - - props.put("jdbc.url", builder.build().toString()); - - return props; + return super.connectionProperties(runContext, "jdbc:as400"); } }