Skip to content

Commit

Permalink
feat: add queries to arrow flight and as400 (#418)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Oct 29, 2024
1 parent adf8fe7 commit 985cef8
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.kestra.plugin.jdbc.arrowflight;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Perform multiple queries on a database through Apache Arrow Flight SQL driver."
)
@Plugin(
examples = {
@Example(
title = "Send SQL queries to a database and fetch row(s) using Apache Arrow Flight SQL driver.",
full = true,
code = """
id: arrow_flight_sql_queries
namespace: company.team
tasks:
- id: query
type: io.kestra.plugin.jdbc.arrowflight.Queries
url: jdbc:arrow-flight-sql://localhost:31010/?useEncryption=false
username: db_user
password: db_password
sql: SELECT * FROM employee; SELECT * FROM laptop;
fetchType: FETCH
"""
)
}
)
public class Queries extends AbstractJdbcQueries implements RunnableTask<AbstractJdbcQueries.MultiQueryOutput> {
@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new ArrowFlightCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new ArrowFlightJdbcDriver());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -74,9 +73,4 @@ protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new ArrowFlightJdbcDriver());
}

@Override
public Output run(RunContext runContext) throws Exception {
return super.run(runContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.kestra.plugin.jdbc.as400;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Properties;

/**
* Copied from the DB2 code as we cannot test AS400 we assume it works like DB2
*/
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Perform multiple queries on a AS400 database."
)
@Plugin(
examples = {
@Example(
title = "Send a SQL query to a AS400 Database and fetch a row as output.",
full = true,
code = """
id: as400_queries
namespace: company.team
tasks:
- id: query
type: io.kestra.plugin.jdbc.as400.Queries
url: jdbc:as400://127.0.0.1:50000/
username: as400_user
password: as400_password
sql: select * from employee; select * from laptops;
fetchType: FETCH
"""
)
}
)
public class Queries extends AbstractJdbcQueries implements RunnableTask<AbstractJdbcQueries.MultiQueryOutput> {
@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new As400CellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new com.ibm.as400.access.AS400JDBCDriver());
}

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
return super.connectionProperties(runContext, "jdbc:as400");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,6 @@ public void registerDriver() throws SQLException {

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
Properties props = super.connectionProperties(runContext);

URI url = URI.create((String) props.get("jdbc.url"));
url = URI.create(url.getSchemeSpecificPart());

UriBuilder builder = UriBuilder.of(url).scheme("jdbc:as400");

props.put("jdbc.url", builder.build().toString());

return props;
return super.connectionProperties(runContext, "jdbc:as400");
}
}

0 comments on commit 985cef8

Please sign in to comment.