Skip to content

Commit

Permalink
FMWK-275 Expose scan max records configuration for schema builder
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Dec 5, 2023
1 parent dd2ff23 commit b20fb4b
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 56 deletions.
24 changes: 15 additions & 9 deletions src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import static com.aerospike.jdbc.util.AerospikeUtils.getIndexBinValuesRatio;
import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.schemaScanRecords;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.sql.Connection.TRANSACTION_NONE;
Expand All @@ -50,17 +49,17 @@ public class AerospikeDatabaseMetadata implements DatabaseMetaData, SimpleWrappe
private static final String NEW_LINE = System.lineSeparator();

private final String url;
private final Connection connection;
private final AerospikeConnection connection;
private final String dbBuild;
private final String dbEdition;
private final List<String> catalogs;
private final Map<String, Collection<String>> tables;
private final Map<String, Collection<AerospikeSecondaryIndex>> catalogIndexes;
private final Map<String, AerospikeSecondaryIndex> secondaryIndexes;
private final AerospikeSchemaBuilder schemaBuilder;

public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeConnection connection) {
logger.info("Init AerospikeDatabaseMetadata");
AerospikeSchemaBuilder.cleanSchemaCache();
this.url = url;
this.connection = connection;

Expand Down Expand Up @@ -103,11 +102,17 @@ public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeC
.flatMap(Collection::stream)
.collect(Collectors.toMap(AerospikeSecondaryIndex::toKey, Function.identity()));

schemaBuilder = new AerospikeSchemaBuilder(client, connection.getConfiguration().getDriverPolicy());

dbBuild = join("N/A", ", ", builds);
dbEdition = join("Aerospike", ", ", editions);
catalogs = namespaces.stream().filter(n -> !"".equals(n)).collect(Collectors.toList());
}

public AerospikeSchemaBuilder getSchemaBuilder() {
return schemaBuilder;
}

@Override
public boolean allProceduresAreCallable() {
return false;
Expand Down Expand Up @@ -773,7 +778,7 @@ public ResultSet getTableTypes() {
@Override
public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern,
String columnNamePattern) throws SQLException {
logger.info(() -> String.format("AerospikeDatabaseMetadata getColumns; %s, %s, %s, %s", catalog,
logger.info(() -> format("AerospikeDatabaseMetadata getColumns; %s, %s, %s, %s", catalog,
schemaPattern, tableNamePattern, columnNamePattern));
Pattern tableNameRegex = isNullOrEmpty(tableNamePattern) ? null
: Pattern.compile(tableNamePattern.replace("%", ".*"));
Expand Down Expand Up @@ -1263,7 +1268,7 @@ private Properties initProperties(String lines) {
try {
properties.load(new StringReader(lines));
} catch (IOException e) {
logger.warning(() -> String.format("Expression in initProperties, lines: %s", lines));
logger.warning(() -> format("Expression in initProperties, lines: %s", lines));
}
return properties;
}
Expand Down Expand Up @@ -1293,17 +1298,18 @@ private int ordinal(ResultSetMetaData md, String columnName) {
}
}
} catch (SQLException e) {
logger.severe(() -> String.format("Exception in ordinal, columnName: %s", columnName));
logger.severe(() -> format("Exception in ordinal, columnName: %s", columnName));
}
return ordinal;
}

private ResultSetMetaData getMetadata(String namespace, String table) {
try (Statement statement = connection.createStatement()) {
return statement.executeQuery(format(
"select * from \"%s.%s\" limit %d", namespace, table, schemaScanRecords)).getMetaData();
String query = format("SELECT * FROM \"%s.%s\" LIMIT %d", namespace, table,
connection.getConfiguration().getDriverPolicy().getSchemaBuilderMaxRecords());
return statement.executeQuery(query).getMetaData();
} catch (SQLException e) {
logger.severe(() -> String.format("Exception in getMetadata, namespace: %s, table: %s", namespace, table));
logger.severe(() -> format("Exception in getMetadata, namespace: %s, table: %s", namespace, table));
throw new IllegalArgumentException(e);
}
}
Expand Down
34 changes: 21 additions & 13 deletions src/main/java/com/aerospike/jdbc/AerospikePreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.aerospike.client.Value;
import com.aerospike.jdbc.model.AerospikeQuery;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
import com.aerospike.jdbc.sql.AerospikeResultSetMetaData;
import com.aerospike.jdbc.sql.SimpleParameterMetaData;
import com.aerospike.jdbc.sql.type.ByteArrayBlob;
Expand All @@ -23,7 +22,6 @@
import java.util.Arrays;
import java.util.Calendar;
import java.util.List;
import java.util.Optional;
import java.util.logging.Logger;

import static com.aerospike.jdbc.util.PreparedStatement.parseParameters;
Expand All @@ -34,22 +32,25 @@ public class AerospikePreparedStatement extends AerospikeStatement implements Pr
private static final Logger logger = Logger.getLogger(AerospikePreparedStatement.class.getName());

private final String sql;
private final List<DataColumn> columns;
private final AerospikeQuery query;
private final AerospikeConnection connection;
private final Object[] parameterValues;
private final AerospikeQuery query;

public AerospikePreparedStatement(IAerospikeClient client, AerospikeConnection connection, String sql) {
super(client, connection);
this.sql = sql;
int params = parseParameters(sql, 0).getValue();
parameterValues = new Object[params];
Arrays.fill(parameterValues, Optional.empty());
this.connection = connection;
parameterValues = buildParameterValues(sql);
try {
query = parseQuery(sql);
} catch (SQLException e) {
throw new UnsupportedOperationException(e);
}
columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client);
}

private Object[] buildParameterValues(String sql) {
int params = parseParameters(sql, 0).getValue();
return new Object[params];
}

@Override
Expand Down Expand Up @@ -159,7 +160,7 @@ public void setBinaryStream(int parameterIndex, InputStream x, int length) throw

@Override
public void clearParameters() {
Arrays.fill(parameterValues, Optional.empty());
Arrays.fill(parameterValues, null);
}

@Override
Expand All @@ -185,7 +186,7 @@ public boolean execute() throws SQLException {
}

private String prepareQuery() {
return String.format(this.sql.replace("?", "%s"), parameterValues);
return format(this.sql.replace("?", "%s"), parameterValues);
}

@Override
Expand Down Expand Up @@ -219,7 +220,10 @@ public void setArray(int parameterIndex, Array x) throws SQLException {
}

@Override
public ResultSetMetaData getMetaData() {
public ResultSetMetaData getMetaData() throws SQLException {
List<DataColumn> columns = ((AerospikeDatabaseMetadata) connection.getMetaData())
.getSchemaBuilder()
.getSchema(query.getSchemaTable());
return new AerospikeResultSetMetaData(query.getSchema(), query.getTable(), columns);
}

Expand Down Expand Up @@ -249,7 +253,10 @@ public void setURL(int parameterIndex, URL url) throws SQLException {
}

@Override
public ParameterMetaData getParameterMetaData() {
public ParameterMetaData getParameterMetaData() throws SQLException {
List<DataColumn> columns = ((AerospikeDatabaseMetadata) connection.getMetaData())
.getSchemaBuilder()
.getSchema(query.getSchemaTable());
return new SimpleParameterMetaData(columns);
}

Expand Down Expand Up @@ -278,7 +285,8 @@ public void setClob(int parameterIndex, Reader reader, long length) throws SQLEx
try {
String result = IOUtils.toString(reader);
if (result.length() != length) {
throw new SQLException(format("Unexpected data length: expected %s but was %d", length, result.length()));
throw new SQLException(format("Unexpected data length: expected %s but was %d", length,
result.length()));
}
setObject(parameterIndex, result);
} catch (IOException e) {
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/aerospike/jdbc/model/DriverPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ public class DriverPolicy {
private static final int DEFAULT_CAPACITY = 256;
private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_METADATA_CACHE_TTL_SECONDS = 3600;
private static final int DEFAULT_SCHEMA_BUILDER_MAX_RECORDS = 1000;

private final int recordSetQueueCapacity;
private final int recordSetTimeoutMs;
private final int metadataCacheTtlSeconds;
private final int schemaBuilderMaxRecords;

public DriverPolicy(Properties properties) {
recordSetQueueCapacity = parseInt(properties.getProperty("recordSetQueueCapacity"), DEFAULT_CAPACITY);
recordSetTimeoutMs = parseInt(properties.getProperty("recordSetTimeoutMs"), DEFAULT_TIMEOUT_MS);
metadataCacheTtlSeconds = parseInt(properties.getProperty("metadataCacheTtlSeconds"),
DEFAULT_METADATA_CACHE_TTL_SECONDS);
schemaBuilderMaxRecords = parseInt(properties.getProperty("schemaBuilderMaxRecords"),
DEFAULT_SCHEMA_BUILDER_MAX_RECORDS);
}

public int getRecordSetQueueCapacity() {
Expand All @@ -31,6 +35,10 @@ public int getMetadataCacheTtlSeconds() {
return metadataCacheTtlSeconds;
}

public int getSchemaBuilderMaxRecords() {
return schemaBuilderMaxRecords;
}

private int parseInt(String value, int defaultValue) {
if (value != null) {
return Integer.parseInt(value);
Expand Down
12 changes: 5 additions & 7 deletions src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.aerospike.jdbc.model.AerospikeSecondaryIndex;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.model.Pair;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
import com.aerospike.jdbc.sql.AerospikeRecordResultSet;

import java.sql.ResultSet;
Expand All @@ -36,22 +35,21 @@ public class SelectQueryHandler extends BaseQueryHandler {

private static final Logger logger = Logger.getLogger(SelectQueryHandler.class.getName());

protected final Map<String, AerospikeSecondaryIndex> secondaryIndexes;
protected final AerospikeDatabaseMetadata databaseMetadata;
protected List<DataColumn> columns;

public SelectQueryHandler(IAerospikeClient client, Statement statement) {
super(client, statement);
try {
secondaryIndexes = ((AerospikeDatabaseMetadata) statement.getConnection().getMetaData())
.getSecondaryIndexes();
databaseMetadata = (AerospikeDatabaseMetadata) statement.getConnection().getMetaData();
} catch (SQLException e) {
throw new IllegalStateException("Failed to get secondary indexes", e);
throw new IllegalStateException("Failed to get AerospikeDatabaseMetadata", e);
}
}

@Override
public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client);
columns = databaseMetadata.getSchemaBuilder().getSchema(query.getSchemaTable());
Collection<Object> keyObjects = query.getPrimaryKeys();
Optional<AerospikeSecondaryIndex> sIndex = secondaryIndex(query);
Pair<ResultSet, Integer> result;
Expand Down Expand Up @@ -132,7 +130,7 @@ private Pair<ResultSet, Integer> executeQuery(AerospikeQuery query,

private Optional<AerospikeSecondaryIndex> secondaryIndex(AerospikeQuery query) {
if (aerospikeVersion.isSIndexSupported() && query.isIndexable()) {
Map<String, AerospikeSecondaryIndex> indexMap = secondaryIndexes;
Map<String, AerospikeSecondaryIndex> indexMap = databaseMetadata.getSecondaryIndexes();
List<String> binNames = query.getPredicate().getBinNames();
if (!binNames.isEmpty() && indexMap != null && !indexMap.isEmpty()) {
if (binNames.size() == 1) {
Expand Down
29 changes: 14 additions & 15 deletions src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.aerospike.client.Value;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.model.DriverPolicy;
import com.aerospike.jdbc.model.SchemaTableName;

import java.sql.Types;
Expand All @@ -16,29 +17,27 @@

import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.schemaCacheTTLMinutes;
import static com.aerospike.jdbc.util.Constants.schemaScanRecords;

public final class AerospikeSchemaBuilder {

private static final Logger logger = Logger.getLogger(AerospikeSchemaBuilder.class.getName());

private static final Duration cacheTTL = Duration.ofMinutes(schemaCacheTTLMinutes);
private static final AerospikeSchemaCache cache = new AerospikeSchemaCache(cacheTTL);
private final IAerospikeClient client;
private final AerospikeSchemaCache schemaCache;
private final int scanMaxRecords;

private AerospikeSchemaBuilder() {
public AerospikeSchemaBuilder(IAerospikeClient client, DriverPolicy driverPolicy) {
this.client = client;
schemaCache = new AerospikeSchemaCache(Duration.ofSeconds(driverPolicy.getMetadataCacheTtlSeconds()));
scanMaxRecords = driverPolicy.getSchemaBuilderMaxRecords();
}

public static void cleanSchemaCache() {
cache.clear();
}

public static List<DataColumn> getSchema(SchemaTableName schemaTableName, IAerospikeClient client) {
return cache.get(schemaTableName).orElseGet(() -> {
public List<DataColumn> getSchema(SchemaTableName schemaTableName) {
return schemaCache.get(schemaTableName).orElseGet(() -> {
logger.info(() -> "Fetching SchemaTableName: " + schemaTableName);
final Map<String, DataColumn> columnHandles = new TreeMap<>(String::compareToIgnoreCase);
ScanPolicy policy = new ScanPolicy(client.getScanPolicyDefault());
policy.maxRecords = schemaScanRecords;
policy.maxRecords = scanMaxRecords;

// add record key column handler
columnHandles.put(PRIMARY_KEY_COLUMN_NAME,
Expand All @@ -65,19 +64,19 @@ public static List<DataColumn> getSchema(SchemaTableName schemaTableName, IAeros
});

List<DataColumn> columns = new ArrayList<>(columnHandles.values());
cache.put(schemaTableName, columns);
schemaCache.put(schemaTableName, columns);
return columns;
});
}

private static String toSet(String tableName) {
private String toSet(String tableName) {
if (tableName.equals(DEFAULT_SCHEMA_NAME)) {
return null;
}
return tableName;
}

private static int getBinType(Object value) {
private int getBinType(Object value) {
int t = 0;
if (value instanceof byte[] || value instanceof Value.BytesValue || value instanceof Value.ByteSegmentValue) {
t = Types.VARBINARY;
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/com/aerospike/jdbc/util/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ public final class Constants {
public static final String PRIMARY_KEY_COLUMN_NAME = "__key";
public static final String DEFAULT_SCHEMA_NAME = "__default";

public static final long schemaScanRecords = 1000L;
public static final long schemaCacheTTLMinutes = 30L;

public static final String UNSUPPORTED_QUERY_TYPE_MESSAGE = "Unsupported query type";

private Constants() {
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ public void testParseUrlParameters() throws Exception {
update.setProperty("recordSetQueueCapacity", "1024");
update.setProperty("metadataCacheTtlSeconds", "7200");
update.setProperty("recordsPerSecond", "128");
update.setProperty("schemaBuilderMaxRecords", "500");
connection.setClientInfo(update);
assertEquals(client.getScanPolicyDefault().recordsPerSecond, 128);
assertTotalTimeoutAll(client, 3000);
assertSendKeyAll(client, true);
assertEquals(config.getDriverPolicy().getRecordSetQueueCapacity(), 1024);
assertEquals(config.getDriverPolicy().getMetadataCacheTtlSeconds(), 7200);
assertEquals(config.getDriverPolicy().getSchemaBuilderMaxRecords(), 500);

connection.setClientInfo("recordSetTimeoutMs", "7000");
assertEquals(config.getDriverPolicy().getRecordSetTimeoutMs(), 7000);
Expand Down
9 changes: 0 additions & 9 deletions src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.aerospike.jdbc;

import com.aerospike.client.Value;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -19,13 +17,6 @@

public class PreparedQueriesTest extends JdbcBaseTest {

@BeforeClass
public void initSchemaCache() throws SQLException {
setUp();
AerospikeSchemaBuilder.cleanSchemaCache();
tearDown();
}

@BeforeMethod
public void setUp() throws SQLException {
Value.UseBoolBin = false;
Expand Down

0 comments on commit b20fb4b

Please sign in to comment.