Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging incremental-checkpoint-feature branch to master #821

Merged
merged 38 commits into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e27733d
Adding the incremental checkpointing functionality to length windows.
miyurud Dec 8, 2017
0472487
Added comments and formatted the code.
miyurud Dec 10, 2017
69b2756
Merge remote-tracking branch 'upstream/master'
miyurud Dec 11, 2017
9b39cac
Adding the incremental file system store cleaning code.
miyurud Dec 11, 2017
66ebb49
Adding the incremental checkpointing functionality to length batch wi…
miyurud Dec 19, 2017
f3c70bf
Merge remote-tracking branch 'upstream/master'
miyurud Dec 20, 2017
d615572
Fixing a test case naming issue.
miyurud Dec 20, 2017
de0e61e
Adding the incremental checkpointing for event tables.
miyurud Jan 8, 2018
c2a89ea
Merge remote-tracking branch 'upstream/master'
miyurud Jan 8, 2018
ea6ca3f
Merge branch 'master' of https://github.com/miyurud/siddhi
miyurud Jan 8, 2018
4cb6d9f
Adding the min-max counting scenario and related code improvements.
miyurud Jan 30, 2018
12163c8
Fixing the issue of incremental vs full snapshot decision logic.
miyurud Feb 2, 2018
7dae130
Adding the changes on group by queries and merging with the upstream.
miyurud Feb 16, 2018
8c2e78e
Updating the local copy.
miyurud Feb 19, 2018
70b5ab7
Adding the incremental checkpointing feature for partitioned windows.
miyurud Feb 21, 2018
34b1cdc
Fixed the checkstyle issues
miyurud Mar 22, 2018
4cab522
Updating the changes from master
miyurud Mar 23, 2018
5661781
Adding local changes
miyurud Mar 23, 2018
9717ddd
Fixing the checkstyle issues
miyurud Mar 23, 2018
146e5e3
Updated the findbugs-exclude file.
miyurud Mar 23, 2018
d4c31c4
Removed the commented lines
miyurud Mar 23, 2018
d3c3025
Merge pull request #790 from miyurud/incremental-checkpoint-feature
tishan89 Mar 30, 2018
6625f6e
Merge remote-tracking branch 'wso2/master' into incremental-checkpoin…
suhothayan Apr 19, 2018
cf95257
Merge remote-tracking branch 'wso2/incremental-checkpoint-feature' in…
suhothayan Apr 19, 2018
abea03c
Rename SnapshotableComplexEventChunk to SnapshotableStreamEventQueue …
suhothayan Apr 26, 2018
19b6c86
Fix Site Name
suhothayan Apr 29, 2018
04f9297
Improve snapshot, incremental store persist and restore
suhothayan May 2, 2018
d7bc8ba
Improve snapshot, incremental store persist and restore
suhothayan May 4, 2018
68e699b
Fix check styles and findbug issues and join with table with stream u…
suhothayan May 5, 2018
d35d2be
Merge remote-tracking branch 'wso2/master' into incremental-checkpoin…
suhothayan May 5, 2018
8c1f3d6
Merge branch 'incremental-checkpoint-feature'
dilini-muthumala May 21, 2018
3c2fa73
Moving PersistenceStore implementations to Carbon-analytics
dilini-muthumala May 28, 2018
f76e108
Improving API
dilini-muthumala May 28, 2018
5cd9a4a
Fixing bug in API
dilini-muthumala May 28, 2018
898269c
Merging with master
dilini-muthumala May 28, 2018
0ffeb2f
Merging with master
dilini-muthumala May 28, 2018
bf1017c
Adding back persistence store implementations to Siddhi for unit-testing
dilini-muthumala May 31, 2018
889e5cb
Fixing license header and removing unnecessary comments.
dilini-muthumala May 31, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
WSO2 Siddhi
===========
Siddhi
======

Siddhi is a java library that listens to events from data streams, detects complex conditions described via a **Streaming
SQL language**, and triggers actions. It performs both **_Stream Processing_** and **_Complex Event Processing_**.
Expand Down
1,935 changes: 1,935 additions & 0 deletions docs/api/4.0.9-SNAPSHOT.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/api/latest.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# API Docs - v4.1.26
# API Docs - v4.1.27-SNAPSHOT

## Core

Expand Down
2 changes: 1 addition & 1 deletion docs/documentation/siddhi-architecture.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Siddhi Architecture

WSO2 Siddhi is a software library that can be utilized in any of the following ways:
Siddhi is a software library that can be utilized in any of the following ways:

- Run as a server on its own
- Run within WSO2 SP as a service
Expand Down
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
WSO2 Siddhi
===========
Siddhi
======

Siddhi is a java library that listens to events from data streams, detects complex conditions described via a **Streaming
SQL language**, and triggers actions. It performs both **_Stream Processing_** and **_Complex Event Processing_**.
Expand Down
13 changes: 13 additions & 0 deletions findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@
<Class name="org.wso2.siddhi.core.table.holder.IndexEventHolder"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Class name="org.wso2.siddhi.core.table.holder.SnapshotableIndexEventHolder"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Class name="org.wso2.siddhi.core.table.holder.ListEventHolder"/>
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
</Match>


<Match>
<Package name="~org\.wso2\.siddhi\.core\.query\.output\.ratelimit\.time.*"/>
Expand All @@ -181,6 +190,10 @@
<Package name="~org\.wso2\.siddhi\.core\.query\.selector\.attribute\.aggregator\.incremental.*"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Package name="~org\.wso2\.siddhi\.core\.util\.snapshot\.*"/>
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
</Match>

<!--other-->
<Match>
Expand Down
8 changes: 4 additions & 4 deletions mkdocs.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
site_name: WSO2 Siddhi
site_name: Siddhi
site_description: Stream Processing and Complex Event Processing Engine
site_author: WSO2
site_url: https://wso2.github.io/siddhi
extra_css:
- stylesheets/extra.css
repo_name: WSO2 Siddhi
repo_name: Siddhi
repo_url: https://github.com/wso2/siddhi
copyright: Copyright &copy; 2011 - 2017 WSO2
copyright: Copyright &copy; 2011 - 2018 WSO2
extra:
logo: images/siddhi-logo-w.svg
palette:
Expand All @@ -26,7 +26,7 @@ markdown_extensions:
- toc(permalink=true)
- codehilite(guess_lang=false)
pages:
- Welcome to WSO2 Siddhi: index.md
- Welcome to Siddhi: index.md
- Features: features.md
- Quick Start Guide: documentation/siddhi-quckstart-4.0.md
- Documentation:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder;
import org.wso2.siddhi.core.util.parser.StoreQueryParser;
import org.wso2.siddhi.core.util.parser.helper.QueryParserHelper;
import org.wso2.siddhi.core.util.snapshot.AsyncSnapshotPersistor;
import org.wso2.siddhi.core.util.persistence.util.PersistenceHelper;
import org.wso2.siddhi.core.util.snapshot.PersistenceReference;
import org.wso2.siddhi.core.util.statistics.BufferedEventsTracker;
import org.wso2.siddhi.core.util.statistics.LatencyTracker;
Expand Down Expand Up @@ -120,6 +120,7 @@ public class SiddhiAppRuntime {
private LatencyTracker storeQueryLatencyTracker;
private SiddhiDebugger siddhiDebugger;
private boolean running = false;
private Future futureIncrementalPersistor;

public SiddhiAppRuntime(Map<String, AbstractDefinition> streamDefinitionMap,
Map<String, AbstractDefinition> tableDefinitionMap,
Expand Down Expand Up @@ -531,13 +532,13 @@ public PersistenceReference persist() {
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// take snapshots of execution units
byte[] snapshots = siddhiAppContext.getSnapshotService().snapshot();
// start the snapshot persisting task asynchronously
AsyncSnapshotPersistor asyncSnapshotPersistor = new AsyncSnapshotPersistor(snapshots,
siddhiAppContext.getSiddhiContext().getPersistenceStore(), siddhiAppContext.getName());
String revision = asyncSnapshotPersistor.getRevision();
Future future = siddhiAppContext.getExecutorService().submit(asyncSnapshotPersistor);
return new PersistenceReference(future, revision);
if (siddhiAppContext.getSiddhiContext().getPersistenceStore() != null) {
return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().fullSnapshot(),
siddhiAppContext);
} else {
return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().incrementalSnapshot(),
siddhiAppContext);
}
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
Expand All @@ -549,7 +550,7 @@ public byte[] snapshot() {
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// take snapshots of execution units
return siddhiAppContext.getSnapshotService().snapshot();
return siddhiAppContext.getSnapshotService().fullSnapshot();
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
Expand All @@ -561,7 +562,7 @@ public void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// start the restoring process
siddhiAppContext.getPersistenceService().restore(snapshot);
siddhiAppContext.getSnapshotService().restore(snapshot);
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
Expand All @@ -573,7 +574,7 @@ public void restoreRevision(String revision) throws CannotRestoreSiddhiAppStateE
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// start the restoring process
siddhiAppContext.getPersistenceService().restoreRevision(revision);
siddhiAppContext.getSnapshotService().restoreRevision(revision);
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
Expand All @@ -586,7 +587,7 @@ public String restoreLastRevision() throws CannotRestoreSiddhiAppStateException
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// start the restoring process
revision = siddhiAppContext.getPersistenceService().restoreLastRevision();
revision = siddhiAppContext.getSnapshotService().restoreLastRevision();
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.wso2.siddhi.core.util.SiddhiAppRuntimeBuilder;
import org.wso2.siddhi.core.util.config.ConfigManager;
import org.wso2.siddhi.core.util.parser.SiddhiAppParser;
import org.wso2.siddhi.core.util.persistence.IncrementalPersistenceStore;
import org.wso2.siddhi.core.util.persistence.PersistenceStore;
import org.wso2.siddhi.query.api.SiddhiApp;
import org.wso2.siddhi.query.compiler.SiddhiCompiler;
Expand Down Expand Up @@ -235,4 +236,8 @@ public void restoreLastState() {
}
}
}

public void setIncrementalPersistenceStore(IncrementalPersistenceStore incrementalPersistenceStore) {
this.siddhiContext.setIncrementalPersistenceStore(incrementalPersistenceStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.wso2.siddhi.core.util.ElementIdGenerator;
import org.wso2.siddhi.core.util.ThreadBarrier;
import org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder;
import org.wso2.siddhi.core.util.persistence.PersistenceService;
import org.wso2.siddhi.core.util.snapshot.SnapshotService;
import org.wso2.siddhi.core.util.statistics.StatisticsManager;
import org.wso2.siddhi.core.util.timestamp.TimestampGenerator;
Expand Down Expand Up @@ -56,7 +55,6 @@ public class SiddhiAppContext {

private ThreadBarrier threadBarrier = null;
private TimestampGenerator timestampGenerator = null;
private PersistenceService persistenceService;
private ElementIdGenerator elementIdGenerator;
private Map<String, Script> scriptFunctionMap;
private ExceptionHandler<Object> disruptorExceptionHandler;
Expand Down Expand Up @@ -166,14 +164,6 @@ public void setSnapshotService(SnapshotService snapshotService) {
this.snapshotService = snapshotService;
}

public PersistenceService getPersistenceService() {
return persistenceService;
}

public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
}

public ElementIdGenerator getElementIdGenerator() {
return elementIdGenerator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import com.lmax.disruptor.ExceptionHandler;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.exception.PersistenceStoreException;
import org.wso2.siddhi.core.stream.input.source.SourceHandlerManager;
import org.wso2.siddhi.core.stream.output.sink.SinkHandlerManager;
import org.wso2.siddhi.core.table.record.RecordTableHandlerManager;
import org.wso2.siddhi.core.util.SiddhiExtensionLoader;
import org.wso2.siddhi.core.util.config.ConfigManager;
import org.wso2.siddhi.core.util.config.InMemoryConfigManager;
import org.wso2.siddhi.core.util.extension.holder.AbstractExtensionHolder;
import org.wso2.siddhi.core.util.persistence.IncrementalPersistenceStore;
import org.wso2.siddhi.core.util.persistence.PersistenceStore;
import org.wso2.siddhi.core.util.statistics.metrics.SiddhiMetricsFactory;

Expand All @@ -45,6 +47,7 @@ public class SiddhiContext {
private ExceptionHandler<Object> defaultDisrupterExceptionHandler;
private Map<String, Class> siddhiExtensions = new HashMap<>();
private PersistenceStore persistenceStore = null;
private IncrementalPersistenceStore incrementalPersistenceStore = null;
private ConcurrentHashMap<String, DataSource> siddhiDataSources;
private StatisticsConfiguration statisticsConfiguration;
private ConcurrentHashMap<Class, AbstractExtensionHolder> extensionHolderMap;
Expand Down Expand Up @@ -82,14 +85,31 @@ public Map<String, Class> getSiddhiExtensions() {
return siddhiExtensions;
}

public PersistenceStore getPersistenceStore() {
public synchronized PersistenceStore getPersistenceStore() {
return persistenceStore;
}

public void setPersistenceStore(PersistenceStore persistenceStore) {
public synchronized void setPersistenceStore(PersistenceStore persistenceStore) {
if (incrementalPersistenceStore != null) {
throw new PersistenceStoreException("Only one type of persistence store can exist. " +
"Incremental persistence store '" + incrementalPersistenceStore.getClass().getName() +
"' already registered!");
}
this.persistenceStore = persistenceStore;
}

public synchronized IncrementalPersistenceStore getIncrementalPersistenceStore() {
return incrementalPersistenceStore;
}

public synchronized void setIncrementalPersistenceStore(IncrementalPersistenceStore incrementalPersistenceStore) {
if (persistenceStore != null) {
throw new PersistenceStoreException("Only one type of persistence store can exist." +
" Persistence store '" + persistenceStore.getClass().getName() + "' already registered!");
}
this.incrementalPersistenceStore = incrementalPersistenceStore;
}

public void setConfigManager(ConfigManager configManager) {
this.configManager = configManager;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.siddhi.core.event.stream;

import java.io.Serializable;


/**
* The class which resembles an instance of operation performed on SnapshotableStreamEventQueue.
*/
public class Operation implements Serializable {

/**
* Possible Operator actions
*/
public enum Operator {
ADD ,
REMOVE,
CLEAR,
OVERWRITE,
DELETE_BY_OPERATOR,
DELETE_BY_INDEX
}

public Operator operation;
public Object parameters;

public Operation(Operator operation, Object parameters) {
this.operation = operation;
this.parameters = parameters;
}

public Operation(Operator operation) {
this.operation = operation;
}
}
Loading