Skip to content

Commit

Permalink
FMWK-444 Allow record metadata columns inclusion
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed May 25, 2024
1 parent 54e9cce commit 28cdba9
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 51 deletions.
1 change: 1 addition & 0 deletions docs/params.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ Consider setting a custom value if really necessary.
| recordSetTimeoutMs | 1000 | Timeout for the asynchronous queue write operation in milliseconds |
| metadataCacheTtlSeconds | 3600 | Database metadata cache TTL in seconds |
| schemaBuilderMaxRecords | 1000 | The number of records to be used to build the table schema |
| showRecordMetadata | false | Add record metadata columns (__digest, __ttl, __gen) |
6 changes: 6 additions & 0 deletions src/main/java/com/aerospike/jdbc/model/DriverPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class DriverPolicy {
private final int recordSetTimeoutMs;
private final int metadataCacheTtlSeconds;
private final int schemaBuilderMaxRecords;
private final boolean showRecordMetadata;

public DriverPolicy(Properties properties) {
recordSetQueueCapacity = parseInt(properties.getProperty("recordSetQueueCapacity"),
Expand All @@ -23,6 +24,7 @@ public DriverPolicy(Properties properties) {
DEFAULT_METADATA_CACHE_TTL_SECONDS);
schemaBuilderMaxRecords = parseInt(properties.getProperty("schemaBuilderMaxRecords"),
DEFAULT_SCHEMA_BUILDER_MAX_RECORDS);
showRecordMetadata = Boolean.parseBoolean(properties.getProperty("showRecordMetadata"));
}

public int getRecordSetQueueCapacity() {
Expand All @@ -41,6 +43,10 @@ public int getSchemaBuilderMaxRecords() {
return schemaBuilderMaxRecords;
}

public boolean getShowRecordMetadata() {
return showRecordMetadata;
}

private int parseInt(String value, int defaultValue) {
if (value != null) {
return Integer.parseInt(value);
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/aerospike/jdbc/model/Pair.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ public Pair(L left, R right) {
this.right = right;
}

public static <L, R> Pair<L, R> of(L left, R right) {
return new Pair<>(left, right);
}

public L getLeft() {
return left;
}
Expand Down
51 changes: 38 additions & 13 deletions src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,44 @@
import com.aerospike.jdbc.model.CatalogTableName;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.model.DriverPolicy;
import com.aerospike.jdbc.model.Pair;

import java.sql.Types;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Logger;

import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME;
import static com.aerospike.jdbc.util.Constants.METADATA_DIGEST_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.METADATA_GEN_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.METADATA_TTL_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;

public final class AerospikeSchemaBuilder {

private static final Logger logger = Logger.getLogger(AerospikeSchemaBuilder.class.getName());

private final IAerospikeClient client;
private final DriverPolicy driverPolicy;
private final AerospikeSchemaCache schemaCache;
private final int scanMaxRecords;

public AerospikeSchemaBuilder(IAerospikeClient client, DriverPolicy driverPolicy) {
this.client = client;
this.driverPolicy = driverPolicy;
schemaCache = new AerospikeSchemaCache(Duration.ofSeconds(driverPolicy.getMetadataCacheTtlSeconds()));
scanMaxRecords = driverPolicy.getSchemaBuilderMaxRecords();
}

public List<DataColumn> getSchema(CatalogTableName catalogTableName) {
return schemaCache.get(catalogTableName).orElseGet(() -> {
logger.info(() -> "Fetching CatalogTableName: " + catalogTableName);
final Map<String, DataColumn> columnHandles = new TreeMap<>(String::compareToIgnoreCase);
ScanPolicy policy = new ScanPolicy(client.getScanPolicyDefault());
policy.maxRecords = scanMaxRecords;
final Map<String, DataColumn> columnHandles = initColumnHandles(catalogTableName);

// add record key column handler
columnHandles.put(PRIMARY_KEY_COLUMN_NAME,
new DataColumn(
catalogTableName.getCatalogName(),
catalogTableName.getTableName(),
Types.VARCHAR,
PRIMARY_KEY_COLUMN_NAME,
PRIMARY_KEY_COLUMN_NAME));
ScanPolicy policy = new ScanPolicy(client.getScanPolicyDefault());
policy.maxRecords = driverPolicy.getSchemaBuilderMaxRecords();

client.scanAll(policy, catalogTableName.getCatalogName(), toSet(catalogTableName.getTableName()),
(key, rec) -> {
Expand All @@ -69,6 +66,34 @@ public List<DataColumn> getSchema(CatalogTableName catalogTableName) {
});
}

private Map<String, DataColumn> initColumnHandles(CatalogTableName catalogTableName) {
final Map<String, DataColumn> columnHandles = new TreeMap<>(String::compareToIgnoreCase);
final List<Pair<String, Integer>> metadataColumns = new ArrayList<>();

// add record key column handler
metadataColumns.add(Pair.of(PRIMARY_KEY_COLUMN_NAME, Types.VARCHAR));

// add record metadata column handlers
if (driverPolicy.getShowRecordMetadata()) {
metadataColumns.addAll(Arrays.asList(
Pair.of(METADATA_DIGEST_COLUMN_NAME, Types.VARCHAR),
Pair.of(METADATA_GEN_COLUMN_NAME, Types.INTEGER),
Pair.of(METADATA_TTL_COLUMN_NAME, Types.INTEGER)
));
}

for (Pair<String, Integer> md : metadataColumns) {
columnHandles.put(md.getLeft(),
new DataColumn(
catalogTableName.getCatalogName(),
catalogTableName.getTableName(),
md.getRight(),
md.getLeft(),
md.getLeft()));
}
return columnHandles;
}

private String toSet(String tableName) {
if (tableName.equals(DEFAULT_SCHEMA_NAME)) {
return null;
Expand Down
78 changes: 40 additions & 38 deletions src/main/java/com/aerospike/jdbc/sql/AerospikeRecordResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,27 @@
import com.aerospike.client.Value;
import com.aerospike.jdbc.async.RecordSet;
import com.aerospike.jdbc.model.DataColumn;
import com.google.common.io.BaseEncoding;

import java.math.BigDecimal;
import java.sql.Statement;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import static com.aerospike.jdbc.util.Constants.METADATA_DIGEST_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.METADATA_GEN_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.METADATA_TTL_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;

public class AerospikeRecordResultSet extends BaseResultSet<Record> {

private static final Logger logger = Logger.getLogger(AerospikeRecordResultSet.class.getName());

private final RecordSet recordSet;
private final Set<String> columnNames;

public AerospikeRecordResultSet(
RecordSet recordSet,
Expand All @@ -28,6 +35,7 @@ public AerospikeRecordResultSet(
) {
super(statement, catalog, table, columns);
this.recordSet = recordSet;
this.columnNames = columns.stream().map(DataColumn::getName).collect(Collectors.toSet());
}

@Override
Expand All @@ -43,36 +51,23 @@ protected boolean moveToNext() {
@Override
public Object getObject(String columnLabel) {
logger.fine(() -> "getObject: " + columnLabel);
Object obj;
if (columnLabel.equals(PRIMARY_KEY_COLUMN_NAME)) {
obj = getUserKey().map(Value::getObject).orElse(null);
} else {
obj = getBin(columnLabel).orElse(null);
}
Object obj = getValue(columnLabel).map(Value::getObject).orElse(null);
wasNull = obj == null;
return obj;
}

@Override
public String getString(String columnLabel) {
logger.fine(() -> "getString: " + columnLabel);
String str;
if (columnLabel.equals(PRIMARY_KEY_COLUMN_NAME)) {
str = getUserKey().map(Value::toString).orElse(null);
} else {
str = getBin(columnLabel).map(Object::toString).orElse(null);
}
String str = getValue(columnLabel).map(Value::toString).orElse(null);
wasNull = str == null;
return str;
}

@Override
public boolean getBoolean(String columnLabel) {
logger.fine(() -> "getBoolean: " + columnLabel);
if (columnLabel.equals(PRIMARY_KEY_COLUMN_NAME)) {
return getUserKey().map(Value::toString).map(Boolean::parseBoolean).orElse(false);
}
return getBin(columnLabel).map(Object::toString).map(Boolean::parseBoolean).orElse(false);
return getValue(columnLabel).map(Value::toString).map(Boolean::parseBoolean).orElse(false);
}

@Override
Expand All @@ -90,19 +85,13 @@ public short getShort(String columnLabel) {
@Override
public int getInt(String columnLabel) {
logger.fine(() -> "getInt: " + columnLabel);
if (columnLabel.equals(PRIMARY_KEY_COLUMN_NAME)) {
return getUserKey().map(Value::toInteger).orElse(0);
}
return getBin(columnLabel).map(Object::toString).map(Integer::parseInt).orElse(0);
return getValue(columnLabel).map(Value::toString).map(Integer::parseInt).orElse(0);
}

@Override
public long getLong(String columnLabel) {
logger.fine(() -> "getLong: " + columnLabel);
if (columnLabel.equals(PRIMARY_KEY_COLUMN_NAME)) {
return getUserKey().map(Value::toLong).orElse(0L);
}
return getBin(columnLabel).map(Object::toString).map(Long::parseLong).orElse(0L);
return getValue(columnLabel).map(Value::toString).map(Long::parseLong).orElse(0L);
}

@Override
Expand All @@ -114,10 +103,7 @@ public float getFloat(String columnLabel) {
@Override
public double getDouble(String columnLabel) {
logger.fine(() -> "getDouble: " + columnLabel);
if (columnLabel.equals(PRIMARY_KEY_COLUMN_NAME)) {
return getUserKey().map(Value::toString).map(Double::parseDouble).orElse(0.0d);
}
return getBin(columnLabel).map(Object::toString).map(Double::parseDouble).orElse(0.0d);
return getValue(columnLabel).map(Value::toString).map(Double::parseDouble).orElse(0.0d);
}

@Override
Expand All @@ -129,17 +115,33 @@ public BigDecimal getBigDecimal(String columnLabel, int scale) {
@Override
public byte[] getBytes(String columnLabel) {
logger.fine(() -> "getBytes: " + columnLabel);
if (columnLabel.equals(PRIMARY_KEY_COLUMN_NAME)) {
return getUserKey().map(Value::toString).map(String::getBytes).orElse(null);
}
return getBin(columnLabel).map(byte[].class::cast).orElse(null);
}

private Optional<Value> getUserKey() {
return Optional.ofNullable(recordSet.getKey().userKey);
return getValue(columnLabel).map(Value::getObject).map(byte[].class::cast).orElse(null);
}

private Optional<Object> getBin(String columnLabel) {
return Optional.ofNullable(recordSet.getRecord().bins.get(columnLabel));
private Optional<Value> getValue(String columnLabel) {
if (!columnNames.contains(columnLabel)) {
return Optional.empty();
}
switch (columnLabel) {
case PRIMARY_KEY_COLUMN_NAME:
return Optional.ofNullable(recordSet.getKey())
.map(key -> key.userKey);
case METADATA_DIGEST_COLUMN_NAME:
return Optional.ofNullable(recordSet.getKey())
.map(key -> BaseEncoding.base16().lowerCase().encode(key.digest))
.map(Value::get);
case METADATA_TTL_COLUMN_NAME:
return Optional.ofNullable(recordSet.getRecord())
.map(rec -> rec.expiration)
.map(Value::get);
case METADATA_GEN_COLUMN_NAME:
return Optional.ofNullable(recordSet.getRecord())
.map(rec -> rec.generation)
.map(Value::get);
default: // regular bin value
return Optional.ofNullable(recordSet.getRecord())
.map(rec -> rec.bins.get(columnLabel))
.map(Value::get);
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/com/aerospike/jdbc/util/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ public final class Constants {
public static final String PRIMARY_KEY_COLUMN_NAME = "__key";
public static final String DEFAULT_SCHEMA_NAME = "__default";

public static final String METADATA_DIGEST_COLUMN_NAME = "__digest";
public static final String METADATA_TTL_COLUMN_NAME = "__ttl";
public static final String METADATA_GEN_COLUMN_NAME = "__gen";

public static final String UNSUPPORTED_QUERY_TYPE_MESSAGE = "Unsupported query type";

// Driver version
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.Executors;
import java.util.logging.Logger;

import static com.aerospike.jdbc.util.Constants.METADATA_DIGEST_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;
import static com.aerospike.jdbc.util.TestConfig.HOSTNAME;
import static com.aerospike.jdbc.util.TestConfig.NAMESPACE;
Expand All @@ -25,6 +26,7 @@
import static java.lang.String.format;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

public class PreparedQueriesTest {
Expand Down Expand Up @@ -96,6 +98,7 @@ public void testSelectQuery() throws SQLException {
statement = connection.prepareStatement(query);
resultSet = statement.executeQuery();
while (resultSet.next()) {
assertNull(resultSet.getObject(METADATA_DIGEST_COLUMN_NAME));
testRecord.assertPreparedResultSet(resultSet);

total++;
Expand Down
Loading

0 comments on commit 28cdba9

Please sign in to comment.