From 8ea2230d21af945102355f51c42a4bffd7db47d0 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 16 Sep 2024 16:49:49 +0300 Subject: [PATCH] Improve versioning for timeseries and attributes --- .../server/cache/VersionedCaffeineTbCache.java | 2 +- .../server/cache/VersionedRedisTbCache.java | 4 ++-- .../thingsboard/server/cache/VersionedTbCache.java | 2 +- .../dao/attributes/AttributeCaffeineCache.java | 11 +++++++++++ .../server/dao/timeseries/TsLatestRedisCache.java | 12 ++++++++++++ 5 files changed, 27 insertions(+), 4 deletions(-) diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java index ade628b769..535fa64cd9 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java @@ -50,7 +50,7 @@ private void doPut(K key, V value, Long version) { lock.lock(); try { TbPair versionValuePair = doGet(key); - if (versionValuePair == null || version >= versionValuePair.getFirst()) { + if (versionValuePair == null || version > versionValuePair.getFirst()) { failAllTransactionsByKey(key); cache.put(key, wrapValue(value, version)); } diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java index ccc229289c..e9b64822e9 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java @@ -51,7 +51,7 @@ local function setNewValue() if currentVersionBytes and #currentVersionBytes == 8 then local currentVersion = struct.unpack(">I8", currentVersionBytes) - if newVersion >= currentVersion then + if newVersion > currentVersion then setNewValue() end else @@ -59,7 +59,7 @@ local function setNewValue() setNewValue() end """); - static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("5e02631c0d9df032e769e9b7d0fc20b74e7e104b"); + static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("0453cb1814135b706b4198b09a09f43c9f67bbfe"); public VersionedRedisTbCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer valueSerializer) { super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer); diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbCache.java index 8fb70520fb..f42aa81b8a 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbCache.java @@ -51,7 +51,7 @@ default V get(K key, Supplier supplier, boolean putToCache) { void evict(K key, Long version); default Long getVersion(V value) { - return 0L; + return null; /* version on edge is static to update cache correctly if (value == null) { return 0L; diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java index 140cfbc03e..5183367b27 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java @@ -30,4 +30,15 @@ public AttributeCaffeineCache(CacheManager cacheManager) { super(cacheManager, CacheConstants.ATTRIBUTES_CACHE); } + @Override + public Long getVersion(AttributeKvEntry value) { + if (value == null) { + return 0L; + } else if (value.getVersion() != null) { + return value.getVersion(); + } else { + return null; + } + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java index 241132b4c8..264d79d904 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java @@ -52,4 +52,16 @@ public TsKvEntry deserialize(TsLatestCacheKey key, byte[] bytes) throws Serializ } }); } + + @Override + public Long getVersion(TsKvEntry value) { + if (value == null) { + return 0L; + } else if (value.getVersion() != null) { + return value.getVersion(); + } else { + return null; + } + } + }