Skip to content

Commit

Permalink
FMWK-273 Create AerospikeVersion instance per connection (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Dec 4, 2023
1 parent 5227f82 commit c282218
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 83 deletions.
8 changes: 8 additions & 0 deletions src/main/java/com/aerospike/jdbc/AerospikeConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.aerospike.jdbc.sql.SimpleWrapper;
import com.aerospike.jdbc.sql.type.ByteArrayBlob;
import com.aerospike.jdbc.sql.type.StringClob;
import com.aerospike.jdbc.util.AerospikeVersion;
import com.aerospike.jdbc.util.DatabaseMetadataBuilder;

import java.sql.*;
Expand Down Expand Up @@ -33,7 +34,9 @@ public class AerospikeConnection implements Connection, SimpleWrapper {
private final DriverConfiguration config;
private final IAerospikeClient client;
private final DatabaseMetadataBuilder metadataBuilder;
private final AerospikeVersion aerospikeVersion;
private final AtomicReference<String> schema = new AtomicReference<>(null); // namespace

private volatile boolean readOnly = false;
private volatile Map<String, Class<?>> typeMap = emptyMap();
private volatile int holdability = HOLD_CURSORS_OVER_COMMIT;
Expand All @@ -45,6 +48,7 @@ public AerospikeConnection(String url, Properties props) {
config = new DriverConfiguration(props);
client = config.parse(url);
metadataBuilder = new DatabaseMetadataBuilder(config.getDriverPolicy());
aerospikeVersion = new AerospikeVersion(client);
schema.set(config.getSchema()); // namespace
}

Expand Down Expand Up @@ -361,6 +365,10 @@ public DriverConfiguration getConfiguration() {
return config;
}

public AerospikeVersion getAerospikeVersion() {
return aerospikeVersion;
}

public IAerospikeClient getClient() {
return client;
}
Expand Down
49 changes: 30 additions & 19 deletions src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class AerospikeDatabaseMetadata implements DatabaseMetaData, SimpleWrappe
private final Map<String, Collection<AerospikeSecondaryIndex>> catalogIndexes;
private final Map<String, AerospikeSecondaryIndex> secondaryIndexes;

public AerospikeDatabaseMetadata(String url, IAerospikeClient client, Connection connection) {
public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeConnection connection) {
logger.info("Init AerospikeDatabaseMetadata");
AerospikeSchemaBuilder.cleanSchemaCache();
this.url = url;
Expand All @@ -82,17 +82,22 @@ public AerospikeDatabaseMetadata(String url, IAerospikeClient client, Connection
);
streamOfSubProperties(r, "sindex")
.filter(AerospikeUtils::isSupportedIndexType)
.forEach(p ->
catalogIndexes.computeIfAbsent(p.getProperty("ns"), s -> new HashSet<>())
.add(new AerospikeSecondaryIndex(
p.getProperty("ns"),
p.getProperty("set"),
p.getProperty("bin"),
p.getProperty("indexname"),
IndexType.valueOf(p.getProperty("type").toUpperCase(Locale.ENGLISH)),
getIndexBinValuesRatio(client, p.getProperty("ns"), p.getProperty("indexname")))
)
);
.forEach(p -> {
String namespace = p.getProperty("ns");
String indexName = p.getProperty("indexname");
Integer binRatio = connection.getAerospikeVersion().isSIndexCardinalitySupported()
? getIndexBinValuesRatio(client, namespace, indexName)
: null;
catalogIndexes.computeIfAbsent(namespace, s -> new HashSet<>())
.add(new AerospikeSecondaryIndex(
namespace,
p.getProperty("set"),
p.getProperty("bin"),
indexName,
IndexType.valueOf(p.getProperty("type").toUpperCase(Locale.ENGLISH)),
binRatio)
);
});
});
secondaryIndexes = catalogIndexes.values().stream()
.flatMap(Collection::stream)
Expand Down Expand Up @@ -853,7 +858,8 @@ public ResultSet getPrimaryKeys(String catalog, String schema, String table) {
final Iterable<List<?>> tablesData;
if (catalog == null) {
tablesData = tables.entrySet().stream()
.flatMap(p -> p.getValue().stream().map(t -> asList(p.getKey(), null, t, defaultKeyName, 1, defaultKeyName)))
.flatMap(p -> p.getValue().stream().map(t ->
asList(p.getKey(), null, t, defaultKeyName, 1, defaultKeyName)))
.collect(toList());
} else {
tablesData = tables.getOrDefault(catalog, Collections.emptyList()).stream()
Expand Down Expand Up @@ -906,8 +912,9 @@ public ResultSet getCrossReference(String parentCatalog, String parentSchema, St
public ResultSet getTypeInfo() {
String[] columns = new String[]{
"TYPE_NAME", "DATA_TYPE", "PRECISION", "LITERAL_PREFIX", "LITERAL_SUFFIX", "CREATE_PARAMS", "NULLABLE",
"CASE_SENSITIVE", "SEARCHABLE", "UNSIGNED_ATTRIBUTE", "FIXED_PREC_SCALE", "AUTO_INCREMENT", "LOCAL_TYPE_NAME",
"MINIMUM_SCALE", "MAXIMUM_SCALE", "SQL_DATA_TYPE", "SQL_DATETIME_SUB", "NUM_PREC_RADIX",
"CASE_SENSITIVE", "SEARCHABLE", "UNSIGNED_ATTRIBUTE", "FIXED_PREC_SCALE", "AUTO_INCREMENT",
"LOCAL_TYPE_NAME", "MINIMUM_SCALE", "MAXIMUM_SCALE", "SQL_DATA_TYPE", "SQL_DATETIME_SUB",
"NUM_PREC_RADIX",
};

Iterable<List<?>> data = asList(
Expand Down Expand Up @@ -1100,7 +1107,8 @@ public ResultSet getAttributes(String catalog, String schemaPattern, String type
"IS_NULLABLE", "SCOPE_CATALOG", "SCOPE_SCHEMA", "SCOPE_TABLE", "SOURCE_DATA_TYPE"};

int[] sqlTypes = new int[]{VARCHAR, VARCHAR, VARCHAR, VARCHAR, INTEGER, VARCHAR, INTEGER, INTEGER, INTEGER,
INTEGER, VARCHAR, VARCHAR, INTEGER, INTEGER, INTEGER, INTEGER, VARCHAR, VARCHAR, VARCHAR, VARCHAR, SMALLINT};
INTEGER, VARCHAR, VARCHAR, INTEGER, INTEGER, INTEGER, INTEGER, VARCHAR, VARCHAR, VARCHAR, VARCHAR,
SMALLINT};

return new ListRecordSet(null, "system", "attributes",
systemColumns(columns, sqlTypes), emptyList());
Expand Down Expand Up @@ -1206,7 +1214,8 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct
}

@Override
public ResultSet getFunctionColumns(String catalog, String schemaPattern, String functionNamePattern, String columnNamePattern) {
public ResultSet getFunctionColumns(String catalog, String schemaPattern, String functionNamePattern,
String columnNamePattern) {
String[] columns = new String[]{"FUNCTION_CAT", "FUNCTION_SCHEM", "FUNCTION_NAME", "COLUMN_NAME", "COLUMN_TYPE",
"DATA_TYPE", "TYPE_NAME", "PRECISION", "LENGTH", "SCALE", "RADIX", "NULLABLE", "REMARKS",
"CHAR_OCTET_LENGTH", "ORDINAL_POSITION", "IS_NULLABLE", "SPECIFIC_NAME"};
Expand All @@ -1219,9 +1228,11 @@ public ResultSet getFunctionColumns(String catalog, String schemaPattern, String
}

@Override
public ResultSet getPseudoColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) {
public ResultSet getPseudoColumns(String catalog, String schemaPattern, String tableNamePattern,
String columnNamePattern) {
String[] columns = new String[]{"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "DATA_TYPE",
"COLUMN_SIZE", "DECIMAL_DIGITS", "NUM_PREC_RADIX", "COLUMN_USAGE", "REMARKS", "CHAR_OCTET_LENGTH", "IS_NULLABLE"};
"COLUMN_SIZE", "DECIMAL_DIGITS", "NUM_PREC_RADIX", "COLUMN_USAGE", "REMARKS", "CHAR_OCTET_LENGTH",
"IS_NULLABLE"};

int[] sqlTypes = new int[]{VARCHAR, VARCHAR, VARCHAR, VARCHAR, INTEGER, INTEGER, INTEGER, INTEGER, VARCHAR,
VARCHAR, INTEGER, VARCHAR};
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static com.aerospike.jdbc.util.Constants.defaultSchemaName;

Expand Down Expand Up @@ -160,6 +161,10 @@ public Collection<Object> getPrimaryKeys() {
return Collections.emptyList();
}

public boolean isIndexable() {
return Objects.nonNull(predicate) && predicate.isIndexable() && Objects.isNull(offset);
}

@Override
public String toString() {
try {
Expand Down
27 changes: 21 additions & 6 deletions src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.aerospike.jdbc.model.AerospikeQuery;
import com.aerospike.jdbc.model.DriverConfiguration;
import com.aerospike.jdbc.sql.ListRecordSet;
import com.aerospike.jdbc.util.AerospikeVersion;

import java.sql.SQLException;
import java.sql.Statement;
Expand All @@ -18,18 +19,16 @@ public abstract class BaseQueryHandler implements QueryHandler {

protected final IAerospikeClient client;
protected final Statement statement;
protected final DriverConfiguration config;
protected final PolicyBuilder policyBuilder;
protected final DriverConfiguration config;
protected final AerospikeVersion aerospikeVersion;

protected BaseQueryHandler(IAerospikeClient client, Statement statement) {
this.client = client;
this.statement = statement;
try {
config = ((AerospikeConnection) statement.getConnection()).getConfiguration();
} catch (SQLException e) {
throw new IllegalStateException("Failed to get configuration", e);
}
policyBuilder = new PolicyBuilder(client);
config = getConfiguration();
aerospikeVersion = getAerospikeVersion();
}

protected Bin[] getBins(AerospikeQuery query) {
Expand All @@ -45,4 +44,20 @@ protected ListRecordSet emptyRecordSet(AerospikeQuery query) {
return new ListRecordSet(statement, query.getSchema(), query.getTable(),
emptyList(), emptyList());
}

private DriverConfiguration getConfiguration() {
try {
return ((AerospikeConnection) statement.getConnection()).getConfiguration();
} catch (SQLException e) {
throw new IllegalStateException("Failed to get DriverConfiguration", e);
}
}

private AerospikeVersion getAerospikeVersion() {
try {
return ((AerospikeConnection) statement.getConnection()).getAerospikeVersion();
} catch (SQLException e) {
throw new IllegalStateException("Failed to get AerospikeVersion", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.aerospike.jdbc.async.FutureWriteListener;
import com.aerospike.jdbc.model.AerospikeQuery;
import com.aerospike.jdbc.model.Pair;
import com.aerospike.jdbc.util.VersionUtils;

import java.sql.ResultSet;
import java.sql.Statement;
Expand All @@ -33,7 +32,7 @@ public InsertQueryHandler(IAerospikeClient client, Statement statement) {

@Override
public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
if (VersionUtils.isBatchOpsSupported(client)) {
if (aerospikeVersion.isBatchOpsSupported()) {
logger.info("INSERT batch");
return putBatch(query);
}
Expand Down
47 changes: 27 additions & 20 deletions src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.aerospike.jdbc.model.Pair;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
import com.aerospike.jdbc.sql.AerospikeRecordResultSet;
import com.aerospike.jdbc.util.VersionUtils;

import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -75,7 +74,8 @@ private Pair<ResultSet, Integer> executeCountQuery(AerospikeQuery query) {
recordNumber = getTableRecordsNumber(client, query.getSchema(), query.getTable());
} else {
ScanPolicy policy = policyBuilder.buildScanNoBinDataPolicy(query);
RecordSet recordSet = ScanQueryHandler.create(client, config.getDriverPolicy()).execute(policy, query);
RecordSet recordSet = ScanQueryHandler.create(client, config.getDriverPolicy())
.execute(policy, query);

final AtomicInteger count = new AtomicInteger();
recordSet.forEach(r -> count.incrementAndGet());
Expand All @@ -88,25 +88,26 @@ private Pair<ResultSet, Integer> executeCountQuery(AerospikeQuery query) {
recordSet.put(new KeyRecord(null, aeroRecord));
recordSet.close();

List<DataColumn> columnList = Collections.singletonList(new DataColumn(query.getSchema(),
query.getTable(), Types.INTEGER, countLabel, countLabel));
columns = Collections.singletonList(new DataColumn(query.getSchema(), query.getTable(),
Types.INTEGER, countLabel, countLabel));

return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getSchema(),
query.getTable(), columnList), -1);
return queryResult(recordSet, query);
}

private Pair<ResultSet, Integer> executeSelectByPrimaryKey(AerospikeQuery query, Collection<Object> keyObjects) {
logger.info(() -> "SELECT primary key");
final BatchReadPolicy policy = policyBuilder.buildBatchReadPolicy(query);
List<BatchRead> batchReadList = keyObjects.stream()
.map(k -> new BatchRead(policy, new Key(query.getSchema(), query.getSetName(), Value.get(k)), true))
.map(k -> {
Key key = new Key(query.getSchema(), query.getSetName(), Value.get(k));
return new BatchRead(policy, key, true);
})
.collect(Collectors.toList());

RecordSetBatchSequenceListener listener = new RecordSetBatchSequenceListener(config.getDriverPolicy());
client.get(EventLoopProvider.getEventLoop(), listener, null, batchReadList);

return new Pair<>(new AerospikeRecordResultSet(listener.getRecordSet(), statement, query.getSchema(),
query.getTable(), filterColumns(columns, query.getBinNames())), -1);
return queryResult(listener.getRecordSet(), query);
}

private Pair<ResultSet, Integer> executeScan(AerospikeQuery query) {
Expand All @@ -115,8 +116,7 @@ private Pair<ResultSet, Integer> executeScan(AerospikeQuery query) {
ScanPolicy policy = policyBuilder.buildScanPolicy(query);
RecordSet recordSet = ScanQueryHandler.create(client, config.getDriverPolicy()).execute(policy, query);

return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getSchema(),
query.getTable(), filterColumns(columns, query.getBinNames())), -1);
return queryResult(recordSet, query);
}

private Pair<ResultSet, Integer> executeQuery(AerospikeQuery query,
Expand All @@ -127,13 +127,11 @@ private Pair<ResultSet, Integer> executeQuery(AerospikeQuery query,
RecordSet recordSet = SecondaryIndexQueryHandler.create(client, config.getDriverPolicy())
.execute(policy, query, secondaryIndex);

return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getSchema(),
query.getTable(), filterColumns(columns, query.getBinNames())), -1);
return queryResult(recordSet, query);
}

private Optional<AerospikeSecondaryIndex> secondaryIndex(AerospikeQuery query) {
if (VersionUtils.isSIndexSupported(client) && Objects.nonNull(query.getPredicate())
&& query.getPredicate().isIndexable() && Objects.isNull(query.getOffset())) {
if (aerospikeVersion.isSIndexSupported() && query.isIndexable()) {
Map<String, AerospikeSecondaryIndex> indexMap = secondaryIndexes;
List<String> binNames = query.getPredicate().getBinNames();
if (!binNames.isEmpty() && indexMap != null && !indexMap.isEmpty()) {
Expand All @@ -146,11 +144,7 @@ private Optional<AerospikeSecondaryIndex> secondaryIndex(AerospikeQuery query) {
}
} else {
List<AerospikeSecondaryIndex> indexList = new ArrayList<>(indexMap.values());
if (VersionUtils.isSIndexCardinalitySupported(client)) {
indexList.sort(Comparator.comparingInt(AerospikeSecondaryIndex::getBinValuesRatio));
} else {
indexList.sort(Comparator.comparing(AerospikeSecondaryIndex::getBinName));
}
sortIndexList(indexList);
for (AerospikeSecondaryIndex index : indexList) {
if (binNames.contains(index.getBinName())) {
return Optional.of(index);
Expand All @@ -162,6 +156,19 @@ private Optional<AerospikeSecondaryIndex> secondaryIndex(AerospikeQuery query) {
return Optional.empty();
}

private Pair<ResultSet, Integer> queryResult(RecordSet recordSet, AerospikeQuery query) {
return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getSchema(),
query.getTable(), filterColumns(columns, query.getBinNames())), -1);
}

private void sortIndexList(List<AerospikeSecondaryIndex> indexList) {
if (aerospikeVersion.isSIndexCardinalitySupported()) {
indexList.sort(Comparator.comparingInt(AerospikeSecondaryIndex::getBinValuesRatio));
} else {
indexList.sort(Comparator.comparing(AerospikeSecondaryIndex::getBinName));
}
}

private boolean isCount(AerospikeQuery query) {
return query.getColumns().size() == 1 &&
query.getColumns().get(0).toLowerCase(Locale.ENGLISH).startsWith("count(");
Expand Down
Loading

0 comments on commit c282218

Please sign in to comment.