Skip to content

Commit

Permalink
feat: add queries to clickhouse (#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Oct 30, 2024
1 parent 985cef8 commit cc659e2
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.jdbc.clickhouse;

import com.clickhouse.data.value.UnsignedLong;
import io.kestra.plugin.jdbc.AbstractCellConverter;

import java.net.Inet4Address;
Expand Down Expand Up @@ -85,6 +86,11 @@ public Object convertCell(int columnIndex, ResultSet rs, Connection connection)
return col.toString().substring(1);
}

if (columnTypeName.equals("UInt64")) {
UnsignedLong col = (UnsignedLong) columnVal;
return col.longValue();
}

return super.convert(columnIndex, rs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.kestra.plugin.jdbc.clickhouse;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Perform multiple queries on a Clickhouse database."
)
@Plugin(
examples = {
@Example(
title = "Queries on a Clickhouse database.",
full = true,
code = """
id: clickhouse_queries
namespace: company.team
tasks:
- id: queries
type: io.kestra.plugin.jdbc.clickhouse.Queries
url: jdbc:clickhouse://127.0.0.1:56982/
username: ch_user
password: ch_password
sql: select * from employee; select * from laptop;
fetchType: STORE
"""
)
}
)
public class Queries extends AbstractJdbcQueries implements RunnableTask<AbstractJdbcQueries.MultiQueryOutput> {

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new ClickHouseCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new com.clickhouse.jdbc.ClickHouseDriver());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package io.kestra.plugin.jdbc.clickhouse;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.kestra.plugin.jdbc.AbstractRdbmsTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.*;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static io.kestra.core.models.tasks.common.FetchType.FETCH;
import static io.kestra.core.models.tasks.common.FetchType.FETCH_ONE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;

@KestraTest
public class ClickHouseQueriesTest extends AbstractRdbmsTest {

@Test
void testMultiSelectWithParameters() throws Exception {
RunContext runContext = runContextFactory.of(Collections.emptyMap());

Map<String, Object> parameters = Map.of(
"age", 40,
"brand", "Apple",
"cpu_frequency", 1.5
);

Queries taskGet = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH)
.timeZoneId("Europe/Paris")
.sql("""
SELECT firstName, lastName, age FROM employee where age > :age and age < :age + 10;
SELECT brand, model FROM laptop where brand = :brand and cpu_frequency > :cpu_frequency;
""")
.parameters(Property.of(parameters))
.build();

AbstractJdbcQueries.MultiQueryOutput runOutput = taskGet.run(runContext);
assertThat(runOutput.getOutputs().size(), is(2));

List<Map<String, Object>> employees = runOutput.getOutputs().getFirst().getRows();
assertThat("employees", employees, notNullValue());
assertThat("employees", employees.size(), is(1));
assertThat("employee selected", employees.getFirst().get("age"), is(45));
assertThat("employee selected", employees.getFirst().get("firstName"), is("John"));
assertThat("employee selected", employees.getFirst().get("lastName"), is("Doe"));

List<Map<String, Object>>laptops = runOutput.getOutputs().getLast().getRows();
assertThat("laptops", laptops, notNullValue());
assertThat("laptops", laptops.size(), is(1));
assertThat("selected laptop", laptops.getFirst().get("brand"), is("Apple"));
}

@Disabled("Test for rollback disabled : Savepoint and rollback are not supported by ClickHouse")
@Test
void testRollback() throws Exception {
RunContext runContext = runContextFactory.of(Collections.emptyMap());

//Queries should pass in a transaction
Queries queriesPass = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
DROP TABLE IF EXISTS test_transaction;
CREATE TABLE test_transaction(id Int64)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192;
INSERT INTO test_transaction (id) VALUES (1);
SELECT COUNT(id) as transaction_count FROM test_transaction;
""")
.build();

AbstractJdbcQueries.MultiQueryOutput runOutput = queriesPass.run(runContext);
assertThat(runOutput.getOutputs().size(), is(1));
assertThat(runOutput.getOutputs().getFirst().getRow().get("transaction_count"), is(1L));

//Queries should fail due to bad sql
Queries insertsFail = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
INSERT INTO test_transaction (id) VALUES (2);
INSERT INTO test_transaction (id) VALUES ('random');
""") //Try inserting before failing
.build();

assertThrows(Exception.class, () -> insertsFail.run(runContext));

//Final query to verify the amount of updated rows
Queries verifyQuery = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH)
.timeZoneId("Europe/Paris")
.sql("""
SELECT COUNT(id) as transaction_count FROM test_transaction;
SELECT * FROM test_transaction;
""") //Try inserting before failing
.build();

AbstractJdbcQueries.MultiQueryOutput verifyOutput = verifyQuery.run(runContext);
assertThat(verifyOutput.getOutputs().size(), is(1));
assertThat(verifyOutput.getOutputs().getFirst().getRow().get("transaction_count"), is(1L));
}

@Test
void testNonTransactionalShouldNotRollback() throws Exception {
RunContext runContext = runContextFactory.of(Collections.emptyMap());

//Queries should pass in a transaction
Queries insertOneAndFail = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.transaction(Property.of(false))
.timeZoneId("Europe/Paris")
.sql("""
DROP TABLE IF EXISTS test_transaction;
CREATE TABLE test_transaction(id Int64)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192;
INSERT INTO test_transaction (id) VALUES (1);
INSERT INTO test_transaction (id) VALUES ('random');
INSERT INTO test_transaction (id) VALUES (2);
""")
.build();

assertThrows(Exception.class, () -> insertOneAndFail.run(runContext));

//Final query to verify the amount of updated rows
Queries verifyQuery = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
SELECT COUNT(id) as transaction_count FROM test_transaction;
""") //Try inserting before failing
.build();

AbstractJdbcQueries.MultiQueryOutput verifyOutput = verifyQuery.run(runContext);
assertThat(verifyOutput.getOutputs().size(), is(1));
assertThat(verifyOutput.getOutputs().getFirst().getRow().get("transaction_count"), is(1L));
}

@Override
protected String getUrl() {
return "jdbc:clickhouse://127.0.0.1:28123/default";
}

@Override
protected void initDatabase() throws SQLException, FileNotFoundException, URISyntaxException {
executeSqlScript("scripts/clickhouse_queries.sql");
}

@Override
@BeforeEach
public void init() throws IOException, URISyntaxException, SQLException {
initDatabase();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
CREATE DATABASE IF NOT EXISTS kestra;

-- Create table employee
DROP TABLE IF EXISTS employee;

CREATE TABLE employee (
firstName String,
lastName String,
age Int8,
employee_id Int64
)
ENGINE = MergeTree()
ORDER BY (employee_id)
SETTINGS index_granularity = 8192;

INSERT INTO employee (firstName, lastName, age, employee_id)
VALUES
('John', 'Doe', 45, 1),
('Bryan', 'Grant', 33, 2),
('Jude', 'Philips', 25, 3),
('Michael', 'Page', 62, 4);

-- Create table laptop
DROP TABLE IF EXISTS laptop;

CREATE TABLE laptop
(
brand String,
model String,
cpu_frequency Decimal(3, 2),
laptop_id Int64
) ENGINE = MergeTree()
ORDER BY (laptop_id)
SETTINGS index_granularity = 8192;

INSERT INTO laptop (brand, model, cpu_frequency, laptop_id)
VALUES
('Apple', 'MacBookPro M1 13', 2.20, 1),
('Apple', 'MacBookPro M3 16', 1.50, 2),
('LG', 'Gram', 1.95, 3),
('Lenovo', 'ThinkPad', 1.05, 4);

0 comments on commit cc659e2

Please sign in to comment.