From 3cec3b1abf20ab0a84697f61e47ea160de95df98 Mon Sep 17 00:00:00 2001 From: Zahid Zafar Date: Wed, 29 Jun 2022 15:27:49 +0500 Subject: [PATCH] [SDK 851] Concurrent thread issue. (#18) * thream sample app * Synchronized Backend mode methods * - thread lock added - version number updated - changelog.md updated * version update in gradle properties * lock object rename to 'lockBRQStorage' --- CHANGELOG.md | 3 + .../count/java/demo/BackendModeExample.java | 346 ++++++++++++++---- gradle.properties | 2 +- .../main/java/ly/count/sdk/java/Config.java | 2 +- .../sdk/java/internal/ModuleBackendMode.java | 20 +- .../ly/count/sdk/java/internal/SDKCore.java | 16 +- .../count/sdk/java/internal/ConfigTests.java | 2 +- 7 files changed, 298 insertions(+), 93 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9e95d9a4..bae652156 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +20.11.3 +* Fixed a threading issue in the backend mode feature. + 20.11.2 * Added backend mode feature and a new configuration field to enable it. diff --git a/app-java/src/main/java/ly/count/java/demo/BackendModeExample.java b/app-java/src/main/java/ly/count/java/demo/BackendModeExample.java index b3b79afcf..4b1119a1b 100644 --- a/app-java/src/main/java/ly/count/java/demo/BackendModeExample.java +++ b/app-java/src/main/java/ly/count/java/demo/BackendModeExample.java @@ -7,12 +7,271 @@ import java.util.HashMap; import java.util.Map; import java.util.Scanner; +import java.util.concurrent.CountDownLatch; public class BackendModeExample { final static String DEVICE_ID = "device-id"; final static String COUNTLY_APP_KEY = "YOUR_APP_KEY"; final static String COUNTLY_SERVER_URL = "https://try.count.ly/"; + + private static void recordUserDetailAndProperties() { + Map userDetail = new HashMap<>(); + userDetail.put("name", "Full Name"); + userDetail.put("username", "username1"); + userDetail.put("email", "user@gmail.com"); + userDetail.put("organization", "Countly"); + userDetail.put("phone", "000-111-000"); + userDetail.put("gender", "M"); + userDetail.put("byear", "1991"); + //custom detail + userDetail.put("hair", "black"); + userDetail.put("height", 5.9); + userDetail.put("fav-colors", "{$push: black}"); + userDetail.put("marks", "{$inc: 1}"); + + Countly.backendMode().recordUserProperties(DEVICE_ID, userDetail, null); + } + + private static void recordView() { + Map segmentation = new HashMap() {{ + put("visit", "1"); + put("segment", "Windows"); + put("start", "1"); + }}; + + Countly.backendMode().recordView(DEVICE_ID, "SampleView", segmentation, 1646640780130L); + } + + private static void recordEvent() { + Map segment = new HashMap() {{ + put("Time Spent", 60); + put("Retry Attempts", 60); + }}; + + Countly.backendMode().recordEvent(DEVICE_ID, "Event Key", 1, 0.1, 5.0, segment, null); + } + + private static void recordExceptionWithThrowableAndSegmentation() { + Map segmentation = new HashMap() {{ + put("login page", "authenticate request"); + }}; + Map crashDetails = new HashMap() {{ + put("_os", "Windows 11"); + put("_os_version", "11.202"); + put("_logs", "main page"); + }}; + try { + int a = 10 / 0; + } catch (Exception e) { + Countly.backendMode().recordException(DEVICE_ID, e, segmentation, crashDetails, null); + } + } + + private static void recordExceptionWithMessageAndSegmentation() { + Map segmentation = new HashMap() {{ + put("login page", "authenticate request"); + }}; + + Map crashDetails = new HashMap() {{ + put("_os", "Windows 11"); + put("_os_version", "11.202"); + put("_logs", "main page"); + }}; + try { + int a = 10 / 0; + } catch (Exception e) { + Countly.backendMode().recordException(DEVICE_ID, "Divided By Zero", "stack traces", segmentation, crashDetails, null); + } + } + + private static void recordDirectRequest() { + Map requestData = new HashMap<>(); + requestData.put("device_id", "id"); + requestData.put("timestamp", "1646640780130"); + requestData.put("end_session", "1"); + requestData.put("session_duration", "20.5"); + Countly.backendMode().recordDirectRequest(DEVICE_ID, requestData, null); + } + + private static void startSession() { + Map metrics = new HashMap() {{ + put("_os", "Android"); + put("_os_version", "10"); + put("_app_version", "1.2"); + }}; + + Map location = new HashMap() {{ + put("ip_address", "192.168.1.1"); + put("city", "Lahore"); + put("country_code", "PK"); + put("location", "31.5204,74.3587"); + }}; + + Countly.backendMode().sessionBegin(DEVICE_ID, metrics, location, null); + } + + static void testWithMultipleThreads() { + + int participants = 13; + CountDownLatch latch = new CountDownLatch(1); + + Thread[] threads = new Thread[participants]; + + threads[0] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[00] executing at: " + System.currentTimeMillis()); + recordEvent(); + System.out.println("Thread[00] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + }); + + + threads[1] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[01] executing at: " + System.currentTimeMillis()); + recordView(); + System.out.println("Thread[01] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + }); + + threads[2] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[02] executing at: " + System.currentTimeMillis()); + recordUserDetailAndProperties(); + System.out.println("Thread[02] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + }); + + threads[3] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[03] executing at: " + System.currentTimeMillis()); + recordExceptionWithThrowableAndSegmentation(); + System.out.println("Thread[03] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + }); + + threads[4] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[04] executing at: " + System.currentTimeMillis()); + recordDirectRequest(); + System.out.println("Thread[04] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + threads[5] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[05] executing at: " + System.currentTimeMillis()); + recordExceptionWithMessageAndSegmentation(); + System.out.println("Thread[05] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + threads[6] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[06] executing at: " + System.currentTimeMillis()); + recordDirectRequest(); + System.out.println("Thread[06] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + threads[7] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[07] executing at: " + System.currentTimeMillis()); + recordView(); + System.out.println("Thread[07] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + threads[8] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[08] executing at: " + System.currentTimeMillis()); + recordUserDetailAndProperties(); + System.out.println("Thread[08] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + threads[9] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[09] executing at: " + System.currentTimeMillis()); + recordUserDetailAndProperties(); + System.out.println("Thread[09] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + threads[10] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[10] executing at: " + System.currentTimeMillis()); + startSession(); + System.out.println("Thread[10] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + threads[11] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[11] executing at: " + System.currentTimeMillis()); + recordView(); + System.out.println("Thread[11] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + threads[12] = new Thread(() -> { + try { + latch.await(); + System.out.println("Thread[12] executing at: " + System.currentTimeMillis()); + recordUserDetailAndProperties(); + System.out.println("Thread[12] finished at: " + System.currentTimeMillis()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + for (int i = 0; i < participants; i++) { + threads[i].start(); + } + + latch.countDown(); + + // System.out.println("All threads completed at: " + System.currentTimeMillis()); + } + static void recordDataWithLegacyCalls() { // Record Event Countly.api().event("Event With Sum And Count") @@ -57,7 +316,6 @@ public static void main(String[] args) throws Exception { // Countly needs persistent storage for requests, configuration storage, user profiles and other temporary data, // therefore requires a separate data folder to run - File targetFolder = new File("d:\\__COUNTLY\\java_test\\"); // Main initialization call, SDK can be used after this one is done @@ -76,6 +334,7 @@ public static void main(String[] args) throws Exception { System.out.println("7) Update session"); System.out.println("8) End session"); System.out.println("9) Record a direct request"); + System.out.println("10) Run Multiple Threads"); System.out.println("99) Record data with legacy calls"); System.out.println("0) Exit "); @@ -85,90 +344,27 @@ public static void main(String[] args) throws Exception { running = false; break; case 1: { // Record an event with key, count, sum, duration and segmentation - Map segment = new HashMap() {{ - put("Time Spent", 60); - put("Retry Attempts", 60); - }}; - - Countly.backendMode().recordEvent(DEVICE_ID, "Event Key", 1, 0.1, 5.0, segment, null); + recordEvent(); } break; case 2: { // Record a view - Map segmentation = new HashMap() {{ - put("visit", "1"); - put("segment", "Windows"); - put("start", "1"); - }}; - - Countly.backendMode().recordView(DEVICE_ID, "SampleView", segmentation, 1646640780130L); + recordView(); } break; case 3: { // record user detail and properties - Map userDetail = new HashMap<>(); - userDetail.put("name", "Full Name"); - userDetail.put("username", "username1"); - userDetail.put("email", "user@gmail.com"); - userDetail.put("organization", "Countly"); - userDetail.put("phone", "000-111-000"); - userDetail.put("gender", "M"); - userDetail.put("byear", "1991"); - //custom detail - userDetail.put("hair", "black"); - userDetail.put("height", 5.9); - userDetail.put("fav-colors", "{$push: black}"); - userDetail.put("marks", "{$inc: 1}"); - - Countly.backendMode().recordUserProperties(DEVICE_ID, userDetail, null); + recordUserDetailAndProperties(); } break; case 4: { // record an exception with throwable and segmentation - Map segmentation = new HashMap() {{ - put("login page", "authenticate request"); - }}; - Map crashDetails = new HashMap() {{ - put("_os", "Windows 11"); - put("_os_version", "11.202"); - put("_logs", "main page"); - }}; - try { - int a = 10 / 0; - } catch (Exception e) { - Countly.backendMode().recordException(DEVICE_ID, e, segmentation, crashDetails, null); - } + recordExceptionWithThrowableAndSegmentation(); } break; case 5: { // record an exception with message, stacktrace and segmentation - Map segmentation = new HashMap() {{ - put("login page", "authenticate request"); - }}; - - Map crashDetails = new HashMap() {{ - put("_os", "Windows 11"); - put("_os_version", "11.202"); - put("_logs", "main page"); - }}; - try { - int a = 10 / 0; - } catch (Exception e) { - Countly.backendMode().recordException(DEVICE_ID, "Divided By Zero", "stack traces", segmentation, crashDetails, null); - } + recordExceptionWithMessageAndSegmentation(); } break; case 6: { // start a session - Map metrics = new HashMap() {{ - put("_os", "Android"); - put("_os_version", "10"); - put("_app_version", "1.2"); - }}; - - Map location = new HashMap() {{ - put("ip_address", "192.168.1.1"); - put("city", "Lahore"); - put("country_code", "PK"); - put("location", "31.5204,74.3587"); - }}; - - Countly.backendMode().sessionBegin(DEVICE_ID, metrics, location, null); + startSession(); break; } case 7: // update session @@ -178,14 +374,12 @@ public static void main(String[] args) throws Exception { Countly.backendMode().sessionEnd(DEVICE_ID, 20, null); break; case 9: { // record a direct request - Map requestData = new HashMap<>(); - requestData.put("device_id", "id"); - requestData.put("timestamp", "1646640780130"); - requestData.put("end_session", "1"); - requestData.put("session_duration", "20.5"); - Countly.backendMode().recordDirectRequest(DEVICE_ID, requestData, null); + recordDirectRequest(); break; } + case 10: + testWithMultipleThreads(); + break; case 99: // record data with legacy call recordDataWithLegacyCalls(); break; diff --git a/gradle.properties b/gradle.properties index c9db11b04..3904766f4 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,7 +18,7 @@ # org.gradle.parallel=true # RELEASE FIELD SECTION -VERSION_NAME=20.11.2 +VERSION_NAME=20.11.3 GROUP=ly.count.sdk POM_URL=https://github.com/Countly/countly-sdk-java diff --git a/sdk-java/src/main/java/ly/count/sdk/java/Config.java b/sdk-java/src/main/java/ly/count/sdk/java/Config.java index a5ee12d51..be422404f 100644 --- a/sdk-java/src/main/java/ly/count/sdk/java/Config.java +++ b/sdk-java/src/main/java/ly/count/sdk/java/Config.java @@ -265,7 +265,7 @@ public boolean restore(byte[] data) { /** * Countly SDK version to be sent in HTTP requests */ - protected String sdkVersion = "20.11.2"; + protected String sdkVersion = "20.11.3"; /** * Countly SDK name to be sent in HTTP requests diff --git a/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleBackendMode.java b/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleBackendMode.java index 71796138b..1b7092215 100644 --- a/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleBackendMode.java +++ b/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleBackendMode.java @@ -294,7 +294,7 @@ private void addTimeInfoIntoRequest(Request request, Long timestamp) { request.params.add("tz", DeviceCore.dev.getTimezoneOffset()); } - private void addEventsAgainstDeviceIdToRequestQ(String deviceID) { + private synchronized void addEventsAgainstDeviceIdToRequestQ(String deviceID) { JSONArray events = eventQueues.get(deviceID); if (events == null || events.isEmpty()) { return; @@ -310,7 +310,7 @@ private void addEventsAgainstDeviceIdToRequestQ(String deviceID) { addRequestToRequestQ(request); } - private void addEventsToRequestQ() { + private synchronized void addEventsToRequestQ() { L.d("addEventsToRequestQ"); for (String s : eventQueues.keySet()) { @@ -323,14 +323,16 @@ private void addEventsToRequestQ() { } private void addRequestToRequestQ(Request request) { - L.d("addRequestToRequestQ"); - if (internalConfig.getRequestQueueMaxSize() == SDKCore.instance.requestQueueMemory.size()) { - L.d("addRequestToRequestQ: In Memory request queue is full, dropping oldest request: " + request.params.toString()); - SDKCore.instance.requestQueueMemory.remove(); - } + synchronized(SDKCore.instance.lockBRQStorage) { + L.d("addRequestToRequestQ"); + if (internalConfig.getRequestQueueMaxSize() == SDKCore.instance.requestQueueMemory.size()) { + L.d("addRequestToRequestQ: In Memory request queue is full, dropping oldest request: " + request.params.toString()); + SDKCore.instance.requestQueueMemory.remove(); + } - SDKCore.instance.requestQueueMemory.add(request); - SDKCore.instance.networking.check(ctx); + SDKCore.instance.requestQueueMemory.add(request); + SDKCore.instance.networking.check(ctx); + } } protected Map removeInvalidDataFromSegments(Map segments) { diff --git a/sdk-java/src/main/java/ly/count/sdk/java/internal/SDKCore.java b/sdk-java/src/main/java/ly/count/sdk/java/internal/SDKCore.java index 818ec5bb4..3520b0972 100644 --- a/sdk-java/src/main/java/ly/count/sdk/java/internal/SDKCore.java +++ b/sdk-java/src/main/java/ly/count/sdk/java/internal/SDKCore.java @@ -16,6 +16,8 @@ public abstract class SDKCore extends SDKModules { protected Networking networking; protected Queue requestQueueMemory = null; + protected final Object lockBRQStorage = new Object(); + public enum Signal { DID(1), Crash(2), @@ -99,16 +101,20 @@ public void run(int feature, Module module) { networking.init(ctx, new IStorageForRequestQueue() { @Override public Request getNextRequest() { - if(requestQueueMemory.isEmpty()) { - return null; - } + synchronized(SDKCore.instance.lockBRQStorage) { + if (requestQueueMemory.isEmpty()) { + return null; + } - return requestQueueMemory.element(); + return requestQueueMemory.element(); + } } @Override public Boolean removeRequest(Request request) { - return requestQueueMemory.remove(request); + synchronized(SDKCore.instance.lockBRQStorage) { + return requestQueueMemory.remove(request); + } } }); } else { diff --git a/sdk-java/src/test/java/ly/count/sdk/java/internal/ConfigTests.java b/sdk-java/src/test/java/ly/count/sdk/java/internal/ConfigTests.java index 7b5a29b3a..438e1156f 100644 --- a/sdk-java/src/test/java/ly/count/sdk/java/internal/ConfigTests.java +++ b/sdk-java/src/test/java/ly/count/sdk/java/internal/ConfigTests.java @@ -81,7 +81,7 @@ public void testSDKName() { @Test public void testSDKVersion() { - String versionName = "20.11.2"; + String versionName = "20.11.3"; Assert.assertEquals(versionName, internalConfig.getSdkVersion()); internalConfig.setSdkVersion(null);