Skip to content
This repository has been archived by the owner on Apr 6, 2022. It is now read-only.

Commit

Permalink
Merge pull request #9 from appmetr/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
Alexander Kharitonov authored Oct 4, 2016
2 parents b4de68c + 84db71a commit 23fce87
Show file tree
Hide file tree
Showing 23 changed files with 1,029 additions and 64 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<groupId>com.appmetr</groupId>
<artifactId>hercules</artifactId>
<packaging>jar</packaging>
<version>0.4.19-SNAPSHOT</version>
<version>0.5.4-SNAPSHOT</version>
<name>Hercules</name>

<repositories>
Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.2</version>
<version>2.5</version>
</dependency>

<!-- Hector -->
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/com/appmetr/hercules/Hercules.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public class Hercules {
private Map<Class, EntityMetadata> entityClassMetadataCache = new HashMap<Class, EntityMetadata>();
private Map<Class, WideEntityMetadata> wideEntityClassMetadataCache = new HashMap<Class, WideEntityMetadata>();

EntityMetadataExtractor metadataExtractor = new EntityMetadataExtractor();
WideEntityMetadataExtractor wideMetadataExtractor = new WideEntityMetadataExtractor();
@Inject EntityMetadataExtractor metadataExtractor;
@Inject WideEntityMetadataExtractor wideMetadataExtractor;

private Cluster cluster;
private Keyspace keyspace;
Expand All @@ -73,7 +73,7 @@ public class Hercules {


public void init() {
cluster = dataDriver.getOrCreateCluster(config.getClusterName(), config.getCassandraHost(), config.getMaxActiveConnections());
cluster = dataDriver.getOrCreateCluster(config.getClusterName(), config.getCassandraHost(), config.getMaxActiveConnections(), config.getMaxConnectTimeMillis(), config.getCassandraThriftSocketTimeout());
keyspace = dataDriver.getOrCreateKeyspace(config.getKeyspaceName(), config.getReplicationFactor(), cluster);

initEntities();
Expand Down Expand Up @@ -130,6 +130,9 @@ private void initPlainEntities() {

checkAndCreateColumnFamily(metadata.getColumnFamily(), metadata.getComparatorType());

}
//We should have extracted metadata before create indexes
for (EntityMetadata metadata : entityClassMetadataCache.values()) {
indexManager.checkAndCreateEntityIndexes(metadata);
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/appmetr/hercules/HerculesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class HerculesConfig {
private int maxActiveConnections;
private int replicationFactor;
private Boolean schemaModificationEnabled;
private long maxConnectTimeMillis = -1;
private int cassandraThriftSocketTimeout;

/* Fields */
private Set<Class> entityClasses;
Expand Down Expand Up @@ -101,4 +103,20 @@ public Set<Class> getWideEntityClasses() {
public void setWideEntityClasses(Set<Class> wideEntityClasses) {
this.wideEntityClasses = wideEntityClasses;
}

public long getMaxConnectTimeMillis() {
return maxConnectTimeMillis;
}

public void setMaxConnectTimeMillis(long maxConnectTimeMillis) {
this.maxConnectTimeMillis = maxConnectTimeMillis;
}

public int getCassandraThriftSocketTimeout() {
return cassandraThriftSocketTimeout;
}

public void setCassandraThriftSocketTimeout(int cassandraThriftSocketTimeout) {
this.cassandraThriftSocketTimeout = cassandraThriftSocketTimeout;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.appmetr.hercules.annotations;

import com.appmetr.hercules.keys.CollectionKeysExtractor;
import com.appmetr.hercules.serializers.AbstractHerculesSerializer;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface IndexedCollection {

String name() default "";

Class itemClass() default Object.class;

Class<? extends AbstractHerculesSerializer> serializer() default AbstractHerculesSerializer.class;

Class<? extends CollectionKeysExtractor> keyExtractorClass() default CollectionKeysExtractor.class;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.appmetr.hercules.batch;

import com.appmetr.hercules.profile.DataOperationsProfile;

import java.util.List;

public class BreakableIterationBatchExecutor<E, K> {
private BatchIterator<E, K> iterator;
private BreakableIterationBatchProcessor<E> processor;

public BreakableIterationBatchExecutor(BatchIterator<E, K> iterator, BreakableIterationBatchProcessor<E> processor) {
this.iterator = iterator;
this.processor = processor;
}

public int execute() {
return execute(null);
}

public int execute(DataOperationsProfile dataOperationsProfile) {
int counter = 0;

while (iterator.hasNext()) {
List<E> batch = iterator.next(dataOperationsProfile);
counter += batch.size();

if (!processor.processBatch(batch)) break;
}

return counter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.appmetr.hercules.batch;

import java.util.List;

public interface BreakableIterationBatchProcessor<E> {
/**
* Process batch with ability to break iterator loop.
* @param batch
* @return true to continue iteration, false to break iteration loop
*/
boolean processBatch(List<E> batch);
}
19 changes: 18 additions & 1 deletion src/main/java/com/appmetr/hercules/dao/AbstractDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ public E getSingleByFK(ForeignKey foreignKey, DataOperationsProfile dataOperatio
return getHercules().getEntityManager().getSingleByFK(entityClass, foreignKey, dataOperationsProfile);
}

public <U> List<E> getByCollectionIndex(String indexedFieldName, U indexValue) {
return getByCollectionIndex(indexedFieldName, indexValue, null);
}

public <U> List<E> getByCollectionIndex(String indexedFieldName, U indexValue, DataOperationsProfile operationsProfile) {
return getHercules().getEntityManager().getByCollectionIndex(entityClass, indexedFieldName, indexValue, operationsProfile);
}

public <U> E getSingleByCollectionIndex(String indexedFieldName, U indexValue) {
return getSingleByCollectionIndex(indexedFieldName, indexValue, null);
}

public <U> E getSingleByCollectionIndex(String indexedFieldName, U indexValue, DataOperationsProfile operationsProfile) {
return getHercules().getEntityManager().getSingleByCollectionIndex(entityClass, indexedFieldName, indexValue, operationsProfile);
}

public int getCountByFK(ForeignKey foreignKey) {
return getCountByFK(foreignKey, null);
}
Expand Down Expand Up @@ -249,7 +265,8 @@ public int processKeyRange(K from, K to, Integer batchSize, BatchProcessor<K> pr

public int processKeyRange(K from, K to, Integer batchSize, BatchProcessor<K> processor, DataOperationsProfile dataOperationsProfile) {
return new BatchExecutor<K, K>(new ImmutableKeyBatchIterator<K>(from, to, batchSize) {
@Override public List<K> getRange(K from, K to, int batchSize, DataOperationsProfile dataOperationsProfile) {
@Override
public List<K> getRange(K from, K to, int batchSize, DataOperationsProfile dataOperationsProfile) {
return getHercules().getEntityManager().getKeyRange(entityClass, from, to, batchSize, dataOperationsProfile);
}
}, processor).execute(dataOperationsProfile);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/appmetr/hercules/driver/DataDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public interface DataDriver {

Cluster getOrCreateCluster(String clusterName, String host, int maxActiveConnections);

Cluster getOrCreateCluster(String clusterName, String host, int maxActiveConnections, long maxConnectTimeMillis, int cassandraThriftSocketTimeout);

void shutdownCluster(Cluster cluster);

Keyspace getOrCreateKeyspace(String keyspaceName, int replicationFactor, Cluster cluster);
Expand Down Expand Up @@ -70,4 +72,7 @@ <R, T> List<R> getKeyRange(Keyspace keyspace, String columnFamily, DataOperation
<R, T> void delete(Keyspace keyspace, String columnFamily, DataOperationsProfile dataOperationsProfile, RowSerializer<R, T> rowSerializer, R rowKey);

<R, T> void delete(Keyspace keyspace, String columnFamily, DataOperationsProfile dataOperationsProfile, RowSerializer<R, T> rowSerializer, R rowKey, Iterable<T> topKeys);

<R, T> void delete(Keyspace keyspace, String columnFamily, DataOperationsProfile dataOperationsProfile, RowSerializer<R, T> rowSerializer, Map<R, Iterable<T>> topKeysInRows);

}
53 changes: 39 additions & 14 deletions src/main/java/com/appmetr/hercules/driver/ThriftDataDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ public class ThriftDataDriver implements DataDriver {
@Inject private Hercules hercules;

@Override public Cluster getOrCreateCluster(String clusterName, String host, int maxActiveConnections) {
return getOrCreateCluster(clusterName, host, maxActiveConnections, -1, 0);
}

@Override public Cluster getOrCreateCluster(String clusterName, String host, int maxActiveConnections, long maxConnectTimeMillis, int cassandraThriftSocketTimeout) {
CassandraHostConfigurator configurator = new CassandraHostConfigurator(host);
configurator.setMaxActive(maxActiveConnections);
configurator.setMaxConnectTimeMillis(maxConnectTimeMillis);
configurator.setCassandraThriftSocketTimeout(cassandraThriftSocketTimeout);

return HFactory.getOrCreateCluster(clusterName, configurator);
}
Expand Down Expand Up @@ -441,10 +447,10 @@ private <R, T> void insert(Keyspace keyspace, String columnFamily, DataOperation
}
}

executeMutator(columnFamily, dataOperationsProfile, mutator, serializedDataSize);
executeInsertionMutator(columnFamily, dataOperationsProfile, mutator, serializedDataSize);
}

private void executeMutator(String columnFamily, DataOperationsProfile dataOperationsProfile, Mutator<ByteBuffer> mutator, int serializedDataSize) {
private void executeInsertionMutator(String columnFamily, DataOperationsProfile dataOperationsProfile, Mutator<ByteBuffer> mutator, int serializedDataSize) {
StopWatch monitor = monitoring.start(HerculesMonitoringGroup.HERCULES_DD, "Insert " + columnFamily);
try {
mutator.execute();
Expand All @@ -458,13 +464,12 @@ private void executeMutator(String columnFamily, DataOperationsProfile dataOpera
}
}

@Override
public <R, T> void delete(Keyspace keyspace, String columnFamily, DataOperationsProfile dataOperationsProfile, RowSerializer<R, T> rowSerializer, R rowKey) {
Mutator<R> mutator = HFactory.createMutator(keyspace, rowSerializer.getRowKeySerializer());

private <R> void executeDeletionMutator(String columnFamily, DataOperationsProfile dataOperationsProfile, Mutator<R> mutator) {
StopWatch monitor = monitoring.start(HerculesMonitoringGroup.HERCULES_DD, "Delete " + columnFamily);

try {
mutator.delete(rowKey, columnFamily, null, rowSerializer.getTopKeySerializer());
mutator.execute();
} finally {
long time = monitor.stop();
if (dataOperationsProfile != null) {
Expand All @@ -474,6 +479,15 @@ public <R, T> void delete(Keyspace keyspace, String columnFamily, DataOperations
}
}


@Override
public <R, T> void delete(Keyspace keyspace, String columnFamily, DataOperationsProfile dataOperationsProfile, RowSerializer<R, T> rowSerializer, R rowKey) {
Mutator<R> mutator = HFactory.createMutator(keyspace, rowSerializer.getRowKeySerializer());
mutator.addDeletion(rowKey, columnFamily);

executeDeletionMutator(columnFamily, dataOperationsProfile, mutator);
}

@Override
public <R, T> void delete(Keyspace keyspace, String columnFamily, DataOperationsProfile dataOperationsProfile, RowSerializer<R, T> rowSerializer, R rowKey, Iterable<T> topKeys) {
Mutator<ByteBuffer> mutator = HFactory.createMutator(keyspace, ByteBufferSerializer.get());
Expand All @@ -485,17 +499,28 @@ public <R, T> void delete(Keyspace keyspace, String columnFamily, DataOperations
for (T topKey : topKeys) {
mutator.addDeletion(serializedRowKey, columnFamily, topKey, rowSerializer.getTopKeySerializer());
}
StopWatch monitor = monitoring.start(HerculesMonitoringGroup.HERCULES_DD, "Delete " + columnFamily);

try {
mutator.execute();
} finally {
long time = monitor.stop();
if (dataOperationsProfile != null) {
dataOperationsProfile.ms += time;
dataOperationsProfile.dbQueries++;
executeDeletionMutator(columnFamily, dataOperationsProfile, mutator);
}

@Override
public <R, T> void delete(Keyspace keyspace, String columnFamily, DataOperationsProfile dataOperationsProfile, RowSerializer<R, T> rowSerializer, Map<R, Iterable<T>> values) {
Serializer<R> rowKeySerializer = rowSerializer.getRowKeySerializer();

Mutator<ByteBuffer> mutator = HFactory.createMutator(keyspace, ByteBufferSerializer.get());

for (R rowKey : values.keySet()) {
ByteBuffer serializedRowKey = rowKeySerializer.toByteBuffer(rowKey);
if (serializedRowKey == null) {
continue;
}

for (T topKey : values.get(rowKey)) {
mutator.addDeletion(serializedRowKey, columnFamily, topKey, rowSerializer.getTopKeySerializer());
}
}

executeDeletionMutator(columnFamily, dataOperationsProfile, mutator);
}

private <R, T> HerculesMultiQueryResult<R, T> buildQueryResult(DataOperationsProfile dataOperationsProfile, RowSerializer<R, T> rowSerializer, Rows<R, T, ByteBuffer> rows) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.appmetr.hercules.keys;

import me.prettyprint.hector.api.Serializer;

public interface CollectionKeysExtractor<E, K> {

Iterable<K> extractKeys(E entity);

Serializer<K> getKeySerializer();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.appmetr.hercules.keys;

import com.appmetr.hercules.manager.EntityManager;
import me.prettyprint.hector.api.Serializer;

import javax.inject.Inject;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public class EntityCollectionKeyExtractor<E, K> implements CollectionKeysExtractor<E, K> {

private EntityManager em;
private Field collectionField;
private Class<?> collectionEntityClass;

@Inject
public EntityCollectionKeyExtractor(Field collectionField, Class<?> collectionEntityClass, EntityManager em) {
this.collectionField = collectionField;
this.collectionEntityClass = collectionEntityClass;
this.em = em;
}

@Override public Iterable<K> extractKeys(E entity) {
try {
Collection indexedCollection = (Collection) collectionField.get(entity);
if (indexedCollection != null) {
List<K> keys = new ArrayList<K>(indexedCollection.size());
for (Object entityInIndex : indexedCollection) {
keys.add(em.<Object, K>getPK(entityInIndex));
}
return keys;
} else {
return Collections.emptyList();
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Unable to read indexed collection field for entity of class " + entity.getClass().getName() + " and field " + collectionField.getName());
}
}

@Override public Serializer<K> getKeySerializer() {
return em.getPKSerializer(collectionEntityClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.appmetr.hercules.keys;

import me.prettyprint.hector.api.Serializer;

import javax.inject.Inject;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;

public class SerializableKeyCollectionKeyExtractor<E, K> implements CollectionKeysExtractor<E, K> {

private Field collectionField;
private Serializer<K> keySerializer;

@Inject
public SerializableKeyCollectionKeyExtractor(Field collectionField, Serializer<K> serializer) {
this.collectionField = collectionField;
this.keySerializer = serializer;
}

@Override public Iterable<K> extractKeys(E entity) {
try {
Collection<K> keys = (Collection<K>) collectionField.get(entity);
if (keys!=null){
return keys;
}else{
return Collections.emptyList();

}
} catch (IllegalAccessException e) {
throw new RuntimeException("Unable to read indexed collection field for entity of class " + entity.getClass().getName() + " and field " + collectionField.getName());
}
}

@Override public Serializer<K> getKeySerializer() {
return keySerializer;
}

}
Loading

0 comments on commit 23fce87

Please sign in to comment.