Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-290 Align Aerospike namespace naming with JDBC catalog #56

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/com/aerospike/jdbc/AerospikeConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class AerospikeConnection implements Connection, SimpleWrapper {
private final IAerospikeClient client;
private final DatabaseMetadataBuilder metadataBuilder;
private final AerospikeVersion aerospikeVersion;
private final AtomicReference<String> schema = new AtomicReference<>(null); // namespace
private final AtomicReference<String> catalog = new AtomicReference<>(null);

private volatile boolean readOnly = false;
private volatile Map<String, Class<?>> typeMap = emptyMap();
Expand All @@ -49,7 +49,7 @@ public AerospikeConnection(String url, Properties props) {
client = config.parse(url);
metadataBuilder = new DatabaseMetadataBuilder(config.getDriverPolicy());
aerospikeVersion = new AerospikeVersion(client);
schema.set(config.getSchema()); // namespace
catalog.set(config.getCatalog()); // namespace
}

@Override
Expand Down Expand Up @@ -125,12 +125,12 @@ public void setReadOnly(boolean readOnly) throws SQLException {

@Override
public String getCatalog() {
return schema.get();
return catalog.get();
}

@Override
public void setCatalog(String catalog) {
schema.set(catalog);
this.catalog.set(catalog);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ public ResultSetMetaData getMetaData() throws SQLException {
AerospikeQuery query = parseQuery(prepareQueryString());
List<DataColumn> columns = ((AerospikeDatabaseMetadata) connection.getMetaData())
.getSchemaBuilder()
.getSchema(query.getSchemaTable());
return new AerospikeResultSetMetaData(query.getSchema(), query.getTable(), columns);
.getSchema(query.getCatalogTable());
return new AerospikeResultSetMetaData(query.getCatalog(), query.getTable(), columns);
}

@Override
Expand Down Expand Up @@ -265,7 +265,7 @@ public ParameterMetaData getParameterMetaData() throws SQLException {
AerospikeQuery query = parseQuery(prepareQueryString());
List<DataColumn> columns = ((AerospikeDatabaseMetadata) connection.getMetaData())
.getSchemaBuilder()
.getSchema(query.getSchemaTable());
.getSchema(query.getCatalogTable());
return new SimpleParameterMetaData(columns);
}

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/aerospike/jdbc/AerospikeStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class AerospikeStatement implements Statement, SimpleWrapper {
protected final IAerospikeClient client;
protected final AerospikeConnection connection;

protected String schema;
protected String catalog;
protected ResultSet resultSet;
protected int updateCount;

Expand All @@ -42,7 +42,7 @@ public class AerospikeStatement implements Statement, SimpleWrapper {
public AerospikeStatement(IAerospikeClient client, AerospikeConnection connection) {
this.client = client;
this.connection = connection;
this.schema = connection.getCatalog();
this.catalog = connection.getCatalog();
}

@Override
Expand All @@ -67,8 +67,8 @@ protected AerospikeQuery parseQuery(String sql) throws SQLException {
} catch (SqlParseException e) {
query = AuxStatementParser.hack(sql);
}
if (query.getSchema() == null) {
query.setSchema(schema);
if (query.getCatalog() == null) {
query.setCatalog(catalog);
}
return query;
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/aerospike/jdbc/async/ScanQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ public RecordSet execute(ScanPolicy scanPolicy, AerospikeQuery query) {
long maxRecords = scanPolicy.maxRecords;
PartitionFilter filter = getPartitionFilter(query);
while (isScanRequired(maxRecords)) {
client.scanPartitions(scanPolicy, filter, query.getSchema(), query.getSetName(),
client.scanPartitions(scanPolicy, filter, query.getCatalog(), query.getSetName(),
callback, query.columnBins());
scanPolicy.maxRecords = maxRecords > 0 ? maxRecords - count : maxRecords;
filter = PartitionFilter.id(++currentPartition);
}
listener.onSuccess();
} else {
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getCatalog(),
query.getSetName(), query.columnBins());
}
return listener.getRecordSet();
}

private PartitionFilter getPartitionFilter(AerospikeQuery query) {
Key key = new Key(query.getSchema(), query.getSetName(), query.getOffset());
Key key = new Key(query.getCatalog(), query.getSetName(), query.getOffset());
currentPartition = Partition.getPartitionId(key.digest);
return PartitionFilter.after(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public RecordSet execute(QueryPolicy queryPolicy, AerospikeQuery query,
statement.setRecordsPerSecond(client.getScanPolicyDefault().recordsPerSecond);

statement.setIndexName(secondaryIndex.getIndexName());
statement.setNamespace(query.getSchema());
statement.setNamespace(query.getCatalog());
statement.setSetName(query.getTable());
statement.setBinNames(query.columnBins());

Expand Down
21 changes: 3 additions & 18 deletions src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class AerospikeQuery {
private static final String ASTERISK = "*";

private String catalog;
private String schema;
private String table;
private QueryType queryType;
private Integer offset;
Expand Down Expand Up @@ -59,29 +58,15 @@ public void setCatalog(String catalog) {
this.catalog = catalog;
}

public String getSchema() {
return schema;
}

public void setSchema(String schema) {
this.catalog = schema; // TODO ?
this.schema = schema;
}

public String getTable() {
return table;
}

public void setTable(String table) {
String[] spec = table.split("\\.");
switch (spec.length) {
case 3:
this.catalog = spec[0];
this.schema = spec[1];
this.table = spec[2];
break;
case 2:
this.schema = spec[0];
this.catalog = spec[0];
this.table = spec[1];
break;
case 1:
Expand All @@ -99,8 +84,8 @@ public String getSetName() {
return table;
}

public SchemaTableName getSchemaTable() {
return new SchemaTableName(schema, table);
public CatalogTableName getCatalogTable() {
return new CatalogTableName(catalog, table);
}

public QueryType getQueryType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class AerospikeSecondaryIndex {
private final IndexType indexType;

@Order(3)
@JsonProperty("schema_name")
@JsonProperty("catalog_name")
private final String namespace;

@Order(4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public AerospikeQuery visit(SqlCall sqlCall) {
query.setQueryType(QueryType.DROP_TABLE);
} else if (sqlCall instanceof SqlDropSchema) {
SqlDropSchema sql = (SqlDropSchema) sqlCall;
query.setSchema(requireNonNull(sql.name).toString());
query.setCatalog(requireNonNull(sql.name).toString());
query.setQueryType(QueryType.DROP_SCHEMA);
} else if (sqlCall instanceof SqlOrderBy) {
SqlOrderBy sql = (SqlOrderBy) sqlCall;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

import java.util.Objects;

public class SchemaTableName {
public class CatalogTableName {

private final String schemaName;
private final String catalogName;
private final String tableName;

public SchemaTableName(String schemaName, String tableName) {
this.schemaName = schemaName;
public CatalogTableName(String catalogName, String tableName) {
this.catalogName = catalogName;
this.tableName = tableName;
}

public String getSchemaName() {
return schemaName;
public String getCatalogName() {
return catalogName;
}

public String getTableName() {
Expand All @@ -24,18 +24,18 @@ public String getTableName() {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchemaTableName that = (SchemaTableName) o;
return Objects.equals(schemaName, that.schemaName) &&
CatalogTableName that = (CatalogTableName) o;
return Objects.equals(catalogName, that.catalogName) &&
Objects.equals(tableName, that.tableName);
}

@Override
public int hashCode() {
return Objects.hash(schemaName, tableName);
return Objects.hash(catalogName, tableName);
}

@Override
public String toString() {
return String.format("%s(%s, %s)", getClass().getSimpleName(), schemaName, tableName);
return String.format("%s(%s, %s)", getClass().getSimpleName(), catalogName, tableName);
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/aerospike/jdbc/model/DataColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public int getType() {
return type;
}

public SchemaTableName getSchemaTableName() {
return new SchemaTableName(catalog, table);
public CatalogTableName getSchemaTableName() {
return new CatalogTableName(catalog, table);
}

@Override
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/com/aerospike/jdbc/model/DriverConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ public final class DriverConfiguration {

private static final String DEFAULT_AEROSPIKE_PORT = "3000";

private static final Pattern AS_JDBC_URL = Pattern.compile("^jdbc:aerospike:(?://)?([^/?]+)");
private static final Pattern AS_JDBC_SCHEMA = Pattern.compile("/([^?]+)");
private static final Pattern AEROSPIKE_JDBC_URL = Pattern.compile("^jdbc:aerospike:(?://)?([^/?]+)");
private static final Pattern AEROSPIKE_JDBC_CATALOG = Pattern.compile("/([^?]+)");

private final Map<Object, Object> clientInfo = new ConcurrentHashMap<>();
private volatile IAerospikeClient client;
private volatile String schema;
private volatile String catalog;
private volatile ClientPolicy clientPolicy;
private volatile DriverPolicy driverPolicy;

Expand All @@ -48,7 +48,7 @@ public DriverConfiguration(Properties props) {
@SuppressWarnings("java:S2696")
public IAerospikeClient parse(String url) {
logger.info(() -> format("Parse URL: %s", url));
schema = parseSchema(url);
catalog = parseCatalog(url);
updateClientInfo(url);

Value.UseBoolBin = Optional.ofNullable(clientInfo.get("useBoolBin"))
Expand Down Expand Up @@ -113,7 +113,7 @@ private TlsPolicy buildTlsPolicy() {
}

private Host[] parseHosts(String url, final String tlsName) {
Matcher m = AS_JDBC_URL.matcher(url);
Matcher m = AEROSPIKE_JDBC_URL.matcher(url);
if (!m.find()) {
throw new IllegalArgumentException("Cannot parse URL " + url);
}
Expand All @@ -124,8 +124,8 @@ private Host[] parseHosts(String url, final String tlsName) {
.toArray(Host[]::new);
}

private String parseSchema(String url) {
Matcher m = AS_JDBC_SCHEMA.matcher(url);
private String parseCatalog(String url) {
Matcher m = AEROSPIKE_JDBC_CATALOG.matcher(url);
return m.find() ? m.group(1) : null;
}

Expand All @@ -141,8 +141,8 @@ private void updateClientInfo(String url) {
}
}

public String getSchema() {
return schema;
public String getCatalog() {
return catalog;
}

public Properties getClientInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected Bin[] getBins(AerospikeQuery query) {
}

protected ListRecordSet emptyRecordSet(AerospikeQuery query) {
return new ListRecordSet(statement, query.getSchema(), query.getTable(),
return new ListRecordSet(statement, query.getCatalog(), query.getTable(),
emptyList(), emptyList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
logger.info("DELETE primary key");
FutureDeleteListener listener = new FutureDeleteListener(keyObjects.size());
for (Object keyObject : keyObjects) {
Key key = new Key(query.getSchema(), query.getSetName(), Value.get(keyObject));
Key key = new Key(query.getCatalog(), query.getSetName(), Value.get(keyObject));
try {
client.delete(EventLoopProvider.getEventLoop(), listener, writePolicy, key);
} catch (AerospikeException e) {
Expand All @@ -49,7 +49,7 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
ScanPolicy scanPolicy = policyBuilder.buildScanPolicy(query);
scanPolicy.includeBinData = false;

client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getCatalog(),
query.getSetName());

final WritePolicy deletePolicy = policyBuilder.buildDeleteWritePolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Pair<ResultSet, Integer> putConsecutively(AerospikeQuery query) {
@SuppressWarnings("unchecked")
List<Object> values = (List<Object>) aerospikeRecord;
Value recordKey = extractInsertKey(query, values);
Key key = new Key(query.getSchema(), query.getSetName(), recordKey);
Key key = new Key(query.getCatalog(), query.getSetName(), recordKey);
Bin[] bins = buildBinArray(binNames, values);

try {
Expand All @@ -73,7 +73,7 @@ public Pair<ResultSet, Integer> putBatch(AerospikeQuery query) {
@SuppressWarnings("unchecked")
List<Object> values = (List<Object>) aerospikeRecord;
Value recordKey = extractInsertKey(query, values);
Key key = new Key(query.getSchema(), query.getSetName(), recordKey);
Key key = new Key(query.getCatalog(), query.getSetName(), recordKey);
batchRecords.add(
new BatchWrite(
batchWritePolicy,
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public SelectQueryHandler(IAerospikeClient client, Statement statement) {

@Override
public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
columns = databaseMetadata.getSchemaBuilder().getSchema(query.getSchemaTable());
columns = databaseMetadata.getSchemaBuilder().getSchema(query.getCatalogTable());
Collection<Object> keyObjects = query.getPrimaryKeys();
Optional<AerospikeSecondaryIndex> sIndex = secondaryIndex(query);
Pair<ResultSet, Integer> result;
Expand All @@ -74,7 +74,7 @@ private Pair<ResultSet, Integer> executeCountQuery(AerospikeQuery query) {
String countLabel = query.getColumns().get(0);
int recordNumber;
if (Objects.isNull(query.getPredicate())) {
recordNumber = getTableRecordsNumber(client, query.getSchema(), query.getTable());
recordNumber = getTableRecordsNumber(client, query.getCatalog(), query.getTable());
} else {
ScanPolicy policy = policyBuilder.buildScanNoBinDataPolicy(query);
RecordSet recordSet = ScanQueryHandler.create(client, config.getDriverPolicy())
Expand All @@ -91,7 +91,7 @@ private Pair<ResultSet, Integer> executeCountQuery(AerospikeQuery query) {
recordSet.put(new KeyRecord(null, aeroRecord));
recordSet.close();

columns = Collections.singletonList(new DataColumn(query.getSchema(), query.getTable(),
columns = Collections.singletonList(new DataColumn(query.getCatalog(), query.getTable(),
Types.INTEGER, countLabel, countLabel));

return queryResult(recordSet, query);
Expand All @@ -102,7 +102,7 @@ private Pair<ResultSet, Integer> executeSelectByPrimaryKey(AerospikeQuery query,
final BatchReadPolicy policy = policyBuilder.buildBatchReadPolicy(query);
List<BatchRead> batchReadList = keyObjects.stream()
.map(k -> {
Key key = new Key(query.getSchema(), query.getSetName(), Value.get(k));
Key key = new Key(query.getCatalog(), query.getSetName(), Value.get(k));
return new BatchRead(policy, key, true);
})
.collect(Collectors.toList());
Expand Down Expand Up @@ -135,7 +135,7 @@ private Pair<ResultSet, Integer> executeQuery(AerospikeQuery query,

private Optional<AerospikeSecondaryIndex> secondaryIndex(AerospikeQuery query) {
if (aerospikeVersion.isSIndexSupported() && query.isIndexable()) {
Collection<AerospikeSecondaryIndex> indexes = databaseMetadata.getSecondaryIndexes(query.getSchema());
Collection<AerospikeSecondaryIndex> indexes = databaseMetadata.getSecondaryIndexes(query.getCatalog());
List<String> binNames = query.getPredicate().getBinNames();
if (!binNames.isEmpty() && indexes != null && !indexes.isEmpty()) {
List<AerospikeSecondaryIndex> indexList = indexes.stream()
Expand All @@ -154,7 +154,7 @@ private Optional<AerospikeSecondaryIndex> secondaryIndex(AerospikeQuery query) {
}

private Pair<ResultSet, Integer> queryResult(RecordSet recordSet, AerospikeQuery query) {
return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getSchema(),
return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getCatalog(),
query.getTable(), filterColumns(query)), -1);
}

Expand Down
Loading
Loading