diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index c4a0a1e70..142a908fd 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -2,6 +2,9 @@ \ No newline at end of file diff --git a/sdk-java/src/main/java/ly/count/sdk/java/internal/CountlyTimer.java b/sdk-java/src/main/java/ly/count/sdk/java/internal/CountlyTimer.java index 01d98b3fd..08b912d84 100644 --- a/sdk-java/src/main/java/ly/count/sdk/java/internal/CountlyTimer.java +++ b/sdk-java/src/main/java/ly/count/sdk/java/internal/CountlyTimer.java @@ -8,6 +8,7 @@ public class CountlyTimer { private final Log L; private ScheduledExecutorService timerService; + protected static int TIMER_DELAY_MS = 0; // for testing purposes protected CountlyTimer(Log logger) { L = logger; @@ -34,11 +35,19 @@ protected void stopTimer() { protected void startTimer(long timerDelay, Runnable runnable) { L.i("[CountlyTimer] startTimer, Starting global timer timerDelay: [" + timerDelay + "]"); + timerDelay = timerDelay * 1000; - if (timerDelay < 1) { - timerDelay = 1; + if (timerDelay < 1000) { + timerDelay = 1000; } - timerService.scheduleWithFixedDelay(runnable, timerDelay, timerDelay, TimeUnit.SECONDS); + long startTime = timerDelay; + + if (TIMER_DELAY_MS > 0) { + timerDelay = TIMER_DELAY_MS; + startTime = 0; + } + + timerService.scheduleWithFixedDelay(runnable, startTime, timerDelay, TimeUnit.MILLISECONDS); } } diff --git a/sdk-java/src/main/java/ly/count/sdk/java/internal/EventQueue.java b/sdk-java/src/main/java/ly/count/sdk/java/internal/EventQueue.java index ececc5622..7e6cfae42 100644 --- a/sdk-java/src/main/java/ly/count/sdk/java/internal/EventQueue.java +++ b/sdk-java/src/main/java/ly/count/sdk/java/internal/EventQueue.java @@ -1,10 +1,9 @@ package ly.count.sdk.java.internal; -import ly.count.sdk.java.Countly; -import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import javax.annotation.Nonnull; public class EventQueue { @@ -12,6 +11,8 @@ public class EventQueue { Log L; List eventQueueMemoryCache; + protected final Object lockEQ = new Object(); + protected EventQueue() { } @@ -24,7 +25,15 @@ protected EventQueue(@Nonnull Log logger, int eventThreshold) { * Returns the number of events currently stored in the queue. */ protected int eqSize() { - return eventQueueMemoryCache.size(); + synchronized (lockEQ) { + return eventQueueMemoryCache.size(); + } + } + + protected List getEQ() { + synchronized (lockEQ) { + return new ArrayList<>(eventQueueMemoryCache); + } } void addEvent(@Nonnull final EventImpl event) { @@ -33,8 +42,10 @@ void addEvent(@Nonnull final EventImpl event) { return; } L.d("[EventQueue] Adding event: " + event.key); - eventQueueMemoryCache.add(event); - writeEventQueueToStorage(); + synchronized (lockEQ) { + eventQueueMemoryCache.add(event); + writeEventQueueToStorage(); + } } /** @@ -56,20 +67,22 @@ void writeEventQueueToStorage() { * Restores events from disk */ void restoreFromDisk() { - L.d("[EventQueue] Restoring events from disk"); - eventQueueMemoryCache.clear(); - - final String[] array = getEvents(); - for (String s : array) { - - final EventImpl event = EventImpl.fromJSON(s, (ev) -> { - }, L); - if (event != null) { - eventQueueMemoryCache.add(event); + synchronized (lockEQ) { + L.d("[EventQueue] Restoring events from disk"); + eventQueueMemoryCache.clear(); + + final String[] array = getEvents(); + for (String s : array) { + + final EventImpl event = EventImpl.fromJSON(s, (ev) -> { + }, L); + if (event != null) { + eventQueueMemoryCache.add(event); + } } + // order the events from least to most recent + eventQueueMemoryCache.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp)); } - // order the events from least to most recent - eventQueueMemoryCache.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp)); } @Nonnull String joinEvents(@Nonnull final Collection collection) { @@ -91,6 +104,8 @@ void restoreFromDisk() { public void clear() { SDKCore.instance.sdkStorage.storeEventQueue(""); - eventQueueMemoryCache.clear(); + synchronized (lockEQ) { + eventQueueMemoryCache.clear(); + } } } diff --git a/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleEvents.java b/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleEvents.java index d49262e7f..0ab8909c5 100644 --- a/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleEvents.java +++ b/sdk-java/src/main/java/ly/count/sdk/java/internal/ModuleEvents.java @@ -69,7 +69,7 @@ public void stop(InternalConfig config, final boolean clear) { private synchronized void addEventsToRequestQ(String deviceId) { L.d("[ModuleEvents] addEventsToRequestQ"); - if (eventQueue.eventQueueMemoryCache.isEmpty()) { + if (eventQueue.getEQ().isEmpty()) { L.d("[ModuleEvents] addEventsToRequestQ, eventQueueMemoryCache is empty, skipping"); return; } @@ -78,7 +78,7 @@ private synchronized void addEventsToRequestQ(String deviceId) { if (deviceId != null) { request.params.add("device_id", deviceId); } - request.params.arr("events").put(eventQueue.eventQueueMemoryCache).add(); + request.params.arr("events").put(eventQueue.getEQ()).add(); request.own(ModuleEvents.class); eventQueue.clear(); diff --git a/sdk-java/src/test/java/ly/count/sdk/java/internal/MultiThreadingTest.java b/sdk-java/src/test/java/ly/count/sdk/java/internal/MultiThreadingTest.java new file mode 100644 index 000000000..42993ee02 --- /dev/null +++ b/sdk-java/src/test/java/ly/count/sdk/java/internal/MultiThreadingTest.java @@ -0,0 +1,196 @@ +package ly.count.sdk.java.internal; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import ly.count.sdk.java.Config; +import ly.count.sdk.java.Countly; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class MultiThreadingTest { + @After + public void stop() { + Countly.instance().halt(); + } + + @Before + public void beforeTest() { + TestUtils.createCleanTestState(); + } + + AtomicInteger feedbackWidgetCounter = new AtomicInteger(0); + AtomicInteger crashCounter = new AtomicInteger(0); + AtomicInteger viewCounter = new AtomicInteger(0); + AtomicInteger eventCounter = new AtomicInteger(0); + AtomicInteger locationCounter = new AtomicInteger(0); + + /** + * Test that all modules are thread safe, and called at the desired count + * + * @throws BrokenBarrierException BrokenBarrierException + * @throws InterruptedException InterruptedException + */ + @Test + public void multiThread() throws BrokenBarrierException, InterruptedException { + CountlyTimer.TIMER_DELAY_MS = 1; + Countly.instance().init(getAllConfig()); + + int rqSize = TestUtils.getCurrentRQ().length; + List events = new ArrayList<>(); + for (int rqIdx = 0; rqIdx < rqSize; rqIdx++) { + events.addAll(TestUtils.readEventsFromRequest(rqIdx, TestUtils.DEVICE_ID)); + } + events.addAll(TestUtils.getCurrentEQ()); + + Assert.assertEquals(0, feedbackWidgetCounter.get()); + Assert.assertEquals(0, crashCounter.get()); + Assert.assertEquals(0, viewCounter.get()); + Assert.assertEquals(0, eventCounter.get()); + Assert.assertEquals(0, locationCounter.get()); + //print(events); + int eventThreads = 50; + int viewThreads = 50; + int locationThreads = 50; + int crashThreads = 50; + int feedbackThreads = 50; + final CyclicBarrier gate = new CyclicBarrier(eventThreads + viewThreads + crashThreads + locationThreads + feedbackThreads + 1); + List runs = new ArrayList<>(); + + submitEvents(eventThreads, runs, gate); + submitViews(viewThreads, runs, gate); + submitCrashes(crashThreads, runs, gate); + submitLocations(locationThreads, runs, gate); + submitFeedbackWidget(feedbackThreads, runs, gate); + + for (Thread t : runs) { + t.start(); + } + + gate.await(); + Storage.await(Mockito.mock(Log.class)); + + for (Thread t : runs) { + t.join(); + } + + rqSize = TestUtils.getCurrentRQ().length; + events = new ArrayList<>(); + + for (int rqIdx = 0; rqIdx < rqSize; rqIdx++) { + events.addAll(TestUtils.readEventsFromRequest(rqIdx, TestUtils.DEVICE_ID)); + } + + events.addAll(TestUtils.getCurrentEQ()); + //print(events); + + Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash") && !r.get("crash").contains("java.lang.Exception")).forEach(r -> { + Assert.assertNull(r.get("crash")); // validate that there is no unhandled sdk crash occurs + }); + Assert.assertEquals(feedbackThreads, feedbackWidgetCounter.get()); + Assert.assertEquals(crashThreads, crashCounter.get()); + Assert.assertEquals(viewThreads, viewCounter.get()); + Assert.assertEquals(eventThreads, eventCounter.get()); + Assert.assertEquals(locationThreads, locationCounter.get()); + } + + private void print(List events) { + System.out.println(events.stream().filter(e -> e.key.equals("[CLY]_survey")).count()); + System.out.println(events.stream().filter(e -> e.key.equals("[CLY]_view")).count()); + System.out.println(events.stream().filter(e -> !e.key.equals("[CLY]_view") && !e.key.equals("[CLY]_survey")).count()); + System.out.println((int) Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash")).count()); + Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash") && !r.get("crash").contains("java.lang.Exception")).forEach(r -> { + System.out.println(r.get("crash")); + }); + System.out.println("-------------------- CALL COUNTS -----------------"); + System.out.println("feedbackWidgetCounter: " + feedbackWidgetCounter.get()); + System.out.println("crashCounter: " + crashCounter.get()); + System.out.println("viewCounter: " + viewCounter.get()); + System.out.println("eventCounter: " + eventCounter.get()); + System.out.println("locationCounter: " + locationCounter.get()); + } + + private void submitFeedbackWidget(int feedbackThreads, List runs, CyclicBarrier gate) { + for (int a = 0; a < feedbackThreads; a++) { + int finalA = a; + runs.add(new Thread(() -> { + gateAwait(gate); + CountlyFeedbackWidget feedbackWidget = new CountlyFeedbackWidget(); + feedbackWidget.widgetId = "testThreadFeedbackWidget_" + finalA; + feedbackWidget.type = FeedbackWidgetType.survey; + feedbackWidget.name = "testThreadFeedbackWidget_" + finalA; + feedbackWidget.tags = new String[] { "testThreadFeedbackWidget_" + finalA }; + Countly.instance().feedback().reportFeedbackWidgetManually(feedbackWidget, null, null); + feedbackWidgetCounter.incrementAndGet(); + })); + } + } + + private void gateAwait(CyclicBarrier gate) { + try { + gate.await(); + } catch (BrokenBarrierException | InterruptedException e) { + e.printStackTrace(); + } + } + + private void submitLocations(int locationThreads, List runs, CyclicBarrier gate) { + for (int a = 0; a < locationThreads; a++) { + int finalA = a; + runs.add(new Thread(() -> { + gateAwait(gate); + Countly.instance().addLocation(finalA, finalA + 1); + locationCounter.incrementAndGet(); + })); + } + } + + private void submitCrashes(int crashThreads, List runs, CyclicBarrier gate) { + for (int a = 0; a < crashThreads; a++) { + int finalA = a; + runs.add(new Thread(() -> { + gateAwait(gate); + Countly.instance().addCrashReport(new Exception("testThreadCrash_" + finalA), true); + crashCounter.incrementAndGet(); + })); + } + } + + private void submitViews(int viewThreads, List runs, CyclicBarrier gate) { + for (int a = 0; a < viewThreads; a++) { + int finalA = a; + runs.add(new Thread(() -> { + gateAwait(gate); + Countly.instance().view("testThreadView_" + finalA).start(false); + viewCounter.incrementAndGet(); + })); + } + } + + private void submitEvents(int eventThreads, List runs, CyclicBarrier gate) { + for (int a = 0; a < eventThreads; a++) { + int finalA = a; + runs.add(new Thread(() -> { + gateAwait(gate); + Countly.instance().events().recordEvent("testThreadEvent_" + finalA, finalA); + eventCounter.incrementAndGet(); + })); + } + } + + private Config getAllConfig() { + Config config = TestUtils.getBaseConfig(); + config.enableFeatures(Config.Feature.Events, Config.Feature.Sessions, Config.Feature.Location, Config.Feature.CrashReporting, Config.Feature.Feedback, Config.Feature.UserProfiles, Config.Feature.Views, Config.Feature.RemoteConfig); + config.enableRemoteConfigValueCaching().setRequiresConsent(false); + return config; + } +} diff --git a/sdk-java/src/test/java/ly/count/sdk/java/internal/TestUtils.java b/sdk-java/src/test/java/ly/count/sdk/java/internal/TestUtils.java index 456ab91fe..4b33e43d9 100644 --- a/sdk-java/src/test/java/ly/count/sdk/java/internal/TestUtils.java +++ b/sdk-java/src/test/java/ly/count/sdk/java/internal/TestUtils.java @@ -324,8 +324,12 @@ static List readEventsFromRequest() { static List readEventsFromRequest(int requestIndex, String deviceId) { Map request = getCurrentRQ()[requestIndex]; validateRequiredParams(request, deviceId); - JSONArray array = new JSONArray(request.get("events")); + String events = request.get("events"); List result = new ArrayList<>(); + if (events == null) { + return result; + } + JSONArray array = new JSONArray(events); array.forEach(value -> { result.add(EventImpl.fromJSON(value.toString(), (ev) -> {