Skip to content

Commit

Permalink
FMWK-293 Add support for index create/drop statements (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Dec 19, 2023
1 parent 919f64b commit 7266ca5
Show file tree
Hide file tree
Showing 23 changed files with 596 additions and 157 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Packages documentation can be found [here](https://javadoc.io/doc/com.aerospike/
* UPDATE
* DELETE
* TRUNCATE TABLE
* CREATE INDEX
* DROP INDEX

See [examples](docs/examples.md) of SQL.

Expand Down
11 changes: 11 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,14 @@ TRUNCATE TABLE port_list;
DELETE FROM port_list;
```

## CREATE INDEX

```sql
CREATE INDEX port_idx ON port_list (port);
```

## DROP INDEX

```sql
DROP INDEX port_idx ON port_list;
```
2 changes: 1 addition & 1 deletion src/main/java/com/aerospike/jdbc/AerospikeConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public boolean isClosed() {
@Override
public DatabaseMetaData getMetaData() throws SQLException {
logger.fine(() -> "getMetaData request");
return metadataBuilder.build(url, client, this);
return metadataBuilder.build(url, this);
}

@Override
Expand Down
165 changes: 58 additions & 107 deletions src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java
Original file line number Diff line number Diff line change
@@ -1,37 +1,35 @@
package com.aerospike.jdbc;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import com.aerospike.client.policy.InfoPolicy;
import com.aerospike.client.query.IndexType;
import com.aerospike.jdbc.model.AerospikeClusterInfo;
import com.aerospike.jdbc.model.AerospikeSecondaryIndex;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
import com.aerospike.jdbc.sql.ListRecordSet;
import com.aerospike.jdbc.sql.SimpleWrapper;
import com.aerospike.jdbc.util.AerospikeUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.RowIdLifetime;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.aerospike.jdbc.util.AerospikeUtils.getIndexBinValuesRatio;
import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME;
import static com.aerospike.jdbc.util.AerospikeUtils.getCatalogIndexes;
import static com.aerospike.jdbc.util.AerospikeUtils.getClusterInfo;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
Expand All @@ -40,80 +38,61 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.synchronizedSet;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;

@SuppressWarnings("java:S1192")
public class AerospikeDatabaseMetadata implements DatabaseMetaData, SimpleWrapper {

private static final Logger logger = Logger.getLogger(AerospikeDatabaseMetadata.class.getName());
private static final String NEW_LINE = System.lineSeparator();

private final String url;
private final Connection connection;
private final String dbBuild;
private final String dbEdition;
private final AerospikeConnection connection;

private final List<String> catalogs;
private final Map<String, Collection<String>> tables;
private final Map<String, Collection<AerospikeSecondaryIndex>> catalogIndexes;
private final AerospikeClusterInfo clusterInfo;
private final AerospikeSchemaBuilder schemaBuilder;
private final Cache<String, ResultSetMetaData> resultSetMetaDataCache;

public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeConnection connection) {
private volatile Map<String, Collection<AerospikeSecondaryIndex>> catalogIndexes;

public AerospikeDatabaseMetadata(String url, AerospikeConnection connection) {
logger.info("Init AerospikeDatabaseMetadata");
this.url = url;
this.connection = connection;

Collection<String> builds = synchronizedSet(new HashSet<>());
Collection<String> editions = synchronizedSet(new HashSet<>());
Collection<String> namespaces = synchronizedSet(new HashSet<>());
final InfoPolicy infoPolicy = client.getInfoPolicyDefault();
catalogIndexes = new ConcurrentHashMap<>();
tables = new ConcurrentHashMap<>();
Arrays.stream(client.getNodes()).parallel()
.map(node -> Info.request(infoPolicy, node, "namespaces", "sets", "sindex", "build", "edition"))
.forEach(r -> {
builds.add(r.get("build"));
editions.add(r.get("edition"));
namespaces.addAll(asList(getOrDefault(r, "namespaces", "").split(";")));
streamOfSubProperties(r, "sets").forEach(p ->
tables.computeIfAbsent(p.getProperty("ns"), s -> new HashSet<>())
.addAll(Arrays.asList(p.getProperty("set"), DEFAULT_SCHEMA_NAME))
);
streamOfSubProperties(r, "sindex")
.filter(AerospikeUtils::isSupportedIndexType)
.forEach(p -> {
String namespace = p.getProperty("ns");
String indexName = p.getProperty("indexname");
Integer binRatio = connection.getAerospikeVersion().isSIndexCardinalitySupported()
? getIndexBinValuesRatio(client, namespace, indexName)
: null;
catalogIndexes.computeIfAbsent(namespace, s -> new HashSet<>())
.add(new AerospikeSecondaryIndex(
namespace,
p.getProperty("set"),
p.getProperty("bin"),
indexName,
IndexType.valueOf(p.getProperty("type").toUpperCase(Locale.ENGLISH)),
binRatio)
);
});
});

schemaBuilder = new AerospikeSchemaBuilder(client, connection.getConfiguration().getDriverPolicy());
clusterInfo = getClusterInfo(connection.getClient());
schemaBuilder = new AerospikeSchemaBuilder(
connection.getClient(),
connection.getConfiguration().getDriverPolicy()
);
resultSetMetaDataCache = CacheBuilder.newBuilder().build();

dbBuild = join("N/A", ", ", builds);
dbEdition = join("Aerospike", ", ", editions);
catalogs = namespaces.stream().filter(n -> !"".equals(n)).collect(Collectors.toList());
}

public AerospikeSchemaBuilder getSchemaBuilder() {
return schemaBuilder;
}

public void resetCatalogIndexes() {
logger.fine(() -> "Reset secondary index information");
catalogIndexes = null;
}

public Collection<AerospikeSecondaryIndex> getSecondaryIndexes(String catalog) {
initCatalogIndexes();
return catalogIndexes.get(catalog);
}

private void initCatalogIndexes() {
if (catalogIndexes == null) {
synchronized (this) {
if (catalogIndexes == null) {
logger.info(() -> "Load secondary index information");
catalogIndexes = getCatalogIndexes(connection.getClient(), connection.getAerospikeVersion());
}
}
}
}

@Override
public boolean allProceduresAreCallable() {
return false;
Expand All @@ -130,7 +109,7 @@ public String getURL() {
}

@Override
public String getUserName() throws SQLException {
public String getUserName() {
return connection.getClientInfo().getProperty("user");
}

Expand Down Expand Up @@ -161,12 +140,12 @@ public boolean nullsAreSortedAtEnd() {

@Override
public String getDatabaseProductName() {
return dbEdition;
return clusterInfo.getEdition();
}

@Override
public String getDatabaseProductVersion() {
return dbBuild;
return clusterInfo.getBuild();
}

@Override
Expand Down Expand Up @@ -251,7 +230,7 @@ public String getSQLKeywords() {

@Override
public String getNumericFunctions() {
return "sum,sumsqs,avg,min,max,count";
return "";
}

@Override
Expand Down Expand Up @@ -481,7 +460,7 @@ public boolean supportsCatalogsInTableDefinitions() {

@Override
public boolean supportsCatalogsInIndexDefinitions() {
return false;
return true;
}

@Override
Expand Down Expand Up @@ -735,12 +714,12 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam

final Iterable<List<?>> tablesData;
if (catalog == null) {
tablesData = tables.entrySet().stream()
tablesData = clusterInfo.getTables().entrySet().stream()
.flatMap(p -> p.getValue().stream().map(t -> asList(p.getKey(), null, t, "TABLE", null, null,
null, null, null, null)))
.collect(toList());
} else {
tablesData = tables.getOrDefault(catalog, Collections.emptyList()).stream()
tablesData = clusterInfo.getTables().getOrDefault(catalog, Collections.emptyList()).stream()
.filter(t -> tableNameRegex == null || tableNameRegex.matcher(t).matches())
.map(t -> asList(catalog, null, t, "TABLE", null, null, null, null, null, null))
.collect(toList());
Expand All @@ -759,14 +738,14 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam
public ResultSet getSchemas() {
return new ListRecordSet(null, "system", "schemas",
systemColumns(new String[]{"TABLE_SCHEM", "TABLE_CATALOG"}, new int[]{VARCHAR, VARCHAR}),
catalogs.stream().map(ns -> Arrays.asList("", ns)).collect(toList()));
clusterInfo.getCatalogs().stream().map(ns -> Arrays.asList("", ns)).collect(toList()));
}

@Override
public ResultSet getCatalogs() {
return new ListRecordSet(null, "system", "catalogs",
systemColumns(new String[]{"TABLE_CAT"}, new int[]{VARCHAR}),
catalogs.stream().map(Collections::singletonList).collect(toList()));
clusterInfo.getCatalogs().stream().map(Collections::singletonList).collect(toList()));
}

@Override
Expand All @@ -786,11 +765,11 @@ public ResultSet getColumns(String catalog, String schemaPattern, String tableNa

final List<ResultSetMetaData> resultSetMetaDataList;
if (catalog == null) {
resultSetMetaDataList = tables.entrySet().stream()
resultSetMetaDataList = clusterInfo.getTables().entrySet().stream()
.flatMap(p -> p.getValue().stream().map(t -> getMetadata(p.getKey(), t)))
.collect(toList());
} else {
resultSetMetaDataList = tables.getOrDefault(catalog, Collections.emptyList()).stream()
resultSetMetaDataList = clusterInfo.getTables().getOrDefault(catalog, Collections.emptyList()).stream()
.filter(t -> tableNameRegex == null || tableNameRegex.matcher(t).matches())
.map(t -> getMetadata(catalog, t))
.collect(toList());
Expand Down Expand Up @@ -862,12 +841,12 @@ public ResultSet getVersionColumns(String catalog, String schema, String table)
public ResultSet getPrimaryKeys(String catalog, String schema, String table) {
final Iterable<List<?>> tablesData;
if (catalog == null) {
tablesData = tables.entrySet().stream()
tablesData = clusterInfo.getTables().entrySet().stream()
.flatMap(p -> p.getValue().stream().map(t ->
asList(p.getKey(), null, t, PRIMARY_KEY_COLUMN_NAME, 1, PRIMARY_KEY_COLUMN_NAME)))
.collect(toList());
} else {
tablesData = tables.getOrDefault(catalog, Collections.emptyList()).stream()
tablesData = clusterInfo.getTables().getOrDefault(catalog, Collections.emptyList()).stream()
.filter(t -> table == null || table.equals(t))
.map(t -> asList(catalog, null, t, PRIMARY_KEY_COLUMN_NAME, 1, PRIMARY_KEY_COLUMN_NAME))
.collect(toList());
Expand Down Expand Up @@ -959,16 +938,16 @@ public ResultSet getTypeInfo() {
@Override
public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate) {
logger.info(() -> format("getIndexInfo: %s, %s, %s", catalog, schema, table));
initCatalogIndexes();
Stream<AerospikeSecondaryIndex> secondaryIndexStream;
if (catalog == null) {
secondaryIndexStream = catalogIndexes.entrySet().stream()
.flatMap(p -> p.getValue().stream());
secondaryIndexStream = catalogIndexes.values().stream().flatMap(Collection::stream);
} else {
secondaryIndexStream = getOrDefault(catalogIndexes, catalog, Collections.emptyList()).stream()
.filter(i -> i.getNamespace().equals(catalog));
secondaryIndexStream = catalogIndexes.getOrDefault(catalog, Collections.emptyList()).stream()
.filter(index -> index.getNamespace().equals(catalog));
}
final Iterable<List<?>> indexData = secondaryIndexStream
.filter(i -> i.getSet().equals(table))
.filter(index -> index.getSet().equals(table))
.map(this::indexInfoAsList)
.collect(Collectors.toList());

Expand All @@ -983,10 +962,6 @@ public ResultSet getIndexInfo(String catalog, String schema, String table, boole
systemColumns(columns, sqlTypes), indexData);
}

public Collection<AerospikeSecondaryIndex> getSecondaryIndexes(String catalog) {
return catalogIndexes.get(catalog);
}

@Override
public boolean supportsResultSetType(int type) {
return false;
Expand Down Expand Up @@ -1259,30 +1234,6 @@ private List<DataColumn> systemColumns(String[] names, int[] types) {
.collect(toList());
}

private Properties initProperties(String lines) {
Properties properties = new Properties();
try {
properties.load(new StringReader(lines));
} catch (IOException e) {
logger.warning(() -> format("Expression in initProperties, lines: %s", lines));
}
return properties;
}

private Stream<Properties> streamOfSubProperties(Map<String, String> map, String key) {
return Optional.ofNullable(map.get(key)).map(s -> Arrays.stream(s.split(";"))
.map(ns -> initProperties(ns.replace(":", NEW_LINE)))).orElse(Stream.empty());
}

private <K, V> V getOrDefault(Map<K, V> map, K key, V defaultValue) {
return Optional.ofNullable(map.getOrDefault(key, defaultValue)).orElse(defaultValue);
}

@SuppressWarnings("SameParameterValue")
private String join(String defaultValue, String delimiter, Collection<String> elements) {
return elements.isEmpty() ? defaultValue : String.join(delimiter, elements);
}

private int ordinal(ResultSetMetaData md, String columnName) {
int ordinal = 0;
try {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/aerospike/jdbc/AerospikeStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.aerospike.jdbc.query.QueryPerformer;
import com.aerospike.jdbc.sql.SimpleWrapper;
import com.aerospike.jdbc.util.AuxStatementParser;
import org.apache.calcite.sql.parser.SqlParseException;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -26,6 +25,7 @@
public class AerospikeStatement implements Statement, SimpleWrapper {

private static final Logger logger = Logger.getLogger(AerospikeStatement.class.getName());

private static final String BATCH_NOT_SUPPORTED_MESSAGE = "Batch update is not supported";
private static final String AUTO_GENERATED_KEYS_NOT_SUPPORTED_MESSAGE = "Auto-generated keys are not supported";

Expand Down Expand Up @@ -64,8 +64,8 @@ protected AerospikeQuery parseQuery(String sql) throws SQLException {
AerospikeQuery query;
try {
query = AerospikeQuery.parse(sql);
} catch (SqlParseException e) {
query = AuxStatementParser.hack(sql);
} catch (Exception e) {
query = AuxStatementParser.parse(sql);
}
if (query.getCatalog() == null) {
query.setCatalog(catalog);
Expand Down
Loading

0 comments on commit 7266ca5

Please sign in to comment.