", @map(...)))
+```
+
+QUERY PARAMETERS
+
+
+ Name
+ Description
+ Default Value
+ Possible Data Types
+ Optional
+ Dynamic
+
+
+ priority
+ This will set the logger priority i.e log level. Accepted values are INFO, DEBUG, WARN, FATAL, ERROR, OFF, TRACE
+ INFO
+ STRING
+ Yes
+ No
+
+
+ prefix
+ This will be the prefix to the output message. If the output stream has event [2,4] and the prefix is given as "Hello" then the log will show "Hello : [2,4]"
+ default prefix will be :
+ STRING
+ Yes
+ No
+
+
+
+Examples
+EXAMPLE 1
+```
+@sink(type='log', prefix='My Log', priority='DEBUG'),
+define stream BarStream (symbol string, price float, volume long)
+```
+In this example BarStream uses log sink and the prefix is given as My Log. Also the priority is set to DEBUG.
+
+EXAMPLE 2
+```
+@sink(type='log', priority='DEBUG'),
+define stream BarStream (symbol string, price float, volume long)
+```
+In this example BarStream uses log sink and the priority is set to DEBUG. User has not specified prefix so the default prefix will be in the form <Siddhi App Name> : <Stream Name>
+
+EXAMPLE 3
+```
+@sink(type='log', prefix='My Log'),
+define stream BarStream (symbol string, price float, volume long)
+```
+In this example BarStream uses log sink and the prefix is given as My Log. User has not given a priority so it will be set to default INFO.
+
+EXAMPLE 4
+```
+@sink(type='log'),
+define stream BarStream (symbol string, price float, volume long)
+```
+In this example BarStream uses log sink. The user has not given prefix or priority so they will be set to their default values.
+
+## Sinkmapper
+
+### passThrough *(Sink Mapper) *
+
+Pass-through mapper passed events (Event[]) through without any mapping or modifications.
+
+Syntax
+```
+@sink(..., @map(type="passThrough")
+```
+
+Examples
+EXAMPLE 1
+```
+@sink(type='inMemory', @map(type='passThrough'),
+define stream BarStream (symbol string, price float, volume long);
+```
+In the following example BarStream uses passThrough outputmapper which emit Siddhi event directly without any transformation into sink.
+
+## Source
+
+### inMemory *(Source) *
+
+In-memory source that can communicate with other in-memory sinks within the same JVM, it is assumed that the publisher and subscriber of a topic uses same event schema (stream definition).
+
+Syntax
+```
+@source(type="inMemory", topic="", @map(...)))
+```
+
+QUERY PARAMETERS
+
+
+ Name
+ Description
+ Default Value
+ Possible Data Types
+ Optional
+ Dynamic
+
+
+ topic
+ Subscribes to sent on the given topic.
+
+ STRING
+ No
+ No
+
+
+
+Examples
+EXAMPLE 1
+```
+@source(type='inMemory', @map(type='passThrough'),
+define stream BarStream (symbol string, price float, volume long)
+```
+In this example BarStream uses inMemory transport which passes the received event internally without using external transport.
+
+## Sourcemapper
+
+### passThrough *(Source Mapper) *
+
+Pass-through mapper passed events (Event[]) through without any mapping or modifications.
+
+Syntax
+```
+@source(..., @map(type="passThrough")
+```
+
+Examples
+EXAMPLE 1
+```
+@source(type='tcp', @map(type='passThrough'),
+define stream BarStream (symbol string, price float, volume long);
+```
+In this example BarStream uses passThrough inputmapper which passes the received Siddhi event directly without any transformation into source.
+
diff --git a/docs/api/latest.md b/docs/api/latest.md
index 1a7a458545..ab2e3ebc0f 100644
--- a/docs/api/latest.md
+++ b/docs/api/latest.md
@@ -1,4 +1,4 @@
-# API Docs - v4.1.26
+# API Docs - v4.1.27-SNAPSHOT
## Core
diff --git a/docs/documentation/siddhi-architecture.md b/docs/documentation/siddhi-architecture.md
index a14d630281..b159ee2c4e 100644
--- a/docs/documentation/siddhi-architecture.md
+++ b/docs/documentation/siddhi-architecture.md
@@ -1,6 +1,6 @@
# Siddhi Architecture
-WSO2 Siddhi is a software library that can be utilized in any of the following ways:
+Siddhi is a software library that can be utilized in any of the following ways:
- Run as a server on its own
- Run within WSO2 SP as a service
diff --git a/docs/index.md b/docs/index.md
index 0f2f7f35c2..12d6fa99a5 100755
--- a/docs/index.md
+++ b/docs/index.md
@@ -1,5 +1,5 @@
-WSO2 Siddhi
-===========
+Siddhi
+======
Siddhi is a java library that listens to events from data streams, detects complex conditions described via a **Streaming
SQL language**, and triggers actions. It performs both **_Stream Processing_** and **_Complex Event Processing_**.
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index 056a88972c..5b46ff265d 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -164,6 +164,15 @@
+
+
+
+
+
+
+
+
+
@@ -181,6 +190,10 @@
+
+
+
+
diff --git a/mkdocs.yml b/mkdocs.yml
index f5a222b5d4..0a17ea45fe 100755
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -1,12 +1,12 @@
-site_name: WSO2 Siddhi
+site_name: Siddhi
site_description: Stream Processing and Complex Event Processing Engine
site_author: WSO2
site_url: https://wso2.github.io/siddhi
extra_css:
- stylesheets/extra.css
-repo_name: WSO2 Siddhi
+repo_name: Siddhi
repo_url: https://github.com/wso2/siddhi
-copyright: Copyright © 2011 - 2017 WSO2
+copyright: Copyright © 2011 - 2018 WSO2
extra:
logo: images/siddhi-logo-w.svg
palette:
@@ -26,7 +26,7 @@ markdown_extensions:
- toc(permalink=true)
- codehilite(guess_lang=false)
pages:
-- Welcome to WSO2 Siddhi: index.md
+- Welcome to Siddhi: index.md
- Features: features.md
- Quick Start Guide: documentation/siddhi-quckstart-4.0.md
- Documentation:
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiAppRuntime.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiAppRuntime.java
index e45704ae13..01576e9bb4 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiAppRuntime.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiAppRuntime.java
@@ -53,7 +53,7 @@
import org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder;
import org.wso2.siddhi.core.util.parser.StoreQueryParser;
import org.wso2.siddhi.core.util.parser.helper.QueryParserHelper;
-import org.wso2.siddhi.core.util.snapshot.AsyncSnapshotPersistor;
+import org.wso2.siddhi.core.util.persistence.util.PersistenceHelper;
import org.wso2.siddhi.core.util.snapshot.PersistenceReference;
import org.wso2.siddhi.core.util.statistics.BufferedEventsTracker;
import org.wso2.siddhi.core.util.statistics.LatencyTracker;
@@ -120,6 +120,7 @@ public class SiddhiAppRuntime {
private LatencyTracker storeQueryLatencyTracker;
private SiddhiDebugger siddhiDebugger;
private boolean running = false;
+ private Future futureIncrementalPersistor;
public SiddhiAppRuntime(Map streamDefinitionMap,
Map tableDefinitionMap,
@@ -531,13 +532,13 @@ public PersistenceReference persist() {
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// take snapshots of execution units
- byte[] snapshots = siddhiAppContext.getSnapshotService().snapshot();
- // start the snapshot persisting task asynchronously
- AsyncSnapshotPersistor asyncSnapshotPersistor = new AsyncSnapshotPersistor(snapshots,
- siddhiAppContext.getSiddhiContext().getPersistenceStore(), siddhiAppContext.getName());
- String revision = asyncSnapshotPersistor.getRevision();
- Future future = siddhiAppContext.getExecutorService().submit(asyncSnapshotPersistor);
- return new PersistenceReference(future, revision);
+ if (siddhiAppContext.getSiddhiContext().getPersistenceStore() != null) {
+ return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().fullSnapshot(),
+ siddhiAppContext);
+ } else {
+ return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().incrementalSnapshot(),
+ siddhiAppContext);
+ }
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
@@ -549,7 +550,7 @@ public byte[] snapshot() {
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// take snapshots of execution units
- return siddhiAppContext.getSnapshotService().snapshot();
+ return siddhiAppContext.getSnapshotService().fullSnapshot();
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
@@ -561,7 +562,7 @@ public void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// start the restoring process
- siddhiAppContext.getPersistenceService().restore(snapshot);
+ siddhiAppContext.getSnapshotService().restore(snapshot);
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
@@ -573,7 +574,7 @@ public void restoreRevision(String revision) throws CannotRestoreSiddhiAppStateE
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// start the restoring process
- siddhiAppContext.getPersistenceService().restoreRevision(revision);
+ siddhiAppContext.getSnapshotService().restoreRevision(revision);
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
@@ -586,7 +587,7 @@ public String restoreLastRevision() throws CannotRestoreSiddhiAppStateException
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// start the restoring process
- revision = siddhiAppContext.getPersistenceService().restoreLastRevision();
+ revision = siddhiAppContext.getSnapshotService().restoreLastRevision();
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManager.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManager.java
index 01a2ddc317..bda3e1fac8 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManager.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManager.java
@@ -27,6 +27,7 @@
import org.wso2.siddhi.core.util.SiddhiAppRuntimeBuilder;
import org.wso2.siddhi.core.util.config.ConfigManager;
import org.wso2.siddhi.core.util.parser.SiddhiAppParser;
+import org.wso2.siddhi.core.util.persistence.IncrementalPersistenceStore;
import org.wso2.siddhi.core.util.persistence.PersistenceStore;
import org.wso2.siddhi.query.api.SiddhiApp;
import org.wso2.siddhi.query.compiler.SiddhiCompiler;
@@ -235,4 +236,8 @@ public void restoreLastState() {
}
}
}
+
+ public void setIncrementalPersistenceStore(IncrementalPersistenceStore incrementalPersistenceStore) {
+ this.siddhiContext.setIncrementalPersistenceStore(incrementalPersistenceStore);
+ }
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java
index 5791be48ed..9867302f9e 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java
@@ -23,7 +23,6 @@
import org.wso2.siddhi.core.util.ElementIdGenerator;
import org.wso2.siddhi.core.util.ThreadBarrier;
import org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder;
-import org.wso2.siddhi.core.util.persistence.PersistenceService;
import org.wso2.siddhi.core.util.snapshot.SnapshotService;
import org.wso2.siddhi.core.util.statistics.StatisticsManager;
import org.wso2.siddhi.core.util.timestamp.TimestampGenerator;
@@ -56,7 +55,6 @@ public class SiddhiAppContext {
private ThreadBarrier threadBarrier = null;
private TimestampGenerator timestampGenerator = null;
- private PersistenceService persistenceService;
private ElementIdGenerator elementIdGenerator;
private Map scriptFunctionMap;
private ExceptionHandler disruptorExceptionHandler;
@@ -166,14 +164,6 @@ public void setSnapshotService(SnapshotService snapshotService) {
this.snapshotService = snapshotService;
}
- public PersistenceService getPersistenceService() {
- return persistenceService;
- }
-
- public void setPersistenceService(PersistenceService persistenceService) {
- this.persistenceService = persistenceService;
- }
-
public ElementIdGenerator getElementIdGenerator() {
return elementIdGenerator;
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiContext.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiContext.java
index 498596337b..c788e600a8 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiContext.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiContext.java
@@ -20,6 +20,7 @@
import com.lmax.disruptor.ExceptionHandler;
import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.exception.PersistenceStoreException;
import org.wso2.siddhi.core.stream.input.source.SourceHandlerManager;
import org.wso2.siddhi.core.stream.output.sink.SinkHandlerManager;
import org.wso2.siddhi.core.table.record.RecordTableHandlerManager;
@@ -27,6 +28,7 @@
import org.wso2.siddhi.core.util.config.ConfigManager;
import org.wso2.siddhi.core.util.config.InMemoryConfigManager;
import org.wso2.siddhi.core.util.extension.holder.AbstractExtensionHolder;
+import org.wso2.siddhi.core.util.persistence.IncrementalPersistenceStore;
import org.wso2.siddhi.core.util.persistence.PersistenceStore;
import org.wso2.siddhi.core.util.statistics.metrics.SiddhiMetricsFactory;
@@ -45,6 +47,7 @@ public class SiddhiContext {
private ExceptionHandler defaultDisrupterExceptionHandler;
private Map siddhiExtensions = new HashMap<>();
private PersistenceStore persistenceStore = null;
+ private IncrementalPersistenceStore incrementalPersistenceStore = null;
private ConcurrentHashMap siddhiDataSources;
private StatisticsConfiguration statisticsConfiguration;
private ConcurrentHashMap extensionHolderMap;
@@ -82,14 +85,31 @@ public Map getSiddhiExtensions() {
return siddhiExtensions;
}
- public PersistenceStore getPersistenceStore() {
+ public synchronized PersistenceStore getPersistenceStore() {
return persistenceStore;
}
- public void setPersistenceStore(PersistenceStore persistenceStore) {
+ public synchronized void setPersistenceStore(PersistenceStore persistenceStore) {
+ if (incrementalPersistenceStore != null) {
+ throw new PersistenceStoreException("Only one type of persistence store can exist. " +
+ "Incremental persistence store '" + incrementalPersistenceStore.getClass().getName() +
+ "' already registered!");
+ }
this.persistenceStore = persistenceStore;
}
+ public synchronized IncrementalPersistenceStore getIncrementalPersistenceStore() {
+ return incrementalPersistenceStore;
+ }
+
+ public synchronized void setIncrementalPersistenceStore(IncrementalPersistenceStore incrementalPersistenceStore) {
+ if (persistenceStore != null) {
+ throw new PersistenceStoreException("Only one type of persistence store can exist." +
+ " Persistence store '" + persistenceStore.getClass().getName() + "' already registered!");
+ }
+ this.incrementalPersistenceStore = incrementalPersistenceStore;
+ }
+
public void setConfigManager(ConfigManager configManager) {
this.configManager = configManager;
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/event/stream/Operation.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/event/stream/Operation.java
new file mode 100644
index 0000000000..6032c3068e
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/event/stream/Operation.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.wso2.siddhi.core.event.stream;
+
+import java.io.Serializable;
+
+
+/**
+ * The class which resembles an instance of operation performed on SnapshotableStreamEventQueue.
+ */
+public class Operation implements Serializable {
+
+ /**
+ * Possible Operator actions
+ */
+ public enum Operator {
+ ADD ,
+ REMOVE,
+ CLEAR,
+ OVERWRITE,
+ DELETE_BY_OPERATOR,
+ DELETE_BY_INDEX
+ }
+
+ public Operator operation;
+ public Object parameters;
+
+ public Operation(Operator operation, Object parameters) {
+ this.operation = operation;
+ this.parameters = parameters;
+ }
+
+ public Operation(Operator operation) {
+ this.operation = operation;
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/event/stream/holder/SnapshotableStreamEventQueue.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/event/stream/holder/SnapshotableStreamEventQueue.java
new file mode 100644
index 0000000000..87f10bde66
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/event/stream/holder/SnapshotableStreamEventQueue.java
@@ -0,0 +1,386 @@
+/*
+ * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.wso2.siddhi.core.event.stream.holder;
+
+import org.wso2.siddhi.core.event.stream.Operation;
+import org.wso2.siddhi.core.event.stream.Operation.Operator;
+import org.wso2.siddhi.core.event.stream.StreamEvent;
+import org.wso2.siddhi.core.util.snapshot.SnapshotRequest;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotState;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+/**
+ * The class to hold stream events in a queue and by managing its snapshots
+ */
+public class SnapshotableStreamEventQueue implements Iterator, Serializable {
+ private static final long serialVersionUID = 3185987841726255019L;
+ protected StreamEvent first;
+ protected StreamEvent previousToLastReturned;
+ protected StreamEvent lastReturned;
+ protected StreamEvent last;
+ private int operationChangeLogThreshold;
+ private transient StreamEventClonerHolder eventClonerHolder;
+ private ArrayList operationChangeLog;
+ private long operationChangeLogSize;
+ private boolean forceFullSnapshot = true;
+ private boolean isOperationLogEnabled = true;
+ private int eventIndex = -1;
+
+ public SnapshotableStreamEventQueue(StreamEventClonerHolder eventClonerHolder) {
+ this(eventClonerHolder, Integer.MAX_VALUE);
+ }
+
+ public SnapshotableStreamEventQueue(StreamEventClonerHolder eventClonerHolder, int operationChangeLogThreshold) {
+ this.operationChangeLog = new ArrayList<>();
+ this.eventClonerHolder = eventClonerHolder;
+ this.operationChangeLogThreshold = operationChangeLogThreshold;
+ }
+
+ public void add(StreamEvent events) {
+ if (!isFullSnapshot()) {
+ if (isOperationLogEnabled) {
+ operationChangeLog.add(new Operation(Operator.ADD, copyEvents(events)));
+ }
+ operationChangeLogSize++;
+ } else {
+ operationChangeLog.clear();
+ operationChangeLogSize = 0;
+ forceFullSnapshot = true;
+ }
+
+ if (first == null) {
+ first = events;
+ } else {
+ last.setNext(events);
+ }
+ last = getLastEvent(events);
+ }
+
+ /**
+ * Removes from the underlying collection the last element returned by the
+ * iterator (optional operation). This method can be called only once per
+ * call to next . The behavior of an iterator is unspecified if
+ * the underlying collection is modified while the iteration is in
+ * progress in any way other than by calling this method.
+ *
+ * @throws UnsupportedOperationException if the remove
+ * operation is not supported by this Iterator.
+ * @throws IllegalStateException if the next method has not
+ * yet been called, or the remove method has already
+ * been called after the last call to the next
+ * method.
+ */
+ public void remove() {
+ if (lastReturned == null) {
+ throw new IllegalStateException();
+ }
+ if (previousToLastReturned != null) {
+ previousToLastReturned.setNext(lastReturned.getNext());
+ } else {
+ first = lastReturned.getNext();
+ if (first == null) {
+ last = null;
+ }
+ }
+ lastReturned.setNext(null);
+ lastReturned = null;
+ if (!isFullSnapshot()) {
+ if (isOperationLogEnabled) {
+ operationChangeLog.add(new Operation(Operator.DELETE_BY_INDEX, eventIndex));
+ }
+ operationChangeLogSize++;
+ } else {
+ operationChangeLog.clear();
+ operationChangeLogSize = 0;
+ forceFullSnapshot = true;
+ }
+ eventIndex--;
+ }
+
+ private StreamEvent getLastEvent(StreamEvent complexEvents) {
+ StreamEvent lastEvent = complexEvents;
+ while (lastEvent != null && lastEvent.getNext() != null) {
+ lastEvent = lastEvent.getNext();
+ }
+ return lastEvent;
+ }
+
+ /**
+ * Returns true if the iteration has more elements. (In other
+ * words, returns true if next would return an element
+ * rather than throwing an exception.)
+ *
+ * @return true if the iterator has more elements.
+ */
+ public boolean hasNext() {
+ if (lastReturned != null) {
+ return lastReturned.getNext() != null;
+ } else if (previousToLastReturned != null) {
+ return previousToLastReturned.getNext() != null;
+ } else {
+ return first != null;
+ }
+ }
+
+ /**
+ * Returns the next element in the iteration.
+ *
+ * @return the next element in the iteration.
+ * @throws NoSuchElementException iteration has no more elements.
+ */
+ public StreamEvent next() {
+ StreamEvent returnEvent;
+ if (lastReturned != null) {
+ returnEvent = lastReturned.getNext();
+ previousToLastReturned = lastReturned;
+ } else if (previousToLastReturned != null) {
+ returnEvent = previousToLastReturned.getNext();
+ } else {
+ returnEvent = first;
+ }
+ if (returnEvent == null) {
+ throw new NoSuchElementException();
+ }
+ lastReturned = returnEvent;
+ eventIndex++;
+ return returnEvent;
+ }
+
+ public void clear() {
+ this.operationChangeLog.clear();
+ operationChangeLogSize = 0;
+ forceFullSnapshot = true;
+
+ previousToLastReturned = null;
+ lastReturned = null;
+ first = null;
+ last = null;
+ eventIndex = -1;
+ }
+
+ public void reset() {
+ previousToLastReturned = null;
+ lastReturned = null;
+ eventIndex = -1;
+ }
+
+ public StreamEvent getFirst() {
+ return first;
+ }
+
+ public StreamEvent getLast() {
+ return last;
+ }
+
+ public StreamEvent poll() {
+ reset();
+ if (first != null) {
+ StreamEvent firstEvent = first;
+ first = first.getNext();
+ firstEvent.setNext(null);
+
+ if (!isFullSnapshot()) {
+ if (isOperationLogEnabled) {
+ operationChangeLog.add(new Operation(Operator.REMOVE));
+ }
+ operationChangeLogSize++;
+ } else {
+ operationChangeLog.clear();
+ operationChangeLogSize = 0;
+ forceFullSnapshot = true;
+ }
+ return firstEvent;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "EventQueue{" +
+ "first=" + first +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SnapshotableStreamEventQueue that = (SnapshotableStreamEventQueue) o;
+
+ if (operationChangeLogThreshold != that.operationChangeLogThreshold) {
+ return false;
+ }
+ if (operationChangeLogSize != that.operationChangeLogSize) {
+ return false;
+ }
+ if (forceFullSnapshot != that.forceFullSnapshot) {
+ return false;
+ }
+ if (isOperationLogEnabled != that.isOperationLogEnabled) {
+ return false;
+ }
+ if (eventIndex != that.eventIndex) {
+ return false;
+ }
+ if (first != null ? !first.equals(that.first) : that.first != null) {
+ return false;
+ }
+ return operationChangeLog != null ? operationChangeLog.equals(that.operationChangeLog) :
+ that.operationChangeLog == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = first != null ? first.hashCode() : 0;
+ result = 31 * result + operationChangeLogThreshold;
+ result = 31 * result + (operationChangeLog != null ? operationChangeLog.hashCode() : 0);
+ result = 31 * result + (int) (operationChangeLogSize ^ (operationChangeLogSize >>> 32));
+ result = 31 * result + (forceFullSnapshot ? 1 : 0);
+ result = 31 * result + (isOperationLogEnabled ? 1 : 0);
+ result = 31 * result + eventIndex;
+ return result;
+ }
+
+ public SnapshotState getSnapshot() {
+ if (isFullSnapshot()) {
+ forceFullSnapshot = false;
+ return new SnapshotState(this.getFirst(), false);
+ } else {
+ SnapshotState snapshot = new SnapshotState(operationChangeLog, true);
+ operationChangeLog = new ArrayList<>();
+ return snapshot;
+ }
+ }
+
+ private boolean isFullSnapshot() {
+ return operationChangeLogSize > 100 || operationChangeLogSize > operationChangeLogThreshold
+ || forceFullSnapshot || SnapshotRequest.isRequestForFullSnapshot();
+
+ }
+
+ public void restore(SnapshotStateList snapshotStatelist) {
+ TreeMap revisions = snapshotStatelist.getSnapshotStates();
+ Iterator> itr = revisions.entrySet().iterator();
+ this.isOperationLogEnabled = false;
+ while (itr.hasNext()) {
+ Map.Entry snapshotEntry = itr.next();
+ if (!snapshotEntry.getValue().isIncrementalSnapshot()) {
+ this.clear();
+ this.add((StreamEvent) snapshotEntry.getValue().getState());
+ forceFullSnapshot = false;
+ } else {
+ ArrayList operations = (ArrayList) snapshotEntry.getValue().getState();
+ for (Operation op : operations) {
+ switch (op.operation) {
+ case ADD:
+ add((StreamEvent) op.parameters);
+ break;
+ case REMOVE:
+ poll();
+ break;
+ case CLEAR:
+ clear();
+ break;
+ case OVERWRITE:
+ int overwriteIndex = (int) ((Object[]) op.parameters)[0];
+ StreamEvent streamEvent = (StreamEvent) ((Object[]) op.parameters)[1];
+ while (hasNext()) {
+ next();
+ if (overwriteIndex == eventIndex) {
+ overwrite(streamEvent);
+ break;
+ }
+ }
+ break;
+ case DELETE_BY_OPERATOR:
+ break;
+ case DELETE_BY_INDEX:
+ int deleteIndex = (int) op.parameters;
+ while (hasNext()) {
+ next();
+ if (deleteIndex == eventIndex) {
+ remove();
+ break;
+ }
+ }
+ break;
+ default:
+ continue;
+ }
+ }
+ }
+ }
+ this.isOperationLogEnabled = true;
+ }
+
+ private StreamEvent copyEvents(StreamEvent events) {
+
+ StreamEvent currentEvent = events;
+ StreamEvent firstCopiedEvent = eventClonerHolder.getStreamEventCloner().copyStreamEvent(events);
+ StreamEvent lastCopiedEvent = firstCopiedEvent;
+
+ while (currentEvent.getNext() != null) {
+ currentEvent = currentEvent.getNext();
+ StreamEvent copiedStreamEvent = eventClonerHolder.getStreamEventCloner().copyStreamEvent(currentEvent);
+ lastCopiedEvent.setNext(copiedStreamEvent);
+ lastCopiedEvent = copiedStreamEvent;
+ }
+ return firstCopiedEvent;
+ }
+
+ public void overwrite(StreamEvent streamEvent) {
+ if (!isFullSnapshot()) {
+ if (isOperationLogEnabled) {
+ operationChangeLog.add(new Operation(Operator.OVERWRITE,
+ new Object[]{eventIndex,
+ eventClonerHolder.getStreamEventCloner().copyStreamEvent(streamEvent)}));
+ }
+ operationChangeLogSize++;
+ } else {
+ operationChangeLog.clear();
+ operationChangeLogSize = 0;
+ forceFullSnapshot = true;
+ }
+
+ if (previousToLastReturned != null) {
+ previousToLastReturned.setNext(streamEvent);
+ } else {
+ first = streamEvent;
+ }
+ StreamEvent next = lastReturned.getNext();
+ if (next != null) {
+ streamEvent.setNext(next);
+ } else {
+ last = streamEvent;
+ }
+ lastReturned = streamEvent;
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/event/stream/holder/StreamEventClonerHolder.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/event/stream/holder/StreamEventClonerHolder.java
new file mode 100644
index 0000000000..5257fad8e5
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/event/stream/holder/StreamEventClonerHolder.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.siddhi.core.event.stream.holder;
+
+import org.wso2.siddhi.core.event.stream.StreamEventCloner;
+
+/**
+ * Holder to have StreamEventCloner
+ */
+public class StreamEventClonerHolder {
+
+ private StreamEventCloner streamEventCloner = null;
+
+ public StreamEventClonerHolder() {
+ }
+
+ public StreamEventClonerHolder(StreamEventCloner streamEventCloner) {
+ this.streamEventCloner = streamEventCloner;
+ }
+
+ public StreamEventCloner getStreamEventCloner() {
+ return streamEventCloner;
+ }
+
+ public void setStreamEventCloner(StreamEventCloner streamEventCloner) {
+ this.streamEventCloner = streamEventCloner;
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/exception/PersistenceStoreException.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/exception/PersistenceStoreException.java
new file mode 100644
index 0000000000..fa5183fc21
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/exception/PersistenceStoreException.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.wso2.siddhi.core.exception;
+
+/**
+ * Exception class to be used when issue occur at persistence store.
+ */
+public class PersistenceStoreException extends RuntimeException {
+ public PersistenceStoreException() {
+ super();
+ }
+
+ public PersistenceStoreException(String message) {
+ super(message);
+ }
+
+ public PersistenceStoreException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+
+ public PersistenceStoreException(Throwable throwable) {
+ super(throwable);
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/AbstractStreamProcessor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/AbstractStreamProcessor.java
index 0bfae341fc..7bf39ec3cf 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/AbstractStreamProcessor.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/AbstractStreamProcessor.java
@@ -23,6 +23,7 @@
import org.wso2.siddhi.core.event.stream.MetaStreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
+import org.wso2.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.event.stream.populater.StreamEventPopulaterFactory;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
@@ -50,6 +51,7 @@ public abstract class AbstractStreamProcessor implements Processor, EternalRefer
protected Processor nextProcessor;
protected List additionalAttributes;
+ protected StreamEventClonerHolder streamEventClonerHolder = new StreamEventClonerHolder();
protected StreamEventCloner streamEventCloner;
protected AbstractDefinition inputDefinition;
protected ExpressionExecutor[] attributeExpressionExecutors;
@@ -192,6 +194,7 @@ public void constructStreamEventPopulater(MetaStreamEvent metaStreamEvent, int s
public void setStreamEventCloner(StreamEventCloner streamEventCloner) {
this.streamEventCloner = streamEventCloner;
+ this.streamEventClonerHolder.setStreamEventCloner(streamEventCloner);
}
public void setToLast(Processor processor) {
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/ExternalTimeWindowProcessor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/ExternalTimeWindowProcessor.java
index 06873931f8..96f2605248 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/ExternalTimeWindowProcessor.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/ExternalTimeWindowProcessor.java
@@ -28,6 +28,7 @@
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
+import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
@@ -38,6 +39,7 @@
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;
@@ -82,13 +84,13 @@
public class ExternalTimeWindowProcessor extends WindowProcessor implements FindableProcessor {
private static final Logger log = Logger.getLogger(ExternalTimeWindowProcessor.class);
private long timeToKeep;
- private ComplexEventChunk expiredEventChunk;
+ private SnapshotableStreamEventQueue expiredEventQueue;
private VariableExpressionExecutor timeStampVariableExpressionExecutor;
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, boolean
outputExpectsExpiredEvents, SiddhiAppContext siddhiAppContext) {
- this.expiredEventChunk = new ComplexEventChunk(false);
+ this.expiredEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder);
if (attributeExpressionExecutors.length == 2) {
if (attributeExpressionExecutors[1].getReturnType() == Attribute.Type.INT) {
timeToKeep = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor)
@@ -125,27 +127,27 @@ protected synchronized void process(ComplexEventChunk streamEventCh
StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent);
clonedEvent.setType(StreamEvent.Type.EXPIRED);
- // reset expiredEventChunk to make sure all of the expired events get removed,
+ // reset expiredEventQueue to make sure all of the expired events get removed,
// otherwise lastReturned.next will always return null and here while check is always false
- expiredEventChunk.reset();
- while (expiredEventChunk.hasNext()) {
- StreamEvent expiredEvent = expiredEventChunk.next();
+ expiredEventQueue.reset();
+ while (expiredEventQueue.hasNext()) {
+ StreamEvent expiredEvent = expiredEventQueue.next();
long expiredEventTime = (Long) timeStampVariableExpressionExecutor.execute(expiredEvent);
long timeDiff = expiredEventTime - currentTime + timeToKeep;
if (timeDiff <= 0) {
- expiredEventChunk.remove();
+ expiredEventQueue.remove();
expiredEvent.setTimestamp(currentTime);
streamEventChunk.insertBeforeCurrent(expiredEvent);
} else {
- expiredEventChunk.reset();
+ expiredEventQueue.reset();
break;
}
}
if (streamEvent.getType() == StreamEvent.Type.CURRENT) {
- this.expiredEventChunk.add(clonedEvent);
+ this.expiredEventQueue.add(clonedEvent);
}
- expiredEventChunk.reset();
+ expiredEventQueue.reset();
}
}
nextProcessor.process(streamEventChunk);
@@ -164,19 +166,19 @@ public void stop() {
@Override
public Map currentState() {
Map state = new HashMap<>();
- state.put("ExpiredEventChunk", expiredEventChunk.getFirst());
+ state.put("ExpiredEventQueue", expiredEventQueue.getSnapshot());
return state;
}
@Override
public void restoreState(Map state) {
- expiredEventChunk.clear();
- expiredEventChunk.add((StreamEvent) state.get("ExpiredEventChunk"));
+ expiredEventQueue.clear();
+ expiredEventQueue.restore((SnapshotStateList) state.get("ExpiredEventQueue"));
}
@Override
public synchronized StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
- return ((Operator) compiledCondition).find(matchingEvent, expiredEventChunk, streamEventCloner);
+ return ((Operator) compiledCondition).find(matchingEvent, expiredEventQueue, streamEventCloner);
}
@Override
@@ -184,7 +186,7 @@ public CompiledCondition compileCondition(Expression condition, MatchingMetaInfo
SiddhiAppContext siddhiAppContext,
List variableExpressionExecutors,
Map tableMap, String queryName) {
- return OperatorParser.constructOperator(expiredEventChunk, condition, matchingMetaInfoHolder,
+ return OperatorParser.constructOperator(expiredEventQueue, condition, matchingMetaInfoHolder,
siddhiAppContext, variableExpressionExecutors, tableMap, this.queryName);
}
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/LengthBatchWindowProcessor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/LengthBatchWindowProcessor.java
index 87747a3f1d..f1abeb21df 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/LengthBatchWindowProcessor.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/LengthBatchWindowProcessor.java
@@ -27,6 +27,7 @@
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
+import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
@@ -37,6 +38,7 @@
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;
@@ -78,20 +80,21 @@ public class LengthBatchWindowProcessor extends WindowProcessor implements Finda
private int length;
private int count = 0;
- private ComplexEventChunk currentEventChunk = new ComplexEventChunk(false);
- private ComplexEventChunk expiredEventChunk = null;
+ private SnapshotableStreamEventQueue currentEventQueue;
+ private SnapshotableStreamEventQueue expiredEventQueue = null;
private boolean outputExpectsExpiredEvents;
private SiddhiAppContext siddhiAppContext;
private StreamEvent resetEvent = null;
@Override
- protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, boolean
- outputExpectsExpiredEvents, SiddhiAppContext siddhiAppContext) {
+ protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
+ boolean outputExpectsExpiredEvents, SiddhiAppContext siddhiAppContext) {
this.outputExpectsExpiredEvents = outputExpectsExpiredEvents;
this.siddhiAppContext = siddhiAppContext;
+ currentEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder);
if (outputExpectsExpiredEvents) {
- expiredEventChunk = new ComplexEventChunk(false);
+ expiredEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder);
}
if (attributeExpressionExecutors.length == 1) {
length = (Integer) (((ConstantExpressionExecutor) attributeExpressionExecutors[0]).getValue());
@@ -111,43 +114,43 @@ protected void process(ComplexEventChunk streamEventChunk, Processo
while (streamEventChunk.hasNext()) {
StreamEvent streamEvent = streamEventChunk.next();
StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
- currentEventChunk.add(clonedStreamEvent);
+ currentEventQueue.add(clonedStreamEvent);
count++;
if (count == length) {
if (outputExpectsExpiredEvents) {
- if (expiredEventChunk.getFirst() != null) {
- while (expiredEventChunk.hasNext()) {
- StreamEvent expiredEvent = expiredEventChunk.next();
+ if (expiredEventQueue.getFirst() != null) {
+ while (expiredEventQueue.hasNext()) {
+ StreamEvent expiredEvent = expiredEventQueue.next();
expiredEvent.setTimestamp(currentTime);
}
- outputStreamEventChunk.add(expiredEventChunk.getFirst());
+ outputStreamEventChunk.add(expiredEventQueue.getFirst());
}
}
- if (expiredEventChunk != null) {
- expiredEventChunk.clear();
+ if (expiredEventQueue != null) {
+ expiredEventQueue.clear();
}
- if (currentEventChunk.getFirst() != null) {
+ if (currentEventQueue.getFirst() != null) {
// add reset event in front of current events
outputStreamEventChunk.add(resetEvent);
resetEvent = null;
- if (expiredEventChunk != null) {
- currentEventChunk.reset();
- while (currentEventChunk.hasNext()) {
- StreamEvent currentEvent = currentEventChunk.next();
+ if (expiredEventQueue != null) {
+ currentEventQueue.reset();
+ while (currentEventQueue.hasNext()) {
+ StreamEvent currentEvent = currentEventQueue.next();
StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(currentEvent);
toExpireEvent.setType(StreamEvent.Type.EXPIRED);
- expiredEventChunk.add(toExpireEvent);
+ expiredEventQueue.add(toExpireEvent);
}
}
- resetEvent = streamEventCloner.copyStreamEvent(currentEventChunk.getFirst());
+ resetEvent = streamEventCloner.copyStreamEvent(currentEventQueue.getFirst());
resetEvent.setType(ComplexEvent.Type.RESET);
- outputStreamEventChunk.add(currentEventChunk.getFirst());
+ outputStreamEventChunk.add(currentEventQueue.getFirst());
}
- currentEventChunk.clear();
+ currentEventQueue.clear();
count = 0;
if (outputStreamEventChunk.getFirst() != null) {
streamEventChunks.add(outputStreamEventChunk);
@@ -175,8 +178,8 @@ public Map currentState() {
Map state = new HashMap<>();
synchronized (this) {
state.put("Count", count);
- state.put("CurrentEventChunk", currentEventChunk.getFirst());
- state.put("ExpiredEventChunk", expiredEventChunk != null ? expiredEventChunk.getFirst() : null);
+ state.put("CurrentEventQueue", currentEventQueue.getSnapshot());
+ state.put("ExpiredEventQueue", expiredEventQueue.getSnapshot());
state.put("ResetEvent", resetEvent);
}
return state;
@@ -186,29 +189,30 @@ public Map currentState() {
@Override
public synchronized void restoreState(Map state) {
count = (int) state.get("Count");
- currentEventChunk.clear();
- currentEventChunk.add((StreamEvent) state.get("CurrentEventChunk"));
- if (expiredEventChunk != null) {
- expiredEventChunk.clear();
- expiredEventChunk.add((StreamEvent) state.get("ExpiredEventChunk"));
+ currentEventQueue.clear();
+ currentEventQueue.restore((SnapshotStateList) state.get("CurrentEventQueue"));
+
+ if (expiredEventQueue != null) {
+ expiredEventQueue.clear();
+ expiredEventQueue.restore((SnapshotStateList) state.get("ExpiredEventQueue"));
}
resetEvent = (StreamEvent) state.get("ResetEvent");
}
@Override
public synchronized StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
- return ((Operator) compiledCondition).find(matchingEvent, expiredEventChunk, streamEventCloner);
+ return ((Operator) compiledCondition).find(matchingEvent, expiredEventQueue, streamEventCloner);
}
@Override
public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder,
- SiddhiAppContext siddhiAppContext,
- List variableExpressionExecutors,
- Map tableMap, String queryName) {
- if (expiredEventChunk == null) {
- expiredEventChunk = new ComplexEventChunk(false);
+ SiddhiAppContext siddhiAppContext,
+ List variableExpressionExecutors,
+ Map tableMap, String queryName) {
+ if (expiredEventQueue == null) {
+ expiredEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder);
}
- return OperatorParser.constructOperator(expiredEventChunk, condition, matchingMetaInfoHolder,
+ return OperatorParser.constructOperator(expiredEventQueue, condition, matchingMetaInfoHolder,
siddhiAppContext, variableExpressionExecutors, tableMap, this.queryName);
}
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/LengthWindowProcessor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/LengthWindowProcessor.java
index 10635ad5cf..49fecd4e6b 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/LengthWindowProcessor.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/LengthWindowProcessor.java
@@ -27,6 +27,7 @@
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
+import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
@@ -37,6 +38,7 @@
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;
@@ -74,7 +76,7 @@ public class LengthWindowProcessor extends WindowProcessor implements FindablePr
private int length;
private int count = 0;
- private ComplexEventChunk expiredEventChunk;
+ private SnapshotableStreamEventQueue expiredEventQueue;
public int getLength() {
return length;
@@ -87,13 +89,13 @@ public void setLength(int length) {
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, boolean
outputExpectsExpiredEvents, SiddhiAppContext siddhiAppContext) {
- expiredEventChunk = new ComplexEventChunk(false);
if (attributeExpressionExecutors.length == 1) {
length = (Integer) ((ConstantExpressionExecutor) attributeExpressionExecutors[0]).getValue();
} else {
throw new SiddhiAppValidationException("Length window should only have one parameter ( " +
"windowLength), but found " + attributeExpressionExecutors.length + " input attributes");
}
+ expiredEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder, length);
}
@Override
@@ -107,13 +109,13 @@ protected void process(ComplexEventChunk streamEventChunk, Processo
clonedEvent.setType(StreamEvent.Type.EXPIRED);
if (count < length) {
count++;
- this.expiredEventChunk.add(clonedEvent);
+ this.expiredEventQueue.add(clonedEvent);
} else {
- StreamEvent firstEvent = this.expiredEventChunk.poll();
+ StreamEvent firstEvent = this.expiredEventQueue.poll();
if (firstEvent != null) {
firstEvent.setTimestamp(currentTime);
streamEventChunk.insertBeforeCurrent(firstEvent);
- this.expiredEventChunk.add(clonedEvent);
+ this.expiredEventQueue.add(clonedEvent);
} else {
StreamEvent resetEvent = streamEventCloner.copyStreamEvent(streamEvent);
resetEvent.setType(ComplexEvent.Type.RESET);
@@ -137,15 +139,15 @@ protected void process(ComplexEventChunk streamEventChunk, Processo
@Override
public synchronized StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
- return ((Operator) compiledCondition).find(matchingEvent, expiredEventChunk, streamEventCloner);
+ return ((Operator) compiledCondition).find(matchingEvent, expiredEventQueue, streamEventCloner);
}
@Override
public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder,
- SiddhiAppContext siddhiAppContext,
- List variableExpressionExecutors,
- Map tableMap, String queryName) {
- return OperatorParser.constructOperator(expiredEventChunk, condition, matchingMetaInfoHolder,
+ SiddhiAppContext siddhiAppContext,
+ List variableExpressionExecutors,
+ Map tableMap, String queryName) {
+ return OperatorParser.constructOperator(expiredEventQueue, condition, matchingMetaInfoHolder,
siddhiAppContext, variableExpressionExecutors, tableMap, this.queryName);
}
@@ -165,7 +167,7 @@ public Map currentState() {
Map state = new HashMap<>();
synchronized (this) {
state.put("Count", count);
- state.put("ExpiredEventChunk", expiredEventChunk.getFirst());
+ state.put("ExpiredEventQueue", expiredEventQueue.getSnapshot());
}
return state;
}
@@ -174,7 +176,7 @@ public Map currentState() {
@Override
public synchronized void restoreState(Map state) {
count = (int) state.get("Count");
- expiredEventChunk.clear();
- expiredEventChunk.add((StreamEvent) state.get("ExpiredEventChunk"));
+ expiredEventQueue.clear();
+ expiredEventQueue.restore((SnapshotStateList) state.get("ExpiredEventQueue"));
}
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/TimeBatchWindowProcessor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/TimeBatchWindowProcessor.java
index fdaddfbaac..6be112c25c 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/TimeBatchWindowProcessor.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/TimeBatchWindowProcessor.java
@@ -27,6 +27,7 @@
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
+import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
@@ -39,6 +40,7 @@
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;
@@ -86,8 +88,8 @@ public class TimeBatchWindowProcessor extends WindowProcessor implements Schedul
private long timeInMilliSeconds;
private long nextEmitTime = -1;
- private ComplexEventChunk currentEventChunk = new ComplexEventChunk(false);
- private ComplexEventChunk expiredEventChunk = null;
+ private SnapshotableStreamEventQueue currentEventQueue;
+ private SnapshotableStreamEventQueue expiredEventQueue = null;
private StreamEvent resetEvent = null;
private Scheduler scheduler;
private boolean outputExpectsExpiredEvents;
@@ -114,8 +116,9 @@ protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigRea
outputExpectsExpiredEvents, SiddhiAppContext siddhiAppContext) {
this.outputExpectsExpiredEvents = outputExpectsExpiredEvents;
this.siddhiAppContext = siddhiAppContext;
+ currentEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder);
if (outputExpectsExpiredEvents) {
- this.expiredEventChunk = new ComplexEventChunk(false);
+ this.expiredEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder);
}
if (attributeExpressionExecutors.length == 1) {
if (attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor) {
@@ -205,45 +208,45 @@ protected void process(ComplexEventChunk streamEventChunk, Processo
continue;
}
StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
- currentEventChunk.add(clonedStreamEvent);
+ currentEventQueue.add(clonedStreamEvent);
}
streamEventChunk.clear();
if (sendEvents) {
if (outputExpectsExpiredEvents) {
- if (expiredEventChunk.getFirst() != null) {
- while (expiredEventChunk.hasNext()) {
- StreamEvent expiredEvent = expiredEventChunk.next();
+ if (expiredEventQueue.getFirst() != null) {
+ while (expiredEventQueue.hasNext()) {
+ StreamEvent expiredEvent = expiredEventQueue.next();
expiredEvent.setTimestamp(currentTime);
}
- streamEventChunk.add(expiredEventChunk.getFirst());
+ streamEventChunk.add(expiredEventQueue.getFirst());
}
}
- if (expiredEventChunk != null) {
- expiredEventChunk.clear();
+ if (expiredEventQueue != null) {
+ expiredEventQueue.clear();
}
- if (currentEventChunk.getFirst() != null) {
+ if (currentEventQueue.getFirst() != null) {
// add reset event in front of current events
streamEventChunk.add(resetEvent);
resetEvent = null;
- if (expiredEventChunk != null) {
- currentEventChunk.reset();
- while (currentEventChunk.hasNext()) {
- StreamEvent currentEvent = currentEventChunk.next();
+ if (expiredEventQueue != null) {
+ currentEventQueue.reset();
+ while (currentEventQueue.hasNext()) {
+ StreamEvent currentEvent = currentEventQueue.next();
StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(currentEvent);
toExpireEvent.setType(StreamEvent.Type.EXPIRED);
- expiredEventChunk.add(toExpireEvent);
+ expiredEventQueue.add(toExpireEvent);
}
}
- resetEvent = streamEventCloner.copyStreamEvent(currentEventChunk.getFirst());
+ resetEvent = streamEventCloner.copyStreamEvent(currentEventQueue.getFirst());
resetEvent.setType(ComplexEvent.Type.RESET);
- streamEventChunk.add(currentEventChunk.getFirst());
+ streamEventChunk.add(currentEventQueue.getFirst());
}
- currentEventChunk.clear();
+ currentEventQueue.clear();
}
}
if (streamEventChunk.getFirst() != null) {
@@ -275,8 +278,8 @@ public void stop() {
public Map currentState() {
Map state = new HashMap<>();
synchronized (this) {
- state.put("CurrentEventChunk", currentEventChunk.getFirst());
- state.put("ExpiredEventChunk", expiredEventChunk != null ? expiredEventChunk.getFirst() : null);
+ state.put("CurrentEventQueue", currentEventQueue.getSnapshot());
+ state.put("ExpiredEventQueue", expiredEventQueue != null ? expiredEventQueue.getSnapshot() : null);
state.put("ResetEvent", resetEvent);
}
return state;
@@ -284,18 +287,16 @@ public Map currentState() {
@Override
public synchronized void restoreState(Map state) {
- if (expiredEventChunk != null) {
- expiredEventChunk.clear();
- expiredEventChunk.add((StreamEvent) state.get("ExpiredEventChunk"));
+ if (expiredEventQueue != null) {
+ expiredEventQueue.restore((SnapshotStateList) state.get("ExpiredEventQueue"));
}
- currentEventChunk.clear();
- currentEventChunk.add((StreamEvent) state.get("CurrentEventChunk"));
+ currentEventQueue.restore((SnapshotStateList) state.get("CurrentEventQueue"));
resetEvent = (StreamEvent) state.get("ResetEvent");
}
@Override
public synchronized StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
- return ((Operator) compiledCondition).find(matchingEvent, expiredEventChunk, streamEventCloner);
+ return ((Operator) compiledCondition).find(matchingEvent, expiredEventQueue, streamEventCloner);
}
@Override
@@ -303,10 +304,10 @@ public CompiledCondition compileCondition(Expression condition, MatchingMetaInfo
SiddhiAppContext siddhiAppContext,
List variableExpressionExecutors,
Map tableMap, String queryName) {
- if (expiredEventChunk == null) {
- expiredEventChunk = new ComplexEventChunk(false);
+ if (expiredEventQueue == null) {
+ expiredEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder);
}
- return OperatorParser.constructOperator(expiredEventChunk, condition, matchingMetaInfoHolder,
+ return OperatorParser.constructOperator(expiredEventQueue, condition, matchingMetaInfoHolder,
siddhiAppContext, variableExpressionExecutors, tableMap,
this.queryName);
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/TimeWindowProcessor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/TimeWindowProcessor.java
index 5708fb0836..250cf741ad 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/TimeWindowProcessor.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/processor/stream/window/TimeWindowProcessor.java
@@ -26,6 +26,7 @@
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
+import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
@@ -38,6 +39,7 @@
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;
@@ -77,7 +79,7 @@
public class TimeWindowProcessor extends WindowProcessor implements SchedulingProcessor, FindableProcessor {
private long timeInMilliSeconds;
- private ComplexEventChunk expiredEventChunk;
+ private SnapshotableStreamEventQueue expiredEventQueue;
private Scheduler scheduler;
private SiddhiAppContext siddhiAppContext;
private volatile long lastTimestamp = Long.MIN_VALUE;
@@ -100,7 +102,7 @@ public void setScheduler(Scheduler scheduler) {
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, boolean
outputExpectsExpiredEvents, SiddhiAppContext siddhiAppContext) {
this.siddhiAppContext = siddhiAppContext;
- this.expiredEventChunk = new ComplexEventChunk(false);
+ this.expiredEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder);
if (attributeExpressionExecutors.length == 1) {
if (attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor) {
if (attributeExpressionExecutors[0].getReturnType() == Attribute.Type.INT) {
@@ -133,12 +135,12 @@ protected void process(ComplexEventChunk streamEventChunk, Processo
StreamEvent streamEvent = streamEventChunk.next();
long currentTime = siddhiAppContext.getTimestampGenerator().currentTime();
- expiredEventChunk.reset();
- while (expiredEventChunk.hasNext()) {
- StreamEvent expiredEvent = expiredEventChunk.next();
+ expiredEventQueue.reset();
+ while (expiredEventQueue.hasNext()) {
+ StreamEvent expiredEvent = expiredEventQueue.next();
long timeDiff = expiredEvent.getTimestamp() - currentTime + timeInMilliSeconds;
if (timeDiff <= 0) {
- expiredEventChunk.remove();
+ expiredEventQueue.remove();
expiredEvent.setTimestamp(currentTime);
streamEventChunk.insertBeforeCurrent(expiredEvent);
} else {
@@ -150,7 +152,7 @@ protected void process(ComplexEventChunk streamEventChunk, Processo
StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent);
clonedEvent.setType(StreamEvent.Type.EXPIRED);
- this.expiredEventChunk.add(clonedEvent);
+ this.expiredEventQueue.add(clonedEvent);
if (lastTimestamp < clonedEvent.getTimestamp()) {
scheduler.notifyAt(clonedEvent.getTimestamp() + timeInMilliSeconds);
@@ -160,22 +162,22 @@ protected void process(ComplexEventChunk streamEventChunk, Processo
streamEventChunk.remove();
}
}
- expiredEventChunk.reset();
+ expiredEventQueue.reset();
}
nextProcessor.process(streamEventChunk);
}
@Override
public synchronized StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
- return ((Operator) compiledCondition).find(matchingEvent, expiredEventChunk, streamEventCloner);
+ return ((Operator) compiledCondition).find(matchingEvent, expiredEventQueue, streamEventCloner);
}
@Override
public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder,
- SiddhiAppContext siddhiAppContext,
- List variableExpressionExecutors,
- Map tableMap, String queryName) {
- return OperatorParser.constructOperator(expiredEventChunk, condition, matchingMetaInfoHolder,
+ SiddhiAppContext siddhiAppContext,
+ List variableExpressionExecutors,
+ Map tableMap, String queryName) {
+ return OperatorParser.constructOperator(expiredEventQueue, condition, matchingMetaInfoHolder,
siddhiAppContext, variableExpressionExecutors, tableMap, this.queryName);
}
@@ -192,13 +194,12 @@ public void stop() {
@Override
public Map currentState() {
Map state = new HashMap<>();
- state.put("ExpiredEventChunk", expiredEventChunk.getFirst());
+ state.put("ExpiredEventQueue", expiredEventQueue.getSnapshot());
return state;
}
@Override
public void restoreState(Map state) {
- expiredEventChunk.clear();
- expiredEventChunk.add((StreamEvent) state.get("ExpiredEventChunk"));
+ expiredEventQueue.restore((SnapshotStateList) state.get("ExpiredEventQueue"));
}
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/selector/attribute/processor/executor/GroupByAggregationAttributeExecutor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/selector/attribute/processor/executor/GroupByAggregationAttributeExecutor.java
index 0c27b6c219..da609058ba 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/selector/attribute/processor/executor/GroupByAggregationAttributeExecutor.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/selector/attribute/processor/executor/GroupByAggregationAttributeExecutor.java
@@ -107,23 +107,19 @@ public ExpressionExecutor cloneExecutor(String key) {
@Override
public Map currentState() {
- HashMap> data = new HashMap<>();
+ Map state = new HashMap<>();
for (Map.Entry entry : aggregatorMap.entrySet()) {
- data.put(entry.getKey(), entry.getValue().currentState());
+ state.put(entry.getKey(), entry.getValue().currentState());
}
- Map state = new HashMap<>();
- state.put("Data", data);
return state;
}
@Override
public void restoreState(Map state) {
- HashMap> data = (HashMap>) state.get("Data");
-
- for (Map.Entry> entry : data.entrySet()) {
- String key = entry.getKey();
+ for (HashMap.Entry item: state.entrySet()) {
+ String key = item.getKey();
AttributeAggregator aAttributeAggregator = attributeAggregator.cloneAggregator(key);
- aAttributeAggregator.restoreState(entry.getValue());
+ aAttributeAggregator.restoreState((Map) item.getValue());
aggregatorMap.put(key, aAttributeAggregator);
}
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/InMemoryTable.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/InMemoryTable.java
index fdbedb388e..b2de05a5f2 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/InMemoryTable.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/InMemoryTable.java
@@ -38,6 +38,7 @@
import org.wso2.siddhi.core.util.parser.ExpressionParser;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.core.util.snapshot.Snapshotable;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.definition.TableDefinition;
import org.wso2.siddhi.query.api.execution.query.output.stream.UpdateSet;
import org.wso2.siddhi.query.api.expression.Expression;
@@ -169,7 +170,6 @@ public StreamEvent find(CompiledCondition compiledCondition, StateEvent matching
} finally {
readWriteLock.readLock().unlock();
}
-
}
@Override
@@ -203,13 +203,13 @@ public CompiledUpdateSet compileUpdateSet(UpdateSet updateSet, MatchingMetaInfoH
@Override
public Map currentState() {
Map state = new HashMap<>();
- state.put("EventHolder", eventHolder);
+ state.put("EventHolder", eventHolder.getSnapshot());
return state;
}
@Override
public void restoreState(Map state) {
- eventHolder = (EventHolder) state.get("EventHolder");
+ eventHolder.restore((SnapshotStateList) state.get("EventHolder"));
}
@Override
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/EventHolder.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/EventHolder.java
index ae1db48439..ed146505f9 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/EventHolder.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/EventHolder.java
@@ -20,6 +20,8 @@
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotState;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
/**
* Base EventHolder interface. EventHolder is a container of {@link StreamEvent}s. You can add {@link ComplexEventChunk}
@@ -27,4 +29,8 @@
*/
public interface EventHolder {
void add(ComplexEventChunk addingEventChunk);
+
+ SnapshotState getSnapshot();
+
+ void restore(SnapshotStateList snapshotStatelist);
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/IndexEventHolder.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/IndexEventHolder.java
index fa87cf011d..a6c9b14ba2 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/IndexEventHolder.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/IndexEventHolder.java
@@ -22,15 +22,20 @@
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
+import org.wso2.siddhi.core.event.stream.Operation;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.event.stream.converter.StreamEventConverter;
import org.wso2.siddhi.core.exception.OperationNotSupportedException;
import org.wso2.siddhi.core.util.SiddhiConstants;
+import org.wso2.siddhi.core.util.snapshot.SnapshotRequest;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotState;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.expression.condition.Compare;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,6 +45,13 @@
import java.util.Set;
import java.util.TreeMap;
+import static org.wso2.siddhi.core.event.stream.Operation.Operator.ADD;
+import static org.wso2.siddhi.core.event.stream.Operation.Operator.CLEAR;
+import static org.wso2.siddhi.core.event.stream.Operation.Operator.DELETE_BY_OPERATOR;
+import static org.wso2.siddhi.core.event.stream.Operation.Operator.OVERWRITE;
+import static org.wso2.siddhi.core.event.stream.Operation.Operator.REMOVE;
+
+
/**
* EventHolder implementation where events will be indexed and stored. This will offer faster access compared to
* other EventHolder implementations. User can only add unique events based on a given primary key.
@@ -59,6 +71,11 @@ public class IndexEventHolder implements IndexedEventHolder, Serializable {
private Map indexMetaData;
private Map multiPrimaryKeyMetaData = new LinkedHashMap<>();
private Map allIndexMetaData = new HashMap<>();
+ private ArrayList operationChangeLog = new ArrayList<>();
+ private long eventsCount;
+ private static final float FULL_SNAPSHOT_THRESHOLD = 2.1f;
+ private boolean forceFullSnapshot = true;
+ private boolean isOperationLogEnabled = true;
public IndexEventHolder(StreamEventPool tableStreamEventPool, StreamEventConverter eventConverter,
PrimaryKeyReferenceHolder[] primaryKeyReferenceHolders,
@@ -142,12 +159,22 @@ public void add(ComplexEventChunk addingEventChunk) {
ComplexEvent complexEvent = addingEventChunk.next();
StreamEvent streamEvent = tableStreamEventPool.borrowEvent();
eventConverter.convertComplexEvent(complexEvent, streamEvent);
+ eventsCount++;
+ if (isOperationLogEnabled) {
+ if (!isFullSnapshot()) {
+ StreamEvent streamEvent2 = tableStreamEventPool.borrowEvent();
+ eventConverter.convertComplexEvent(complexEvent, streamEvent2);
+ operationChangeLog.add(new Operation(ADD, streamEvent2));
+ } else {
+ operationChangeLog.clear();
+ forceFullSnapshot = true;
+ }
+ }
add(streamEvent);
}
}
private void add(StreamEvent streamEvent) {
-
StreamEvent existingValue = null;
if (primaryKeyData != null) {
Object primaryKey = constructPrimaryKey(streamEvent, primaryKeyReferenceHolders);
@@ -191,6 +218,16 @@ private Object constructPrimaryKey(StreamEvent streamEvent,
@Override
public void overwrite(StreamEvent streamEvent) {
+ if (isOperationLogEnabled) {
+ if (!isFullSnapshot()) {
+ StreamEvent streamEvent2 = tableStreamEventPool.borrowEvent();
+ eventConverter.convertComplexEvent(streamEvent, streamEvent2);
+ operationChangeLog.add(new Operation(OVERWRITE, streamEvent2));
+ } else {
+ operationChangeLog.clear();
+ forceFullSnapshot = true;
+ }
+ }
StreamEvent deletedEvent = null;
if (primaryKeyData != null) {
Object primaryKey = constructPrimaryKey(streamEvent, primaryKeyReferenceHolders);
@@ -329,6 +366,14 @@ public Collection findEvents(String attribute, Compare.Operator ope
@Override
public void deleteAll() {
+ if (isOperationLogEnabled) {
+ if (!isFullSnapshot()) {
+ operationChangeLog.add(new Operation(CLEAR));
+ } else {
+ operationChangeLog.clear();
+ forceFullSnapshot = true;
+ }
+ }
if (primaryKeyData != null) {
primaryKeyData.clear();
}
@@ -342,20 +387,45 @@ public void deleteAll() {
@Override
public void deleteAll(Collection storeEventSet) {
for (StreamEvent streamEvent : storeEventSet) {
- if (primaryKeyData != null) {
- Object primaryKey = constructPrimaryKey(streamEvent, primaryKeyReferenceHolders);
- StreamEvent deletedEvent = primaryKeyData.remove(primaryKey);
- if (indexData != null) {
- deleteFromIndexes(deletedEvent);
+ if (isOperationLogEnabled) {
+ if (!isFullSnapshot()) {
+ StreamEvent streamEvent2 = tableStreamEventPool.borrowEvent();
+ eventConverter.convertComplexEvent(streamEvent, streamEvent2);
+ operationChangeLog.add(new Operation(REMOVE, streamEvent));
+ } else {
+ operationChangeLog.clear();
+ forceFullSnapshot = true;
}
- } else if (indexData != null) {
- deleteFromIndexes(streamEvent);
}
+ deleteAll(streamEvent);
+
+ }
+ }
+
+ private void deleteAll(StreamEvent streamEvent) {
+ if (primaryKeyData != null) {
+ Object primaryKey = constructPrimaryKey(streamEvent, primaryKeyReferenceHolders);
+ StreamEvent deletedEvent = primaryKeyData.remove(primaryKey);
+ if (indexData != null) {
+ deleteFromIndexes(deletedEvent);
+ }
+ } else if (indexData != null) {
+ deleteFromIndexes(streamEvent);
}
}
@Override
public void delete(String attribute, Compare.Operator operator, Object value) {
+
+ if (isOperationLogEnabled) {
+ if (!isFullSnapshot()) {
+ operationChangeLog.add(new Operation(DELETE_BY_OPERATOR, new Object[]{attribute, operator, value}));
+ } else {
+ operationChangeLog.clear();
+ forceFullSnapshot = true;
+ }
+ }
+
if (primaryKeyData != null && attribute.equals(primaryKeyAttributes)) {
switch (operator) {
@@ -541,4 +611,67 @@ private void deleteFromIndexes(StreamEvent toDeleteEvent) {
}
}
+ private boolean isFullSnapshot() {
+ return operationChangeLog.size() > (eventsCount * FULL_SNAPSHOT_THRESHOLD)
+ || forceFullSnapshot
+ || SnapshotRequest.isRequestForFullSnapshot();
+ }
+
+ public SnapshotState getSnapshot() {
+ if (isFullSnapshot()) {
+ forceFullSnapshot = false;
+ return new SnapshotState(this, false);
+ } else {
+ SnapshotState snapshot = new SnapshotState(operationChangeLog, true);
+ operationChangeLog = new ArrayList<>();
+ return snapshot;
+ }
+ }
+
+ public void restore(SnapshotStateList snapshotStatelist) {
+ TreeMap revisions = snapshotStatelist.getSnapshotStates();
+ Iterator> itr = revisions.entrySet().iterator();
+ this.isOperationLogEnabled = false;
+ while (itr.hasNext()) {
+ Map.Entry snapshotEntry = itr.next();
+ if (!snapshotEntry.getValue().isIncrementalSnapshot()) {
+ this.deleteAll();
+ IndexEventHolder snapshotEventHolder = (IndexEventHolder) snapshotEntry.getValue().getState();
+ if (primaryKeyData != null) {
+ primaryKeyData.clear();
+ primaryKeyData.putAll(snapshotEventHolder.primaryKeyData);
+ }
+ if (indexData != null) {
+ indexData.clear();
+ indexData.putAll(snapshotEventHolder.indexData);
+ }
+ forceFullSnapshot = false;
+ } else {
+ ArrayList operations = (ArrayList) snapshotEntry.getValue().getState();
+ for (Operation op : operations) {
+ switch (op.operation) {
+ case ADD:
+ add((StreamEvent) op.parameters);
+ break;
+ case REMOVE:
+ deleteAll((StreamEvent) op.parameters);
+ break;
+ case CLEAR:
+ deleteAll();
+ break;
+ case OVERWRITE:
+ overwrite((StreamEvent) op.parameters);
+ break;
+ case DELETE_BY_OPERATOR:
+ Object[] args = (Object[]) op.parameters;
+ delete((String) args[0], (Compare.Operator) args[1], args[2]);
+ break;
+ default:
+ continue;
+ }
+ }
+ }
+ }
+ this.isOperationLogEnabled = true;
+ }
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/ListEventHolder.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/ListEventHolder.java
index 14b02a3845..29bc1666e5 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/ListEventHolder.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/table/holder/ListEventHolder.java
@@ -23,20 +23,22 @@
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.event.stream.converter.StreamEventConverter;
-
-import java.util.LinkedList;
+import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
+import org.wso2.siddhi.core.event.stream.holder.StreamEventClonerHolder;
/**
* Holder object to contain a list of {@link StreamEvent}. Users can add {@link ComplexEventChunk}s to the
* {@link ListEventHolder} where events in chunk will be added to the {@link StreamEvent} list.
*/
-public class ListEventHolder extends LinkedList implements EventHolder {
+public class ListEventHolder extends SnapshotableStreamEventQueue implements EventHolder {
private static final long serialVersionUID = 4695745058501269511L;
private StreamEventPool tableStreamEventPool;
private StreamEventConverter eventConverter;
- public ListEventHolder(StreamEventPool tableStreamEventPool, StreamEventConverter eventConverter) {
+ public ListEventHolder(StreamEventPool tableStreamEventPool, StreamEventConverter eventConverter,
+ StreamEventClonerHolder streamEventClonerHolder) {
+ super(streamEventClonerHolder);
this.tableStreamEventPool = tableStreamEventPool;
this.eventConverter = eventConverter;
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/collection/operator/EventChunkOperator.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/collection/operator/EventChunkOperator.java
index 19721a25c2..0aab9d2f5b 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/collection/operator/EventChunkOperator.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/collection/operator/EventChunkOperator.java
@@ -163,5 +163,4 @@ public ComplexEventChunk tryUpdate(ComplexEventChunk up
}
return failedEventChunk;
}
-
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/collection/operator/SnapshotableEventQueueOperator.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/collection/operator/SnapshotableEventQueueOperator.java
new file mode 100644
index 0000000000..50a5058f63
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/collection/operator/SnapshotableEventQueueOperator.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.siddhi.core.util.collection.operator;
+
+import org.wso2.siddhi.core.event.ComplexEventChunk;
+import org.wso2.siddhi.core.event.state.StateEvent;
+import org.wso2.siddhi.core.event.stream.StreamEvent;
+import org.wso2.siddhi.core.event.stream.StreamEventCloner;
+import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.table.InMemoryCompiledUpdateSet;
+import org.wso2.siddhi.core.util.collection.AddingStreamEventExtractor;
+
+import java.util.Map;
+
+/**
+ * Operator which is related to non-indexed In-memory table operations.
+ */
+public class SnapshotableEventQueueOperator implements Operator {
+ protected ExpressionExecutor expressionExecutor;
+ protected int storeEventPosition;
+
+ public SnapshotableEventQueueOperator(ExpressionExecutor expressionExecutor, int storeEventPosition) {
+ this.expressionExecutor = expressionExecutor;
+ this.storeEventPosition = storeEventPosition;
+ }
+
+ @Override
+ public CompiledCondition cloneCompilation(String key) {
+ return new SnapshotableEventQueueOperator(expressionExecutor.cloneExecutor(key), storeEventPosition);
+ }
+
+ @Override
+ public StreamEvent find(StateEvent matchingEvent, Object storeEvents, StreamEventCloner storeEventCloner) {
+ SnapshotableStreamEventQueue storeEventQueue = (SnapshotableStreamEventQueue) storeEvents;
+ ComplexEventChunk returnEventChunk = new ComplexEventChunk(false);
+
+ storeEventQueue.reset();
+ while (storeEventQueue.hasNext()) {
+ StreamEvent storeEvent = storeEventQueue.next();
+ matchingEvent.setEvent(storeEventPosition, storeEvent);
+ if ((Boolean) expressionExecutor.execute(matchingEvent)) {
+ returnEventChunk.add(storeEventCloner.copyStreamEvent(storeEvent));
+ }
+ matchingEvent.setEvent(storeEventPosition, null);
+ }
+ return returnEventChunk.getFirst();
+
+ }
+
+ @Override
+ public boolean contains(StateEvent matchingEvent, Object storeEvents) {
+ SnapshotableStreamEventQueue storeEventQueue = (SnapshotableStreamEventQueue) storeEvents;
+ try {
+ storeEventQueue.reset();
+ while (storeEventQueue.hasNext()) {
+ StreamEvent storeEvent = storeEventQueue.next();
+ matchingEvent.setEvent(storeEventPosition, storeEvent);
+ if ((Boolean) expressionExecutor.execute(matchingEvent)) {
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ matchingEvent.setEvent(storeEventPosition, null);
+ }
+ }
+
+ @Override
+ public void delete(ComplexEventChunk deletingEventChunk, Object storeEvents) {
+ SnapshotableStreamEventQueue storeEventQueue = (SnapshotableStreamEventQueue) storeEvents;
+ deletingEventChunk.reset();
+ while (deletingEventChunk.hasNext()) {
+ StateEvent deletingEvent = deletingEventChunk.next();
+ try {
+ storeEventQueue.reset();
+ while (storeEventQueue.hasNext()) {
+ StreamEvent storeEvent = storeEventQueue.next();
+ deletingEvent.setEvent(storeEventPosition, storeEvent);
+ if ((Boolean) expressionExecutor.execute(deletingEvent)) {
+ storeEventQueue.remove();
+ }
+ }
+ } finally {
+ deletingEvent.setEvent(storeEventPosition, null);
+ }
+ }
+ }
+
+
+ @Override
+ public void update(ComplexEventChunk updatingEventChunk, Object storeEvents,
+ InMemoryCompiledUpdateSet compiledUpdateSet) {
+ SnapshotableStreamEventQueue storeEventQueue = (SnapshotableStreamEventQueue) storeEvents;
+ updatingEventChunk.reset();
+ while (updatingEventChunk.hasNext()) {
+ StateEvent updatingEvent = updatingEventChunk.next();
+ try {
+ storeEventQueue.reset();
+ while (storeEventQueue.hasNext()) {
+ StreamEvent storeEvent = storeEventQueue.next();
+ updatingEvent.setEvent(storeEventPosition, storeEvent);
+ if ((Boolean) expressionExecutor.execute(updatingEvent)) {
+ for (Map.Entry entry :
+ compiledUpdateSet.getExpressionExecutorMap().entrySet()) {
+ Object value = entry.getValue().execute(updatingEvent);
+ storeEvent.setOutputData(value, entry.getKey());
+ }
+ storeEventQueue.overwrite(storeEvent);
+ }
+ }
+ } finally {
+ updatingEvent.setEvent(storeEventPosition, null);
+ }
+ }
+ }
+
+ @Override
+ public ComplexEventChunk tryUpdate(ComplexEventChunk updatingOrAddingEventChunk,
+ Object storeEvents,
+ InMemoryCompiledUpdateSet compiledUpdateSet,
+ AddingStreamEventExtractor addingStreamEventExtractor) {
+ SnapshotableStreamEventQueue storeEventQueue = (SnapshotableStreamEventQueue) storeEvents;
+ updatingOrAddingEventChunk.reset();
+ ComplexEventChunk failedEventChunk = new ComplexEventChunk
+ (updatingOrAddingEventChunk.isBatch());
+ while (updatingOrAddingEventChunk.hasNext()) {
+ StateEvent overwritingOrAddingEvent = updatingOrAddingEventChunk.next();
+ try {
+ boolean updated = false;
+ storeEventQueue.reset();
+ while (storeEventQueue.hasNext()) {
+ StreamEvent storeEvent = storeEventQueue.next();
+ overwritingOrAddingEvent.setEvent(storeEventPosition, storeEvent);
+ if ((Boolean) expressionExecutor.execute(overwritingOrAddingEvent)) {
+ for (Map.Entry entry :
+ compiledUpdateSet.getExpressionExecutorMap().entrySet()) {
+ storeEvent.setOutputData(entry.getValue().
+ execute(overwritingOrAddingEvent), entry.getKey());
+ }
+ storeEventQueue.overwrite(storeEvent);
+ updated = true;
+ }
+ }
+ if (!updated) {
+ failedEventChunk.add(addingStreamEventExtractor.getAddingStreamEvent(overwritingOrAddingEvent));
+ }
+ } finally {
+ overwritingOrAddingEvent.setEvent(storeEventPosition, null);
+ }
+ }
+ return failedEventChunk;
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/CollectionExpressionParser.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/CollectionExpressionParser.java
index 83582025ca..904c4c7aab 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/CollectionExpressionParser.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/CollectionExpressionParser.java
@@ -395,7 +395,8 @@ private static boolean isCollectionVariable(MatchingMetaInfoHolder matchingMetaI
!= MetaStreamEvent.EventType.DEFAULT) {
return true;
} else if (matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvents().length == 2 &&
- matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvents()[1].getEventType()
+ matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvents()
+ [matchingMetaInfoHolder.getStoreEventIndex()].getEventType()
!= MetaStreamEvent.EventType.DEFAULT) {
return true;
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/EventHolderPasser.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/EventHolderPasser.java
index 9ea4d95f43..7712a037b6 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/EventHolderPasser.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/EventHolderPasser.java
@@ -20,14 +20,14 @@
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.SiddhiAppContext;
+import org.wso2.siddhi.core.event.stream.MetaStreamEvent;
+import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.event.stream.converter.ZeroStreamEventConverter;
+import org.wso2.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import org.wso2.siddhi.core.exception.OperationNotSupportedException;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
-import org.wso2.siddhi.core.table.holder.EventHolder;
-import org.wso2.siddhi.core.table.holder.IndexEventHolder;
-import org.wso2.siddhi.core.table.holder.ListEventHolder;
-import org.wso2.siddhi.core.table.holder.PrimaryKeyReferenceHolder;
+import org.wso2.siddhi.core.table.holder.*;
import org.wso2.siddhi.core.util.SiddhiConstants;
import org.wso2.siddhi.query.api.annotation.Annotation;
import org.wso2.siddhi.query.api.annotation.Element;
@@ -113,7 +113,13 @@ public static EventHolder parse(AbstractDefinition tableDefinition, StreamEventP
return new IndexEventHolder(tableStreamEventPool, eventConverter, primaryKeyReferenceHolders, isNumeric,
indexMetaData, tableDefinition, siddhiAppContext);
} else {
- return new ListEventHolder(tableStreamEventPool, eventConverter);
+ MetaStreamEvent metaStreamEvent = new MetaStreamEvent();
+ for (Attribute attribute : tableDefinition.getAttributeList()) {
+ metaStreamEvent.addOutputData(attribute);
+ }
+ StreamEventCloner streamEventCloner = new StreamEventCloner(metaStreamEvent, tableStreamEventPool);
+ return new ListEventHolder(tableStreamEventPool, eventConverter,
+ new StreamEventClonerHolder(streamEventCloner));
}
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OperatorParser.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OperatorParser.java
index 2bc1fae13f..403aa837c9 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OperatorParser.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OperatorParser.java
@@ -20,6 +20,8 @@
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEventChunk;
+import org.wso2.siddhi.core.util.collection.operator.SnapshotableEventQueueOperator;
+import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import org.wso2.siddhi.core.event.stream.MetaStreamEvent;
import org.wso2.siddhi.core.exception.OperationNotSupportedException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
@@ -88,6 +90,11 @@ public static Operator constructOperator(Object storeEvents, Expression expressi
matchingMetaInfoHolder.getMetaStateEvent(), matchingMetaInfoHolder.getCurrentState(), tableMap,
variableExpressionExecutors, siddhiAppContext, false, 0, queryName);
return new EventChunkOperator(expressionExecutor, matchingMetaInfoHolder.getStoreEventIndex());
+ } else if (storeEvents instanceof SnapshotableStreamEventQueue) {
+ ExpressionExecutor expressionExecutor = ExpressionParser.parseExpression(expression,
+ matchingMetaInfoHolder.getMetaStateEvent(), matchingMetaInfoHolder.getCurrentState(), tableMap,
+ variableExpressionExecutors, siddhiAppContext, false, 0, queryName);
+ return new SnapshotableEventQueueOperator(expressionExecutor, matchingMetaInfoHolder.getStoreEventIndex());
} else if (storeEvents instanceof Map) {
ExpressionExecutor expressionExecutor = ExpressionParser.parseExpression(expression,
matchingMetaInfoHolder.getMetaStateEvent(), matchingMetaInfoHolder.getCurrentState(), tableMap,
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/QueryParser.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/QueryParser.java
index 37c905f866..b8df9868cc 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/QueryParser.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/QueryParser.java
@@ -99,7 +99,7 @@ public static QueryRuntime parse(Query query, SiddhiAppContext siddhiAppContext,
if (nameElement != null) {
queryName = nameElement.getValue();
} else {
- queryName = "query_" + queryIndex + "_" + UUID.randomUUID().toString();
+ queryName = "query_" + queryIndex;
}
latencyTracker = QueryParserHelper.createLatencyTracker(siddhiAppContext, queryName,
SiddhiConstants.METRIC_INFIX_QUERIES, null);
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java
index 02e0354671..7ecd8a47ff 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java
@@ -29,7 +29,6 @@
import org.wso2.siddhi.core.util.SiddhiAppRuntimeBuilder;
import org.wso2.siddhi.core.util.SiddhiConstants;
import org.wso2.siddhi.core.util.ThreadBarrier;
-import org.wso2.siddhi.core.util.persistence.PersistenceService;
import org.wso2.siddhi.core.util.snapshot.SnapshotService;
import org.wso2.siddhi.core.util.timestamp.EventTimeBasedMillisTimestampGenerator;
import org.wso2.siddhi.core.util.timestamp.SystemCurrentTimeMillisTimestampGenerator;
@@ -212,7 +211,6 @@ public static SiddhiAppRuntimeBuilder parse(SiddhiApp siddhiApp, String siddhiAp
siddhiAppContext.setTimestampGenerator(new SystemCurrentTimeMillisTimestampGenerator());
}
siddhiAppContext.setSnapshotService(new SnapshotService(siddhiAppContext));
- siddhiAppContext.setPersistenceService(new PersistenceService(siddhiAppContext));
siddhiAppContext.setElementIdGenerator(new ElementIdGenerator(siddhiAppContext.getName()));
} catch (DuplicateAnnotationException e) {
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/FileSystemPersistenceStore.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/FileSystemPersistenceStore.java
new file mode 100644
index 0000000000..d0573835c6
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/FileSystemPersistenceStore.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.siddhi.core.util.persistence;
+
+import com.google.common.io.Files;
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.util.persistence.util.PersistenceConstants;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Implementation of Persistence Store that would persist snapshots to the file system
+ */
+public class FileSystemPersistenceStore implements PersistenceStore {
+
+ private static final Logger log = Logger.getLogger(FileSystemPersistenceStore.class);
+ private int numberOfRevisionsToSave;
+ private String folder;
+
+ @Override
+ public void save(String siddhiAppName, String revision, byte[] snapshot) {
+ File file = new File(folder + File.separator + siddhiAppName + File.separator + revision);
+ try {
+ Files.createParentDirs(file);
+ Files.write(snapshot, file);
+ cleanOldRevisions(siddhiAppName);
+ if (log.isDebugEnabled()) {
+ log.debug("Periodic persistence of " + siddhiAppName + " persisted successfully.");
+ }
+ } catch (IOException e) {
+ log.error("Cannot save the revision " + revision + " of SiddhiApp: " + siddhiAppName +
+ " to the file system.", e);
+ }
+ }
+
+ @Override
+ public void setProperties(Map properties) {
+ Map configurationMap = (Map) properties.get(PersistenceConstants.STATE_PERSISTENCE_CONFIGS);
+ Object numberOfRevisionsObject = properties.get(PersistenceConstants.STATE_PERSISTENCE_REVISIONS_TO_KEEP);
+
+ if (numberOfRevisionsObject == null || !(numberOfRevisionsObject instanceof Integer)) {
+ numberOfRevisionsToSave = 3;
+ if (log.isDebugEnabled()) {
+ log.debug("Number of revisions to keep is not set or invalid. Default value will be used.");
+ }
+ } else {
+ numberOfRevisionsToSave = Integer.parseInt(String.valueOf(numberOfRevisionsObject));
+ }
+
+ if (configurationMap != null) {
+ Object folderObject = configurationMap.get("location");
+ if (folderObject == null || !(folderObject instanceof String)) {
+ folder = PersistenceConstants.DEFAULT_FILE_PERSISTENCE_FOLDER;
+ if (log.isDebugEnabled()) {
+ log.debug("File system persistence location not set. Default persistence location will be used.");
+ }
+ } else {
+ folder = String.valueOf(folderObject);
+ }
+
+ } else {
+ folder = PersistenceConstants.DEFAULT_FILE_PERSISTENCE_FOLDER;
+ if (log.isDebugEnabled()) {
+ log.debug("File system persistence config not set. Default persistence location will be used.");
+ }
+ }
+ }
+
+ @Override
+ public byte[] load(String siddhiAppName, String revision) {
+ File file = new File(folder + File.separator + siddhiAppName + File.separator + revision);
+ try {
+ byte[] bytes = Files.toByteArray(file);
+ log.info("State loaded for " + siddhiAppName + " revision " + revision + " from the file system.");
+ return bytes;
+ } catch (IOException e) {
+ log.error("Cannot load the revision " + revision + " of SiddhiApp: " + siddhiAppName +
+ " from file system.", e);
+ }
+ return null;
+ }
+
+ @Override
+ public String getLastRevision(String siddhiAppName) {
+ File dir = new File(folder + File.separator + siddhiAppName);
+ File[] files = dir.listFiles();
+
+ if (files == null || files.length == 0) {
+ return null;
+ }
+
+ String lastRevision = null;
+ for (File file : files) {
+ String fileName = file.getName();
+ if (lastRevision == null || fileName.compareTo(lastRevision) > 0) {
+ lastRevision = fileName;
+ }
+ }
+ return lastRevision;
+ }
+
+ /**
+ * Method to remove revisions that are older than the user specified amount
+ *
+ * @param siddhiAppName is the name of the Siddhi Application whose old revisions to remove
+ */
+
+ private void cleanOldRevisions(String siddhiAppName) {
+ File targetDirectory = new File(folder + File.separator + siddhiAppName);
+ File[] files = targetDirectory.listFiles();
+ if (files != null) {
+ while (files.length > numberOfRevisionsToSave) {
+ String firstRevision = null;
+ for (File file : files) {
+ String fileName = file.getName();
+ if (firstRevision == null || fileName.compareTo(firstRevision) < 0) {
+ firstRevision = fileName;
+ }
+ }
+ File fileToDelete = new File(targetDirectory + File.separator + firstRevision);
+ if (fileToDelete.exists()) {
+ Boolean isDeleted = fileToDelete.delete();
+ if (!isDeleted) {
+ log.error("Error deleting old revision " + firstRevision);
+ }
+ }
+ files = targetDirectory.listFiles();
+ if (files == null || files.length < 1) {
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/InMemoryPersistenceStore.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/InMemoryPersistenceStore.java
index 5fa7b9b7ee..5ff67c740d 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/InMemoryPersistenceStore.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/InMemoryPersistenceStore.java
@@ -34,17 +34,13 @@ public class InMemoryPersistenceStore implements PersistenceStore {
Map> persistenceMap = new HashMap>();
Map> revisionMap = new HashMap>();
-
@Override
public void save(String siddhiAppId, String revision, byte[] data) {
Map executionPersistenceMap = persistenceMap.get(siddhiAppId);
if (executionPersistenceMap == null) {
executionPersistenceMap = new HashMap();
}
-
executionPersistenceMap.put(revision, data);
-
-
List revisionList = revisionMap.get(siddhiAppId);
if (revisionList == null) {
revisionList = new ArrayList();
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/IncrementalFileSystemPersistenceStore.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/IncrementalFileSystemPersistenceStore.java
new file mode 100644
index 0000000000..74235ea667
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/IncrementalFileSystemPersistenceStore.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.siddhi.core.util.persistence;
+
+import com.google.common.io.Files;
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.util.persistence.util.IncrementalSnapshotInfo;
+import org.wso2.siddhi.core.util.persistence.util.PersistenceConstants;
+import org.wso2.siddhi.core.util.persistence.util.PersistenceHelper;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of Persistence Store that would persist snapshots to the file system
+ */
+public class IncrementalFileSystemPersistenceStore implements IncrementalPersistenceStore {
+
+ private static final Logger log = Logger.getLogger(IncrementalFileSystemPersistenceStore.class);
+ private String folder;
+
+ public IncrementalFileSystemPersistenceStore(){
+ }
+
+ public IncrementalFileSystemPersistenceStore(String storageFilePath) {
+ folder = storageFilePath;
+ }
+
+ @Override
+ public void save(IncrementalSnapshotInfo snapshotInfo, byte[] snapshot) {
+ File file = new File(folder + File.separator + snapshotInfo.getSiddhiAppId() + File.separator +
+ snapshotInfo.getRevision());
+ try {
+ Files.createParentDirs(file);
+ Files.write(snapshot, file);
+ if (snapshotInfo.getType() == IncrementalSnapshotInfo.SnapshotType.BASE) {
+ cleanOldRevisions(snapshotInfo);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Incremental persistence of '" + snapshotInfo.getSiddhiAppId() +
+ "' with revision '" + snapshotInfo.getRevision() + "' persisted successfully.");
+ }
+ } catch (IOException e) {
+ log.error("Cannot save the revision '" + snapshotInfo.getRevision() + "' of SiddhiApp: '" +
+ snapshotInfo.getSiddhiAppId() + "' to the file system.", e);
+ }
+ }
+
+ @Override
+ public void setProperties(Map properties) {
+ //nothing to do
+ }
+
+ @Override
+ public byte[] load(IncrementalSnapshotInfo snapshotInfo) {
+ File file = new File(folder + File.separator + snapshotInfo.getSiddhiAppId() + File.separator +
+ snapshotInfo.getRevision());
+ byte[] bytes = null;
+ try {
+ bytes = Files.toByteArray(file);
+ if (log.isDebugEnabled()) {
+ log.debug("State loaded for SiddhiApp '" + snapshotInfo.getSiddhiAppId() + "' revision '" +
+ snapshotInfo.getRevision() + "' from file system.");
+ }
+ } catch (IOException e) {
+ log.error("Cannot load the revision '" + snapshotInfo.getRevision() + "' of SiddhiApp '" +
+ snapshotInfo.getSiddhiAppId() + "' from file system.", e);
+ }
+ return bytes;
+ }
+
+ @Override
+ public List getListOfRevisionsToLoad (long restoreTime, String siddhiAppName) {
+
+ File dir = new File(folder + File.separator + siddhiAppName);
+ File[] files = dir.listFiles();
+ if (files == null || files.length == 0) {
+ return null;
+ }
+ List results = new ArrayList<>();
+ for (File file : files) {
+ String fileName = file.getName();
+ IncrementalSnapshotInfo snapshotInfo = PersistenceHelper.convertRevision(fileName);
+ if (snapshotInfo.getTime() <= restoreTime &&
+ siddhiAppName.equals(snapshotInfo.getSiddhiAppId()) &&
+ snapshotInfo.getElementId() != null &&
+ snapshotInfo.getQueryName() != null) {
+ //Note: Here we discard the (items.length == 2) scenario which is handled
+ // by the full snapshot handling
+ if (log.isDebugEnabled()) {
+ log.debug("List of revisions to load : " + fileName);
+ }
+ results.add(snapshotInfo);
+ }
+ }
+ return results;
+ }
+
+ @Override
+ public String getLastRevision(String siddhiAppName) {
+ long restoreTime = -1;
+ File dir = new File(folder + File.separator + siddhiAppName);
+ File[] files = dir.listFiles();
+ if (files == null || files.length == 0) {
+ return null;
+ }
+ for (File file : files) {
+ String fileName = file.getName();
+ IncrementalSnapshotInfo snapshotInfo = PersistenceHelper.convertRevision(fileName);
+ if (snapshotInfo.getTime() > restoreTime &&
+ siddhiAppName.equals(snapshotInfo.getSiddhiAppId()) &&
+ snapshotInfo.getElementId() != null &&
+ snapshotInfo.getQueryName() != null) {
+ //Note: Here we discard the (items.length == 2) scenario which is handled
+ // by the full snapshot handling
+ restoreTime = snapshotInfo.getTime();
+ }
+ }
+ if (restoreTime != -1) {
+ if (log.isDebugEnabled()) {
+ log.debug("Latest revision to load: " + restoreTime + PersistenceConstants.REVISION_SEPARATOR +
+ siddhiAppName);
+ }
+ return restoreTime + PersistenceConstants.REVISION_SEPARATOR + siddhiAppName;
+ }
+ return null;
+ }
+
+ private void cleanOldRevisions(IncrementalSnapshotInfo incrementalSnapshotInfo) {
+ if (incrementalSnapshotInfo.getType() == IncrementalSnapshotInfo.SnapshotType.BASE) {
+ File dir = new File(folder + File.separator + incrementalSnapshotInfo.getSiddhiAppId());
+ File[] files = dir.listFiles();
+ if (files != null) {
+ long baseTimeStamp = (incrementalSnapshotInfo.getTime());
+ for (File file : files) {
+ String fileName = file.getName();
+ IncrementalSnapshotInfo snapshotInfo = PersistenceHelper.convertRevision(fileName);
+ if (snapshotInfo.getTime() < baseTimeStamp &&
+ incrementalSnapshotInfo.getSiddhiAppId().equals(snapshotInfo.getSiddhiAppId()) &&
+ incrementalSnapshotInfo.getQueryName().equals(snapshotInfo.getQueryName()) &&
+ incrementalSnapshotInfo.getElementId().equals(snapshotInfo.getElementId())) {
+ if (file.exists()) {
+ Boolean isDeleted = file.delete();
+ if (!isDeleted) {
+ log.error("Error deleting old revision " + fileName);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/IncrementalPersistenceStore.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/IncrementalPersistenceStore.java
new file mode 100644
index 0000000000..88745e8ac5
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/IncrementalPersistenceStore.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.wso2.siddhi.core.util.persistence;
+
+import org.wso2.siddhi.core.util.persistence.util.IncrementalSnapshotInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Interface class for Persistence Stores which does incremental checkpointing.
+ */
+public interface IncrementalPersistenceStore {
+
+ void save(IncrementalSnapshotInfo snapshotInfo, byte[] snapshot);
+
+ void setProperties(Map properties);
+
+ byte[] load(IncrementalSnapshotInfo snapshotInfo);
+
+ List getListOfRevisionsToLoad(long restoreTime, String siddhiAppName);
+
+ String getLastRevision(String siddhiAppId);
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/PersistenceService.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/PersistenceService.java
deleted file mode 100644
index 8c0c1e08a2..0000000000
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/PersistenceService.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * WSO2 Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.wso2.siddhi.core.util.persistence;
-
-import org.apache.log4j.Logger;
-import org.wso2.siddhi.core.config.SiddhiAppContext;
-import org.wso2.siddhi.core.exception.CannotRestoreSiddhiAppStateException;
-import org.wso2.siddhi.core.exception.NoPersistenceStoreException;
-import org.wso2.siddhi.core.util.snapshot.SnapshotService;
-
-/**
- * Persistence Service is the service layer to handle state persistence tasks such as persisting current state and
- * restoring previous states.
- */
-public class PersistenceService {
-
- private static final Logger log = Logger.getLogger(PersistenceService.class);
- private String siddhiAppName;
- private PersistenceStore persistenceStore;
- private SnapshotService snapshotService;
-
- public PersistenceService(SiddhiAppContext siddhiAppContext) {
- this.snapshotService = siddhiAppContext.getSnapshotService();
- this.persistenceStore = siddhiAppContext.getSiddhiContext().getPersistenceStore();
- this.siddhiAppName = siddhiAppContext.getName();
- }
-
-
- public String persist() {
-
- if (persistenceStore != null) {
- if (log.isDebugEnabled()) {
- log.debug("Persisting...");
- }
- byte[] snapshot = snapshotService.snapshot();
- String revision = System.currentTimeMillis() + "_" + siddhiAppName;
- persistenceStore.save(siddhiAppName, revision, snapshot);
- if (log.isDebugEnabled()) {
- log.debug("Persisted.");
- }
- return revision;
- } else {
- throw new NoPersistenceStoreException("No persistence store assigned for siddhi app " +
- siddhiAppName);
- }
-
- }
-
- public void restoreRevision(String revision) throws CannotRestoreSiddhiAppStateException {
- if (persistenceStore != null) {
- if (log.isDebugEnabled()) {
- log.debug("Restoring revision: " + revision + " ...");
- }
- byte[] snapshot = persistenceStore.load(siddhiAppName, revision);
- snapshotService.restore(snapshot);
- if (log.isDebugEnabled()) {
- log.debug("Restored revision: " + revision);
- }
- } else {
- throw new NoPersistenceStoreException("No persistence store assigned for siddhi app " +
- siddhiAppName);
- }
- }
-
- public String restoreLastRevision() throws CannotRestoreSiddhiAppStateException {
- if (persistenceStore != null) {
- String revision = persistenceStore.getLastRevision(siddhiAppName);
- if (revision != null) {
- restoreRevision(revision);
- }
- return revision;
- } else {
- throw new NoPersistenceStoreException("No persistence store assigned for siddhi app " +
- siddhiAppName);
- }
- }
-
- public void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException {
- snapshotService.restore(snapshot);
- }
-}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/util/IncrementalSnapshotInfo.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/util/IncrementalSnapshotInfo.java
new file mode 100644
index 0000000000..388a617c08
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/util/IncrementalSnapshotInfo.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.siddhi.core.util.persistence.util;
+
+/**
+ * Struct to store information about Incremental Snapshot
+ */
+public class IncrementalSnapshotInfo {
+
+ /**
+ * Type of incremental snapshot types
+ */
+ public enum SnapshotType {
+ BASE, INCREMENT
+ }
+
+ private String siddhiAppId;
+ private String queryName;
+ private String elementId;
+ private long time;
+ private String revision;
+ private SnapshotType type;
+
+ public IncrementalSnapshotInfo(String siddhiAppId, String queryName, String elementId, long time,
+ SnapshotType type) {
+ this.siddhiAppId = siddhiAppId;
+ this.queryName = queryName;
+ this.elementId = elementId;
+ this.time = time;
+ this.type = type;
+ this.revision = time + PersistenceConstants.REVISION_SEPARATOR + siddhiAppId +
+ PersistenceConstants.REVISION_SEPARATOR + queryName +
+ PersistenceConstants.REVISION_SEPARATOR + elementId +
+ PersistenceConstants.REVISION_SEPARATOR + type;
+ }
+
+ public String getSiddhiAppId() {
+ return siddhiAppId;
+ }
+
+ public void setSiddhiAppId(String siddhiAppId) {
+ this.siddhiAppId = siddhiAppId;
+ }
+
+ public String getQueryName() {
+ return queryName;
+ }
+
+ public void setQueryName(String queryName) {
+ this.queryName = queryName;
+ }
+
+ public String getElementId() {
+ return elementId;
+ }
+
+ public void setElementId(String elementId) {
+ this.elementId = elementId;
+ }
+
+ public String getRevision() {
+ return revision;
+ }
+
+ public SnapshotType getType() {
+ return type;
+ }
+
+ public void setType(SnapshotType type) {
+ this.type = type;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public void setTime(long time) {
+ this.time = time;
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/util/PersistenceConstants.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/util/PersistenceConstants.java
new file mode 100644
index 0000000000..f3301a68b9
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/util/PersistenceConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.siddhi.core.util.persistence.util;
+
+/**
+ * A collection of constants used for state persistance.
+ */
+public class PersistenceConstants {
+
+ public static final String STATE_PERSISTENCE_REVISIONS_TO_KEEP = "revisionsToKeep";
+ public static final String STATE_PERSISTENCE_CONFIGS = "config";
+ public static final String DEFAULT_FILE_PERSISTENCE_FOLDER = "siddhi-app-persistence";
+ public static final String REVISION_SEPARATOR = "_";
+
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/util/PersistenceHelper.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/util/PersistenceHelper.java
new file mode 100644
index 0000000000..0f0ba6adc0
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/persistence/util/PersistenceHelper.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.siddhi.core.util.persistence.util;
+
+import org.wso2.siddhi.core.config.SiddhiAppContext;
+import org.wso2.siddhi.core.exception.PersistenceStoreException;
+import org.wso2.siddhi.core.util.snapshot.AsyncIncrementalSnapshotPersistor;
+import org.wso2.siddhi.core.util.snapshot.AsyncSnapshotPersistor;
+import org.wso2.siddhi.core.util.snapshot.IncrementalSnapshot;
+import org.wso2.siddhi.core.util.snapshot.PersistenceReference;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Helper Class to persist snapshots
+ */
+public final class PersistenceHelper {
+
+ public static IncrementalSnapshotInfo convertRevision(String revision) {
+ String[] items = revision.split("_");
+ //Note: Here we discard the (items.length == 2) scenario which is handled by the full snapshot handling
+ if (items.length == 5) {
+ return new IncrementalSnapshotInfo(items[1], items[2], items[3],
+ Long.parseLong(items[0]), IncrementalSnapshotInfo.SnapshotType.valueOf(items[4]));
+ } else if (items.length == 2) {
+ return new IncrementalSnapshotInfo(items[1], null, null,
+ Long.parseLong(items[0]), IncrementalSnapshotInfo.SnapshotType.BASE);
+ } else {
+ throw new PersistenceStoreException("Invalid revision found '" + revision + "'!");
+ }
+ }
+
+ public static PersistenceReference persist(byte[] serializeObj, SiddhiAppContext siddhiAppContext) {
+ long revisionTime = System.currentTimeMillis();
+ // start the snapshot persisting task asynchronously
+ AsyncSnapshotPersistor asyncSnapshotPersistor = new AsyncSnapshotPersistor(serializeObj,
+ siddhiAppContext.getSiddhiContext().getPersistenceStore(), siddhiAppContext.getName(),
+ revisionTime);
+ Future future = siddhiAppContext.getExecutorService().submit(asyncSnapshotPersistor);
+ return new PersistenceReference(future, asyncSnapshotPersistor.getRevision());
+ }
+
+ public static PersistenceReference persist(IncrementalSnapshot serializeObj, SiddhiAppContext siddhiAppContext) {
+ long revisionTime = System.currentTimeMillis();
+ List incrementalFutures = new ArrayList<>();
+ //Incremental base state
+ Map> incrementalStateBase = serializeObj.
+ getIncrementalStateBase();
+ if (incrementalStateBase != null) {
+ incrementalStateBase.forEach((queryName, value) -> {
+ value.forEach((elementId, value1) -> {
+ AsyncIncrementalSnapshotPersistor asyncIncrementSnapshotPersistor = new
+ AsyncIncrementalSnapshotPersistor(value1,
+ siddhiAppContext.getSiddhiContext().getIncrementalPersistenceStore(),
+ new IncrementalSnapshotInfo(siddhiAppContext.getName(), queryName, elementId,
+ revisionTime, IncrementalSnapshotInfo.SnapshotType.BASE));
+ Future future = siddhiAppContext.getExecutorService().
+ submit(asyncIncrementSnapshotPersistor);
+ incrementalFutures.add(future);
+ });
+ });
+ }
+ //Next, handle the increment persistence scenarios
+ //Incremental state
+ Map> incrementalState = serializeObj.
+ getIncrementalState();
+ if (incrementalState != null) {
+ incrementalState.forEach((queryName, value) -> {
+ value.forEach((elementId, value1) -> {
+ AsyncIncrementalSnapshotPersistor asyncIncrementSnapshotPersistor = new
+ AsyncIncrementalSnapshotPersistor(value1,
+ siddhiAppContext.getSiddhiContext().getIncrementalPersistenceStore(),
+ new IncrementalSnapshotInfo(siddhiAppContext.getName(), queryName, elementId,
+ revisionTime, IncrementalSnapshotInfo.SnapshotType.INCREMENT));
+ Future future = siddhiAppContext.getExecutorService().
+ submit(asyncIncrementSnapshotPersistor);
+ incrementalFutures.add(future);
+ });
+ });
+ }
+ return new PersistenceReference(incrementalFutures,
+ revisionTime + PersistenceConstants.REVISION_SEPARATOR + siddhiAppContext.getName());
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/AsyncIncrementalSnapshotPersistor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/AsyncIncrementalSnapshotPersistor.java
new file mode 100644
index 0000000000..160a91c4bf
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/AsyncIncrementalSnapshotPersistor.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.siddhi.core.util.snapshot;
+
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.exception.NoPersistenceStoreException;
+import org.wso2.siddhi.core.util.persistence.IncrementalPersistenceStore;
+import org.wso2.siddhi.core.util.persistence.util.IncrementalSnapshotInfo;
+
+/**
+ * {@link Runnable} which is responsible for persisting the snapshots that are taken
+ */
+public class AsyncIncrementalSnapshotPersistor implements Runnable {
+ private static final Logger log = Logger.getLogger(AsyncIncrementalSnapshotPersistor.class);
+ private byte[] snapshots;
+ private IncrementalPersistenceStore incrementalPersistenceStore;
+ private IncrementalSnapshotInfo snapshotInfo;
+
+ public AsyncIncrementalSnapshotPersistor(byte[] snapshots, IncrementalPersistenceStore incrementalPersistenceStore,
+ IncrementalSnapshotInfo snapshotInfo) {
+ if (incrementalPersistenceStore == null) {
+ throw new NoPersistenceStoreException("No incremental persistence store assigned for siddhi app '" +
+ snapshotInfo.getSiddhiAppId() + "'");
+ }
+ this.snapshots = snapshots;
+ this.incrementalPersistenceStore = incrementalPersistenceStore;
+ this.snapshotInfo = snapshotInfo;
+ }
+
+ public String getRevision() {
+ return snapshotInfo.getRevision();
+ }
+
+ @Override
+ public void run() {
+ if (incrementalPersistenceStore != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Persisting...");
+ }
+ incrementalPersistenceStore.save(snapshotInfo, snapshots);
+ if (log.isDebugEnabled()) {
+ log.debug("Persisted.");
+ }
+ } else {
+ throw new NoPersistenceStoreException("No persistence store assigned for siddhi app " +
+ snapshotInfo.getSiddhiAppId());
+ }
+
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/AsyncSnapshotPersistor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/AsyncSnapshotPersistor.java
index d692333e9a..8fab2d6100 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/AsyncSnapshotPersistor.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/AsyncSnapshotPersistor.java
@@ -31,13 +31,19 @@ public class AsyncSnapshotPersistor implements Runnable {
private PersistenceStore persistenceStore;
private String siddhiAppName;
private String revision;
+ private long time;
public AsyncSnapshotPersistor(byte[] snapshots, PersistenceStore persistenceStore,
- String siddhiAppName) {
+ String siddhiAppName, long time) {
+ if (persistenceStore == null) {
+ throw new NoPersistenceStoreException("No persistence store assigned for siddhi app '" +
+ siddhiAppName + "'");
+ }
this.snapshots = snapshots;
this.persistenceStore = persistenceStore;
this.siddhiAppName = siddhiAppName;
- revision = System.currentTimeMillis() + "_" + siddhiAppName;
+ this.time = time;
+ this.revision = time + "_" + siddhiAppName;
}
public String getRevision() {
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/IncrementalSnapshot.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/IncrementalSnapshot.java
new file mode 100644
index 0000000000..078b3a2ce9
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/IncrementalSnapshot.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.siddhi.core.util.snapshot;
+
+import java.util.Map;
+
+/**
+ * The class which represents the serialized incremental snapshot.
+ */
+public class IncrementalSnapshot {
+ private Map> incrementalState;
+ private Map> incrementalStateBase;
+
+
+ public Map> getIncrementalState() {
+ return incrementalState;
+ }
+
+ public void setIncrementalState(Map> incrementalState) {
+ this.incrementalState = incrementalState;
+ }
+
+ public Map> getIncrementalStateBase() {
+ return incrementalStateBase;
+ }
+
+ public void setIncrementalStateBase(Map> incrementalStateBase) {
+ this.incrementalStateBase = incrementalStateBase;
+ }
+
+ @Override
+ public String toString() {
+ return "IncrementalSnapshot{" +
+ "incrementalState=" + (incrementalState != null) +
+ ", incrementalStateBase=" + (incrementalStateBase != null) +
+ '}';
+ }
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/PersistenceReference.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/PersistenceReference.java
index 2c18fb2556..54cb25789c 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/PersistenceReference.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/PersistenceReference.java
@@ -18,6 +18,7 @@
package org.wso2.siddhi.core.util.snapshot;
+import java.util.List;
import java.util.concurrent.Future;
/**
@@ -25,19 +26,34 @@
*/
public class PersistenceReference {
- private Future future;
+ private Future fullStateFuture;
+ private List incrementalStateFuture;
private String revision;
- public PersistenceReference(Future future, String revision) {
- this.future = future;
+ public PersistenceReference(Future fullStateFuture, String revision) {
+ this.fullStateFuture = fullStateFuture;
this.revision = revision;
}
+ public PersistenceReference(List incrementalStateFuture, String revision) {
+ this.incrementalStateFuture = incrementalStateFuture;
+ this.revision = revision;
+ }
+
+ @Deprecated
public Future getFuture() {
- return future;
+ return fullStateFuture;
}
public String getRevision() {
return revision;
}
+
+ public List getIncrementalStateFuture() {
+ return incrementalStateFuture;
+ }
+
+ public Future getFullStateFuture() {
+ return fullStateFuture;
+ }
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/SnapshotRequest.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/SnapshotRequest.java
new file mode 100644
index 0000000000..4f7676d7c3
--- /dev/null
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/SnapshotRequest.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.siddhi.core.util.snapshot;
+
+/**
+ * Snapshot request type ThreadLocal holder
+ */
+public class SnapshotRequest {
+ private static final ThreadLocal requestForFullSnapshot =
+ new ThreadLocal() {
+ @Override
+ protected Boolean initialValue() {
+ return false;
+ }
+ };
+
+ public static void requestForFullSnapshot(boolean enable) {
+ requestForFullSnapshot.set(enable);
+ }
+
+ public static boolean isRequestForFullSnapshot() {
+ return requestForFullSnapshot.get();
+ }
+
+}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/SnapshotService.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/SnapshotService.java
index c89551ac32..266c343657 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/SnapshotService.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/snapshot/SnapshotService.java
@@ -20,36 +20,48 @@
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.CannotRestoreSiddhiAppStateException;
+import org.wso2.siddhi.core.exception.NoPersistenceStoreException;
import org.wso2.siddhi.core.util.ThreadBarrier;
+import org.wso2.siddhi.core.util.persistence.IncrementalPersistenceStore;
+import org.wso2.siddhi.core.util.persistence.PersistenceStore;
+import org.wso2.siddhi.core.util.persistence.util.IncrementalSnapshotInfo;
+import org.wso2.siddhi.core.util.persistence.util.PersistenceHelper;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotState;
+import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Service level implementation to take/restore snapshots of processing elements.
*/
public class SnapshotService {
-
-
private static final Logger log = Logger.getLogger(SnapshotService.class);
private static final ThreadLocal skipSnapshotableThreadLocal = new ThreadLocal();
private final ThreadBarrier threadBarrier;
- private HashMap> snapshotableMap = new HashMap>();
+ private ConcurrentHashMap> snapshotableMap = new ConcurrentHashMap<>();
private SiddhiAppContext siddhiAppContext;
public SnapshotService(SiddhiAppContext siddhiAppContext) {
this.siddhiAppContext = siddhiAppContext;
this.threadBarrier = siddhiAppContext.getThreadBarrier();
-
}
public static ThreadLocal getSkipSnapshotableThreadLocal() {
return skipSnapshotableThreadLocal;
}
+ public ConcurrentHashMap> getSnapshotableMap() {
+ return snapshotableMap;
+ }
+
public synchronized void addSnapshotable(String queryName, Snapshotable snapshotable) {
Boolean skipSnapshotable = skipSnapshotableThreadLocal.get();
if (skipSnapshotable == null || !skipSnapshotable) {
@@ -69,35 +81,139 @@ public synchronized void addSnapshotable(String queryName, Snapshotable snapshot
}
}
- public byte[] snapshot() {
- HashMap> snapshots = new HashMap<>(snapshotableMap.size());
- List snapshotableList;
- byte[] serializedSnapshots;
- if (log.isDebugEnabled()) {
- log.debug("Taking snapshot ...");
- }
+ public byte[] fullSnapshot() {
try {
- threadBarrier.lock();
- for (Map.Entry> entry : snapshotableMap.entrySet()) {
- snapshotableList = entry.getValue();
- snapshotableList.forEach(snapshotableElement -> snapshots.put(snapshotableElement.getElementId(),
- snapshotableElement.currentState()));
- }
+ SnapshotRequest.requestForFullSnapshot(true);
+ Map> elementSnapshotMapFull = new HashMap<>();
+ byte[] serializedFullState = null;
if (log.isDebugEnabled()) {
- log.debug("Snapshot serialization started ...");
+ log.debug("Taking snapshot ...");
+ }
+ try {
+ threadBarrier.lock();
+ for (Map.Entry> entry : snapshotableMap.entrySet()) {
+ Map elementWiseFullSnapshots = new HashMap<>();
+ for (Snapshotable snapshotableObject : entry.getValue()) {
+ Map currentState = snapshotableObject.currentState();
+ if (currentState != null) {
+ Map elementWiseSnapshots = new HashMap<>();
+ for (Map.Entry item2 : currentState.entrySet()) {
+ String key = item2.getKey();
+ Object snapShot = item2.getValue();
+ if (snapShot instanceof SnapshotState) {
+ if (((SnapshotState) snapShot).isIncrementalSnapshot()) {
+ throw new NoPersistenceStoreException("No incremental persistence store " +
+ "exist to store incremental snapshot of siddhiApp:'"
+ + siddhiAppContext.getName() + "' subElement:'" + entry.getKey()
+ + "' elementId:'" + snapshotableObject.getElementId()
+ + "' and key:'" + key + "'");
+ } else {
+ elementWiseSnapshots.put(key, snapShot);
+ }
+ } else {
+ elementWiseSnapshots.put(key, snapShot);
+ }
+ }
+ if (!elementWiseSnapshots.isEmpty()) {
+ elementWiseFullSnapshots.put(snapshotableObject.getElementId(), elementWiseSnapshots);
+ }
+ }
+ }
+ if (!elementWiseFullSnapshots.isEmpty()) {
+ elementSnapshotMapFull.put(entry.getKey(), elementWiseFullSnapshots);
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("SnapshotState serialization started ...");
+ }
+ serializedFullState = ByteSerializer.objectToByte(elementSnapshotMapFull, siddhiAppContext);
+ if (log.isDebugEnabled()) {
+ log.debug("SnapshotState serialization finished.");
+ }
+ } finally {
+ threadBarrier.unlock();
}
- serializedSnapshots = ByteSerializer.objectToByte(snapshots, siddhiAppContext);
if (log.isDebugEnabled()) {
- log.debug("Snapshot serialization finished.");
+ log.debug("SnapshotState taken for Siddhi app '" + siddhiAppContext.getName() + "'");
}
+ return serializedFullState;
} finally {
- threadBarrier.unlock();
- }
- if (log.isDebugEnabled()) {
- log.debug("Snapshot taken for Siddhi app '" + siddhiAppContext.getName() + "'");
+ SnapshotRequest.requestForFullSnapshot(false);
}
+ }
- return serializedSnapshots;
+ public IncrementalSnapshot incrementalSnapshot() {
+ try {
+ SnapshotRequest.requestForFullSnapshot(false);
+ Map> elementSnapshotMapIncremental = new HashMap<>();
+ Map> elementSnapshotMapIncrementalBase = new HashMap<>();
+ if (log.isDebugEnabled()) {
+ log.debug("Taking snapshot ...");
+ }
+ try {
+ threadBarrier.lock();
+ for (Map.Entry> entry : snapshotableMap.entrySet()) {
+ Map elementWiseIncrementalSnapshots = new HashMap<>();
+ Map elementWiseIncrementalSnapshotsBase = new HashMap<>();
+ for (Snapshotable snapshotableObject : entry.getValue()) {
+ Map currentState = snapshotableObject.currentState();
+ if (currentState != null) {
+ Map incrementalSnapshotableMap = new HashMap<>();
+ Map incrementalSnapshotableMapBase = new HashMap<>();
+ for (Map.Entry stateEntry : currentState.entrySet()) {
+ String key = stateEntry.getKey();
+ Object snapShot = stateEntry.getValue();
+ if (snapShot instanceof SnapshotState) {
+ if (((SnapshotState) snapShot).isIncrementalSnapshot()) {
+ incrementalSnapshotableMap.put(key, snapShot);
+ } else {
+ incrementalSnapshotableMapBase.put(key, snapShot);
+ }
+ } else {
+ incrementalSnapshotableMapBase.put(key, snapShot);
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("SnapshotState serialization started ...");
+ }
+ if (!incrementalSnapshotableMap.isEmpty()) {
+ //Do we need to get and then update?
+ elementWiseIncrementalSnapshots.put(snapshotableObject.getElementId(),
+ ByteSerializer.objectToByte(incrementalSnapshotableMap, siddhiAppContext));
+ }
+ if (!incrementalSnapshotableMapBase.isEmpty()) {
+ elementWiseIncrementalSnapshotsBase.put(snapshotableObject.getElementId(),
+ ByteSerializer.objectToByte(incrementalSnapshotableMapBase, siddhiAppContext));
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("SnapshotState serialization finished.");
+ }
+ }
+ }
+ if (!elementWiseIncrementalSnapshots.isEmpty()) {
+ elementSnapshotMapIncremental.put(entry.getKey(), elementWiseIncrementalSnapshots);
+ }
+ if (!elementWiseIncrementalSnapshotsBase.isEmpty()) {
+ elementSnapshotMapIncrementalBase.put(entry.getKey(), elementWiseIncrementalSnapshotsBase);
+ }
+ }
+ } finally {
+ threadBarrier.unlock();
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("SnapshotState taken for Siddhi app '" + siddhiAppContext.getName() + "'");
+ }
+ IncrementalSnapshot snapshot = new IncrementalSnapshot();
+ if (!elementSnapshotMapIncremental.isEmpty()) {
+ snapshot.setIncrementalState(elementSnapshotMapIncremental);
+ }
+ if (!elementSnapshotMapIncrementalBase.isEmpty()) {
+ snapshot.setIncrementalStateBase(elementSnapshotMapIncrementalBase);
+ }
+ return snapshot;
+ } finally {
+ SnapshotRequest.requestForFullSnapshot(false);
+ }
}
public Map queryState(String queryName) {
@@ -114,52 +230,276 @@ public Map queryState(String queryName) {
state.put(elementId, elementState);
}
}
-
} finally {
threadBarrier.unlock();
}
log.debug("Taking snapshot finished.");
-
return state;
-
}
public void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException {
- Map> snapshots = (Map>)
- ByteSerializer.byteToObject(snapshot, siddhiAppContext);
- List snapshotableList;
+ if (snapshot == null) {
+ throw new CannotRestoreSiddhiAppStateException("Restoring of Siddhi app " + siddhiAppContext.
+ getName() + " failed due to no snapshot.");
+ }
+ Map