From c7e979708caa17dac123534a480cd45a007a3892 Mon Sep 17 00:00:00 2001 From: AnuGayan Date: Fri, 23 Aug 2024 08:26:29 +0530 Subject: [PATCH] Improve Persisted aggregation initialization --- .../IncrementalExecutorsInitialiser.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java index fb8636cad1..5361c0ce71 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java @@ -136,12 +136,12 @@ public synchronized void initialiseExecutors() { if (isPersistedAggregation) { for (int i = incrementalDurations.size() - 1; i > 0; i--) { if (lastData != null && !(IncrementalTimeConverterUtil.isAggregationDataCompleteAgainstTime(lastData, - incrementalDurations.get(i), timeZone) && isAggregationDataComplete(incrementalDurations.get(i), - incrementalDurations.get(i - 1), endOFLatestEventTimestamp, lastData))) { + incrementalDurations.get(i), timeZone) || isAggregationDataComplete(incrementalDurations.get(i), + incrementalDurations.get(i - 1), endOFLatestEventTimestamp))) { recreateState(lastData, incrementalDurations.get(i), aggregationTables.get(incrementalDurations.get(i - 1)), i == 1); } else if (lastData == null && !isAggregationDataComplete(incrementalDurations.get(i), - incrementalDurations.get(i - 1), endOFLatestEventTimestamp, null)) { + incrementalDurations.get(i - 1), endOFLatestEventTimestamp)) { recreateState(null, incrementalDurations.get(i), aggregationTables.get(incrementalDurations.get(i - 1)), i == 1); } @@ -154,6 +154,8 @@ public synchronized void initialiseExecutors() { events = onDemandQueryRuntime.execute(); if (events != null) { lastData = (Long) events[events.length - 1].getData(0); + endOFLatestEventTimestamp = IncrementalTimeConverterUtil + .getNextEmitTime(lastData, incrementalDurations.get(i - 1), timeZone); } else { lastData = null; } @@ -228,6 +230,10 @@ private boolean isStatePresentForAggregationDuration(TimePeriod.Duration recreat private void recreateState(Long lastData, TimePeriod.Duration recreateForDuration, Table recreateFromTable, boolean isBeforeRoot) { + if (log.isDebugEnabled()) { + log.debug("Start initialising state for aggregation: " + aggregationDefinition.getId() + + " Duration: " + recreateForDuration); + } OnDemandQuery onDemandQuery; if (lastData != null) { endOFLatestEventTimestamp = IncrementalTimeConverterUtil @@ -268,6 +274,10 @@ private void recreateState(Long lastData, TimePeriod.Duration recreateForDuratio } } + if (log.isDebugEnabled()) { + log.debug("Completed initialising state for aggregation: " + aggregationDefinition.getId() + + " Duration " + recreateForDuration); + } } private OnDemandQuery getOnDemandQuery(Table table, boolean isLargestGranularity, Long endOFLatestEventTimestamp, @@ -319,22 +329,20 @@ private OnDemandQuery getOnDemandQuery(Table table, boolean isLargestGranularity } private boolean isAggregationDataComplete(TimePeriod.Duration parentDuration, TimePeriod.Duration childDuration, - Long endOFLatestEventTimestamp, Long lastData) { + Long endOFLatestEventTimestamp) { OnDemandQuery onDemandQuery = getOnDemandQuery(aggregationTables.get(childDuration), true, endOFLatestEventTimestamp, true, OrderByAttribute.Order.ASC); onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND); OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext, tableMap, windowMap, aggregationMap); Event[] events = onDemandQueryRuntime.execute(); - if (lastData == null && events == null) { + if (events == null) { return true; - } else if (lastData == null - && (Long) events[events.length - 1].getData(0) >= IncrementalTimeConverterUtil.getStartTimeOfAggregates( + } else if ((Long) events[events.length - 1].getData(0) >= IncrementalTimeConverterUtil.getStartTimeOfAggregates( System.currentTimeMillis(), parentDuration, timeZone)) { return true; } - - return events == null || events.length == 0; + return false; } }