Skip to content

Commit

Permalink
Merge pull request #26 from aerospike/develop
Browse files Browse the repository at this point in the history
v1.7.2
  • Loading branch information
reugn authored Apr 3, 2023
2 parents 320e463 + 38b77ee commit 4169857
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 27 deletions.
18 changes: 11 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<name>aerospike-jdbc</name>
<description>A JDBC driver for the Aerospike database</description>
<url>https://github.com/aerospike/aerospike-jdbc</url>
<version>1.7.1</version>
<version>1.7.2</version>

<properties>
<skipTests>false</skipTests>
Expand All @@ -26,11 +26,15 @@
<maven-shade-plugin.version>3.4.0</maven-shade-plugin.version>
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>

<aerospike-client.version>6.1.6</aerospike-client.version>
<netty.version>4.1.87.Final</netty.version>
<aerospike-client.version>6.1.8</aerospike-client.version>
<netty.version>4.1.90.Final</netty.version>
<jackson.version>2.14.2</jackson.version>
<calcite.version>1.32.0</calcite.version>
<calcite.version>1.34.0</calcite.version>
<httpclient.version>4.5.14</httpclient.version>

<assertj.version>3.24.2</assertj.version>
<testng.version>7.5</testng.version> <!-- latest for JDK 8 -->
<jdbi.version>3.37.1</jdbi.version>
</properties>

<licenses>
Expand Down Expand Up @@ -121,21 +125,21 @@
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.23.1</version>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.5</version>
<version>${testng.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-core</artifactId>
<version>3.33.0</version>
<version>${jdbi.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
21 changes: 12 additions & 9 deletions src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import static com.aerospike.jdbc.util.AerospikeUtils.getIndexBinValuesRatio;
import static com.aerospike.jdbc.util.Constants.defaultKeyName;
import static com.aerospike.jdbc.util.Constants.defaultSchemaName;
import static com.aerospike.jdbc.util.Constants.schemaScanRecords;
import static java.lang.String.format;
import static java.sql.Connection.TRANSACTION_NONE;
Expand Down Expand Up @@ -78,7 +79,7 @@ public AerospikeDatabaseMetadata(String url, IAerospikeClient client, Connection
namespaces.addAll(asList(getOrDefault(r, "namespaces", "").split(";")));
streamOfSubProperties(r, "sets").forEach(p ->
tables.computeIfAbsent(p.getProperty("ns"), s -> new HashSet<>())
.add(p.getProperty("set"))
.addAll(Arrays.asList(p.getProperty("set"), defaultSchemaName))
);
streamOfSubProperties(r, "sindex")
.filter(AerospikeUtils::isSupportedIndexType)
Expand Down Expand Up @@ -245,17 +246,17 @@ public String getNumericFunctions() {

@Override
public String getStringFunctions() {
return null;
return "";
}

@Override
public String getSystemFunctions() {
return null;
return "";
}

@Override
public String getTimeDateFunctions() {
return null;
return "";
}

@Override
Expand Down Expand Up @@ -949,14 +950,16 @@ public ResultSet getTypeInfo() {

@Override
public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate) {
final Iterable<List<?>> indicesData;
Stream<AerospikeSecondaryIndex> secondaryIndexStream;
if (catalog == null) {
indicesData = indices.entrySet().stream().flatMap(p -> p.getValue().stream())
.map(this::indexInfoAsList).collect(Collectors.toList());
secondaryIndexStream = indices.entrySet().stream().flatMap(p -> p.getValue().stream());
} else {
indicesData = getOrDefault(indices, catalog, Collections.emptyList()).stream()
.map(this::indexInfoAsList).collect(Collectors.toList());
secondaryIndexStream = getOrDefault(indices, catalog, Collections.emptyList()).stream();
}
final Iterable<List<?>> indicesData = secondaryIndexStream
.filter(i -> i.getNamespace().equals(schema) && i.getSet().equals(table))
.map(this::indexInfoAsList)
.collect(Collectors.toList());

String[] columns = new String[]{"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "NON_UNIQUE", "INDEX_QUALIFIER",
"INDEX_NAME", "TYPE", "ORDINAL_POSITION", "COLUMN_NAME", "ASC_OR_DESC", "CARDINALITY",
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/aerospike/jdbc/async/ScanQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ public RecordSet execute(ScanPolicy scanPolicy, AerospikeQuery query) {
long maxRecords = scanPolicy.maxRecords;
PartitionFilter filter = getPartitionFilter(query);
while (isScanRequired(maxRecords)) {
client.scanPartitions(scanPolicy, filter, query.getSchema(), query.getTable(),
client.scanPartitions(scanPolicy, filter, query.getSchema(), query.getSetName(),
callback, query.getBinNames());
scanPolicy.maxRecords = maxRecords > 0 ? maxRecords - count : maxRecords;
filter = PartitionFilter.id(++currentPartition);
}
listener.onSuccess();
} else {
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
query.getTable(), query.getBinNames());
query.getSetName(), query.getBinNames());
}
return listener.getRecordSet();
}

private PartitionFilter getPartitionFilter(AerospikeQuery query) {
Key key = new Key(query.getSchema(), query.getTable(), query.getOffset());
Key key = new Key(query.getSchema(), query.getSetName(), query.getOffset());
currentPartition = Partition.getPartitionId(key.digest);
return PartitionFilter.after(key);
}
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.List;
import java.util.Map;

import static com.aerospike.jdbc.util.Constants.defaultSchemaName;

public class AerospikeQuery {

@VisibleForTesting
Expand Down Expand Up @@ -88,6 +90,13 @@ public void setTable(String table) {
}
}

public String getSetName() {
if (table.equals(defaultSchemaName)) {
return null;
}
return table;
}

public SchemaTableName getSchemaTable() {
return new SchemaTableName(schema, table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
logger.info("DELETE primary key");
FutureDeleteListener listener = new FutureDeleteListener(keyObjects.size());
for (Object keyObject : keyObjects) {
Key key = new Key(query.getSchema(), query.getTable(), Value.get(keyObject));
Key key = new Key(query.getSchema(), query.getSetName(), Value.get(keyObject));
try {
client.delete(EventLoopProvider.getEventLoop(), listener, writePolicy, key);
} catch (AerospikeException e) {
Expand All @@ -57,7 +57,7 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
ScanPolicy scanPolicy = buildScanPolicy(query);
scanPolicy.includeBinData = false;
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
query.getTable());
query.getSetName());

final AtomicInteger count = new AtomicInteger();
listener.getRecordSet().forEach(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Pair<ResultSet, Integer> putConsecutively(AerospikeQuery query) {
@SuppressWarnings("unchecked")
List<Object> values = (List<Object>) record;
Value recordKey = extractInsertKey(query, values);
Key key = new Key(query.getSchema(), query.getTable(), recordKey);
Key key = new Key(query.getSchema(), query.getSetName(), recordKey);
Bin[] bins = buildBinArray(binNames, values);

try {
Expand Down Expand Up @@ -85,7 +85,7 @@ public Pair<ResultSet, Integer> putBatch(AerospikeQuery query) {
@SuppressWarnings("unchecked")
List<Object> values = (List<Object>) record;
Value recordKey = extractInsertKey(query, values);
Key key = new Key(query.getSchema(), query.getTable(), recordKey);
Key key = new Key(query.getSchema(), query.getSetName(), recordKey);
batchRecords.add(
new BatchWrite(
batchWritePolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private Pair<ResultSet, Integer> executeSelectByPrimaryKey(AerospikeQuery query,
logger.info(() -> "SELECT primary key");
final BatchReadPolicy policy = buildBatchReadPolicy(query);
List<BatchRead> batchReadList = keyObjects.stream()
.map(k -> new BatchRead(policy, new Key(query.getSchema(), query.getTable(), Value.get(k)), true))
.map(k -> new BatchRead(policy, new Key(query.getSchema(), query.getSetName(), Value.get(k)), true))
.collect(Collectors.toList());

RecordSetBatchSequenceListener listener = new RecordSetBatchSequenceListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public TruncateQueryHandler(IAerospikeClient client, Statement statement) {
@Override
public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
logger.info("TRUNCATE/DROP statement");
client.truncate(null, query.getSchema(), query.getTable(), null);
client.truncate(null, query.getSchema(), query.getSetName(), null);

return new Pair<>(emptyRecordSet(query), 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
logger.info("UPDATE primary key");
FutureWriteListener listener = new FutureWriteListener(keyObjects.size());
for (Object keyObject : keyObjects) {
Key key = new Key(query.getSchema(), query.getTable(), Value.get(keyObject));
Key key = new Key(query.getSchema(), query.getSetName(), Value.get(keyObject));
try {
client.put(EventLoopProvider.getEventLoop(), listener, writePolicy, key, bins);
} catch (AerospikeException e) {
Expand All @@ -59,7 +59,7 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
ScanPolicy scanPolicy = buildScanPolicy(query);
scanPolicy.includeBinData = false;
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
query.getTable());
query.getSetName());

final AtomicInteger count = new AtomicInteger();
listener.getRecordSet().forEach(r -> {
Expand Down
34 changes: 34 additions & 0 deletions src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,40 @@ public void testSelectQuery() throws SQLException {
}
}

@Test
public void testSelectByPrimaryKeyQuery() throws SQLException {
PreparedStatement statement = null;
ResultSet resultSet = null;
String query = String.format(
"insert into %s (__key, bin1, int1, str1, bool1) values (\"key1\", 11101, 2, \"bar\", true)",
tableName
);
try {
statement = connection.prepareStatement(query);
statement.executeUpdate();
} finally {
closeQuietly(statement);
}
query = String.format("select * from %s where __key='%s'", tableName, "key1");
int total = 0;
try {
statement = connection.prepareStatement(query);
resultSet = statement.executeQuery();
while (resultSet.next()) {
assertEquals(resultSet.getInt("bin1"), 11101);
assertEquals(resultSet.getInt("int1"), 2);
assertEquals(resultSet.getString("str1"), "bar");
assertEquals(resultSet.getInt("bool1"), 1);

total++;
}
assertEquals(total, 1);
} finally {
closeQuietly(statement);
closeQuietly(resultSet);
}
}

@Test
public void testInsertQuery() throws SQLException {
PreparedStatement statement = null;
Expand Down
34 changes: 34 additions & 0 deletions src/test/java/com/aerospike/jdbc/SimpleQueriesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,40 @@ public void testSelectQuery() throws SQLException {
}
}

@Test
public void testSelectByPrimaryKeyQuery() throws SQLException {
Statement statement = null;
ResultSet resultSet = null;
String query = String.format(
"insert into %s (__key, bin1, int1, str1, bool1) values (\"key1\", 11101, 2, \"bar\", true)",
tableName
);
try {
statement = connection.createStatement();
statement.executeUpdate(query);
} finally {
closeQuietly(statement);
}
query = String.format("select * from %s where __key='%s'", tableName, "key1");
int total = 0;
try {
statement = connection.createStatement();
resultSet = statement.executeQuery(query);
while (resultSet.next()) {
assertEquals(resultSet.getInt("bin1"), 11101);
assertEquals(resultSet.getInt("int1"), 2);
assertEquals(resultSet.getString("str1"), "bar");
assertEquals(resultSet.getInt("bool1"), 1);

total++;
}
assertEquals(total, 1);
} finally {
closeQuietly(statement);
closeQuietly(resultSet);
}
}

@Test
public void testInsertQuery() throws SQLException {
Statement statement = null;
Expand Down

0 comments on commit 4169857

Please sign in to comment.