diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index c4a0a1e70..142a908fd 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -2,6 +2,9 @@
+
+
+
\ No newline at end of file
diff --git a/sdk-java/src/main/java/ly/count/sdk/java/internal/CountlyTimer.java b/sdk-java/src/main/java/ly/count/sdk/java/internal/CountlyTimer.java
index 01d98b3fd..08b912d84 100644
--- a/sdk-java/src/main/java/ly/count/sdk/java/internal/CountlyTimer.java
+++ b/sdk-java/src/main/java/ly/count/sdk/java/internal/CountlyTimer.java
@@ -8,6 +8,7 @@ public class CountlyTimer {
private final Log L;
private ScheduledExecutorService timerService;
+ protected static int TIMER_DELAY_MS = 0; // for testing purposes
protected CountlyTimer(Log logger) {
L = logger;
@@ -34,11 +35,19 @@ protected void stopTimer() {
protected void startTimer(long timerDelay, Runnable runnable) {
L.i("[CountlyTimer] startTimer, Starting global timer timerDelay: [" + timerDelay + "]");
+ timerDelay = timerDelay * 1000;
- if (timerDelay < 1) {
- timerDelay = 1;
+ if (timerDelay < 1000) {
+ timerDelay = 1000;
}
- timerService.scheduleWithFixedDelay(runnable, timerDelay, timerDelay, TimeUnit.SECONDS);
+ long startTime = timerDelay;
+
+ if (TIMER_DELAY_MS > 0) {
+ timerDelay = TIMER_DELAY_MS;
+ startTime = 0;
+ }
+
+ timerService.scheduleWithFixedDelay(runnable, startTime, timerDelay, TimeUnit.MILLISECONDS);
}
}
diff --git a/sdk-java/src/main/java/ly/count/sdk/java/internal/EventQueue.java b/sdk-java/src/main/java/ly/count/sdk/java/internal/EventQueue.java
index ececc5622..7e6cfae42 100644
--- a/sdk-java/src/main/java/ly/count/sdk/java/internal/EventQueue.java
+++ b/sdk-java/src/main/java/ly/count/sdk/java/internal/EventQueue.java
@@ -1,10 +1,9 @@
package ly.count.sdk.java.internal;
-import ly.count.sdk.java.Countly;
-import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import javax.annotation.Nonnull;
public class EventQueue {
@@ -12,6 +11,8 @@ public class EventQueue {
Log L;
List eventQueueMemoryCache;
+ protected final Object lockEQ = new Object();
+
protected EventQueue() {
}
@@ -24,7 +25,15 @@ protected EventQueue(@Nonnull Log logger, int eventThreshold) {
* Returns the number of events currently stored in the queue.
*/
protected int eqSize() {
- return eventQueueMemoryCache.size();
+ synchronized (lockEQ) {
+ return eventQueueMemoryCache.size();
+ }
+ }
+
+ protected List getEQ() {
+ synchronized (lockEQ) {
+ return new ArrayList<>(eventQueueMemoryCache);
+ }
}
void addEvent(@Nonnull final EventImpl event) {
@@ -33,8 +42,10 @@ void addEvent(@Nonnull final EventImpl event) {
return;
}
L.d("[EventQueue] Adding event: " + event.key);
- eventQueueMemoryCache.add(event);
- writeEventQueueToStorage();
+ synchronized (lockEQ) {
+ eventQueueMemoryCache.add(event);
+ writeEventQueueToStorage();
+ }
}
/**
@@ -56,20 +67,22 @@ void writeEventQueueToStorage() {
* Restores events from disk
*/
void restoreFromDisk() {
- L.d("[EventQueue] Restoring events from disk");
- eventQueueMemoryCache.clear();
-
- final String[] array = getEvents();
- for (String s : array) {
-
- final EventImpl event = EventImpl.fromJSON(s, (ev) -> {
- }, L);
- if (event != null) {
- eventQueueMemoryCache.add(event);
+ synchronized (lockEQ) {
+ L.d("[EventQueue] Restoring events from disk");
+ eventQueueMemoryCache.clear();
+
+ final String[] array = getEvents();
+ for (String s : array) {
+
+ final EventImpl event = EventImpl.fromJSON(s, (ev) -> {
+ }, L);
+ if (event != null) {
+ eventQueueMemoryCache.add(event);
+ }
}
+ // order the events from least to most recent
+ eventQueueMemoryCache.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp));
}
- // order the events from least to most recent
- eventQueueMemoryCache.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp));
}
@Nonnull String joinEvents(@Nonnull final Collection collection) {
@@ -91,6 +104,8 @@ void restoreFromDisk() {
public void clear() {
SDKCore.instance.sdkStorage.storeEventQueue("");
- eventQueueMemoryCache.clear();
+ synchronized (lockEQ) {
+ eventQueueMemoryCache.clear();
+ }
}
}
diff --git a/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleEvents.java b/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleEvents.java
index d49262e7f..0ab8909c5 100644
--- a/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleEvents.java
+++ b/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleEvents.java
@@ -69,7 +69,7 @@ public void stop(InternalConfig config, final boolean clear) {
private synchronized void addEventsToRequestQ(String deviceId) {
L.d("[ModuleEvents] addEventsToRequestQ");
- if (eventQueue.eventQueueMemoryCache.isEmpty()) {
+ if (eventQueue.getEQ().isEmpty()) {
L.d("[ModuleEvents] addEventsToRequestQ, eventQueueMemoryCache is empty, skipping");
return;
}
@@ -78,7 +78,7 @@ private synchronized void addEventsToRequestQ(String deviceId) {
if (deviceId != null) {
request.params.add("device_id", deviceId);
}
- request.params.arr("events").put(eventQueue.eventQueueMemoryCache).add();
+ request.params.arr("events").put(eventQueue.getEQ()).add();
request.own(ModuleEvents.class);
eventQueue.clear();
diff --git a/sdk-java/src/test/java/ly/count/sdk/java/internal/MultiThreadingTest.java b/sdk-java/src/test/java/ly/count/sdk/java/internal/MultiThreadingTest.java
new file mode 100644
index 000000000..42993ee02
--- /dev/null
+++ b/sdk-java/src/test/java/ly/count/sdk/java/internal/MultiThreadingTest.java
@@ -0,0 +1,196 @@
+package ly.count.sdk.java.internal;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import ly.count.sdk.java.Config;
+import ly.count.sdk.java.Countly;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+@RunWith(JUnit4.class)
+public class MultiThreadingTest {
+ @After
+ public void stop() {
+ Countly.instance().halt();
+ }
+
+ @Before
+ public void beforeTest() {
+ TestUtils.createCleanTestState();
+ }
+
+ AtomicInteger feedbackWidgetCounter = new AtomicInteger(0);
+ AtomicInteger crashCounter = new AtomicInteger(0);
+ AtomicInteger viewCounter = new AtomicInteger(0);
+ AtomicInteger eventCounter = new AtomicInteger(0);
+ AtomicInteger locationCounter = new AtomicInteger(0);
+
+ /**
+ * Test that all modules are thread safe, and called at the desired count
+ *
+ * @throws BrokenBarrierException BrokenBarrierException
+ * @throws InterruptedException InterruptedException
+ */
+ @Test
+ public void multiThread() throws BrokenBarrierException, InterruptedException {
+ CountlyTimer.TIMER_DELAY_MS = 1;
+ Countly.instance().init(getAllConfig());
+
+ int rqSize = TestUtils.getCurrentRQ().length;
+ List events = new ArrayList<>();
+ for (int rqIdx = 0; rqIdx < rqSize; rqIdx++) {
+ events.addAll(TestUtils.readEventsFromRequest(rqIdx, TestUtils.DEVICE_ID));
+ }
+ events.addAll(TestUtils.getCurrentEQ());
+
+ Assert.assertEquals(0, feedbackWidgetCounter.get());
+ Assert.assertEquals(0, crashCounter.get());
+ Assert.assertEquals(0, viewCounter.get());
+ Assert.assertEquals(0, eventCounter.get());
+ Assert.assertEquals(0, locationCounter.get());
+ //print(events);
+ int eventThreads = 50;
+ int viewThreads = 50;
+ int locationThreads = 50;
+ int crashThreads = 50;
+ int feedbackThreads = 50;
+ final CyclicBarrier gate = new CyclicBarrier(eventThreads + viewThreads + crashThreads + locationThreads + feedbackThreads + 1);
+ List runs = new ArrayList<>();
+
+ submitEvents(eventThreads, runs, gate);
+ submitViews(viewThreads, runs, gate);
+ submitCrashes(crashThreads, runs, gate);
+ submitLocations(locationThreads, runs, gate);
+ submitFeedbackWidget(feedbackThreads, runs, gate);
+
+ for (Thread t : runs) {
+ t.start();
+ }
+
+ gate.await();
+ Storage.await(Mockito.mock(Log.class));
+
+ for (Thread t : runs) {
+ t.join();
+ }
+
+ rqSize = TestUtils.getCurrentRQ().length;
+ events = new ArrayList<>();
+
+ for (int rqIdx = 0; rqIdx < rqSize; rqIdx++) {
+ events.addAll(TestUtils.readEventsFromRequest(rqIdx, TestUtils.DEVICE_ID));
+ }
+
+ events.addAll(TestUtils.getCurrentEQ());
+ //print(events);
+
+ Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash") && !r.get("crash").contains("java.lang.Exception")).forEach(r -> {
+ Assert.assertNull(r.get("crash")); // validate that there is no unhandled sdk crash occurs
+ });
+ Assert.assertEquals(feedbackThreads, feedbackWidgetCounter.get());
+ Assert.assertEquals(crashThreads, crashCounter.get());
+ Assert.assertEquals(viewThreads, viewCounter.get());
+ Assert.assertEquals(eventThreads, eventCounter.get());
+ Assert.assertEquals(locationThreads, locationCounter.get());
+ }
+
+ private void print(List events) {
+ System.out.println(events.stream().filter(e -> e.key.equals("[CLY]_survey")).count());
+ System.out.println(events.stream().filter(e -> e.key.equals("[CLY]_view")).count());
+ System.out.println(events.stream().filter(e -> !e.key.equals("[CLY]_view") && !e.key.equals("[CLY]_survey")).count());
+ System.out.println((int) Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash")).count());
+ Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash") && !r.get("crash").contains("java.lang.Exception")).forEach(r -> {
+ System.out.println(r.get("crash"));
+ });
+ System.out.println("-------------------- CALL COUNTS -----------------");
+ System.out.println("feedbackWidgetCounter: " + feedbackWidgetCounter.get());
+ System.out.println("crashCounter: " + crashCounter.get());
+ System.out.println("viewCounter: " + viewCounter.get());
+ System.out.println("eventCounter: " + eventCounter.get());
+ System.out.println("locationCounter: " + locationCounter.get());
+ }
+
+ private void submitFeedbackWidget(int feedbackThreads, List runs, CyclicBarrier gate) {
+ for (int a = 0; a < feedbackThreads; a++) {
+ int finalA = a;
+ runs.add(new Thread(() -> {
+ gateAwait(gate);
+ CountlyFeedbackWidget feedbackWidget = new CountlyFeedbackWidget();
+ feedbackWidget.widgetId = "testThreadFeedbackWidget_" + finalA;
+ feedbackWidget.type = FeedbackWidgetType.survey;
+ feedbackWidget.name = "testThreadFeedbackWidget_" + finalA;
+ feedbackWidget.tags = new String[] { "testThreadFeedbackWidget_" + finalA };
+ Countly.instance().feedback().reportFeedbackWidgetManually(feedbackWidget, null, null);
+ feedbackWidgetCounter.incrementAndGet();
+ }));
+ }
+ }
+
+ private void gateAwait(CyclicBarrier gate) {
+ try {
+ gate.await();
+ } catch (BrokenBarrierException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void submitLocations(int locationThreads, List runs, CyclicBarrier gate) {
+ for (int a = 0; a < locationThreads; a++) {
+ int finalA = a;
+ runs.add(new Thread(() -> {
+ gateAwait(gate);
+ Countly.instance().addLocation(finalA, finalA + 1);
+ locationCounter.incrementAndGet();
+ }));
+ }
+ }
+
+ private void submitCrashes(int crashThreads, List runs, CyclicBarrier gate) {
+ for (int a = 0; a < crashThreads; a++) {
+ int finalA = a;
+ runs.add(new Thread(() -> {
+ gateAwait(gate);
+ Countly.instance().addCrashReport(new Exception("testThreadCrash_" + finalA), true);
+ crashCounter.incrementAndGet();
+ }));
+ }
+ }
+
+ private void submitViews(int viewThreads, List runs, CyclicBarrier gate) {
+ for (int a = 0; a < viewThreads; a++) {
+ int finalA = a;
+ runs.add(new Thread(() -> {
+ gateAwait(gate);
+ Countly.instance().view("testThreadView_" + finalA).start(false);
+ viewCounter.incrementAndGet();
+ }));
+ }
+ }
+
+ private void submitEvents(int eventThreads, List runs, CyclicBarrier gate) {
+ for (int a = 0; a < eventThreads; a++) {
+ int finalA = a;
+ runs.add(new Thread(() -> {
+ gateAwait(gate);
+ Countly.instance().events().recordEvent("testThreadEvent_" + finalA, finalA);
+ eventCounter.incrementAndGet();
+ }));
+ }
+ }
+
+ private Config getAllConfig() {
+ Config config = TestUtils.getBaseConfig();
+ config.enableFeatures(Config.Feature.Events, Config.Feature.Sessions, Config.Feature.Location, Config.Feature.CrashReporting, Config.Feature.Feedback, Config.Feature.UserProfiles, Config.Feature.Views, Config.Feature.RemoteConfig);
+ config.enableRemoteConfigValueCaching().setRequiresConsent(false);
+ return config;
+ }
+}
diff --git a/sdk-java/src/test/java/ly/count/sdk/java/internal/TestUtils.java b/sdk-java/src/test/java/ly/count/sdk/java/internal/TestUtils.java
index 456ab91fe..4b33e43d9 100644
--- a/sdk-java/src/test/java/ly/count/sdk/java/internal/TestUtils.java
+++ b/sdk-java/src/test/java/ly/count/sdk/java/internal/TestUtils.java
@@ -324,8 +324,12 @@ static List readEventsFromRequest() {
static List readEventsFromRequest(int requestIndex, String deviceId) {
Map request = getCurrentRQ()[requestIndex];
validateRequiredParams(request, deviceId);
- JSONArray array = new JSONArray(request.get("events"));
+ String events = request.get("events");
List result = new ArrayList<>();
+ if (events == null) {
+ return result;
+ }
+ JSONArray array = new JSONArray(events);
array.forEach(value -> {
result.add(EventImpl.fromJSON(value.toString(), (ev) -> {