Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add queries to arrow flight and as400 #418

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.kestra.plugin.jdbc.arrowflight;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Perform multiple queries on a database through Apache Arrow Flight SQL driver."
)
@Plugin(
examples = {
@Example(
title = "Send SQL queries to a database and fetch row(s) using Apache Arrow Flight SQL driver.",
full = true,
code = """
id: arrow_flight_sql_queries
namespace: company.team

tasks:
- id: query
type: io.kestra.plugin.jdbc.arrowflight.Queries
url: jdbc:arrow-flight-sql://localhost:31010/?useEncryption=false
username: db_user
password: db_password
sql: SELECT * FROM employee; SELECT * FROM laptop;
fetchType: FETCH
"""
)
}
)
public class Queries extends AbstractJdbcQueries implements RunnableTask<AbstractJdbcQueries.MultiQueryOutput> {
@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new ArrowFlightCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new ArrowFlightJdbcDriver());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -74,9 +73,4 @@ protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new ArrowFlightJdbcDriver());
}

@Override
public Output run(RunContext runContext) throws Exception {
return super.run(runContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.kestra.plugin.jdbc.as400;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Properties;

/**
* Copied from the DB2 code as we cannot test AS400 we assume it works like DB2
*/
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Perform multiple queries on a AS400 database."
)
@Plugin(
examples = {
@Example(
title = "Send a SQL query to a AS400 Database and fetch a row as output.",
full = true,
code = """
id: as400_queries
namespace: company.team

tasks:
- id: query
type: io.kestra.plugin.jdbc.as400.Queries
url: jdbc:as400://127.0.0.1:50000/
username: as400_user
password: as400_password
sql: select * from employee; select * from laptops;
fetchType: FETCH
"""
)
}
)
public class Queries extends AbstractJdbcQueries implements RunnableTask<AbstractJdbcQueries.MultiQueryOutput> {
@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new As400CellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new com.ibm.as400.access.AS400JDBCDriver());
}

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
return super.connectionProperties(runContext, "jdbc:as400");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,6 @@ public void registerDriver() throws SQLException {

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
Properties props = super.connectionProperties(runContext);

URI url = URI.create((String) props.get("jdbc.url"));
url = URI.create(url.getSchemeSpecificPart());

UriBuilder builder = UriBuilder.of(url).scheme("jdbc:as400");

props.put("jdbc.url", builder.build().toString());

return props;
return super.connectionProperties(runContext, "jdbc:as400");
}
}