Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging incremental-checkpoint-feature branch to master #821

Merged
merged 38 commits into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e27733d
Adding the incremental checkpointing functionality to length windows.
miyurud Dec 8, 2017
0472487
Added comments and formatted the code.
miyurud Dec 10, 2017
69b2756
Merge remote-tracking branch 'upstream/master'
miyurud Dec 11, 2017
9b39cac
Adding the incremental file system store cleaning code.
miyurud Dec 11, 2017
66ebb49
Adding the incremental checkpointing functionality to length batch wi…
miyurud Dec 19, 2017
f3c70bf
Merge remote-tracking branch 'upstream/master'
miyurud Dec 20, 2017
d615572
Fixing a test case naming issue.
miyurud Dec 20, 2017
de0e61e
Adding the incremental checkpointing for event tables.
miyurud Jan 8, 2018
c2a89ea
Merge remote-tracking branch 'upstream/master'
miyurud Jan 8, 2018
ea6ca3f
Merge branch 'master' of https://github.com/miyurud/siddhi
miyurud Jan 8, 2018
4cb6d9f
Adding the min-max counting scenario and related code improvements.
miyurud Jan 30, 2018
12163c8
Fixing the issue of incremental vs full snapshot decision logic.
miyurud Feb 2, 2018
7dae130
Adding the changes on group by queries and merging with the upstream.
miyurud Feb 16, 2018
8c2e78e
Updating the local copy.
miyurud Feb 19, 2018
70b5ab7
Adding the incremental checkpointing feature for partitioned windows.
miyurud Feb 21, 2018
34b1cdc
Fixed the checkstyle issues
miyurud Mar 22, 2018
4cab522
Updating the changes from master
miyurud Mar 23, 2018
5661781
Adding local changes
miyurud Mar 23, 2018
9717ddd
Fixing the checkstyle issues
miyurud Mar 23, 2018
146e5e3
Updated the findbugs-exclude file.
miyurud Mar 23, 2018
d4c31c4
Removed the commented lines
miyurud Mar 23, 2018
d3c3025
Merge pull request #790 from miyurud/incremental-checkpoint-feature
tishan89 Mar 30, 2018
6625f6e
Merge remote-tracking branch 'wso2/master' into incremental-checkpoin…
suhothayan Apr 19, 2018
cf95257
Merge remote-tracking branch 'wso2/incremental-checkpoint-feature' in…
suhothayan Apr 19, 2018
abea03c
Rename SnapshotableComplexEventChunk to SnapshotableStreamEventQueue …
suhothayan Apr 26, 2018
19b6c86
Fix Site Name
suhothayan Apr 29, 2018
04f9297
Improve snapshot, incremental store persist and restore
suhothayan May 2, 2018
d7bc8ba
Improve snapshot, incremental store persist and restore
suhothayan May 4, 2018
68e699b
Fix check styles and findbug issues and join with table with stream u…
suhothayan May 5, 2018
d35d2be
Merge remote-tracking branch 'wso2/master' into incremental-checkpoin…
suhothayan May 5, 2018
8c1f3d6
Merge branch 'incremental-checkpoint-feature'
dilini-muthumala May 21, 2018
3c2fa73
Moving PersistenceStore implementations to Carbon-analytics
dilini-muthumala May 28, 2018
f76e108
Improving API
dilini-muthumala May 28, 2018
5cd9a4a
Fixing bug in API
dilini-muthumala May 28, 2018
898269c
Merging with master
dilini-muthumala May 28, 2018
0ffeb2f
Merging with master
dilini-muthumala May 28, 2018
bf1017c
Adding back persistence store implementations to Siddhi for unit-testing
dilini-muthumala May 31, 2018
889e5cb
Fixing license header and removing unnecessary comments.
dilini-muthumala May 31, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,935 changes: 1,935 additions & 0 deletions docs/api/4.0.9-SNAPSHOT.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@
<Package name="~org\.wso2\.siddhi\.core\.query\.selector\.attribute\.aggregator\.incremental.*"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Package name="~org\.wso2\.siddhi\.core\.util\.snapshot\.*"/>
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
</Match>

<!--other-->
<Match>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public class SiddhiAppRuntime {
private LatencyTracker storeQueryLatencyTracker;
private SiddhiDebugger siddhiDebugger;
private boolean running = false;
private Future futureIncrementalPersistor;

public SiddhiAppRuntime(Map<String, AbstractDefinition> streamDefinitionMap,
Map<String, AbstractDefinition> tableDefinitionMap,
Expand Down Expand Up @@ -541,39 +542,42 @@ public PersistenceReference persist() {
AsyncSnapshotPersistor asyncSnapshotPersistor = new AsyncSnapshotPersistor(snapshots,
siddhiAppContext.getSiddhiContext().getPersistenceStore(), siddhiAppContext.getName());
String revision = asyncSnapshotPersistor.getRevision();
//TODO:Need to decide how do we handle the Future variable below.
Future future = siddhiAppContext.getExecutorService().submit(asyncSnapshotPersistor);

//Next, handle the increment persistance scenarios
//Incremental state
HashMap<String, HashMap<String, Object>> incrementalState = serializeObj.incrementalState;
//Base state
HashMap<String, HashMap<String, Object>> incrementalStateBase = serializeObj.incrementalStateBase;

if (incrementalState != null) {
for (Map.Entry<String, HashMap<String, Object>> entry : incrementalState.entrySet()) {
if (incrementalStateBase != null) {
for (Map.Entry<String, HashMap<String, Object>> entry : incrementalStateBase.entrySet()) {
for (HashMap.Entry<String, Object> entry2 : entry.getValue().entrySet()) {
AsyncIncrementalSnapshotPersistor asyncIncrementSnapshotPersistor = new
AsyncIncrementalSnapshotPersistor((byte[]) entry2.getValue(),
siddhiAppContext.getSiddhiContext().getIncrementalPersistenceStore(),
siddhiAppContext.getName(), entry.getKey(), entry2.getKey(),
revision.split("_")[0], "I");
revision.split("_")[0], "B");

Future future2 = siddhiAppContext.getExecutorService().submit(asyncIncrementSnapshotPersistor);
//TODO:Need to decide how do we handle the Future variable below.
Future future3 = siddhiAppContext.getExecutorService().submit(asyncIncrementSnapshotPersistor);
}
}
}

//Base state
HashMap<String, HashMap<String, Object>> incrementalStateBase = serializeObj.incrementalStateBase;
//Next, handle the increment persistance scenarios
//Incremental state
HashMap<String, HashMap<String, Object>> incrementalState = serializeObj.incrementalState;

if (incrementalStateBase != null) {
for (Map.Entry<String, HashMap<String, Object>> entry : incrementalStateBase.entrySet()) {
if (incrementalState != null) {
for (Map.Entry<String, HashMap<String, Object>> entry : incrementalState.entrySet()) {
for (HashMap.Entry<String, Object> entry2 : entry.getValue().entrySet()) {
AsyncIncrementalSnapshotPersistor asyncIncrementSnapshotPersistor = new
AsyncIncrementalSnapshotPersistor((byte[]) entry2.getValue(),
siddhiAppContext.getSiddhiContext().getIncrementalPersistenceStore(),
siddhiAppContext.getName(), entry.getKey(), entry2.getKey(),
revision.split("_")[0], "B");
revision.split("_")[0], "I");

Future future3 = siddhiAppContext.getExecutorService().submit(asyncIncrementSnapshotPersistor);
//TODO:Need to decide how do we handle the Future variable below.
Future future2 = siddhiAppContext.getExecutorService().submit(asyncIncrementSnapshotPersistor);
}
}
}
Expand All @@ -598,6 +602,7 @@ public byte[] snapshot() {
}
}

//TODO:Need to identify where this method has been used.
public void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException {
try {
// first, pause all the event sources
Expand All @@ -610,6 +615,7 @@ public void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException
}
}

//TODO:Need to identify where this method has been used.
public void restoreRevision(String revision) throws CannotRestoreSiddhiAppStateException {
try {
// first, pause all the event sources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class SnapshotableComplexEventChunk<E extends ComplexEvent> implements It
private static final float FULL_SNAPSHOT_THRESHOLD = 2.1f;
private boolean isFirstSnapshot = true;
private boolean isRecovery;
private String operatorType;
private long threshold;
private long sizeOfBacklogChangelogs;

public SnapshotableComplexEventChunk(boolean isBatch) {
this.isBatch = isBatch;
Expand Down Expand Up @@ -78,6 +81,18 @@ public SnapshotableComplexEventChunk(E first, E last, boolean isBatch) {
this.changeLog = new ArrayList<Operation>();
}

public SnapshotableComplexEventChunk(boolean isBatch, String metaInfo) {
this.isBatch = isBatch;
this.changeLog = new ArrayList<Operation>();
populateMetaInformation(metaInfo);
}

private void populateMetaInformation(String metaInfo) {
String[] metaInfoStringArr = metaInfo.split(":");
operatorType = metaInfoStringArr[0];
threshold = Long.parseLong(metaInfoStringArr[1]);
}

public void insertBeforeCurrent(E events) {

if (lastReturned == null) {
Expand Down Expand Up @@ -170,7 +185,7 @@ public boolean hasNext() {
* Returns the next element in the iteration.
*
* @return the next element in the iteration.
* @throws java.util.NoSuchElementException iteration has no more elements.
* @throws NoSuchElementException iteration has no more elements.
*/
public E next() {
E returnEvent;
Expand Down Expand Up @@ -312,9 +327,7 @@ public void setAtIndex(int index, Object value) {

public Snapshot getSnapshot() {
if (isFirstSnapshot) {
//objectMap
Snapshot snapshot = new Snapshot(this, false);
//snapshotByteSerializer.objectToByte(objectMap, siddhiAppContext);
isFirstSnapshot = false;
this.changeLog.clear();
return snapshot;
Expand All @@ -323,15 +336,20 @@ public Snapshot getSnapshot() {
if (isFullSnapshot()) {
Snapshot snapshot = new Snapshot(this, false);
this.changeLog = new ArrayList<Operation>();
sizeOfBacklogChangelogs = 0;
return snapshot;
} else {
Snapshot snapshot = new Snapshot(changeLog, true);
sizeOfBacklogChangelogs += this.changeLog.size();
this.changeLog.clear();
return snapshot;
}
}

private boolean isFullSnapshot() {
if ((this.changeLog.size() > (eventsCount * FULL_SNAPSHOT_THRESHOLD)) && (eventsCount != 0)) {
if (sizeOfBacklogChangelogs > 100) {
return true;
} else if (sizeOfBacklogChangelogs > threshold) {
return true;
} else {
return false;
Expand Down Expand Up @@ -386,24 +404,20 @@ public Object restore(String key, Map<String, Object> state) {
for (Operation op : addList) {
switch (op.operation) {
case Operator.ADD:
//Need to check whether there is only one event or multiple events. If so we have to
//TODO:Need to check whether there is only one event or multiple events.
// If so we have to
// traverse the linked list and then get the count by which the eventsCount needs
// to be updated.
this.add((E) op.parameters);
//((StreamEvent)parameters).setNext(null);
//changeLogForVariable.add(new Operation(operator, (StreamEvent) parameters));
break;
case Operator.REMOVE:
this.remove();
//changeLogForVariable.add(new Operation(operator, parameters));
break;
case Operator.POLL:
//changeLogForVariable.add(new Operation(operator, parameters));
this.poll();
break;
case Operator.CLEAR:
this.clear();
//changeLogForVariable.add(new Operation(operator, parameters));
break;
default:
continue;
Expand All @@ -419,24 +433,19 @@ public Object restore(String key, Map<String, Object> state) {
for (Operation op : addList) {
switch (op.operation) {
case Operator.ADD:
//Need to check whether there is only one event or multiple events. If so we have to
//TODO:Need to check whether there is only one event or multiple events. If so we have to
//traverse the linked list and then get the count by which the eventsCount needs
// to be updated.
this.add((E) op.parameters);
//((StreamEvent)parameters).setNext(null);
//changeLogForVariable.add(new Operation(operator, (StreamEvent) parameters));
break;
case Operator.REMOVE:
this.remove();
//changeLogForVariable.add(new Operation(operator, parameters));
break;
case Operator.POLL:
//changeLogForVariable.add(new Operation(operator, parameters));
this.poll();
break;
case Operator.CLEAR:
this.clear();
//changeLogForVariable.add(new Operation(operator, parameters));
break;
default:
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,14 @@ public void setLength(int length) {
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, boolean
outputExpectsExpiredEvents, SiddhiAppContext siddhiAppContext) {
expiredEventChunk = new SnapshotableComplexEventChunk<StreamEvent>(false);
if (attributeExpressionExecutors.length == 1) {
length = (Integer) ((ConstantExpressionExecutor) attributeExpressionExecutors[0]).getValue();
} else {
throw new SiddhiAppValidationException("Length window should only have one parameter (<int> " +
"windowLength), but found " + attributeExpressionExecutors.length + " input attributes");
}

expiredEventChunk = new SnapshotableComplexEventChunk<StreamEvent>(false, "LW:" + length);
}

@Override
Expand Down Expand Up @@ -178,7 +179,7 @@ public Map<String, Object> currentState() {
public synchronized void restoreState(Map<String, Object> state) {
count = (int) state.get("Count");
expiredEventChunk.clear();
expiredEventChunk = (SnapshotableComplexEventChunk<StreamEvent>) expiredEventChunk.restore("ExpiredEventChunk", state);
//expiredEventChunk.add(((SnapshotableComplexEventChunk<StreamEvent>) expiredEventChunk.restore("ExpiredEventChunk", state)).getFirst());
expiredEventChunk = (SnapshotableComplexEventChunk<StreamEvent>)
expiredEventChunk.restore("ExpiredEventChunk", state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,21 @@ public ExpressionExecutor cloneExecutor(String key) {

@Override
public Map<String, Object> currentState() {
HashMap<String, Map<String, Object>> data = new HashMap<>();
Map<String, Object> state = new HashMap<>();
for (Map.Entry<String, AttributeAggregator> entry : aggregatorMap.entrySet()) {
data.put(entry.getKey(), entry.getValue().currentState());
state.put(entry.getKey(), entry.getValue().currentState());
}
Map<String, Object> state = new HashMap<>();
state.put("Data", data);

return state;
}

@Override
public void restoreState(Map<String, Object> state) {
HashMap<String, Map<String, Object>> data = (HashMap<String, Map<String, Object>>) state.get("Data");
for (HashMap.Entry<String, Object> item: state.entrySet()) {
String key = item.getKey();

for (Map.Entry<String, Map<String, Object>> entry : data.entrySet()) {
String key = entry.getKey();
AttributeAggregator aAttributeAggregator = attributeAggregator.cloneAggregator(key);
aAttributeAggregator.restoreState(entry.getValue());
aAttributeAggregator.restoreState((Map<String, Object>) item.getValue());
aggregatorMap.put(key, aAttributeAggregator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@
import org.wso2.siddhi.query.api.expression.condition.Compare;

import java.io.Serializable;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

/**
* EventHolder implementation where events will be indexed and stored. This will offer faster access compared to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.wso2.siddhi.core.event.stream.converter.StreamEventConverter;
import org.wso2.siddhi.core.util.snapshot.Snapshot;

import java.util.LinkedList;
import java.util.Map;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.SnapshotableComplexEventChunk;
import org.wso2.siddhi.core.event.stream.Operation;
import org.wso2.siddhi.core.event.stream.Operator;
import org.wso2.siddhi.core.event.stream.StreamEvent;
Expand All @@ -35,7 +34,16 @@
import org.wso2.siddhi.query.api.expression.condition.Compare;

import java.io.Serializable;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

/**
* EventHolder implementation where events will be indexed and stored. This will offer faster access compared to
Expand Down Expand Up @@ -184,7 +192,6 @@ private void add(StreamEvent streamEvent) {
}
}
}

}

private Object constructPrimaryKey(StreamEvent streamEvent,
Expand Down Expand Up @@ -572,7 +579,6 @@ public Snapshot getSnapshot() {
if (isFirstSnapshot) {
//objectMap
Snapshot snapshot = new Snapshot(this, false);
//snapshotByteSerializer.objectToByte(objectMap, siddhiAppContext);
isFirstSnapshot = false;
this.changeLog.clear();
return snapshot;
Expand Down Expand Up @@ -643,20 +649,19 @@ public EventHolder restore(String key, Map<String, Object> state) {
this.overwrite((StreamEvent) op.parameters);
break;
case Operator.REMOVE2:
delete((String)((Object[])op.parameters)[0],
delete((String) ((Object[]) op.parameters)[0],
(Compare.Operator) ((Object[]) op.parameters)[1],
((Object[]) op.parameters)[2]);
break;
case Operator.CLEAR:
this.deleteAll();
break;
case Operator.REMOVE:
this.deleteAll((Collection<StreamEvent>)op.parameters);
this.deleteAll((Collection<StreamEvent>) op.parameters);
break;
case Operator.CLEARALL:
// this.changeLog.add(new Operation(Operator.CLEARALL, storeEventSet))
//this.delete((Collection<StreamEvent>) op.parameters);
this.deleteAll((Collection<StreamEvent>) op.parameters);
break;
default:
continue;
}
Expand All @@ -671,25 +676,19 @@ public EventHolder restore(String key, Map<String, Object> state) {
for (Operation op : addList) {
switch (op.operation) {
case Operator.ADD:
//Need to check whether there is only one event or multiple events. If so we have to
// traverse the linked list and then get the count by which the eventsCount needs
// to be updated.
this.add((StreamEvent) op.parameters);
//((StreamEvent)parameters).setNext(null);
//changeLogForVariable.add(new Operation(operator, (StreamEvent) parameters));
break;
case Operator.REMOVE2:
delete((String)((Object[])op.parameters)[0],
delete((String) ((Object[]) op.parameters)[0],
(Compare.Operator) ((Object[]) op.parameters)[1],
((Object[]) op.parameters)[2]);
break;
case Operator.CLEAR:
this.deleteAll();
break;
case Operator.REMOVE:
// this.changeLog.add(new Operation(Operator.CLEARALL, storeEventSet))
//this.delete((Collection<StreamEvent>) op.parameters);
this.deleteAll((Collection<StreamEvent>) op.parameters);
break;
default:
continue;
}
Expand All @@ -700,5 +699,4 @@ public EventHolder restore(String key, Map<String, Object> state) {

return this;
}

}
Loading