From 003def0755236f4442f319aa4c451f2e38b9e5a1 Mon Sep 17 00:00:00 2001 From: terrypacker Date: Mon, 13 May 2019 13:21:02 -1000 Subject: [PATCH 01/10] Implement save callback for point values, retain unsaved values in cache --- .../m2m2/H2InMemoryDatabaseProxy.java | 9 +- .../com/serotonin/m2m2/MockPointValueDao.java | 140 +++----------- .../m2m2/rt/dataImage/DataPointRTTest.java | 24 +-- .../rt/dataImage/PointValueCacheTest.java | 178 ++++++++++++++++++ .../com/serotonin/m2m2/db/DatabaseProxy.java | 8 + Core/src/com/serotonin/m2m2/db/H2Proxy.java | 10 +- .../src/com/serotonin/m2m2/db/MSSQLProxy.java | 3 +- .../src/com/serotonin/m2m2/db/MySQLProxy.java | 3 +- .../com/serotonin/m2m2/db/PostgresProxy.java | 12 +- .../m2m2/db/dao/EnhancedPointValueDao.java | 3 +- .../serotonin/m2m2/db/dao/PointValueDao.java | 17 +- .../m2m2/db/dao/PointValueDaoMetrics.java | 136 ++----------- .../m2m2/db/dao/PointValueDaoSQL.java | 151 +++++++++++---- .../m2m2/rt/dataImage/DataPointRT.java | 22 +-- .../rt/dataImage/EnhancedPointValueCache.java | 12 +- .../rt/dataImage/HistoricalDataPoint.java | 2 +- .../m2m2/rt/dataImage/PointValueCache.java | 145 +++++++------- .../m2m2/rt/dataImage/PointValueEmporter.java | 4 +- 18 files changed, 470 insertions(+), 409 deletions(-) create mode 100644 Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java diff --git a/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java b/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java index acc2a498cd..daba41ecaf 100644 --- a/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java +++ b/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java @@ -363,9 +363,16 @@ public void doInConnection(ConnectionCallbackVoid callback) { public List doLimitQuery(DaoUtils dao, String sql, Object[] args, RowMapper rowMapper, int limit) { if (limit > 0) - sql += " LIMIT " + limit; + sql = getLimitQuerySql(sql, limit); return dao.query(sql, args, rowMapper); } + + @Override + public String getLimitQuerySql(String sql, int limit) { + if (limit > 0) + return sql + " LIMIT " + limit; + return sql; + } @Override public long doLimitDelete(ExtendedJdbcTemplate ejt, String sql, Object[] args, int chunkSize, diff --git a/Core/src-test/com/serotonin/m2m2/MockPointValueDao.java b/Core/src-test/com/serotonin/m2m2/MockPointValueDao.java index e03764e3d2..22a9e1fc49 100644 --- a/Core/src-test/com/serotonin/m2m2/MockPointValueDao.java +++ b/Core/src-test/com/serotonin/m2m2/MockPointValueDao.java @@ -4,11 +4,13 @@ */ package com.serotonin.m2m2; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import com.infiniteautomation.mango.db.query.BookendQueryCallback; import com.infiniteautomation.mango.db.query.PVTQueryCallback; @@ -32,10 +34,7 @@ public class MockPointValueDao implements PointValueDao{ public Map> getData() { return this.data; } - - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#savePointValueSync(int, com.serotonin.m2m2.rt.dataImage.PointValueTime, com.serotonin.m2m2.rt.dataImage.SetPointSource) - */ + @Override public PointValueTime savePointValueSync(int pointId, PointValueTime pointValue, SetPointSource source) { @@ -58,17 +57,13 @@ public PointValueTime savePointValueSync(int pointId, PointValueTime pointValue, return newPvt; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#savePointValueAsync(int, com.serotonin.m2m2.rt.dataImage.PointValueTime, com.serotonin.m2m2.rt.dataImage.SetPointSource) - */ @Override - public void savePointValueAsync(int pointId, PointValueTime pointValue, SetPointSource source) { + public void savePointValueAsync(int pointId, PointValueTime pointValue, SetPointSource source, Consumer savedCallback) { savePointValueSync(pointId, pointValue, source); + if(savedCallback != null) + savedCallback.accept(pointValue.getTime()); } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValues(int, long) - */ @Override public List getPointValues(int pointId, long since) { List pvts = new ArrayList<>(); @@ -82,9 +77,6 @@ public List getPointValues(int pointId, long since) { return pvts; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValuesBetween(int, long, long) - */ @Override public List getPointValuesBetween(int pointId, long from, long to) { List pvts = new ArrayList<>(); @@ -98,9 +90,6 @@ public List getPointValuesBetween(int pointId, long from, long t return pvts; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValuesBetween(int, long, long, int) - */ @Override public List getPointValuesBetween(int pointId, long from, long to, int limit) { @@ -117,9 +106,6 @@ public List getPointValuesBetween(int pointId, long from, long t return pvts; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getLatestPointValues(int, int) - */ @Override public List getLatestPointValues(int pointId, int limit) { List pvts = new ArrayList<>(); @@ -134,9 +120,6 @@ public List getLatestPointValues(int pointId, int limit) { return pvts; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getLatestPointValues(int, int, long) - */ @Override public List getLatestPointValues(int pointId, int limit, long before) { List pvts = new ArrayList<>(); @@ -152,10 +135,28 @@ public List getLatestPointValues(int pointId, int limit, long be } return pvts; } + + @Override + public void getLatestPointValues(int pointId, long before, Integer limit, final PVTQueryCallback callback) { + List existing = data.get(pointId); + int count = 0; + if(existing != null) { + for(int i=existing.size() -1; i>=0; i--) { + PointValueTime pvt = existing.get(i); + if(pvt.getTime() < before) { + try { + callback.row(pvt, count); + } catch (IOException e) { + break; + } + count++; + } + if(count >= limit) + break; + } + } + } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getLatestPointValue(int) - */ @Override public PointValueTime getLatestPointValue(int pointId) { List existing = data.get(pointId); @@ -165,9 +166,6 @@ public PointValueTime getLatestPointValue(int pointId) { return null; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValueBefore(int, long) - */ @Override public PointValueTime getPointValueBefore(int pointId, long time) { List existing = data.get(pointId); @@ -181,9 +179,6 @@ public PointValueTime getPointValueBefore(int pointId, long time) { return null; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValueAfter(int, long) - */ @Override public PointValueTime getPointValueAfter(int pointId, long time) { List existing = data.get(pointId); @@ -196,9 +191,6 @@ public PointValueTime getPointValueAfter(int pointId, long time) { return null; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValueAt(int, long) - */ @Override public PointValueTime getPointValueAt(int pointId, long time) { List existing = data.get(pointId); @@ -211,9 +203,6 @@ public PointValueTime getPointValueAt(int pointId, long time) { return null; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValuesBetween(int, long, long, com.serotonin.db.MappedRowCallback) - */ @Override public void getPointValuesBetween(int pointId, long from, long to, MappedRowCallback callback) { @@ -221,9 +210,6 @@ public void getPointValuesBetween(int pointId, long from, long to, } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValuesBetween(java.util.List, long, long, com.serotonin.db.MappedRowCallback) - */ @Override public void getPointValuesBetween(List pointIds, long from, long to, MappedRowCallback callback) { @@ -231,9 +217,6 @@ public void getPointValuesBetween(List pointIds, long from, long to, } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#wideQuery(int, long, long, com.serotonin.db.WideQueryCallback) - */ @Override public void wideQuery(int pointId, long from, long to, WideQueryCallback callback) { @@ -241,162 +224,108 @@ public void wideQuery(int pointId, long from, long to, } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deletePointValuesBetween(int, long, long) - */ @Override public long deletePointValuesBetween(int pointId, long startTime, long endTime) { // TODO Auto-generated method stub return 0; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deletePointValuesBefore(int, long) - */ @Override public long deletePointValuesBefore(int pointId, long time) { // TODO Auto-generated method stub return 0; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deletePointValuesBeforeWithoutCount(int, long) - */ @Override public boolean deletePointValuesBeforeWithoutCount(int pointId, long time) { // TODO Auto-generated method stub return false; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deletePointValues(int) - */ @Override public long deletePointValues(int pointId) { // TODO Auto-generated method stub return 0; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deletePointValuesWithoutCount(int) - */ @Override public boolean deletePointValuesWithoutCount(int pointId) { // TODO Auto-generated method stub return false; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deleteAllPointData() - */ @Override public long deleteAllPointData() { // TODO Auto-generated method stub return 0; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deleteAllPointDataWithoutCount() - */ @Override public void deleteAllPointDataWithoutCount() { // TODO Auto-generated method stub } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deleteOrphanedPointValues() - */ @Override public long deleteOrphanedPointValues() { // TODO Auto-generated method stub return 0; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deleteOrphanedPointValuesWithoutCount() - */ @Override public void deleteOrphanedPointValuesWithoutCount() { // TODO Auto-generated method stub } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deleteOrphanedPointValueAnnotations() - */ @Override public void deleteOrphanedPointValueAnnotations() { // TODO Auto-generated method stub } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#dateRangeCount(int, long, long) - */ @Override public long dateRangeCount(int pointId, long from, long to) { // TODO Auto-generated method stub return 0; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getInceptionDate(int) - */ @Override public long getInceptionDate(int pointId) { // TODO Auto-generated method stub return 0; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getStartTime(java.util.List) - */ @Override public long getStartTime(List pointIds) { // TODO Auto-generated method stub return 0; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getEndTime(java.util.List) - */ @Override public long getEndTime(List pointIds) { // TODO Auto-generated method stub return 0; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getStartAndEndTime(java.util.List) - */ @Override public LongPair getStartAndEndTime(List pointIds) { // TODO Auto-generated method stub return null; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getFiledataIds(int) - */ @Override public List getFiledataIds(int pointId) { // TODO Auto-generated method stub return null; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#updatePointValueAsync(int, com.serotonin.m2m2.rt.dataImage.PointValueTime, com.serotonin.m2m2.rt.dataImage.SetPointSource) - */ @Override public void updatePointValueAsync(int dataPointId, PointValueTime pvt, SetPointSource source) { // TODO Auto-generated method stub } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#updatePointValueSync(int, com.serotonin.m2m2.rt.dataImage.PointValueTime, com.serotonin.m2m2.rt.dataImage.SetPointSource) - */ @Override public PointValueTime updatePointValueSync(int dataPointId, PointValueTime pvt, SetPointSource source) { @@ -404,37 +333,24 @@ public PointValueTime updatePointValueSync(int dataPointId, PointValueTime pvt, return null; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deletePointValue(int, long) - */ @Override public long deletePointValue(int dataPointId, long ts) { // TODO Auto-generated method stub return 0; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#wideBookendQuery(java.util.List, long, long, java.lang.Integer, com.serotonin.db.WideQueryCallback) - */ @Override public void wideBookendQuery(List pointIds, long from, long to, boolean orderById, Integer limit, BookendQueryCallback callback) { // TODO Auto-generated method stub } - - /* - * (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getLatestPointValues(java.util.List, long, boolean, java.lang.Integer, com.infiniteautomation.mango.db.query.PVTQueryCallback) - */ + @Override public void getLatestPointValues(List ids, long before, boolean orderById, Integer limit, final PVTQueryCallback callback) { } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValuesBetween(java.util.List, long, long, boolean, java.lang.Integer, com.infiniteautomation.mango.db.query.PVTQueryCallback) - */ @Override public void getPointValuesBetween(List ids, long from, long to, boolean orderById, Integer limit, PVTQueryCallback callback) { diff --git a/Core/src-test/com/serotonin/m2m2/rt/dataImage/DataPointRTTest.java b/Core/src-test/com/serotonin/m2m2/rt/dataImage/DataPointRTTest.java index 742f86b0ee..1e30e894b3 100644 --- a/Core/src-test/com/serotonin/m2m2/rt/dataImage/DataPointRTTest.java +++ b/Core/src-test/com/serotonin/m2m2/rt/dataImage/DataPointRTTest.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import org.junit.Test; @@ -118,10 +119,7 @@ public void testIntervalOnChangeLogging() { //TODO Test for Historical Generation //TODO Test Quantized - - /* (non-Javadoc) - * @see com.serotonin.m2m2.MangoTestBase#getLifecycle() - */ + @Override protected MockMangoLifecycle getLifecycle() { return new DataPointRtMockMangoLifecycle(modules, enableH2Web, h2WebPort); @@ -139,9 +137,6 @@ public DataPointRtMockMangoLifecycle(List modules, boolean enableWebCons super(modules, enableWebConsole, webPort); } - /* (non-Javadoc) - * @see com.serotonin.m2m2.MockMangoLifecycle#getDatabaseProxy() - */ @Override protected H2InMemoryDatabaseProxy getDatabaseProxy() { return new DataPointRtMockDatabaseProxy(); @@ -150,35 +145,28 @@ protected H2InMemoryDatabaseProxy getDatabaseProxy() { class DataPointRtMockDatabaseProxy extends H2InMemoryDatabaseProxy { - /* (non-Javadoc) - * @see com.serotonin.m2m2.H2InMemoryDatabaseProxy#newPointValueDao() - */ @Override public PointValueDao newPointValueDao() { return new MockPointValueDao(); } } - private List values = new ArrayList(); + protected List values = new ArrayList(); class MockPointValueDao extends PointValueDaoSQL { - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDaoSQL#getLatestPointValue(int) - */ @Override public PointValueTime getLatestPointValue(int dataPointId) { return values.get(values.size() - 1); } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDaoSQL#savePointValueAsync(int, com.serotonin.m2m2.rt.dataImage.PointValueTime, com.serotonin.m2m2.rt.dataImage.SetPointSource) - */ @Override public void savePointValueAsync(int pointId, PointValueTime pointValue, - SetPointSource source) { + SetPointSource source, Consumer savedCallback) { values.add(pointValue); + if(savedCallback != null) + savedCallback.accept(pointValue.getTime()); } } diff --git a/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java b/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java new file mode 100644 index 0000000000..0824578004 --- /dev/null +++ b/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java @@ -0,0 +1,178 @@ +/** + * Copyright (C) 2019 Infinite Automation Software. All rights reserved. + */ +package com.serotonin.m2m2.rt.dataImage; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.serotonin.m2m2.Common; +import com.serotonin.m2m2.DataTypes; +import com.serotonin.m2m2.MangoTestBase; +import com.serotonin.m2m2.db.dao.DataPointDao; +import com.serotonin.m2m2.db.dao.DataSourceDao; +import com.serotonin.m2m2.db.dao.PointValueDao; +import com.serotonin.m2m2.rt.dataSource.MockPointLocatorRT; +import com.serotonin.m2m2.vo.DataPointVO; +import com.serotonin.m2m2.vo.dataPoint.MockPointLocatorVO; +import com.serotonin.m2m2.vo.dataSource.mock.MockDataSourceVO; + +/** + * @author Terry Packer + * + */ +public class PointValueCacheTest extends MangoTestBase { + + private int dataSourceId = Common.NEW_ID; + private int dataPointId = Common.NEW_ID; + private double currentValue; + private MockDataSourceVO dsVo; + private DataPointVO dpVo; + private MockPointLocatorVO plVo; + + + //TODO Test RT saves callback does prune list properly + //TODO Test synchronization + //TODO Test public void reset(long before) + //TODO Test public PointValueTime getLatestPointValue() + + @Test + public void testSaveCallbackPrune() { + //Insert some test data + List values = insertValues(5); + + //There should only be 5 values + dpVo.setDefaultCacheSize(10); + MockPointLocatorRT plRt = new MockPointLocatorRT(plVo); + DataPointRT rt = new DataPointRT(dpVo, plRt, dsVo, null, timer); + rt.initialize(); + + List latest = rt.getLatestPointValues(5); + assertEquals(5, latest.size()); + + //Check the order (cache is time descending) + for(int i=0; i<5; i++) { + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + + //Insert another 5 values + for(int i=0; i<5; i++) { + rt.updatePointValue(new PointValueTime(currentValue, timer.currentTimeMillis())); + timer.fastForwardTo(timer.currentTimeMillis() + 1000); + currentValue += 1.0d; + } + + latest = rt.getLatestPointValues(10); + assertEquals(10, latest.size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + } + + @Test + public void testCacheReset() { + //Insert some test data + List values = insertValues(5); + + //There should only be 5 values + PointValueCache cache = new PointValueCache(dataPointId, 10, null); + assertEquals(5, cache.getCacheContents().size()); + + //Check the order (cache is time descending) + for(int i=0; i<5; i++) { + assertEquals(values.get(i), cache.getCacheContents().get(cache.getCacheContents().size() - (i + 1))); + } + + //Insert another 5 values + values.addAll(insertValues(5)); + + //Expand cache by resetting it + cache.reset(); + + List latest = cache.getLatestPointValues(10); + assertEquals(10, cache.getCacheContents().size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i), cache.getCacheContents().get(cache.getCacheContents().size() - (i + 1))); + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + } + + + @Test + public void testCacheGetLatestPointValues() { + //Insert some test data + List values = insertValues(5); + + //There should only be 5 values + PointValueCache cache = new PointValueCache(dataPointId, 10, null); + assertEquals(5, cache.getCacheContents().size()); + + //Check the order (cache is time descending) + for(int i=0; i<5; i++) { + assertEquals(values.get(i), cache.getCacheContents().get(cache.getCacheContents().size() - (i + 1))); + } + + //Insert another 5 values + values.addAll(insertValues(5)); + + //TODO This revealed a bug? The cache is 5 and won't expand to 10... + //Expand cache + List latest = cache.getLatestPointValues(10); + assertEquals(10, cache.getCacheContents().size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i), cache.getCacheContents().get(cache.getCacheContents().size() - (i + 1))); + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + } + + @Before + public void beforePointValueCacheTest() { + //Create data source + dsVo = new MockDataSourceVO(); + dsVo.setXid("DS_MOCK_ME"); + dsVo.setName("data source"); + validate(dsVo); + DataSourceDao.getInstance().save(dsVo); + dataSourceId = dsVo.getId(); + + //Create a data point + plVo = new MockPointLocatorVO(DataTypes.NUMERIC, true); + dpVo = new DataPointVO(); + dpVo.setXid("DP_MOCK_ME"); + dpVo.setName("point"); + dpVo.setPointLocator(plVo); + dpVo.setDataSourceId(dataSourceId); + validate(dpVo); + DataPointDao.getInstance().save(dpVo); + dataPointId = dpVo.getId(); + + currentValue = 0; + } + + @After + public void afterPointValueCacheTest() { + if(dataPointId != Common.NEW_ID) + DataPointDao.getInstance().delete(dataPointId); + if(dataSourceId != Common.NEW_ID) + DataSourceDao.getInstance().delete(dataSourceId); + } + + private List insertValues(int count) { + PointValueDao dao = Common.databaseProxy.newPointValueDao(); + List values = new ArrayList<>(); + for(int i=0; i List doLimitQuery(DaoUtils dao, String sql, Object[] args, RowMapper rowMapper, int limit) { - if (limit > 0) - sql += " LIMIT " + limit; + sql = getLimitQuerySql(sql, limit); return dao.query(sql, args, rowMapper); } + + @Override + public String getLimitQuerySql(String sql, int limit) { + if (limit > 0) + return sql + " LIMIT " + limit; + return sql; + } @Override protected String getLimitDelete(String sql, int chunkSize) { diff --git a/Core/src/com/serotonin/m2m2/db/MSSQLProxy.java b/Core/src/com/serotonin/m2m2/db/MSSQLProxy.java index 8f8ff141bf..40038e89c4 100644 --- a/Core/src/com/serotonin/m2m2/db/MSSQLProxy.java +++ b/Core/src/com/serotonin/m2m2/db/MSSQLProxy.java @@ -63,7 +63,8 @@ public List doLimitQuery(DaoUtils dao, String sql, Object[] args, RowMapp return dao.query(getLimitQuerySql(sql, limit), args, rowMapper); } - private String getLimitQuerySql(String sql, int limit) { + @Override + public String getLimitQuerySql(String sql, int limit) { if (limit > 0) { if (sql.length() > 6 && sql.substring(0, 7).equalsIgnoreCase("select ")) sql = "select top " + limit + " " + sql.substring(7); diff --git a/Core/src/com/serotonin/m2m2/db/MySQLProxy.java b/Core/src/com/serotonin/m2m2/db/MySQLProxy.java index 033a9f2672..d95d18a4d0 100644 --- a/Core/src/com/serotonin/m2m2/db/MySQLProxy.java +++ b/Core/src/com/serotonin/m2m2/db/MySQLProxy.java @@ -82,7 +82,8 @@ public List doLimitQuery(DaoUtils dao, String sql, Object[] args, RowMapp return dao.query(getLimitQuerySql(sql, limit), args, rowMapper); } - private String getLimitQuerySql(String sql, int limit) { + @Override + public String getLimitQuerySql(String sql, int limit) { if (limit > 0) sql += " limit " + limit; return sql; diff --git a/Core/src/com/serotonin/m2m2/db/PostgresProxy.java b/Core/src/com/serotonin/m2m2/db/PostgresProxy.java index f659cdad31..956d4b9894 100644 --- a/Core/src/com/serotonin/m2m2/db/PostgresProxy.java +++ b/Core/src/com/serotonin/m2m2/db/PostgresProxy.java @@ -50,11 +50,17 @@ public boolean tableExists(ExtendedJdbcTemplate ejt, String tableName) { @Override public List doLimitQuery(DaoUtils dao, String sql, Object[] args, RowMapper rowMapper, int limit) { - if (limit > 0) - sql += " LIMIT " + limit; + sql = getLimitQuerySql(sql, limit); return dao.query(sql, args, rowMapper); } - + + @Override + public String getLimitQuerySql(String sql, int limit) { + if (limit > 0) + return sql + " LIMIT " + limit; + return sql; + } + @Override public String getTableListQuery() { return "SELECT table_name FROM information_schema.tables " diff --git a/Core/src/com/serotonin/m2m2/db/dao/EnhancedPointValueDao.java b/Core/src/com/serotonin/m2m2/db/dao/EnhancedPointValueDao.java index 132e4bcfe9..f8c5aaaf74 100644 --- a/Core/src/com/serotonin/m2m2/db/dao/EnhancedPointValueDao.java +++ b/Core/src/com/serotonin/m2m2/db/dao/EnhancedPointValueDao.java @@ -2,6 +2,7 @@ import java.util.List; import java.util.Map; +import java.util.function.Consumer; import com.serotonin.m2m2.rt.dataImage.PointValueTime; import com.serotonin.m2m2.rt.dataImage.SetPointSource; @@ -12,7 +13,7 @@ public interface EnhancedPointValueDao extends PointValueDao { public Map getLatestPointValuesForDataSource(DataSourceVO dataSource); public Map> getLatestPointValuesForDataSource(DataSourceVO dataSource, int numberOfValues); public PointValueTime savePointValueSync(DataPointVO pointVo, DataSourceVO dataSourceVo, PointValueTime pvt, SetPointSource source); - public void savePointValueAsync(DataPointVO pointVo, DataSourceVO dataSourceVo, PointValueTime pvt, SetPointSource source); + public void savePointValueAsync(DataPointVO pointVo, DataSourceVO dataSourceVo, PointValueTime pvt, SetPointSource source, Consumer savedCallback); public PointValueTime updatePointValueSync(DataPointVO pointVo, DataSourceVO dataSourceVo, PointValueTime pvt, SetPointSource source); public void updatePointValueAsync(DataPointVO pointVo, DataSourceVO dataSourceVo, PointValueTime pvt, SetPointSource source); } diff --git a/Core/src/com/serotonin/m2m2/db/dao/PointValueDao.java b/Core/src/com/serotonin/m2m2/db/dao/PointValueDao.java index 2b76efb8e8..6fd4da0e68 100644 --- a/Core/src/com/serotonin/m2m2/db/dao/PointValueDao.java +++ b/Core/src/com/serotonin/m2m2/db/dao/PointValueDao.java @@ -19,6 +19,7 @@ package com.serotonin.m2m2.db.dao; import java.util.List; +import java.util.function.Consumer; import com.infiniteautomation.mango.db.query.BookendQueryCallback; import com.infiniteautomation.mango.db.query.PVTQueryCallback; @@ -38,8 +39,13 @@ public interface PointValueDao { /** * Only the PointValueCache should call this method during runtime. Do not use. + * + * @param pointId + * @param pointValue + * @param source + * @param savedCallback - callback with timestamp of saved value */ - public void savePointValueAsync(int pointId, PointValueTime pointValue, SetPointSource source); + public void savePointValueAsync(int pointId, PointValueTime pointValue, SetPointSource source, Consumer savedCallback); /** * Get the point values >= since @@ -83,6 +89,15 @@ public interface PointValueDao { * @return */ public List getLatestPointValues(int pointId, int limit, long before); + + /** + * Get point values < before in reverse time order using a callback + * @param pointId + * @param before + * @param limit + * @param callback + */ + public void getLatestPointValues(int pointId, long before, Integer limit, final PVTQueryCallback callback); /** * Get point values < before in reverse time order diff --git a/Core/src/com/serotonin/m2m2/db/dao/PointValueDaoMetrics.java b/Core/src/com/serotonin/m2m2/db/dao/PointValueDaoMetrics.java index c69a524f0f..cf68958620 100644 --- a/Core/src/com/serotonin/m2m2/db/dao/PointValueDaoMetrics.java +++ b/Core/src/com/serotonin/m2m2/db/dao/PointValueDaoMetrics.java @@ -5,6 +5,7 @@ package com.serotonin.m2m2.db.dao; import java.util.List; +import java.util.function.Consumer; import com.infiniteautomation.mango.db.query.BookendQueryCallback; import com.infiniteautomation.mango.db.query.PVTQueryCallback; @@ -42,29 +43,20 @@ public PointValueDaoMetrics(PointValueDao dao){ public PointValueDao getBaseDao(){ return this.dao; } - - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#savePointValueSync(int, com.serotonin.m2m2.rt.dataImage.PointValueTime, com.serotonin.m2m2.rt.dataImage.SetPointSource) - */ + @Override public PointValueTime savePointValueSync(int pointId, PointValueTime pointValue, SetPointSource source) { return dao.savePointValueSync(pointId, pointValue, source); } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#savePointValueAsync(int, com.serotonin.m2m2.rt.dataImage.PointValueTime, com.serotonin.m2m2.rt.dataImage.SetPointSource) - */ @Override public void savePointValueAsync(int pointId, PointValueTime pointValue, - SetPointSource source) { - dao.savePointValueAsync(pointId, pointValue, source); + SetPointSource source, Consumer savedCallback) { + dao.savePointValueAsync(pointId, pointValue, source, savedCallback); } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValues(int, long) - */ @Override public List getPointValues(int pointId, long since) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -73,9 +65,6 @@ public List getPointValues(int pointId, long since) { return values; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValuesBetween(int, long, long) - */ @Override public List getPointValuesBetween(int pointId, long from, long to) { @@ -85,9 +74,6 @@ public List getPointValuesBetween(int pointId, long from, return values; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValuesBetween(int, long, long, limit) - */ @Override public List getPointValuesBetween(int pointId, long from, long to, int limit) { @@ -98,9 +84,6 @@ public List getPointValuesBetween(int pointId, long from, } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getLatestPointValues(int, int) - */ @Override public List getLatestPointValues(int pointId, int limit) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -109,9 +92,6 @@ public List getLatestPointValues(int pointId, int limit) { return values; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getLatestPointValues(int, int, long) - */ @Override public List getLatestPointValues(int pointId, int limit, long before) { @@ -121,19 +101,19 @@ public List getLatestPointValues(int pointId, int limit, return values; } - /* - * (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getLatestPointValues(java.util.List, java.lang.Integer, long, boolean, com.infiniteautomation.mango.db.query.PVTQueryCallback) - */ + @Override + public void getLatestPointValues(int pointId, long before, Integer limit, final PVTQueryCallback callback) { + LogStopWatch LogStopWatch = new LogStopWatch(); + dao.getLatestPointValues(pointId, before, limit, callback); + LogStopWatch.stop("getLatestPointValues(pointId,before,limit,callback) (" + pointId +", " + before + ", " + limit + ", callback)", this.metricsThreshold); + } + public void getLatestPointValues(List ids, long before, boolean orderById, Integer limit, PVTQueryCallback callback){ LogStopWatch LogStopWatch = new LogStopWatch(); dao.getLatestPointValues(ids, before, orderById, limit, callback); LogStopWatch.stop("getLatestPointValues(pointId,limit,before, orderById, callback) (" + ids +", " + limit + ", " + before + "," + orderById + ", callback)", this.metricsThreshold); } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getLatestPointValue(int) - */ @Override public PointValueTime getLatestPointValue(int pointId) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -142,9 +122,6 @@ public PointValueTime getLatestPointValue(int pointId) { return value; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValueBefore(int, long) - */ @Override public PointValueTime getPointValueBefore(int pointId, long time) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -153,9 +130,6 @@ public PointValueTime getPointValueBefore(int pointId, long time) { return value; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValueAfter(int, long) - */ @Override public PointValueTime getPointValueAfter(int pointId, long time) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -163,10 +137,7 @@ public PointValueTime getPointValueAfter(int pointId, long time) { LogStopWatch.stop("getPointValueAfter(pointId,time) (" + pointId + ", " + time + "){" + (value != null ? 1 : 0) + "}", this.metricsThreshold); return value; } - - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValueAt(int, long) - */ + @Override public PointValueTime getPointValueAt(int pointId, long time) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -175,9 +146,6 @@ public PointValueTime getPointValueAt(int pointId, long time) { return value; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValuesBetween(int, long, long, com.serotonin.db.MappedRowCallback) - */ @Override public void getPointValuesBetween(int pointId, long from, long to, MappedRowCallback callback) { @@ -186,9 +154,6 @@ public void getPointValuesBetween(int pointId, long from, long to, LogStopWatch.stop("getPointValuesBetween(pointId,from,to,callback) + (" + pointId + ", " + from + ", " + to + ", " + callback.toString() + ")", this.metricsThreshold); } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValuesBetween(java.util.List, long, long, com.serotonin.db.MappedRowCallback) - */ @Override public void getPointValuesBetween(List pointIds, long from, long to, MappedRowCallback callback) { @@ -205,9 +170,6 @@ public void getPointValuesBetween(List pointIds, long from, } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#wideQuery(int, long, long, com.serotonin.db.WideQueryCallback) - */ @Override public void wideQuery(int pointId, long from, long to, WideQueryCallback callback) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -215,9 +177,6 @@ public void wideQuery(int pointId, long from, long to, WideQueryCallback pointIds) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -370,9 +289,6 @@ public long getStartTime(List pointIds) { return result; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getEndTime(java.util.List) - */ @Override public long getEndTime(List pointIds) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -389,9 +305,6 @@ public long getEndTime(List pointIds) { return result; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getStartAndEndTime(java.util.List) - */ @Override public LongPair getStartAndEndTime(List pointIds) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -408,9 +321,6 @@ public LongPair getStartAndEndTime(List pointIds) { return result; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getFiledataIds(int) - */ @Override public List getFiledataIds(int pointId) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -419,9 +329,6 @@ public List getFiledataIds(int pointId) { return value; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#updatePointValueAsync(int, com.serotonin.m2m2.rt.dataImage.PointValueIdTime, com.serotonin.m2m2.rt.dataImage.SetPointSource) - */ @Override public void updatePointValueAsync(int id, PointValueTime pvt, SetPointSource source) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -429,9 +336,6 @@ public void updatePointValueAsync(int id, PointValueTime pvt, SetPointSource sou LogStopWatch.stop("updatePointValueAsync(id, ts, source) (" + id + ", pvt)", this.metricsThreshold); } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#updatePointValueSync(int, com.serotonin.m2m2.rt.dataImage.PointValueIdTime, com.serotonin.m2m2.rt.dataImage.SetPointSource) - */ @Override public PointValueTime updatePointValueSync(int dataPointId, PointValueTime pvt, SetPointSource source) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -440,10 +344,6 @@ public PointValueTime updatePointValueSync(int dataPointId, PointValueTime pvt, return value; } - - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#deletePointValue(int) - */ @Override public long deletePointValue(int dataPointId, long ts) { LogStopWatch LogStopWatch = new LogStopWatch(); @@ -452,9 +352,6 @@ public long deletePointValue(int dataPointId, long ts) { return value; } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#wideBookendQuery(java.util.List, long, long, java.lang.Integer, com.serotonin.db.WideQueryCallback) - */ @Override public void wideBookendQuery(List pointIds, long from, long to, boolean orderById, Integer limit, BookendQueryCallback callback) { @@ -463,9 +360,6 @@ public void wideBookendQuery(List pointIds, long from, long to, boolean logStopWatch.stop("wideBookendQuery(dataPointIds, from, to, orderById, limit, callback) + (" + pointIds + ", " + to + ", " + from + ", " + limit + "callback)", this.metricsThreshold); } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#getPointValuesBetween(java.util.List, long, long, boolean, java.lang.Integer, com.infiniteautomation.mango.db.query.PVTQueryCallback) - */ @Override public void getPointValuesBetween(List ids, long from, long to, boolean orderById, Integer limit, PVTQueryCallback callback) { diff --git a/Core/src/com/serotonin/m2m2/db/dao/PointValueDaoSQL.java b/Core/src/com/serotonin/m2m2/db/dao/PointValueDaoSQL.java index 66336aad23..e0cc61d1fd 100644 --- a/Core/src/com/serotonin/m2m2/db/dao/PointValueDaoSQL.java +++ b/Core/src/com/serotonin/m2m2/db/dao/PointValueDaoSQL.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.RejectedExecutionException; +import java.util.function.Consumer; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.logging.Log; @@ -84,11 +85,11 @@ public class PointValueDaoSQL extends BaseDao implements PointValueDao { + "(pointValueId, textPointValueShort, textPointValueLong, sourceMessage) values (?,?,?,?)"; /** - * Only the PointValueCache should call this method during runtime. Do not use. + * Only the PointValueCache should call this method during runtime. Only use if the data point is not running. */ @Override public PointValueTime savePointValueSync(int pointId, PointValueTime pointValue, SetPointSource source) { - long id = savePointValueImpl(pointId, pointValue, source, false); + long id = savePointValueImpl(pointId, pointValue, source, false, null); PointValueTime savedPointValue; int retries = 5; @@ -108,15 +109,15 @@ public PointValueTime savePointValueSync(int pointId, PointValueTime pointValue, } /** - * Only the PointValueCache should call this method during runtime. Do not use. + * Only the PointValueCache should call this method during runtime. Only use if the data point is not running. */ @Override - public void savePointValueAsync(int pointId, PointValueTime pointValue, SetPointSource source) { - savePointValueImpl(pointId, pointValue, source, true); + public void savePointValueAsync(int pointId, PointValueTime pointValue, SetPointSource source, Consumer savedCallback) { + savePointValueImpl(pointId, pointValue, source, true, savedCallback); } long savePointValueImpl(final int pointId, final PointValueTime pointValue, final SetPointSource source, - boolean async) { + boolean async, Consumer savedCallback) { DataValue value = pointValue.getValue(); final int dataType = DataTypes.getDataType(value); double dvalue = 0; @@ -138,12 +139,12 @@ else if (value.hasDoubleRepresentation()) try { if (svalue != null || source != null || dataType == DataTypes.IMAGE) async = false; - id = savePointValue(pointId, dataType, dvalue, pointValue.getTime(), svalue, source, async); + id = savePointValue(pointId, dataType, dvalue, pointValue.getTime(), svalue, source, async, savedCallback); } catch (ConcurrencyFailureException e) { // Still failed to insert after all of the retries. Store the data synchronized (UNSAVED_POINT_VALUES) { - UNSAVED_POINT_VALUES.add(new UnsavedPointValue(pointId, pointValue, source)); + UNSAVED_POINT_VALUES.add(new UnsavedPointValue(pointId, pointValue, source, savedCallback)); } return -1; } @@ -192,19 +193,19 @@ private void clearUnsavedPointValues() { synchronized (UNSAVED_POINT_VALUES) { while (!UNSAVED_POINT_VALUES.isEmpty()) { UnsavedPointValue data = UNSAVED_POINT_VALUES.remove(0); - savePointValueImpl(data.getPointId(), data.getPointValue(), data.getSource(), false); + savePointValueImpl(data.getPointId(), data.getPointValue(), data.getSource(), false, data.getSavedCallback()); } } } } long savePointValue(final int pointId, final int dataType, double dvalue, final long time, final String svalue, - final SetPointSource source, boolean async) { + final SetPointSource source, boolean async, Consumer savedCallback) { // Apply database specific bounds on double values. dvalue = Common.databaseProxy.applyBounds(dvalue); if (async) { - BatchWriteBehind.add(new BatchWriteBehindEntry(pointId, dataType, dvalue, time), ejt); + BatchWriteBehind.add(new BatchWriteBehindEntry(pointId, dataType, dvalue, time, savedCallback), ejt); return -1; } @@ -258,8 +259,8 @@ private long savePointValueImpl(int pointId, int dataType, double dvalue, long t private static List UNSAVED_POINT_UPDATES = new ArrayList(); private static final String POINT_VALUE_UPDATE = "UPDATE pointValues SET dataType=?, pointValue=? "; - private static final String POINT_VALUE_ANNOTATION_UPDATE = "UPDATE pointValueAnnotations SET" - + "textPointValueShort=?, textPointValueLong=?, sourceMessage=? "; + private static final String POINT_VALUE_ANNOTATION_UPDATE = "UPDATE pointValueAnnotations SET " + + "textPointValueShort=?, textPointValueLong=?, sourceMessage=? "; /** * Only the PointValueCache should call this method during runtime. Do not use. @@ -539,6 +540,12 @@ public List getLatestPointValues(int dataPointId, int limit, lon new Object[] { dataPointId, before }, limit); } + @Override + public void getLatestPointValues(int dataPointId, long before, Integer limit, final PVTQueryCallback callback) { + LatestSinglePointValuesPreparedStatementCreator stmt = new LatestSinglePointValuesPreparedStatementCreator(dataPointId, before, limit, callback); + ejt.execute(stmt, stmt); + } + private List pointValuesQuery(String sql, Object[] params, int limit) { return Common.databaseProxy.doLimitQuery(this, sql, params, new PointValueRowMapper(), limit); } @@ -548,7 +555,68 @@ private List pointValuesQuery(String sql, Object[] params, int l * * @author Terry Packer */ - class LatestSinglePointValuesPreparedStatementCreator implements PreparedStatementCreator, PreparedStatementCallback{ + class LatestSinglePointValuesPreparedStatementCreator implements PreparedStatementCreator, PreparedStatementCallback { + + final PointValueRowMapper mapper = new PointValueRowMapper(); + final int dataPointId; + final long before; + final Integer limit; + final PVTQueryCallback callback; + int counter; + + public LatestSinglePointValuesPreparedStatementCreator(Integer id, long before, Integer limit, PVTQueryCallback callback) { + this.dataPointId = id; + this.before = before; + this.limit = limit; + this.callback = callback; + this.counter = 0; + } + + @Override + public PreparedStatement createPreparedStatement(Connection con) + throws SQLException { + + Object[] args = new Object[] {dataPointId, before}; + String sql = POINT_VALUE_SELECT + " where pv.dataPointId=? and pv.ts{ final AnnotatedIdPointValueRowMapper mapper = new AnnotatedIdPointValueRowMapper(); @@ -558,7 +626,7 @@ class LatestSinglePointValuesPreparedStatementCreator implements PreparedStateme final PVTQueryCallback callback; final MutableInt counter; - public LatestSinglePointValuesPreparedStatementCreator(Integer id, long before, Integer limit, PVTQueryCallback callback, MutableInt counter) { + public LatestSingleIdPointValuesPreparedStatementCreator(Integer id, long before, Integer limit, PVTQueryCallback callback, MutableInt counter) { this.ids = new ArrayList(); this.ids.add(id); this.before = before; @@ -567,7 +635,7 @@ public LatestSinglePointValuesPreparedStatementCreator(Integer id, long before, this.counter = counter; } - public LatestSinglePointValuesPreparedStatementCreator(List ids, long before, Integer limit, PVTQueryCallback callback, MutableInt counter) { + public LatestSingleIdPointValuesPreparedStatementCreator(List ids, long before, Integer limit, PVTQueryCallback callback, MutableInt counter) { this.ids = ids; this.before = before; this.limit = limit; @@ -581,18 +649,15 @@ public PreparedStatement createPreparedStatement(Connection con) if(ids.size() != 1) throw new RuntimeException("Wrong base query."); - List args = new ArrayList<>(); + Object[] args = new Object[] {ids.get(0), before}; String sql = ANNOTATED_POINT_ID_VALUE_SELECT + " where pv.dataPointId = ? and pv.ts < ? order by pv.ts desc"; - args.add(ids.get(0)); - args.add(before); - + if(limit != null) { - sql += " limit ?"; - args.add(limit); + sql = Common.databaseProxy.getLimitQuerySql(sql, limit); } PreparedStatement stmt = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - ArgumentPreparedStatementSetter setter = new ArgumentPreparedStatementSetter(args.toArray(new Object[args.size()])); + ArgumentPreparedStatementSetter setter = new ArgumentPreparedStatementSetter(args); setter.setValues(stmt); return stmt; } @@ -625,7 +690,7 @@ public Integer doInPreparedStatement(PreparedStatement ps) * * @author Terry Packer */ - class LatestMultiplePointsValuesPreparedStatementCreator extends LatestSinglePointValuesPreparedStatementCreator{ + class LatestMultiplePointsValuesPreparedStatementCreator extends LatestSingleIdPointValuesPreparedStatementCreator{ final AnnotatedIdPointValueRowMapper mapper = new AnnotatedIdPointValueRowMapper(); @@ -640,19 +705,17 @@ public PreparedStatement createPreparedStatement(Connection con) if(ids.size() == 1) return super.createPreparedStatement(con); - - List args = new ArrayList<>(); + + Object[] args = new Object[] {before}; String dataPointIds = createDelimitedList(ids, ",", null); String sql = ANNOTATED_POINT_ID_VALUE_SELECT + " where pv.dataPointId in (" + dataPointIds + ") and pv.ts < ? order by pv.ts desc"; - args.add(before); if(limit != null) { - sql += " limit ?"; - args.add(limit); - } + sql = Common.databaseProxy.getLimitQuerySql(sql, limit); + } PreparedStatement stmt = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - ArgumentPreparedStatementSetter setter = new ArgumentPreparedStatementSetter(args.toArray(new Object[args.size()])); + ArgumentPreparedStatementSetter setter = new ArgumentPreparedStatementSetter(args); setter.setValues(stmt); return stmt; } @@ -670,7 +733,7 @@ public void getLatestPointValues(List ids, long before, boolean orderBy //Limit results of each data point to size limit, i.e. loop over all points and query with limit MutableInt counter = new MutableInt(0); for(Integer id: ids) { - LatestSinglePointValuesPreparedStatementCreator c = new LatestSinglePointValuesPreparedStatementCreator( + LatestSingleIdPointValuesPreparedStatementCreator c = new LatestSingleIdPointValuesPreparedStatementCreator( id, before, limit, callback, counter); ejt.execute(c,c); } @@ -827,9 +890,6 @@ public void getPointValuesBetween(List ids, long from, long to, boolean } } - /* (non-Javadoc) - * @see com.serotonin.m2m2.db.dao.PointValueDao#wideQuery(int, long, long, com.serotonin.db.WideQueryCallback) - */ @Override public void wideQuery(int pointId, long from, long to, WideQueryCallback callback) { // TODO Improve performance by using one statement and using the exceptions to cancel the results @@ -1305,11 +1365,13 @@ class UnsavedPointValue { private final int pointId; private final PointValueTime pointValue; private final SetPointSource source; - - public UnsavedPointValue(int pointId, PointValueTime pointValue, SetPointSource source) { + private final Consumer savedCallback; + + public UnsavedPointValue(int pointId, PointValueTime pointValue, SetPointSource source, Consumer savedCallback) { this.pointId = pointId; this.pointValue = pointValue; this.source = source; + this.savedCallback = savedCallback; } public int getPointId() { @@ -1323,6 +1385,10 @@ public PointValueTime getPointValue() { public SetPointSource getSource() { return source; } + + public Consumer getSavedCallback() { + return savedCallback; + } } /** @@ -1359,12 +1425,14 @@ class BatchWriteBehindEntry { private final int dataType; private final double dvalue; private final long time; + private final Consumer savedCallback; - public BatchWriteBehindEntry(int pointId, int dataType, double dvalue, long time) { + public BatchWriteBehindEntry(int pointId, int dataType, double dvalue, long time, Consumer savedCallback) { this.pointId = pointId; this.dataType = dataType; this.dvalue = dvalue; this.time = time; + this.savedCallback = savedCallback; } public void writeInto(Object[] params, int index) { @@ -1374,6 +1442,11 @@ public void writeInto(Object[] params, int index) { params[index++] = dvalue; params[index++] = time; } + + public void callback() { + if(savedCallback != null) + savedCallback.accept(time); + } } public static final String ENTRIES_MONITOR_ID = "com.serotonin.m2m2.db.dao.PointValueDao$BatchWriteBehind.ENTRIES_MONITOR"; @@ -1516,6 +1589,8 @@ public void execute() { ejt.update(sb.toString(), params); writesPerSecond.hitMultiple(inserts.length); BATCH_WRITE_SPEED_MONITOR.setValue(writesPerSecond.getEventCounts()[0] / 5); + for(BatchWriteBehindEntry entry : inserts) + entry.callback(); break; } catch (RuntimeException e) { diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/DataPointRT.java b/Core/src/com/serotonin/m2m2/rt/dataImage/DataPointRT.java index c359ef3d1e..64e213167c 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/DataPointRT.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/DataPointRT.java @@ -328,6 +328,7 @@ else if (backdated) case DataPointVO.LoggingTypes.INTERVAL: if (!backdated) intervalSave(newValue); + saveValue = true; //We want to save the intra-interval values in the cache default: logValue = false; } @@ -476,7 +477,7 @@ public void initializeIntervalLogging(long nextPollTime, boolean quantize) { if(averagingValues.size() > 0) { AnalogStatistics stats = new AnalogStatistics(intervalStartTime-loggingPeriodMillis, intervalStartTime, null, averagingValues); PointValueTime newValue = new PointValueTime(stats.getAverage(), intervalStartTime); - valueCache.logPointValueAsync(newValue, null); + valueCache.savePointValue(newValue, null, true, true); //Fire logged Events fireEvents(null, newValue, null, false, false, true, false, false); averagingValues.clear(); @@ -654,7 +655,7 @@ else if(vo.getPointLocator().getDataTypeId() == DataTypes.BINARY) if (value != null){ PointValueTime newValue = new PointValueTime(value, fireTime); - valueCache.logPointValueAsync(newValue, null); + valueCache.savePointValue(newValue, null, true, true); //Fire logged Events fireEvents(null, newValue, null, false, false, true, false, false); } @@ -941,23 +942,6 @@ public void terminateHistorical() { terminateIntervalLogging(); pointValue = valueCache.getLatestPointValue(); } - - - /** - * Update the value in the cache with the option to log to DB. - * - * This only updates an existing value - * - * Caution, this bypasses the Logging Settings - * - * @param newValue - * @param source - * @param logValue - * @param async - */ - public void updatePointValueInCache(PointValueTime newValue, SetPointSource source, boolean logValue, boolean async) { - valueCache.updatePointValue(newValue, source, logValue, async); - } /** * Get a copy of the current cache diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/EnhancedPointValueCache.java b/Core/src/com/serotonin/m2m2/rt/dataImage/EnhancedPointValueCache.java index e961eab508..be0c59fda1 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/EnhancedPointValueCache.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/EnhancedPointValueCache.java @@ -22,21 +22,11 @@ public EnhancedPointValueCache(DataPointVO dataPoint, DataSourceVO dataSource @Override void savePointValueAsync(PointValueTime pvt, SetPointSource source) { - enhancedDao.savePointValueAsync(dataPoint, dataSource, pvt, source); + enhancedDao.savePointValueAsync(dataPoint, dataSource, pvt, source, valueSavedCallback); } @Override PointValueTime savePointValueSync(PointValueTime pvt, SetPointSource source) { return enhancedDao.savePointValueSync(dataPoint, dataSource, pvt, source); } - - @Override - void updatePointValueAsync(PointValueTime pvt, SetPointSource source) { - enhancedDao.updatePointValueAsync(dataPoint, dataSource, pvt, source); - } - - @Override - PointValueTime updatePointValueSync(PointValueTime pvt, SetPointSource source) { - return enhancedDao.updatePointValueSync(dataPoint, dataSource, pvt, source); - } } diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/HistoricalDataPoint.java b/Core/src/com/serotonin/m2m2/rt/dataImage/HistoricalDataPoint.java index 0049fb92bf..7364eea20f 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/HistoricalDataPoint.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/HistoricalDataPoint.java @@ -47,7 +47,7 @@ public void setPointValue(PointValueTime newValue, SetPointSource source) { //throw new NotImplementedException(); DataPointRT dprt = Common.runtimeManager.getDataPoint(vo.getId()); if(dprt == null) //point isn't running, we can save the value through the DAO - pointValueDao.savePointValueAsync(vo.getId(), newValue, source); + pointValueDao.savePointValueAsync(vo.getId(), newValue, source, null); else //Give the point a chance to cache the new value dprt.savePointValueDirectToCache(newValue, source, true, true); } diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java index 462ac47767..394f7877be 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java @@ -7,19 +7,20 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.ListIterator; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import com.serotonin.m2m2.Common; import com.serotonin.m2m2.db.dao.PointValueDao; /** * This class maintains an ordered list of the most recent values for a data point. It will mirror values in the - * database, but provide a much faster lookup for a limited number of values. + * database, but provide a much faster lookup for a limited number of values. * - * Because there is not a significant performance problem for time-based lookups, they are not handled here, but rather - * are still handled by the database. + * This is also used as a store for values that are queued to be saved but may not be saved yet i.e. in a batch + * write queue allowing a place to access all saved values reliably. * - * @author Matthew Lohbihler + * @author Matthew Lohbihler, Terry Packer */ public class PointValueCache { private final int dataPointId; @@ -37,10 +38,38 @@ public class PointValueCache { * time, always use a local copy of the variable for read purposes. */ private List cache; + private final AtomicLong latestSavedValueTime; + protected final Consumer valueSavedCallback; + /** + * Create a cache for a data point. + * + * @param dataPointId + * @param defaultSize - the base size of the cache, it will never be smaller than this + * @param cache - initial cache, if null then the cache is populated to default size from the database contents + */ public PointValueCache(int dataPointId, int defaultSize, List cache) { this.dataPointId = dataPointId; this.defaultSize = defaultSize; + this.latestSavedValueTime = new AtomicLong(); + this.valueSavedCallback = (time) -> { + //Prune cache + //TODO Synchronization + latestSavedValueTime.set(time); + List c = this.cache; + if(c.size() > maxSize) { + List newCache = new ArrayList(c.size() + 1); + newCache.addAll(c); + + while (newCache.size() > maxSize) { + //If the value has not been saved we are not discarding any data + if(newCache.get(newCache.size() - 1).getTime() > latestSavedValueTime.get()) + break; + newCache.remove(newCache.size() - 1); + } + this.cache = newCache; + } + }; if (cache == null) { this.cache = new ArrayList<>(); @@ -54,76 +83,32 @@ public PointValueCache(int dataPointId, int defaultSize, List ca } else { this.cache = cache; } + + //Set our known state of saved values + if (cache.size() > 0) + this.latestSavedValueTime.set(cache.get(0).getTime()); + this.maxSize = defaultSize; } } - - void savePointValueAsync(PointValueTime pvt, SetPointSource source) { - dao.savePointValueAsync(dataPointId, pvt, source); - } - - PointValueTime savePointValueSync(PointValueTime pvt, SetPointSource source) { - return dao.savePointValueSync(dataPointId, pvt, source); - } - - void updatePointValueAsync(PointValueTime pvt, SetPointSource source) { - dao.updatePointValueAsync(dataPointId, pvt, source); - } - - PointValueTime updatePointValueSync(PointValueTime pvt, SetPointSource source) { - return dao.updatePointValueSync(dataPointId, pvt, source); - } /** - * Update a value in the system - * @param pvt - * @param source - * @param logValue - Store in DB and Cache or Just Cache - * @param async - * @return true if point value existed and was updated, false if value DNE in cache + * Save a point value, placing it in the cache. * + * @param pvt - value to save + * @param source - source to use for annotation + * @param logValue - save to database or just cache it + * @param async - queue this value into a batch for performance writes */ - public void updatePointValue(PointValueTime pvt, SetPointSource source, boolean logValue, boolean async){ - - if (logValue) { - if (async) - updatePointValueAsync(pvt, source); - else - pvt = updatePointValueSync(pvt, source); - } - - - //Update our point in the cache if it exists - List c = cache; - List newCache = new ArrayList(c.size() + 1); - newCache.addAll(c); - - // Insert the value in the cache. - if (newCache.size() == 0) - return; //Empty anyway - else { - ListIterator it = newCache.listIterator(); - while (it.hasNext()) { - PointValueTime cachedValue = it.next(); - if(cachedValue.getTime() == pvt.getTime()){ - it.set(pvt); //Replace it and break out - break; - } - } - } - - cache = newCache; - return; - } - public void savePointValue(PointValueTime pvt, SetPointSource source, boolean logValue, boolean async) { if (logValue) { - if (async) + if (async) { savePointValueAsync(pvt, source); - else + }else pvt = savePointValueSync(pvt, source); } + //TODO Synchronization List c = cache; List newCache = new ArrayList(c.size() + 1); newCache.addAll(c); @@ -133,27 +118,31 @@ public void savePointValue(PointValueTime pvt, SetPointSource source, boolean lo if (newCache.size() == 0) newCache.add(pvt); else { + //Find where in the cache we want to place the new value while (pos < newCache.size() && newCache.get(pos).getTime() > pvt.getTime()) pos++; - if (pos < maxSize) - newCache.add(pos, pvt); + newCache.add(pos, pvt); } // Check if we need to clean up the list - while (newCache.size() > maxSize) + while (newCache.size() > maxSize) { + //If the value has not been saved we are not discarding any data + if(newCache.get(newCache.size() - 1).getTime() > latestSavedValueTime.get()) + break; newCache.remove(newCache.size() - 1); + } cache = newCache; } - /** - * Saves the given value to the database without adding it to the cache. - */ - void logPointValueAsync(PointValueTime pointValue, SetPointSource source) { - // Save the new value and get a point value time back that has the id and annotations set, as appropriate. - savePointValueAsync(pointValue, source); + void savePointValueAsync(PointValueTime pvt, SetPointSource source) { + dao.savePointValueAsync(dataPointId, pvt, source, valueSavedCallback); } - + + PointValueTime savePointValueSync(PointValueTime pvt, SetPointSource source) { + return dao.savePointValueSync(dataPointId, pvt, source); + } + public PointValueTime getLatestPointValue() { if (maxSize == 0) refreshCache(1); @@ -192,15 +181,15 @@ private void refreshCache(int size) { List c = new ArrayList(); c.add(pvt); cache = c; + latestSavedValueTime.set(pvt.getTime()); } } else { - List ids = new ArrayList<>(); - ids.add(dataPointId); - List cc = new ArrayList<>(); - cc.addAll(cache); + List c = cache; + List cc = new ArrayList<>(c.size()); + cc.addAll(c); List nc = new ArrayList(size); - dao.getLatestPointValues(ids, Common.timer.currentTimeMillis() + 1, false, size, (value, index) -> { + dao.getLatestPointValues(dataPointId, Common.timer.currentTimeMillis() + 1, size, (value, index) -> { //Cache is in same order as rows if(nc.size() < size && cc.size() > 0 && cc.get(0).getTime() >= value.getTime()) { //The cached value is newer so add it @@ -217,6 +206,8 @@ private void refreshCache(int size) { } }); cache = nc; + if(nc.size() > 0) + latestSavedValueTime.set(nc.get(0).getTime()); } } } diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueEmporter.java b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueEmporter.java index c88df5b24f..fbd3b8cf6d 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueEmporter.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueEmporter.java @@ -264,9 +264,9 @@ protected void importRow(Row rowData) throws SpreadsheetException { else{ if (pointValueDao instanceof EnhancedPointValueDao) { DataSourceVO ds = getDataSource(dp.getDataSourceId()); - ((EnhancedPointValueDao) pointValueDao).savePointValueAsync(dp, ds, pvt,null); + ((EnhancedPointValueDao) pointValueDao).savePointValueAsync(dp, ds, pvt, null, null); } else { - pointValueDao.savePointValueAsync(dp.getId(),pvt,null); + pointValueDao.savePointValueAsync(dp.getId(), pvt, null, null); } } From bba45d3a2b862ae186eafda8cbd5b5e22d94db6f Mon Sep 17 00:00:00 2001 From: terrypacker Date: Tue, 14 May 2019 10:29:12 -1000 Subject: [PATCH 02/10] Adding synchronization and minor tweaks to cache from testing. --- .../rt/dataImage/PointValueCacheTest.java | 107 +++++++--- .../m2m2/rt/dataImage/DataPointRT.java | 8 +- .../m2m2/rt/dataImage/PointValueCache.java | 188 ++++++++++-------- 3 files changed, 191 insertions(+), 112 deletions(-) diff --git a/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java b/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java index 0824578004..575f368de3 100644 --- a/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java +++ b/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java @@ -35,6 +35,8 @@ public class PointValueCacheTest extends MangoTestBase { private MockDataSourceVO dsVo; private DataPointVO dpVo; private MockPointLocatorVO plVo; + private MockPointLocatorRT plRt; + private DataPointRT rt; //TODO Test RT saves callback does prune list properly @@ -44,14 +46,10 @@ public class PointValueCacheTest extends MangoTestBase { @Test public void testSaveCallbackPrune() { + setupRuntime(); + //Insert some test data - List values = insertValues(5); - - //There should only be 5 values - dpVo.setDefaultCacheSize(10); - MockPointLocatorRT plRt = new MockPointLocatorRT(plVo); - DataPointRT rt = new DataPointRT(dpVo, plRt, dsVo, null, timer); - rt.initialize(); + List values = insertValuesIntoRuntime(5); List latest = rt.getLatestPointValues(5); assertEquals(5, latest.size()); @@ -63,42 +61,60 @@ public void testSaveCallbackPrune() { //Insert another 5 values for(int i=0; i<5; i++) { - rt.updatePointValue(new PointValueTime(currentValue, timer.currentTimeMillis())); + PointValueTime pvt = new PointValueTime(currentValue, timer.currentTimeMillis()); + rt.updatePointValue(pvt); + values.add(pvt); timer.fastForwardTo(timer.currentTimeMillis() + 1000); currentValue += 1.0d; } + //TODO Sleep/wait/advance timer? + latest = rt.getLatestPointValues(10); assertEquals(10, latest.size()); for(int i=0; i<10; i++) { assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); } + + //Insert another 5 to see the cache > 10 and then trim back to 10 + for(int i=0; i<5; i++) { + PointValueTime pvt = new PointValueTime(currentValue, timer.currentTimeMillis()); + rt.updatePointValue(pvt); + values.add(pvt); + timer.fastForwardTo(timer.currentTimeMillis() + 1000); + currentValue += 1.0d; + } + List cacheCopy = rt.getCacheCopy(); + assertEquals(10, cacheCopy.size()); } @Test public void testCacheReset() { //Insert some test data - List values = insertValues(5); + List values = insertValuesIntoDatabase(5); + setupRuntime(); + //There should only be 5 values - PointValueCache cache = new PointValueCache(dataPointId, 10, null); - assertEquals(5, cache.getCacheContents().size()); + List cache = rt.getCacheCopy(); + assertEquals(5, cache.size()); //Check the order (cache is time descending) for(int i=0; i<5; i++) { - assertEquals(values.get(i), cache.getCacheContents().get(cache.getCacheContents().size() - (i + 1))); + assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); } //Insert another 5 values - values.addAll(insertValues(5)); + values.addAll(insertValuesIntoRuntime(5)); //Expand cache by resetting it - cache.reset(); + rt.resetValues(); - List latest = cache.getLatestPointValues(10); - assertEquals(10, cache.getCacheContents().size()); + List latest = rt.getLatestPointValues(10); + cache = rt.getCacheCopy(); + assertEquals(10, cache.size()); for(int i=0; i<10; i++) { - assertEquals(values.get(i), cache.getCacheContents().get(cache.getCacheContents().size() - (i + 1))); + assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); } } @@ -106,27 +122,31 @@ public void testCacheReset() { @Test public void testCacheGetLatestPointValues() { + + setupRuntime(); + //Insert some test data - List values = insertValues(5); + List values = insertValuesIntoRuntime(5); //There should only be 5 values - PointValueCache cache = new PointValueCache(dataPointId, 10, null); - assertEquals(5, cache.getCacheContents().size()); + List cache = rt.getCacheCopy(); + assertEquals(5, cache.size()); //Check the order (cache is time descending) for(int i=0; i<5; i++) { - assertEquals(values.get(i), cache.getCacheContents().get(cache.getCacheContents().size() - (i + 1))); + assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); } //Insert another 5 values - values.addAll(insertValues(5)); + values.addAll(insertValuesIntoRuntime(5)); //TODO This revealed a bug? The cache is 5 and won't expand to 10... //Expand cache - List latest = cache.getLatestPointValues(10); - assertEquals(10, cache.getCacheContents().size()); + List latest = rt.getLatestPointValues(10); + cache = rt.getCacheCopy(); + assertEquals(10, cache.size()); for(int i=0; i<10; i++) { - assertEquals(values.get(i), cache.getCacheContents().get(cache.getCacheContents().size() - (i + 1))); + assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); } } @@ -154,6 +174,17 @@ public void beforePointValueCacheTest() { currentValue = 0; } + + /** + * Setup the runtime, useful to fill database with values to load in cache on initialization of point + */ + private void setupRuntime() { + //There should only be 5 values + dpVo.setDefaultCacheSize(10); + plRt = new MockPointLocatorRT(plVo); + rt = new DataPointRT(dpVo, plRt, dsVo, null, timer); + rt.initialize(); + } @After public void afterPointValueCacheTest() { @@ -163,7 +194,31 @@ public void afterPointValueCacheTest() { DataSourceDao.getInstance().delete(dataSourceId); } - private List insertValues(int count) { + /** + * Insert values directly into the DataPointRT and its cache. Used to simulate a running point + * saving values. + * @param count + * @return + */ + private List insertValuesIntoRuntime(int count) { + List values = new ArrayList<>(); + for(int i=0; i insertValuesIntoDatabase(int count) { PointValueDao dao = Common.databaseProxy.newPointValueDao(); List values = new ArrayList<>(); for(int i=0; i getLatestPointValues(int limit) { + //TODO This can expand the cache, perhaps we want to not expand the cache and then fill to limit + // from the db. return valueCache.getLatestPointValues(limit); } @@ -948,10 +950,8 @@ public void terminateHistorical() { * @return */ public List getCacheCopy(){ - List copy = new ArrayList(this.valueCache.getCacheContents().size()); - for(PointValueTime pvt : this.valueCache.getCacheContents()) - copy.add(pvt); - return copy; + List c = this.valueCache.getCacheContents(); + return new ArrayList(c); } /** diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java index 394f7877be..bef9257fe0 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java @@ -23,6 +23,7 @@ * @author Matthew Lohbihler, Terry Packer */ public class PointValueCache { + private final Object lock = new Object(); private final int dataPointId; private final int defaultSize; private int maxSize = 0; @@ -53,22 +54,10 @@ public PointValueCache(int dataPointId, int defaultSize, List ca this.defaultSize = defaultSize; this.latestSavedValueTime = new AtomicLong(); this.valueSavedCallback = (time) -> { - //Prune cache - //TODO Synchronization - latestSavedValueTime.set(time); - List c = this.cache; - if(c.size() > maxSize) { - List newCache = new ArrayList(c.size() + 1); - newCache.addAll(c); - - while (newCache.size() > maxSize) { - //If the value has not been saved we are not discarding any data - if(newCache.get(newCache.size() - 1).getTime() > latestSavedValueTime.get()) - break; - newCache.remove(newCache.size() - 1); - } - this.cache = newCache; - } + //Track latest value's time that is saved to db + latestSavedValueTime.accumulateAndGet(time, (updated, current) ->{ + return updated > current ? updated : current; + }); }; if (cache == null) { @@ -108,31 +97,34 @@ public void savePointValue(PointValueTime pvt, SetPointSource source, boolean lo pvt = savePointValueSync(pvt, source); } - //TODO Synchronization - List c = cache; - List newCache = new ArrayList(c.size() + 1); - newCache.addAll(c); - - // Insert the value in the cache. - int pos = 0; - if (newCache.size() == 0) - newCache.add(pvt); - else { - //Find where in the cache we want to place the new value - while (pos < newCache.size() && newCache.get(pos).getTime() > pvt.getTime()) - pos++; - newCache.add(pos, pvt); - } - - // Check if we need to clean up the list - while (newCache.size() > maxSize) { - //If the value has not been saved we are not discarding any data - if(newCache.get(newCache.size() - 1).getTime() > latestSavedValueTime.get()) - break; - newCache.remove(newCache.size() - 1); + synchronized (lock) { + + List c = cache; + List newCache = new ArrayList(c.size() + 1); + newCache.addAll(c); + + // Insert the value in the cache. + int pos = 0; + if (newCache.size() == 0) + newCache.add(pvt); + else { + //Find where in the cache we want to place the new value + while (pos < newCache.size() && newCache.get(pos).getTime() > pvt.getTime()) + pos++; + if (pos < maxSize) + newCache.add(pos, pvt); + } + + // Check if we need to clean up the list (Probably don't need to do this here now that there is a callback) + while (newCache.size() > maxSize) { + //If the value has not been saved we are not discarding any data + if(newCache.get(newCache.size() - 1).getTime() > latestSavedValueTime.get()) + break; + newCache.remove(newCache.size() - 1); + } + + cache = newCache; } - - cache = newCache; } void savePointValueAsync(PointValueTime pvt, SetPointSource source) { @@ -172,42 +164,60 @@ public List getLatestPointValues(int limit) { * @param size */ private void refreshCache(int size) { + + //TODO Ensure we don't discard values that are not in the DB yet if (size > maxSize) { maxSize = size; if (size == 1) { // Performance thingy PointValueTime pvt = dao.getLatestPointValue(dataPointId); if (pvt != null) { - List c = new ArrayList(); - c.add(pvt); - cache = c; - latestSavedValueTime.set(pvt.getTime()); - } + synchronized(lock) { + List c = new ArrayList(); + c.add(pvt); + cache = c; + } + latestSavedValueTime.accumulateAndGet(pvt.getTime(), (updated, current) ->{ + return updated > current ? updated : current; + }); + }else + latestSavedValueTime.accumulateAndGet(Long.MIN_VALUE, (updated, current) ->{ + return updated > current ? updated : current; + }); } else { - List c = cache; - List cc = new ArrayList<>(c.size()); - cc.addAll(c); - List nc = new ArrayList(size); - dao.getLatestPointValues(dataPointId, Common.timer.currentTimeMillis() + 1, size, (value, index) -> { - //Cache is in same order as rows - if(nc.size() < size && cc.size() > 0 && cc.get(0).getTime() >= value.getTime()) { - //The cached value is newer so add it - while(nc.size() < size && cc.size() > 0 && cc.get(0).getTime() > value.getTime()) - nc.add(cc.remove(0)); - if(cc.size() > 0 && cc.get(0).getTime() == value.getTime()) - cc.remove(0); - if(nc.size() < size) - nc.add(value); - }else { - //Past cached value times - if(nc.size() < size) - nc.add(value); - } - }); - cache = nc; + List nc; + synchronized(lock) { + List c = cache; + List cc = new ArrayList<>(c.size()); + cc.addAll(c); + nc = new ArrayList(size); + dao.getLatestPointValues(dataPointId, Common.timer.currentTimeMillis() + 1, size, (value, index) -> { + //Cache is in same order as rows + if(nc.size() < size && cc.size() > 0 && cc.get(0).getTime() >= value.getTime()) { + //The cached value is newer so add it + while(nc.size() < size && cc.size() > 0 && cc.get(0).getTime() > value.getTime()) + nc.add(cc.remove(0)); + if(cc.size() > 0 && cc.get(0).getTime() == value.getTime()) + cc.remove(0); + if(nc.size() < size) + nc.add(value); + }else { + //Past cached value times + if(nc.size() < size) + nc.add(value); + } + }); + cache = nc; + } if(nc.size() > 0) - latestSavedValueTime.set(nc.get(0).getTime()); + latestSavedValueTime.accumulateAndGet(nc.get(0).getTime(), (updated, current) ->{ + return updated > current ? updated : current; + }); + else + latestSavedValueTime.accumulateAndGet(Long.MIN_VALUE, (updated, current) ->{ + return updated > current ? updated : current; + }); } } } @@ -219,25 +229,39 @@ public List getCacheContents() { return cache; } + /** + * Reset the cache to default size and reload data from the database + */ public void reset() { + //TODO Ensure we don't discard values that are not in the DB yet + List nc = dao.getLatestPointValues(dataPointId, defaultSize); - maxSize = defaultSize; - cache = nc; + synchronized(lock) { + maxSize = defaultSize; + cache = nc; + } } + /** + * Reset the cache to a + * @param before + */ public void reset(long before) { - List nc = new ArrayList(cache.size()); - nc.addAll(cache); - Iterator iter = nc.iterator(); - while(iter.hasNext()) - if(iter.next().getTime() < before) - iter.remove(); - - if(nc.size() < defaultSize) { - maxSize = 0; - cache = nc; - refreshCache(defaultSize); - } else - cache = nc; + //TODO Ensure we don't discard values that are not in the DB yet + synchronized(lock) { + List nc = new ArrayList(cache.size()); + nc.addAll(cache); + Iterator iter = nc.iterator(); + while(iter.hasNext()) + if(iter.next().getTime() < before) + iter.remove(); + + if(nc.size() < defaultSize) { + maxSize = 0; + cache = nc; + refreshCache(defaultSize); + } else + cache = nc; + } } } From f5c629725354e336453f5551d96374f5875333ce Mon Sep 17 00:00:00 2001 From: terrypacker Date: Tue, 14 May 2019 15:43:27 -1000 Subject: [PATCH 03/10] Adding in unsaved value retention for PointValueCache --- .../rt/dataImage/PointValueCacheTest.java | 43 ++++---- .../m2m2/rt/dataImage/PointValueCache.java | 99 +++++++++++-------- 2 files changed, 84 insertions(+), 58 deletions(-) diff --git a/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java b/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java index 575f368de3..334c49759e 100644 --- a/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java +++ b/Core/src-test/com/serotonin/m2m2/rt/dataImage/PointValueCacheTest.java @@ -4,6 +4,7 @@ package com.serotonin.m2m2.rt.dataImage; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.List; @@ -38,14 +39,14 @@ public class PointValueCacheTest extends MangoTestBase { private MockPointLocatorRT plRt; private DataPointRT rt; - - //TODO Test RT saves callback does prune list properly //TODO Test synchronization //TODO Test public void reset(long before) //TODO Test public PointValueTime getLatestPointValue() + //TODO Test insert backdated values + @Test - public void testSaveCallbackPrune() { + public void testSaveCallbackPrune() throws InterruptedException { setupRuntime(); //Insert some test data @@ -60,15 +61,7 @@ public void testSaveCallbackPrune() { } //Insert another 5 values - for(int i=0; i<5; i++) { - PointValueTime pvt = new PointValueTime(currentValue, timer.currentTimeMillis()); - rt.updatePointValue(pvt); - values.add(pvt); - timer.fastForwardTo(timer.currentTimeMillis() + 1000); - currentValue += 1.0d; - } - - //TODO Sleep/wait/advance timer? + values.addAll(insertValuesIntoRuntime(5)); latest = rt.getLatestPointValues(10); assertEquals(10, latest.size()); @@ -77,15 +70,28 @@ public void testSaveCallbackPrune() { } //Insert another 5 to see the cache > 10 and then trim back to 10 - for(int i=0; i<5; i++) { - PointValueTime pvt = new PointValueTime(currentValue, timer.currentTimeMillis()); - rt.updatePointValue(pvt); - values.add(pvt); - timer.fastForwardTo(timer.currentTimeMillis() + 1000); - currentValue += 1.0d; + values.addAll(insertValuesIntoRuntime(5)); + + int retries = 10; + while(retries > 0) { + //Force refresh so that all saved values get cleared out + rt.resetValues(); + List cacheCopy = rt.getCacheCopy(); + if(cacheCopy.size() == 10) + break; + Thread.sleep(100); + retries--; } + if(retries == 0) + fail("Didn't recieve all values into cache. Cache size is " + rt.getCacheCopy().size()); + + //assert cache contents, there will have been 15 inserted and we will compare the latest 10 which should be + // in the cache List cacheCopy = rt.getCacheCopy(); assertEquals(10, cacheCopy.size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i + 5), cacheCopy.get(cacheCopy.size() - (i + 1))); + } } @Test @@ -179,7 +185,6 @@ public void beforePointValueCacheTest() { * Setup the runtime, useful to fill database with values to load in cache on initialization of point */ private void setupRuntime() { - //There should only be 5 values dpVo.setDefaultCacheSize(10); plRt = new MockPointLocatorRT(plVo); rt = new DataPointRT(dpVo, plRt, dsVo, null, timer); diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java index bef9257fe0..ea838b3c09 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java @@ -16,6 +16,8 @@ /** * This class maintains an ordered list of the most recent values for a data point. It will mirror values in the * database, but provide a much faster lookup for a limited number of values. + * + * The list is in time descending order with the latest value for a point at position 0 * * This is also used as a store for values that are queued to be saved but may not be saved yet i.e. in a batch * write queue allowing a place to access all saved values reliably. @@ -104,10 +106,10 @@ public void savePointValue(PointValueTime pvt, SetPointSource source, boolean lo newCache.addAll(c); // Insert the value in the cache. - int pos = 0; if (newCache.size() == 0) newCache.add(pvt); else { + int pos = 0; //Find where in the cache we want to place the new value while (pos < newCache.size() && newCache.get(pos).getTime() > pvt.getTime()) pos++; @@ -115,13 +117,7 @@ public void savePointValue(PointValueTime pvt, SetPointSource source, boolean lo newCache.add(pos, pvt); } - // Check if we need to clean up the list (Probably don't need to do this here now that there is a callback) - while (newCache.size() > maxSize) { - //If the value has not been saved we are not discarding any data - if(newCache.get(newCache.size() - 1).getTime() > latestSavedValueTime.get()) - break; - newCache.remove(newCache.size() - 1); - } + safelyTrim(newCache); cache = newCache; } @@ -164,8 +160,6 @@ public List getLatestPointValues(int limit) { * @param size */ private void refreshCache(int size) { - - //TODO Ensure we don't discard values that are not in the DB yet if (size > maxSize) { maxSize = size; if (size == 1) { @@ -173,33 +167,37 @@ private void refreshCache(int size) { PointValueTime pvt = dao.getLatestPointValue(dataPointId); if (pvt != null) { synchronized(lock) { - List c = new ArrayList(); - c.add(pvt); + List c = new ArrayList(cache); + safelyTrim(c); + int pos = 0; + //Find where in the cache we want to place the new value + while (pos < c.size() && c.get(pos).getTime() > pvt.getTime()) + pos++; + if (pos < maxSize) + c.add(pos, pvt); cache = c; } - latestSavedValueTime.accumulateAndGet(pvt.getTime(), (updated, current) ->{ - return updated > current ? updated : current; - }); - }else - latestSavedValueTime.accumulateAndGet(Long.MIN_VALUE, (updated, current) ->{ - return updated > current ? updated : current; - }); + } } else { List nc; synchronized(lock) { - List c = cache; - List cc = new ArrayList<>(c.size()); - cc.addAll(c); + List cc = new ArrayList<>(cache.size()); + cc.addAll(cache); nc = new ArrayList(size); dao.getLatestPointValues(dataPointId, Common.timer.currentTimeMillis() + 1, size, (value, index) -> { //Cache is in same order as rows if(nc.size() < size && cc.size() > 0 && cc.get(0).getTime() >= value.getTime()) { - //The cached value is newer so add it + + //The cached value is newer retain it (this should ensure we keep all the values that haven't been written to the db) while(nc.size() < size && cc.size() > 0 && cc.get(0).getTime() > value.getTime()) nc.add(cc.remove(0)); + + //should we replace this value? if(cc.size() > 0 && cc.get(0).getTime() == value.getTime()) cc.remove(0); + + //add the value from the database if it doesn't make us too big if(nc.size() < size) nc.add(value); }else { @@ -210,14 +208,6 @@ private void refreshCache(int size) { }); cache = nc; } - if(nc.size() > 0) - latestSavedValueTime.accumulateAndGet(nc.get(0).getTime(), (updated, current) ->{ - return updated > current ? updated : current; - }); - else - latestSavedValueTime.accumulateAndGet(Long.MIN_VALUE, (updated, current) ->{ - return updated > current ? updated : current; - }); } } } @@ -230,38 +220,69 @@ public List getCacheContents() { } /** - * Reset the cache to default size and reload data from the database + * Reset the cache. This is used during a point purge to reset our state to + * the database which was just fully purged. Point values may be streaming in during + * this time so we cannot assume the database is empty. */ public void reset() { - //TODO Ensure we don't discard values that are not in the DB yet - List nc = dao.getLatestPointValues(dataPointId, defaultSize); synchronized(lock) { maxSize = defaultSize; + long latestTime = nc.size() > 0 ? nc.get(0).getTime() : Long.MIN_VALUE; + + //fill in backwards + for(int i=cache.size()-1; i>=0; i--) + if(cache.get(i).getTime() > latestTime) + nc.add(0, cache.get(i)); + + safelyTrim(nc); + //Reset our latest time, + latestSavedValueTime.accumulateAndGet(nc.size() > 0 ? nc.get(0).getTime() : Long.MIN_VALUE, (updated, current) ->{ + return updated > current ? updated : current; + }); cache = nc; } } /** - * Reset the cache to a + * Reset the cache by dropping values before 'before', this is used during a point purge. * @param before */ public void reset(long before) { - //TODO Ensure we don't discard values that are not in the DB yet synchronized(lock) { List nc = new ArrayList(cache.size()); nc.addAll(cache); Iterator iter = nc.iterator(); - while(iter.hasNext()) - if(iter.next().getTime() < before) + while(iter.hasNext()) { + long time = iter.next().getTime(); + //Remove old values that have been saved + if(time < before && time <= latestSavedValueTime.get()) iter.remove(); + } if(nc.size() < defaultSize) { maxSize = 0; cache = nc; refreshCache(defaultSize); - } else + } else { + safelyTrim(nc); cache = nc; + } + //TODO Reset our latestSavedValueTime? + } + } + + /** + * Trim a cache and make sure to not lose values that have not been saved. + * @param toTrim + */ + private void safelyTrim(List toTrim) { + while (toTrim.size() > maxSize) { + //If the value has not been saved we will not trim it + // and since we are in time order the remaining values will not have been saved either + if(toTrim.get(toTrim.size() - 1).getTime() > latestSavedValueTime.get()) + break; + toTrim.remove(toTrim.size() - 1); } } } From 25e4c44ede7f7416dc4d93f9fc79930e63a46509 Mon Sep 17 00:00:00 2001 From: terrypacker Date: Wed, 15 May 2019 14:36:52 -1000 Subject: [PATCH 04/10] Fixes from new PointValueCache and DataPointRT query tests --- .../m2m2/H2InMemoryDatabaseProxy.java | 21 + .../AbstractPointValueCacheTestBase.java | 141 ++++++ .../rt/dataImage/PointValueCacheTest.java | 439 +++++++++++++----- .../rt/dataImage/TestDataPointRTQueries.java | 177 +++++++ .../m2m2/rt/dataImage/DataPointRT.java | 30 +- .../rt/dataImage/EnhancedPointValueCache.java | 5 +- .../rt/dataImage/IDataPointValueSource.java | 35 ++ .../m2m2/rt/dataImage/PointValueCache.java | 87 +++- .../m2m2/rt/dataImage/PointValueFacade.java | 5 + .../m2m2/rt/script/AbstractPointWrapper.java | 4 +- 10 files changed, 780 insertions(+), 164 deletions(-) create mode 100644 Core/src-test/com/serotonin/m2m2/rt/dataImage/AbstractPointValueCacheTestBase.java create mode 100644 Core/src-test/com/serotonin/m2m2/rt/dataImage/TestDataPointRTQueries.java diff --git a/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java b/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java index daba41ecaf..51a96b3454 100644 --- a/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java +++ b/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java @@ -4,6 +4,8 @@ */ package com.serotonin.m2m2; +import static org.junit.Assert.fail; + import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -27,6 +29,7 @@ import org.springframework.jdbc.datasource.DataSourceUtils; import org.springframework.transaction.PlatformTransactionManager; +import com.infiniteautomation.mango.monitor.IntegerMonitor; import com.serotonin.ShouldNeverHappenException; import com.serotonin.db.DaoUtils; import com.serotonin.db.spring.ConnectionCallbackVoid; @@ -446,10 +449,28 @@ public NoSQLProxy getNoSQLProxy() { */ public void clean() throws Exception { + //If the SQL Batch writer is executing we need to allow it to finish + if(initialized && noSQLProxy == null) { + IntegerMonitor m = (IntegerMonitor)Common.MONITORED_VALUES.getValueMonitor(PointValueDaoSQL.INSTANCES_MONITOR_ID); + if(m != null) { + int retries = 100; + while(retries > 0) { + if(m.getValue() == 0) + break; + retries--; + Thread.sleep(100); + } + if(retries == 0) + fail("SQL Batch write behind still running."); + } + } + ExtendedJdbcTemplate ejt = new ExtendedJdbcTemplate(); ejt.setDataSource(getDataSource()); + //Drop everything runScript(new String[] {"DROP ALL OBJECTS;"}, null); + //Create database runScript(this.getClass().getResourceAsStream("/createTables-" + getType().name() + ".sql"), null); for (DatabaseSchemaDefinition def : ModuleRegistry.getDefinitions(DatabaseSchemaDefinition.class)) diff --git a/Core/src-test/com/serotonin/m2m2/rt/dataImage/AbstractPointValueCacheTestBase.java b/Core/src-test/com/serotonin/m2m2/rt/dataImage/AbstractPointValueCacheTestBase.java new file mode 100644 index 0000000000..db963150a3 --- /dev/null +++ b/Core/src-test/com/serotonin/m2m2/rt/dataImage/AbstractPointValueCacheTestBase.java @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2019 Infinite Automation Software. All rights reserved. + */ +package com.serotonin.m2m2.rt.dataImage; + +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Before; + +import com.serotonin.m2m2.Common; +import com.serotonin.m2m2.DataTypes; +import com.serotonin.m2m2.MangoTestBase; +import com.serotonin.m2m2.db.dao.DataPointDao; +import com.serotonin.m2m2.db.dao.DataSourceDao; +import com.serotonin.m2m2.db.dao.PointValueDao; +import com.serotonin.m2m2.rt.dataSource.MockPointLocatorRT; +import com.serotonin.m2m2.vo.DataPointVO; +import com.serotonin.m2m2.vo.dataPoint.MockPointLocatorVO; +import com.serotonin.m2m2.vo.dataSource.mock.MockDataSourceVO; + +/** + * Base class for testing the point value cache + * @author Terry Packer + * + */ +public class AbstractPointValueCacheTestBase extends MangoTestBase { + + protected int dataSourceId = Common.NEW_ID; + protected int dataPointId = Common.NEW_ID; + protected double currentValue; + protected MockDataSourceVO dsVo; + protected DataPointVO dpVo; + protected MockPointLocatorVO plVo; + protected MockPointLocatorRT plRt; + protected DataPointRT rt; + + @Before + public void beforePointValueCacheTest() { + //Create data source + dsVo = new MockDataSourceVO(); + dsVo.setXid("DS_MOCK_ME"); + dsVo.setName("data source"); + validate(dsVo); + DataSourceDao.getInstance().save(dsVo); + dataSourceId = dsVo.getId(); + + //Create a data point + plVo = new MockPointLocatorVO(DataTypes.NUMERIC, true); + dpVo = new DataPointVO(); + dpVo.setXid("DP_MOCK_ME"); + dpVo.setName("point"); + dpVo.setPointLocator(plVo); + dpVo.setDataSourceId(dataSourceId); + validate(dpVo); + DataPointDao.getInstance().save(dpVo); + dataPointId = dpVo.getId(); + + currentValue = 0; + } + + /** + * Setup the runtime, useful to fill database with values to load in cache on initialization of point + */ + protected void setupRuntime(int cacheSize) { + dpVo.setDefaultCacheSize(cacheSize); + plRt = new MockPointLocatorRT(plVo); + rt = new DataPointRT(dpVo, plRt, dsVo, null, timer); + rt.initialize(); + } + + @After + public void afterPointValueCacheTest() { + if(dataPointId != Common.NEW_ID) + DataPointDao.getInstance().delete(dataPointId); + if(dataSourceId != Common.NEW_ID) + DataSourceDao.getInstance().delete(dataSourceId); + + } + + /** + * Wait for values to be available in the database, wait retries x 100 ms + * @param count + * @param retries + * @throws InterruptedException + */ + protected void ensureValuesInDatabase(int count, int retries) { + PointValueDao dao = Common.databaseProxy.newPointValueDao(); + int trial = retries; + while(trial >= 0) { + if(dao.getLatestPointValues(dataPointId, count).size() >= count) + return; + try{ Thread.sleep(100); }catch(InterruptedException e) { } + trial--; + } + if(trial == 0) + fail("Not enough values in database."); + } + + /** + * Insert values directly into the DataPointRT and its cache. Used to simulate a running point + * saving values via the BWB. Expect a delay between this call returning and values being + * query-able in the database. + * @param count + * @return + */ + protected List insertValuesIntoRuntime(int count) { + List values = new ArrayList<>(); + for(int i=0; i insertValuesIntoDatabase(int count) { + PointValueDao dao = Common.databaseProxy.newPointValueDao(); + List values = new ArrayList<>(); + for(int i=0; i values = insertValuesIntoRuntime(10); + List latest = rt.getLatestPointValues(10); + assertEquals(10, latest.size()); + //Check the order (cache is time descending) + for(int i=0; i<10; i++) { + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + + //Insert values in the past + PointValueTime value = new PointValueTime(0.0d, 0); + rt.updatePointValue(value); + values.add(0, value); + currentValue++; + + value = new PointValueTime(1.0d, 1000); + rt.updatePointValue(value); + values.add(0, value); + currentValue++; + + value = new PointValueTime(2.0d, 2000); + rt.updatePointValue(value); + values.add(0, value); + currentValue++; + + value = new PointValueTime(3.0d, 3000); + rt.updatePointValue(value); + values.add(0, value); + currentValue++; + + value = new PointValueTime(4.0d, 4000); + rt.updatePointValue(value); + values.add(0, value); + currentValue++; + + //Validate that the cache does not contain these values, the cache should be size 10 + latest = rt.getLatestPointValues(10); + List cache = rt.getCacheCopy(); + assertEquals(10, latest.size()); + assertEquals(10, cache.size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i + 5), cache.get(cache.size() - (i + 1))); + assertEquals(values.get(i + 5), latest.get(latest.size() - (i + 1))); + } + + //Reset the cache and see that is has the latest 10 values + rt.resetValues(); + latest = rt.getLatestPointValues(10); + cache = rt.getCacheCopy(); + assertEquals(10, latest.size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i + 5), cache.get(cache.size() - (i + 1))); + assertEquals(values.get(i + 5), latest.get(latest.size() - (i + 1))); + } + + //Test getLatestPointValue via the reset + assertEquals(values.get(values.size() - 1), rt.getPointValue()); + } - private int dataSourceId = Common.NEW_ID; - private int dataPointId = Common.NEW_ID; - private double currentValue; - private MockDataSourceVO dsVo; - private DataPointVO dpVo; - private MockPointLocatorVO plVo; - private MockPointLocatorRT plRt; - private DataPointRT rt; + @Test + public void testFutureDates() { + setupRuntime(10); + + List values = insertValuesIntoRuntime(10); + List latest = rt.getLatestPointValues(10); + assertEquals(10, latest.size()); + //Check the order (cache is time descending) + for(int i=0; i<10; i++) { + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + + //Insert values in the future + PointValueTime value = new PointValueTime(currentValue, timer.currentTimeMillis()); + rt.updatePointValue(value); + values.add(value); + currentValue++; + + value = new PointValueTime(currentValue, timer.currentTimeMillis() + 1000); + rt.updatePointValue(value); + values.add(value); + currentValue++; + + value = new PointValueTime(currentValue, timer.currentTimeMillis() + 2000); + rt.updatePointValue(value); + values.add(value); + currentValue++; + + value = new PointValueTime(currentValue, timer.currentTimeMillis() + 3000); + rt.updatePointValue(value); + values.add(value); + currentValue++; + + value = new PointValueTime(currentValue, timer.currentTimeMillis() + 4000); + rt.updatePointValue(value); + values.add(value); + currentValue++; + + //Validate that the cache contains these values, note the cache copy may be larger than 10 here + latest = rt.getLatestPointValues(10); + List cache = rt.getCacheCopy(); + int cacheOffset = cache.size() - latest.size(); + assertEquals(10, latest.size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i + 5), cache.get(cache.size() - (i + 1 + cacheOffset))); + assertEquals(values.get(i + 5), latest.get(latest.size() - (i + 1))); + } + + //Reset the cache and see that is has the latest 10 values + rt.resetValues(); + latest = rt.getLatestPointValues(10); + cache = rt.getCacheCopy(); + assertEquals(10, latest.size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i + 5), cache.get(cache.size() - (i + 1))); + assertEquals(values.get(i + 5), latest.get(latest.size() - (i + 1))); + } + + //Test getLatestPointValue via the reset + assertEquals(values.get(values.size() - 1), rt.getPointValue()); + + } + + @Test + public void testResetBefore() { + //Insert some test data + List values = insertValuesIntoDatabase(5); - //TODO Test synchronization - //TODO Test public void reset(long before) - //TODO Test public PointValueTime getLatestPointValue() - //TODO Test insert backdated values + setupRuntime(10); + + //There should only be 5 values + List cache = rt.getCacheCopy(); + assertEquals(5, cache.size()); + + //Check the order (cache is time descending) + for(int i=0; i<5; i++) { + assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); + } + + //Insert another 10 values + values.addAll(insertValuesIntoRuntime(5)); + + //Reset cache, this will expand it to max + rt.resetValues((1000 * 3) + 1); + ensureValuesInDatabase(10, 5); + + //Confirm cache is expanded and correct + cache = rt.getCacheCopy(); + assertEquals(10, cache.size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); + } + + //Get all cached values + List latest = rt.getLatestPointValues(10); + cache = rt.getCacheCopy(); + assertEquals(10, cache.size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + + //Test getLatestPointValue via the reset + assertEquals(values.get(values.size() - 1), rt.getPointValue()); + + //Totally reset the cache + rt.resetValues((values.size() * 1000) + 1); + //Confirm cache is expanded and correct + cache = rt.getCacheCopy(); + assertEquals(10, cache.size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); + } + + //Reset the cache in a way that no values are removed + rt.resetValues(0); + //Confirm cache is expanded and correct + cache = rt.getCacheCopy(); + assertEquals(10, cache.size()); + for(int i=0; i<10; i++) { + assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); + } + + //Test getLatestPointValue via the reset + assertEquals(values.get(values.size() - 1), rt.getPointValue()); + } + + @Test + public void testLatestPointValuesSynchronization() { + setupRuntime(1); + List values = insertValuesIntoRuntime(10); + List latest = rt.getLatestPointValues(10); + assertEquals(10, latest.size()); + //Check the order (cache is time descending) + for(int i=0; i<10; i++) { + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + + ensureValuesInDatabase(10, 5); + rt.resetValues(); + + //Try expanding the cache + latest = rt.getLatestPointValues(10); + assertEquals(10, latest.size()); + + //Test getLatestPointValue via the reset + assertEquals(values.get(values.size() - 1), rt.getPointValue()); + } + + @Test + public void testCacheSize1() { + setupRuntime(1); + + List values = insertValuesIntoRuntime(10); + List latest = rt.getLatestPointValues(10); + assertEquals(10, latest.size()); + //Check the order (cache is time descending) + for(int i=0; i<10; i++) { + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + + ensureValuesInDatabase(10, 5); + rt.resetValues(); + + latest = rt.getLatestPointValues(1); + assertEquals(1, latest.size()); + assertEquals(values.get(9), latest.get(0)); + + //Try expanding the cache + latest = rt.getLatestPointValues(10); + assertEquals(10, latest.size()); + + //Check the order (cache is time descending) + for(int i=0; i<10; i++) { + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + + //Test getLatestPointValue via the reset + assertEquals(values.get(values.size() - 1), rt.getPointValue()); + } + @Test + public void testManyUnsavedInCache() throws InterruptedException { + setupRuntime(10); + + //Insert some test data + int inserted = 1000; + List values = insertValuesIntoRuntime(inserted); + + //Get a snapshot of the cache in an expanded state + List cacheCopy = rt.getCacheCopy(); + //Ensure the order is correct + int valuePos = values.size() - 1; + for(int i=0; i latest = rt.getLatestPointValues(inserted); + assertEquals(inserted, latest.size()); + //Check the order (cache is time descending) + for(int i=0; i values = insertValuesIntoRuntime(5); @@ -72,26 +333,29 @@ public void testSaveCallbackPrune() throws InterruptedException { //Insert another 5 to see the cache > 10 and then trim back to 10 values.addAll(insertValuesIntoRuntime(5)); - int retries = 10; - while(retries > 0) { - //Force refresh so that all saved values get cleared out - rt.resetValues(); - List cacheCopy = rt.getCacheCopy(); - if(cacheCopy.size() == 10) - break; - Thread.sleep(100); - retries--; - } - if(retries == 0) - fail("Didn't recieve all values into cache. Cache size is " + rt.getCacheCopy().size()); + //This cache will very likely be > 10 as the point values are in the batch writer + List cacheCopy = rt.getCacheCopy(); + + rt.resetValues(); + ensureValuesInDatabase(15, 5); //assert cache contents, there will have been 15 inserted and we will compare the latest 10 which should be // in the cache - List cacheCopy = rt.getCacheCopy(); + cacheCopy = rt.getCacheCopy(); assertEquals(10, cacheCopy.size()); for(int i=0; i<10; i++) { assertEquals(values.get(i + 5), cacheCopy.get(cacheCopy.size() - (i + 1))); } + + //Check for all 15 values + latest = rt.getLatestPointValues(15); + assertEquals(15, latest.size()); + for(int i=0; i<15; i++) { + assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); + } + + //Test getLatestPointValue via the reset + assertEquals(values.get(values.size() - 1), rt.getPointValue()); } @Test @@ -99,7 +363,7 @@ public void testCacheReset() { //Insert some test data List values = insertValuesIntoDatabase(5); - setupRuntime(); + setupRuntime(10); //There should only be 5 values List cache = rt.getCacheCopy(); @@ -115,6 +379,7 @@ public void testCacheReset() { //Expand cache by resetting it rt.resetValues(); + ensureValuesInDatabase(10, 5); List latest = rt.getLatestPointValues(10); cache = rt.getCacheCopy(); @@ -123,13 +388,16 @@ public void testCacheReset() { assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); } + + //Test getLatestPointValue via the reset + assertEquals(values.get(values.size() - 1), rt.getPointValue()); } @Test public void testCacheGetLatestPointValues() { - setupRuntime(); + setupRuntime(10); //Insert some test data List values = insertValuesIntoRuntime(5); @@ -146,7 +414,6 @@ public void testCacheGetLatestPointValues() { //Insert another 5 values values.addAll(insertValuesIntoRuntime(5)); - //TODO This revealed a bug? The cache is 5 and won't expand to 10... //Expand cache List latest = rt.getLatestPointValues(10); cache = rt.getCacheCopy(); @@ -155,84 +422,8 @@ public void testCacheGetLatestPointValues() { assertEquals(values.get(i), cache.get(cache.size() - (i + 1))); assertEquals(values.get(i), latest.get(latest.size() - (i + 1))); } - } - - @Before - public void beforePointValueCacheTest() { - //Create data source - dsVo = new MockDataSourceVO(); - dsVo.setXid("DS_MOCK_ME"); - dsVo.setName("data source"); - validate(dsVo); - DataSourceDao.getInstance().save(dsVo); - dataSourceId = dsVo.getId(); - - //Create a data point - plVo = new MockPointLocatorVO(DataTypes.NUMERIC, true); - dpVo = new DataPointVO(); - dpVo.setXid("DP_MOCK_ME"); - dpVo.setName("point"); - dpVo.setPointLocator(plVo); - dpVo.setDataSourceId(dataSourceId); - validate(dpVo); - DataPointDao.getInstance().save(dpVo); - dataPointId = dpVo.getId(); - - currentValue = 0; - } - - /** - * Setup the runtime, useful to fill database with values to load in cache on initialization of point - */ - private void setupRuntime() { - dpVo.setDefaultCacheSize(10); - plRt = new MockPointLocatorRT(plVo); - rt = new DataPointRT(dpVo, plRt, dsVo, null, timer); - rt.initialize(); - } - - @After - public void afterPointValueCacheTest() { - if(dataPointId != Common.NEW_ID) - DataPointDao.getInstance().delete(dataPointId); - if(dataSourceId != Common.NEW_ID) - DataSourceDao.getInstance().delete(dataSourceId); - } - - /** - * Insert values directly into the DataPointRT and its cache. Used to simulate a running point - * saving values. - * @param count - * @return - */ - private List insertValuesIntoRuntime(int count) { - List values = new ArrayList<>(); - for(int i=0; i insertValuesIntoDatabase(int count) { - PointValueDao dao = Common.databaseProxy.newPointValueDao(); - List values = new ArrayList<>(); - for(int i=0; i values = insertValuesIntoRuntime(5); + + //Query for them + PointValueTime before = rt.getPointValueBefore(timer.currentTimeMillis()); + + assertNotNull(before); + assertEquals(values.get(values.size() - 1), before); + + //Insert a bunch to ensure we are querying from the cache + values = insertValuesIntoRuntime(5000); + before = rt.getPointValueBefore(values.get(4500).getTime()); + + assertNotNull(before); + assertEquals(values.get(4499), before); + + //Test the null before + assertNull(rt.getPointValueBefore(0)); + + } + + @Test + public void getPointValueAt() { + setupRuntime(10); + + //Insert some test data + List values = insertValuesIntoRuntime(5); + + //Query for them + PointValueTime at = rt.getPointValueAt(3000); + + assertNotNull(at); + assertEquals(values.get(3), at); + + //Insert a bunch to ensure we are querying from the cache + values = insertValuesIntoRuntime(5000); + at = rt.getPointValueAt(values.get(2500).getTime()); + + assertNotNull(at); + assertEquals(values.get(2500), at); + + //Test a null value + assertNull(rt.getPointValueAt(500)); + } + + @Test + public void getPointValueAfter() { + setupRuntime(10); + + //Insert some test data + List values = insertValuesIntoRuntime(5); + + //Query for them + PointValueTime after = rt.getPointValueAfter(3000); + + assertNotNull(after); + assertEquals(values.get(3), after); + + //Insert a bunch to ensure we are querying from the cache + values = insertValuesIntoRuntime(5000); + after = rt.getPointValueAfter(values.get(2500).getTime()); + + assertNotNull(after); + assertEquals(values.get(2500), after); + + //Test null value after now + assertNull(rt.getPointValueAfter(timer.currentTimeMillis())); + } + + @Test + public void getLatestPointValues() { + setupRuntime(10); + + //Insert some test data + List values = insertValuesIntoRuntime(5); + + //Query for them + List latest = rt.getLatestPointValues(3); + + assertEquals(3, latest.size()); + //Check the order (cache is time descending) + assertEquals(values.get(4), latest.get(0)); + assertEquals(values.get(3), latest.get(1)); + assertEquals(values.get(2), latest.get(2)); + + //Insert a bunch to ensure we are querying from the cache + values.addAll(insertValuesIntoRuntime(5000)); + latest = rt.getLatestPointValues(1000); + + assertEquals(1000, latest.size()); + //Check the order (latest is time descending) + for(int i=0; i values = insertValuesIntoRuntime(5); + + //Query for them + List since = rt.getPointValues(2000); + + assertEquals(3, since.size()); + for(int i=0; i<3; i++) + assertEquals(values.get(i + 2), since.get(i)); + + //Insert a bunch to ensure we are querying from the cache + values.addAll(insertValuesIntoRuntime(5000)); + since = rt.getPointValues(values.get(3500).getTime()); + + assertEquals(1505, since.size()); + for(int i=0; i<1505; i++) { + assertEquals(values.get(i + 3500), since.get(i)); + } + + //Test no values after now + assertEquals(0, rt.getPointValues(timer.currentTimeMillis()).size()); + } + + @Test + public void getPointValuesBetween() { + setupRuntime(10); + + //Insert some test data + List values = insertValuesIntoRuntime(5); + + //Query for them + List between = rt.getPointValuesBetween(2000, 4000); + + assertEquals(2, between.size()); + for(int i=0; i<2; i++) + assertEquals(values.get(i + 2), between.get(i)); + + //Insert a bunch to ensure we are querying from the cache + values.addAll(insertValuesIntoRuntime(5000)); + between = rt.getPointValuesBetween(values.get(3500).getTime(), values.get(4500).getTime()); + + assertEquals(1000, between.size()); + for(int i=0; i<1000; i++) { + assertEquals(values.get(i + 3500), between.get(i)); + } + + //Test no values between some future date + assertEquals(0, rt.getPointValuesBetween(timer.currentTimeMillis(), timer.currentTimeMillis() + 1000000).size()); + } + +} diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/DataPointRT.java b/Core/src/com/serotonin/m2m2/rt/dataImage/DataPointRT.java index 8849e64c8f..bf5a96d937 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/DataPointRT.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/DataPointRT.java @@ -87,16 +87,7 @@ public class DataPointRT implements IDataPointValueSource, ILifecycle { * @param initialCache */ public DataPointRT(DataPointVO vo, PointLocatorRT pointLocator, DataSourceVO dsVo, List initialCache) { - this.vo = vo; - this.dsVo = dsVo; - this.pointLocator = pointLocator; - if (enhanced) { - valueCache = new EnhancedPointValueCache(vo, dsVo, vo.getDefaultCacheSize(), initialCache); - } else { - valueCache = new PointValueCache(vo.getId(), vo.getDefaultCacheSize(), initialCache); - } - if(vo.getIntervalLoggingType() == DataPointVO.IntervalLoggingTypes.AVERAGE) - averagingValues = new ArrayList(); + this(vo, pointLocator, dsVo, initialCache, Common.timer); } /** @@ -108,8 +99,18 @@ public DataPointRT(DataPointVO vo, PointLocatorRT pointLocator, DataSourceVO< * @param timer */ public DataPointRT(DataPointVO vo, PointLocatorRT pointLocator, DataSourceVO dsVo, List initialCache, AbstractTimer timer) { - this(vo, pointLocator, dsVo, initialCache); + this.vo = vo; + this.dsVo = dsVo; + this.pointLocator = pointLocator; this.timer = timer; + if (enhanced) { + valueCache = new EnhancedPointValueCache(vo, dsVo, vo.getDefaultCacheSize(), initialCache, timer); + } else { + valueCache = new PointValueCache(vo.getId(), vo.getDefaultCacheSize(), initialCache, timer); + } + if(vo.getIntervalLoggingType() == DataPointVO.IntervalLoggingTypes.AVERAGE) + averagingValues = new ArrayList(); + } // @@ -136,6 +137,9 @@ public PointValueTime getPointValueAt(long time) { return Common.databaseProxy.newPointValueDao().getPointValueAt(vo.getId(), time); } + /** + * Get the point value at or just after this time + */ @Override public PointValueTime getPointValueAfter(long time) { @@ -170,8 +174,6 @@ public PointValueTime getPointValueAfter(long time) { // @Override public List getLatestPointValues(int limit) { - //TODO This can expand the cache, perhaps we want to not expand the cache and then fill to limit - // from the db. return valueCache.getLatestPointValues(limit); } @@ -910,8 +912,6 @@ public void initialize() { pedRT.initialize(); Common.runtimeManager.addDataPointListener(vo.getId(), pedRT); } - - //initializeIntervalLogging(); } @Override diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/EnhancedPointValueCache.java b/Core/src/com/serotonin/m2m2/rt/dataImage/EnhancedPointValueCache.java index be0c59fda1..92617ed30c 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/EnhancedPointValueCache.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/EnhancedPointValueCache.java @@ -8,14 +8,15 @@ import com.serotonin.m2m2.db.dao.EnhancedPointValueDao; import com.serotonin.m2m2.vo.DataPointVO; import com.serotonin.m2m2.vo.dataSource.DataSourceVO; +import com.serotonin.timer.AbstractTimer; public class EnhancedPointValueCache extends PointValueCache { private final DataPointVO dataPoint; private final DataSourceVO dataSource; private static final EnhancedPointValueDao enhancedDao = (EnhancedPointValueDao)PointValueCache.dao; //See PointValueCache.dao - public EnhancedPointValueCache(DataPointVO dataPoint, DataSourceVO dataSource, int defaultSize, List cache) { - super(dataPoint.getId(), defaultSize, cache); + public EnhancedPointValueCache(DataPointVO dataPoint, DataSourceVO dataSource, int defaultSize, List cache, AbstractTimer timer) { + super(dataPoint.getId(), defaultSize, cache, timer); this.dataPoint = dataPoint; this.dataSource = dataSource; } diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/IDataPointValueSource.java b/Core/src/com/serotonin/m2m2/rt/dataImage/IDataPointValueSource.java index e6255266d6..dea94738c6 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/IDataPointValueSource.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/IDataPointValueSource.java @@ -20,18 +20,53 @@ public interface IDataPointValueSource { void updatePointValue(PointValueTime newValue, boolean async); + /** + * Set the value, optionally supply a source for the annotation + * @param newValue + * @param source + */ void setPointValue(PointValueTime newValue, SetPointSource source); + /** + * Get the current value + * @return + */ PointValueTime getPointValue(); + /** + * Get the nearest point value before time + * @param time + * @return + */ PointValueTime getPointValueBefore(long time); + /** + * Get the point value at or just after this time + * @param time + * @return + */ PointValueTime getPointValueAfter(long time); + /** + * Get values >= since + * @param since + * @return + */ List getPointValues(long since); + /** + * Get point values >= from < to + * @param from + * @param to + * @return + */ List getPointValuesBetween(long from, long to); + /** + * Get the point value exactly at this time or return null if there isn't one + * @param time + * @return + */ PointValueTime getPointValueAt(long time); int getDataTypeId(); diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java index ea838b3c09..973b6fb8a9 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java @@ -12,6 +12,7 @@ import com.serotonin.m2m2.Common; import com.serotonin.m2m2.db.dao.PointValueDao; +import com.serotonin.timer.AbstractTimer; /** * This class maintains an ordered list of the most recent values for a data point. It will mirror values in the @@ -26,6 +27,8 @@ */ public class PointValueCache { private final Object lock = new Object(); + //Simulation Timer, or any timer implementation + private final AbstractTimer timer; private final int dataPointId; private final int defaultSize; private int maxSize = 0; @@ -51,7 +54,8 @@ public class PointValueCache { * @param defaultSize - the base size of the cache, it will never be smaller than this * @param cache - initial cache, if null then the cache is populated to default size from the database contents */ - public PointValueCache(int dataPointId, int defaultSize, List cache) { + public PointValueCache(int dataPointId, int defaultSize, List cache, AbstractTimer timer) { + this.timer = timer; this.dataPointId = dataPointId; this.defaultSize = defaultSize; this.latestSavedValueTime = new AtomicLong(); @@ -102,24 +106,23 @@ public void savePointValue(PointValueTime pvt, SetPointSource source, boolean lo synchronized (lock) { List c = cache; - List newCache = new ArrayList(c.size() + 1); - newCache.addAll(c); + List nc = new ArrayList(c.size() + 1); + nc.addAll(c); // Insert the value in the cache. - if (newCache.size() == 0) - newCache.add(pvt); + if (nc.size() == 0) + nc.add(pvt); else { int pos = 0; //Find where in the cache we want to place the new value - while (pos < newCache.size() && newCache.get(pos).getTime() > pvt.getTime()) + while (pos < nc.size() && nc.get(pos).getTime() > pvt.getTime()) pos++; - if (pos < maxSize) - newCache.add(pos, pvt); + //TODO logValue with respect to Interval Logging + nc.add(pos, pvt); } - safelyTrim(newCache); - - cache = newCache; + safelyTrim(nc); + cache = nc; } } @@ -131,6 +134,10 @@ PointValueTime savePointValueSync(PointValueTime pvt, SetPointSource source) { return dao.savePointValueSync(dataPointId, pvt, source); } + /** + * Get the latest point value or null. This will expand the cache to 1 if it is empty + * @return + */ public PointValueTime getLatestPointValue() { if (maxSize == 0) refreshCache(1); @@ -142,6 +149,12 @@ public PointValueTime getLatestPointValue() { return null; } + /** + * Get the latest limit number of point values. This call can expand the + * cache if limit is larger than the current cache size + * @param limit + * @return + */ public List getLatestPointValues(int limit) { if (maxSize < limit) refreshCache(limit); @@ -161,20 +174,32 @@ public List getLatestPointValues(int limit) { */ private void refreshCache(int size) { if (size > maxSize) { - maxSize = size; if (size == 1) { // Performance thingy PointValueTime pvt = dao.getLatestPointValue(dataPointId); if (pvt != null) { synchronized(lock) { List c = new ArrayList(cache); - safelyTrim(c); int pos = 0; //Find where in the cache we want to place the new value while (pos < c.size() && c.get(pos).getTime() > pvt.getTime()) pos++; if (pos < maxSize) c.add(pos, pvt); + //Reset our latest time, + latestSavedValueTime.accumulateAndGet(pvt.getTime(), (updated, current) ->{ + return updated > current ? updated : current; + }); + safelyTrim(c); + maxSize = c.size(); + cache = c; + } + }else { + synchronized(lock) { + List c = new ArrayList(cache); + safelyTrim(c); + latestSavedValueTime.set(Long.MIN_VALUE); + maxSize = c.size(); cache = c; } } @@ -185,12 +210,16 @@ private void refreshCache(int size) { List cc = new ArrayList<>(cache.size()); cc.addAll(cache); nc = new ArrayList(size); - dao.getLatestPointValues(dataPointId, Common.timer.currentTimeMillis() + 1, size, (value, index) -> { + dao.getLatestPointValues(dataPointId, timer.currentTimeMillis() + 1, size, (value, index) -> { + //Reset our latest time, + if(index == 0) + latestSavedValueTime.accumulateAndGet(value.getTime(), (updated, current) ->{ + return updated > current ? updated : current; + }); //Cache is in same order as rows if(nc.size() < size && cc.size() > 0 && cc.get(0).getTime() >= value.getTime()) { - - //The cached value is newer retain it (this should ensure we keep all the values that haven't been written to the db) - while(nc.size() < size && cc.size() > 0 && cc.get(0).getTime() > value.getTime()) + //The cached value is newer or unsaved retain it + while((nc.size() < size && cc.size() > 0 && cc.get(0).getTime() > value.getTime()) || (cc.size() > 0 && cc.get(0).getTime() > latestSavedValueTime.get())) nc.add(cc.remove(0)); //should we replace this value? @@ -200,12 +229,27 @@ private void refreshCache(int size) { //add the value from the database if it doesn't make us too big if(nc.size() < size) nc.add(value); + + //TODO REMOVE ME AFTER TEST to see if cache is ever out of order + long lastTime = Long.MAX_VALUE; + for(PointValueTime pvt : nc) { + if(pvt.getTime() >= lastTime) + System.out.print("fail"); + lastTime = pvt.getTime(); + } }else { //Past cached value times if(nc.size() < size) nc.add(value); } }); + //No values in database, make sure we keep the unsaved values + if(nc.size() == 0) { + while((nc.size() < size && cc.size() > 0) || (cc.size() > 0 && cc.get(0).getTime() > latestSavedValueTime.get())) + nc.add(cc.remove(0)); + latestSavedValueTime.set(Long.MIN_VALUE); + } + maxSize = nc.size(); cache = nc; } } @@ -234,12 +278,12 @@ public void reset() { for(int i=cache.size()-1; i>=0; i--) if(cache.get(i).getTime() > latestTime) nc.add(0, cache.get(i)); - - safelyTrim(nc); + //Reset our latest time, latestSavedValueTime.accumulateAndGet(nc.size() > 0 ? nc.get(0).getTime() : Long.MIN_VALUE, (updated, current) ->{ return updated > current ? updated : current; }); + safelyTrim(nc); cache = nc; } } @@ -261,14 +305,15 @@ public void reset(long before) { } if(nc.size() < defaultSize) { - maxSize = 0; + //So we expand the cache if necessary + maxSize = nc.size(); cache = nc; refreshCache(defaultSize); } else { safelyTrim(nc); + maxSize = nc.size(); cache = nc; } - //TODO Reset our latestSavedValueTime? } } diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueFacade.java b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueFacade.java index dc0ebd2382..4f4fe9e864 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueFacade.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueFacade.java @@ -53,6 +53,11 @@ public PointValueTime getPointValueAt(long time) { return pointValueDao.getPointValueAt(dataPointId, time); } + /** + * Get the value at or just after this time + * @param time + * @return + */ public PointValueTime getPointValueAfter(long time) { if ((point != null)&&(useCache)) return point.getPointValueAfter(time); diff --git a/Core/src/com/serotonin/m2m2/rt/script/AbstractPointWrapper.java b/Core/src/com/serotonin/m2m2/rt/script/AbstractPointWrapper.java index aae3e38b14..4c164e4c36 100644 --- a/Core/src/com/serotonin/m2m2/rt/script/AbstractPointWrapper.java +++ b/Core/src/com/serotonin/m2m2/rt/script/AbstractPointWrapper.java @@ -227,7 +227,7 @@ public PointValueTime pointValueBefore(long timestamp, boolean cache){ } /** - * Get the nearest point value after the timestamp, not using cache + * Get the nearest point value after or at the timestamp, not using cache * @param timestamp * @return nearest value OR null */ @@ -236,7 +236,7 @@ public PointValueTime pointValueAfter(long timestamp){ } /** - * Get the nearest point value after the timestamp, optionally using cache + * Get the nearest point value after or at the timestamp, optionally using cache * @param timestamp * @param cache * @return nearest value OR null From 75ba9c98ff613a0763fb77cda269db3dac84ec45 Mon Sep 17 00:00:00 2001 From: terrypacker Date: Wed, 15 May 2019 15:26:34 -1000 Subject: [PATCH 05/10] Rename test and remove test code from Runtime --- ...DataPointRTQueries.java => DataPointRTQueriesTest.java} | 2 +- .../com/serotonin/m2m2/rt/dataImage/PointValueCache.java | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) rename Core/src-test/com/serotonin/m2m2/rt/dataImage/{TestDataPointRTQueries.java => DataPointRTQueriesTest.java} (98%) diff --git a/Core/src-test/com/serotonin/m2m2/rt/dataImage/TestDataPointRTQueries.java b/Core/src-test/com/serotonin/m2m2/rt/dataImage/DataPointRTQueriesTest.java similarity index 98% rename from Core/src-test/com/serotonin/m2m2/rt/dataImage/TestDataPointRTQueries.java rename to Core/src-test/com/serotonin/m2m2/rt/dataImage/DataPointRTQueriesTest.java index ac3317c55e..68308a09bf 100644 --- a/Core/src-test/com/serotonin/m2m2/rt/dataImage/TestDataPointRTQueries.java +++ b/Core/src-test/com/serotonin/m2m2/rt/dataImage/DataPointRTQueriesTest.java @@ -17,7 +17,7 @@ * @author Terry Packer * */ -public class TestDataPointRTQueries extends AbstractPointValueCacheTestBase { +public class DataPointRTQueriesTest extends AbstractPointValueCacheTestBase { @Test public void getPointValueBefore() { diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java index 973b6fb8a9..f3e61251d2 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java @@ -230,13 +230,6 @@ private void refreshCache(int size) { if(nc.size() < size) nc.add(value); - //TODO REMOVE ME AFTER TEST to see if cache is ever out of order - long lastTime = Long.MAX_VALUE; - for(PointValueTime pvt : nc) { - if(pvt.getTime() >= lastTime) - System.out.print("fail"); - lastTime = pvt.getTime(); - } }else { //Past cached value times if(nc.size() < size) From d3804346198f522cde04d0164ed921c7f42ddf67 Mon Sep 17 00:00:00 2001 From: terrypacker Date: Wed, 15 May 2019 16:51:44 -1000 Subject: [PATCH 06/10] Improvements to cleaning/terminating tests for Point Value Cache --- .../com/serotonin/m2m2/H2InMemoryDatabaseProxy.java | 6 +++++- Core/src-test/com/serotonin/m2m2/MangoTestBase.java | 4 ++-- .../rt/dataImage/AbstractPointValueCacheTestBase.java | 8 ++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java b/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java index 51a96b3454..1ec81e0589 100644 --- a/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java +++ b/Core/src-test/com/serotonin/m2m2/H2InMemoryDatabaseProxy.java @@ -50,6 +50,7 @@ import com.serotonin.m2m2.vo.User; import com.serotonin.m2m2.vo.template.DefaultDataPointPropertiesTemplateFactory; import com.serotonin.provider.Providers; +import com.serotonin.timer.SimulationTimer; /** * Using an H2 in memory database we can easily mock the database proxy. @@ -450,7 +451,8 @@ public NoSQLProxy getNoSQLProxy() { public void clean() throws Exception { //If the SQL Batch writer is executing we need to allow it to finish - if(initialized && noSQLProxy == null) { + if(initialized) { + SimulationTimer timer = Common.timer instanceof SimulationTimer ? (SimulationTimer)Common.timer : null; IntegerMonitor m = (IntegerMonitor)Common.MONITORED_VALUES.getValueMonitor(PointValueDaoSQL.INSTANCES_MONITOR_ID); if(m != null) { int retries = 100; @@ -458,6 +460,8 @@ public void clean() throws Exception { if(m.getValue() == 0) break; retries--; + if(timer != null) + timer.fastForwardTo(timer.currentTimeMillis() + 1000); Thread.sleep(100); } if(retries == 0) diff --git a/Core/src-test/com/serotonin/m2m2/MangoTestBase.java b/Core/src-test/com/serotonin/m2m2/MangoTestBase.java index 4299af0559..f907ea685b 100644 --- a/Core/src-test/com/serotonin/m2m2/MangoTestBase.java +++ b/Core/src-test/com/serotonin/m2m2/MangoTestBase.java @@ -122,8 +122,6 @@ public void before() { @After public void after() { - SimulationTimerProvider provider = (SimulationTimerProvider) Providers.get(TimerProvider.class); - provider.reset(); Common.runtimeManager.terminate(); Common.runtimeManager.joinTermination(); H2InMemoryDatabaseProxy proxy = (H2InMemoryDatabaseProxy) Common.databaseProxy; @@ -132,6 +130,8 @@ public void after() { } catch (Exception e) { throw new ShouldNeverHappenException(e); } + SimulationTimerProvider provider = (SimulationTimerProvider) Providers.get(TimerProvider.class); + provider.reset(); } @AfterClass diff --git a/Core/src-test/com/serotonin/m2m2/rt/dataImage/AbstractPointValueCacheTestBase.java b/Core/src-test/com/serotonin/m2m2/rt/dataImage/AbstractPointValueCacheTestBase.java index db963150a3..1ebbdf163a 100644 --- a/Core/src-test/com/serotonin/m2m2/rt/dataImage/AbstractPointValueCacheTestBase.java +++ b/Core/src-test/com/serotonin/m2m2/rt/dataImage/AbstractPointValueCacheTestBase.java @@ -39,7 +39,9 @@ public class AbstractPointValueCacheTestBase extends MangoTestBase { protected DataPointRT rt; @Before - public void beforePointValueCacheTest() { + @Override + public void before() { + super.before(); //Create data source dsVo = new MockDataSourceVO(); dsVo.setXid("DS_MOCK_ME"); @@ -73,11 +75,13 @@ protected void setupRuntime(int cacheSize) { } @After - public void afterPointValueCacheTest() { + @Override + public void after() { if(dataPointId != Common.NEW_ID) DataPointDao.getInstance().delete(dataPointId); if(dataSourceId != Common.NEW_ID) DataSourceDao.getInstance().delete(dataSourceId); + super.after(); } From 8d499db369fa9b129b6130ee70a817658d75626f Mon Sep 17 00:00:00 2001 From: terrypacker Date: Thu, 16 May 2019 08:55:51 -1000 Subject: [PATCH 07/10] Use query for list instead of callback in PointValueCache.refreshCache --- .../m2m2/rt/dataImage/PointValueCache.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java index f3e61251d2..bac41fa048 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java @@ -210,12 +210,14 @@ private void refreshCache(int size) { List cc = new ArrayList<>(cache.size()); cc.addAll(cache); nc = new ArrayList(size); - dao.getLatestPointValues(dataPointId, timer.currentTimeMillis() + 1, size, (value, index) -> { - //Reset our latest time, - if(index == 0) - latestSavedValueTime.accumulateAndGet(value.getTime(), (updated, current) ->{ - return updated > current ? updated : current; - }); + List latest = dao.getLatestPointValues(dataPointId, size, timer.currentTimeMillis() + 1); + //Reset our latest time, + if(latest.size() > 0) + latestSavedValueTime.accumulateAndGet(latest.get(0).getTime(), (updated, current) ->{ + return updated > current ? updated : current; + }); + + for(PointValueTime value : latest) { //Cache is in same order as rows if(nc.size() < size && cc.size() > 0 && cc.get(0).getTime() >= value.getTime()) { //The cached value is newer or unsaved retain it @@ -235,7 +237,7 @@ private void refreshCache(int size) { if(nc.size() < size) nc.add(value); } - }); + } //No values in database, make sure we keep the unsaved values if(nc.size() == 0) { while((nc.size() < size && cc.size() > 0) || (cc.size() > 0 && cc.get(0).getTime() > latestSavedValueTime.get())) From 862659d03de686e5c80bc7d5c41093b48ecf007a Mon Sep 17 00:00:00 2001 From: terrypacker Date: Fri, 17 May 2019 13:30:24 -1000 Subject: [PATCH 08/10] Set max size to cache size at init. --- .../com/serotonin/m2m2/rt/dataImage/PointValueCache.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java index bac41fa048..fa1978bcc4 100644 --- a/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java +++ b/Core/src/com/serotonin/m2m2/rt/dataImage/PointValueCache.java @@ -80,10 +80,10 @@ public PointValueCache(int dataPointId, int defaultSize, List ca } //Set our known state of saved values - if (cache.size() > 0) + if (cache.size() > 0) { this.latestSavedValueTime.set(cache.get(0).getTime()); - - this.maxSize = defaultSize; + this.maxSize = cache.size(); + } } } From 3100539911e5e597c062658eae1b183ac47d3cd5 Mon Sep 17 00:00:00 2001 From: terrypacker Date: Fri, 17 May 2019 13:31:02 -1000 Subject: [PATCH 09/10] Cleanup mock lifecycle a little --- .../serotonin/m2m2/MockMangoLifecycle.java | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/Core/src-test/com/serotonin/m2m2/MockMangoLifecycle.java b/Core/src-test/com/serotonin/m2m2/MockMangoLifecycle.java index a0a46b3e0a..5fe7da83bf 100644 --- a/Core/src-test/com/serotonin/m2m2/MockMangoLifecycle.java +++ b/Core/src-test/com/serotonin/m2m2/MockMangoLifecycle.java @@ -112,7 +112,7 @@ public void initialize() throws InterruptedException, ExecutionException { Common.MA_HOME = maHome; Common.setModuleClassLoader(Thread.currentThread().getContextClassLoader()); - + //Add in modules for(Module module : modules) ModuleRegistry.addModule(module); @@ -141,7 +141,7 @@ public void initialize() throws InterruptedException, ExecutionException { } freemarkerInitialize(); - + //TODO This must be done only once because we have a static // final referece to the PointValueDao in the PointValueCache class // and so if you try to restart the database it doesn't get the new connection @@ -216,17 +216,17 @@ protected CompletableFuture springRuntimeContextInitialize( private void freemarkerInitialize() { Configuration cfg = new Configuration(Configuration.VERSION_2_3_28); - File baseTemplateDir = new File(Common.MA_HOME, "ftl"); + File baseTemplateDir = Common.MA_HOME_PATH.resolve("ftl").toFile(); if(!baseTemplateDir.exists()) { LOG.info("Not initializing Freemarker, this test is not running in Core source tree. Requires ./ftl directory to initialize."); return; } - + try { List loaders = new ArrayList<>(); // Add the overrides directory. - File override = new File(Common.MA_HOME, "overrides/ftl"); + File override = Common.MA_HOME_PATH.resolve("overrides/ftl").toFile(); if (override.exists()) loaders.add(new FileTemplateLoader(override)); @@ -259,22 +259,16 @@ public boolean isTerminated() { @Override public void terminate() { - // H2InMemoryDatabaseProxy proxy = (H2InMemoryDatabaseProxy) Common.databaseProxy; - // try { - // proxy.clean(); - // } catch (Exception e) { - // throw new ShouldNeverHappenException(e); - // } if(Common.databaseProxy != null) Common.databaseProxy.terminate(true); - + if(Common.serialPortManager != null) { - try { - Common.serialPortManager.terminate(); - } catch (LifecycleException e) { - fail(e.getMessage()); - } - Common.serialPortManager.joinTermination(); + try { + Common.serialPortManager.terminate(); + } catch (LifecycleException e) { + fail(e.getMessage()); + } + Common.serialPortManager.joinTermination(); } } @@ -311,14 +305,14 @@ public void loadLic() { // TODO Auto-generated method stub } - + @Override public Integer dataPointLimit() { return Integer.MAX_VALUE; } @Override - public Thread scheduleShutdown(Long timeout, boolean b, User user) { + public Thread scheduleShutdown(long timeout, boolean b, User user) { return null; } From 1daf7711083bc4771ed81a334a3fa52736e500da Mon Sep 17 00:00:00 2001 From: terrypacker Date: Mon, 24 Jun 2019 13:34:35 -1000 Subject: [PATCH 10/10] Fixes from rebase conflict --- .../com/serotonin/m2m2/MockMangoLifecycle.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/Core/src-test/com/serotonin/m2m2/MockMangoLifecycle.java b/Core/src-test/com/serotonin/m2m2/MockMangoLifecycle.java index 5fe7da83bf..0c08763557 100644 --- a/Core/src-test/com/serotonin/m2m2/MockMangoLifecycle.java +++ b/Core/src-test/com/serotonin/m2m2/MockMangoLifecycle.java @@ -262,13 +262,13 @@ public void terminate() { if(Common.databaseProxy != null) Common.databaseProxy.terminate(true); - if(Common.serialPortManager != null) { - try { - Common.serialPortManager.terminate(); - } catch (LifecycleException e) { - fail(e.getMessage()); - } - Common.serialPortManager.joinTermination(); + if (Common.serialPortManager != null) { + try { + Common.serialPortManager.terminate(); + } catch (LifecycleException e) { + fail(e.getMessage()); + } + Common.serialPortManager.joinTermination(); } } @@ -312,8 +312,7 @@ public Integer dataPointLimit() { } @Override - public Thread scheduleShutdown(long timeout, boolean b, User user) { - + public Thread scheduleShutdown(Long timeout, boolean b, User user) { return null; }