Skip to content

Commit

Permalink
Merge pull request #25 from dilini-muthumala/es-7.x.x
Browse files Browse the repository at this point in the history
Add type mapping annotation and fix createIndex() method failure
  • Loading branch information
ramindu90 authored Jun 18, 2019
2 parents 287e85c + 5e8e53f commit 37cade0
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 52 deletions.
2 changes: 1 addition & 1 deletion component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.wso2.extension.siddhi.store.elasticsearch</groupId>
<artifactId>siddhi-store-elasticsearch-parent</artifactId>
<version>2.0.1-SNAPSHOT</version>
<version>2.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>siddhi-store-elasticsearch</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -53,6 +54,7 @@
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.table.record.AbstractRecordTable;
import org.wso2.siddhi.core.table.record.ExpressionBuilder;
import org.wso2.siddhi.core.table.record.RecordIterator;
Expand All @@ -78,6 +80,7 @@
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -105,8 +108,6 @@
ANNOTATION_ELEMENT_INDEX_NUMBER_OF_REPLICAS;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.
ANNOTATION_ELEMENT_INDEX_NUMBER_OF_SHARDS;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.
ANNOTATION_ELEMENT_INDEX_TYPE;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.
ANNOTATION_ELEMENT_MEMBER_LIST;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.
Expand All @@ -126,6 +127,7 @@
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.
ANNOTATION_ELEMENT_TRUSRTSTORE_TYPE;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.ANNOTATION_ELEMENT_USER;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.ANNOTATION_TYPE_MAPPINGS;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.
DEFAULT_BACKOFF_POLICY_RETRY_NO;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.
Expand All @@ -136,7 +138,6 @@
DEFAULT_CONCURRENT_REQUESTS;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.DEFAULT_FLUSH_INTERVAL;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.DEFAULT_HOSTNAME;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.DEFAULT_INDEX_TYPE;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.DEFAULT_IO_THREAD_COUNT;
import static org.wso2.extension.siddhi.store.elasticsearch.utils.ElasticsearchTableConstants.
DEFAULT_NUMBER_OF_REPLICAS;
Expand Down Expand Up @@ -194,9 +195,6 @@
description = "The name of the Elasticsearch index.",
type = {DataType.STRING}, optional = true,
defaultValue = "The table name defined in the Siddhi App query."),
@Parameter(name = "index.type",
description = "The the type of the index.",
type = {DataType.STRING}, optional = true, defaultValue = "_doc"),
@Parameter(name = "payload.index.of.index.name",
description = "The payload which is used to create the index. This can be used if the user " +
"needs to create index names dynamically",
Expand Down Expand Up @@ -308,7 +306,6 @@ public class ElasticsearchEventTable extends AbstractRecordTable {
private List<String> primaryKeys;
private String hostname = DEFAULT_HOSTNAME;
private String indexName;
private String indexType = DEFAULT_INDEX_TYPE;
private String indexAlias;
private int port = DEFAULT_PORT;
private String scheme = DEFAULT_SCHEME;
Expand All @@ -330,7 +327,7 @@ public class ElasticsearchEventTable extends AbstractRecordTable {
private boolean sslEnabled = DEFAULT_SSL_ENABLED;
private int payloadIndexOfIndexName = DEFAULT_PAYLOAD_INDEX_OF_INDEX_NAME;
private String listOfHostnames;

private Map<String, String> typeMappings = new HashMap<>();

/**
* Initializing the Record Table
Expand Down Expand Up @@ -365,11 +362,6 @@ protected void init(TableDefinition tableDefinition, ConfigReader configReader)
} else {
port = Integer.parseInt(configReader.readConfig(ANNOTATION_ELEMENT_HOSTNAME, String.valueOf(port)));
}
if (!ElasticsearchTableUtils.isEmpty(storeAnnotation.getElement(ANNOTATION_ELEMENT_INDEX_TYPE))) {
indexType = storeAnnotation.getElement(ANNOTATION_ELEMENT_INDEX_TYPE);
} else {
indexType = configReader.readConfig(ANNOTATION_ELEMENT_INDEX_TYPE, indexType);
}
if (!ElasticsearchTableUtils.isEmpty(storeAnnotation.
getElement(ANNOTATION_ELEMENT_INDEX_NUMBER_OF_SHARDS))) {
numberOfShards = Integer.parseInt(storeAnnotation.getElement(
Expand Down Expand Up @@ -488,6 +480,14 @@ protected void init(TableDefinition tableDefinition, ConfigReader configReader)
if (!ElasticsearchTableUtils.isEmpty(storeAnnotation.getElement(ANNOTATION_ELEMENT_MEMBER_LIST))) {
listOfHostnames = storeAnnotation.getElement(ANNOTATION_ELEMENT_MEMBER_LIST);
}

List<Annotation> typeMappingsAnnotations = storeAnnotation.getAnnotations(ANNOTATION_TYPE_MAPPINGS);
if (typeMappingsAnnotations.size() > 0) {
for (Element element : typeMappingsAnnotations.get(0).getElements()) {
validateTypeMappingAttribute(element.getKey());
typeMappings.put(element.getKey(), element.getValue());
}
}
} else {
throw new ElasticsearchEventTableException("Elasticsearch Store annotation list null for table id : '" +
tableDefinition.getId() + "', required properties cannot be resolved.");
Expand Down Expand Up @@ -649,7 +649,7 @@ private ElasticsearchRecordIterator findRecords(Map<String, Object> findConditio
compiledCondition) throws ElasticsearchServiceException {
String condition = ElasticsearchTableUtils.resolveCondition((ElasticsearchCompiledCondition) compiledCondition,
findConditionParameterMap);
return new ElasticsearchRecordIterator(indexName, indexType, condition, restHighLevelClient, attributes);
return new ElasticsearchRecordIterator(indexName, condition, restHighLevelClient, attributes);
}

/**.
Expand Down Expand Up @@ -836,6 +836,17 @@ protected void destroy() {
}

private void createIndex() {
try {
if (restHighLevelClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
logger.debug("Index: " + indexName + " has already being created for table id: " +
tableDefinition.getId() + ".");
return;
}
} catch (IOException e) {
throw new ElasticsearchEventTableException("Error while checking indices for table id : '" +
tableDefinition.getId(), e);
}

CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(Settings.builder()
.put(SETTING_INDEX_NUMBER_OF_SHARDS, numberOfShards)
Expand All @@ -845,43 +856,41 @@ private void createIndex() {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.startObject(indexType);
builder.startObject(MAPPING_PROPERTIES_ELEMENT);
{
builder.startObject(MAPPING_PROPERTIES_ELEMENT);
{
for (Attribute attribute : attributes) {
builder.startObject(attribute.getName());
{
if (attribute.getType().equals(Attribute.Type.STRING)) {
builder.field(MAPPING_TYPE_ELEMENT, "text");
builder.startObject("fields");
for (Attribute attribute : attributes) {
builder.startObject(attribute.getName());
{
if (typeMappings.containsKey(attribute.getName())) {
builder.field(MAPPING_TYPE_ELEMENT, typeMappings.get(attribute.getName()));
} else if (attribute.getType().equals(Attribute.Type.STRING)) {
builder.field(MAPPING_TYPE_ELEMENT, "text");
builder.startObject("fields");
{
builder.startObject("keyword");
{
builder.startObject("keyword");
{
builder.field("type", "keyword");
builder.field("ignore_above", 256);
}
builder.endObject();
builder.field("type", "keyword");
builder.field("ignore_above", 256);
}
builder.endObject();
} else if (attribute.getType().equals(Attribute.Type.INT)) {
builder.field(MAPPING_TYPE_ELEMENT, "integer");
} else if (attribute.getType().equals(Attribute.Type.LONG)) {
builder.field(MAPPING_TYPE_ELEMENT, "long");
} else if (attribute.getType().equals(Attribute.Type.FLOAT)) {
builder.field(MAPPING_TYPE_ELEMENT, "float");
} else if (attribute.getType().equals(Attribute.Type.DOUBLE)) {
builder.field(MAPPING_TYPE_ELEMENT, "double");
} else if (attribute.getType().equals(Attribute.Type.BOOL)) {
builder.field(MAPPING_TYPE_ELEMENT, "boolean");
} else {
builder.field(MAPPING_TYPE_ELEMENT, "object");
}
builder.endObject();
} else if (attribute.getType().equals(Attribute.Type.INT)) {
builder.field(MAPPING_TYPE_ELEMENT, "integer");
} else if (attribute.getType().equals(Attribute.Type.LONG)) {
builder.field(MAPPING_TYPE_ELEMENT, "long");
} else if (attribute.getType().equals(Attribute.Type.FLOAT)) {
builder.field(MAPPING_TYPE_ELEMENT, "float");
} else if (attribute.getType().equals(Attribute.Type.DOUBLE)) {
builder.field(MAPPING_TYPE_ELEMENT, "double");
} else if (attribute.getType().equals(Attribute.Type.BOOL)) {
builder.field(MAPPING_TYPE_ELEMENT, "boolean");
} else {
builder.field(MAPPING_TYPE_ELEMENT, "object");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
Expand All @@ -901,8 +910,21 @@ private void createIndex() {
throw new ElasticsearchEventTableException("Error while creating indices for table id : '" +
tableDefinition.getId(), e);
} catch (ElasticsearchStatusException e) {
logger.debug("Elasticsearch status exception occurs while creating index for table id: " +
logger.error("Elasticsearch status exception occurred while creating index for table id: " +
tableDefinition.getId(), e);
}
}

private void validateTypeMappingAttribute(String typeMappingAttributeName) {
boolean matchFound = false;
for (Attribute storeAttribute : attributes) {
if (storeAttribute.getName().equals(typeMappingAttributeName)) {
matchFound = true;
}
}
if (!matchFound) {
throw new SiddhiAppCreationException("Invalid attribute name '" + typeMappingAttributeName
+ "' found in " + ANNOTATION_TYPE_MAPPINGS + ". No such attribute found in Store definition.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@ public class ElasticsearchRecordIterator implements RecordIterator<Object[]> {
private List<Attribute> attributes;
private Iterator<SearchHit> elasticsearchHitsIterator;

public ElasticsearchRecordIterator(String indexName, String indexType, String queryString,
public ElasticsearchRecordIterator(String indexName, String queryString,
RestHighLevelClient restHighLevelClient, List<Attribute> attributes)
throws ElasticsearchServiceException {
this.attributes = attributes;
QueryBuilder queryBuilder = getQueryBuilder(queryString);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(indexType);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilder);
searchRequest.source(searchSourceBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class ElasticsearchTableConstants {
public static final String ANNOTATION_ELEMENT_INDEX_NUMBER_OF_SHARDS = "index.number.of.shards";
public static final String ANNOTATION_ELEMENT_INDEX_NUMBER_OF_REPLICAS = "index.number.of.replicas";
public static final String ANNOTATION_ELEMENT_INDEX_ALIAS = "index.alias";
public static final String ANNOTATION_ELEMENT_INDEX_TYPE = "index.type";
public static final String ANNOTATION_ELEMENT_UPDATE_BATCH_SIZE = "update.batch.size";
public static final String ANNOTATION_ELEMENT_BULK_ACTIONS = "bulk.actions";
public static final String ANNOTATION_ELEMENT_BULK_SIZE = "bulk.size";
Expand All @@ -47,9 +46,9 @@ public class ElasticsearchTableConstants {
public static final String ANNOTATION_ELEMENT_TRUSRTSTORE_PASS = "trust.store.pass";
public static final String ANNOTATION_ELEMENT_PAYLOAD_INDEX_OF_INDEX_NAME = "payload.index.of.index.name";
public static final String ANNOTATION_ELEMENT_MEMBER_LIST = "elasticsearch.member.list";
public static final String ANNOTATION_TYPE_MAPPINGS = "TypeMappings";

public static final String DEFAULT_HOSTNAME = "localhost";
public static final String DEFAULT_INDEX_TYPE = "_doc";
public static final int DEFAULT_PORT = 9200;
public static final int DEFAULT_NUMBER_OF_SHARDS = 3;
public static final int DEFAULT_NUMBER_OF_REPLICAS = 2;
Expand Down
4 changes: 2 additions & 2 deletions docs/api/latest.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# API Docs - v2.0.0
# API Docs - v2.1.0-SNAPSHOT

## Store

Expand Down Expand Up @@ -82,7 +82,7 @@
<tr>
<td style="vertical-align: top">index.type</td>
<td style="vertical-align: top; word-wrap: break-word">The the type of the index.</td>
<td style="vertical-align: top">_doc</td>
<td style="vertical-align: top">empty</td>
<td style="vertical-align: top">STRING</td>
<td style="vertical-align: top">Yes</td>
<td style="vertical-align: top">No</td>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<packaging>pom</packaging>
<groupId>org.wso2.extension.siddhi.store.elasticsearch</groupId>
<artifactId>siddhi-store-elasticsearch-parent</artifactId>
<version>2.0.1-SNAPSHOT</version>
<version>2.1.0-SNAPSHOT</version>
<name>Siddhi Extension - Elasticsearch Store</name>
<profiles>
<profile>
Expand Down

0 comments on commit 37cade0

Please sign in to comment.