Skip to content

Commit

Permalink
[Java] Create a multithreading test (#175)
Browse files Browse the repository at this point in the history
* fix: concurrent modification

* feat: multithreading test init

* feat: locking for the EQ

* feat: cyclic barrier

* chore: test printing

* fix: await things

* fix: multi threading test

* feat: validate no sdk error occurs
  • Loading branch information
arifBurakDemiray authored Nov 22, 2023
1 parent 7a05f5f commit 47a9012
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 24 deletions.
3 changes: 3 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class CountlyTimer {

private final Log L;
private ScheduledExecutorService timerService;
protected static int TIMER_DELAY_MS = 0; // for testing purposes

protected CountlyTimer(Log logger) {
L = logger;
Expand All @@ -34,11 +35,19 @@ protected void stopTimer() {

protected void startTimer(long timerDelay, Runnable runnable) {
L.i("[CountlyTimer] startTimer, Starting global timer timerDelay: [" + timerDelay + "]");
timerDelay = timerDelay * 1000;

if (timerDelay < 1) {
timerDelay = 1;
if (timerDelay < 1000) {
timerDelay = 1000;
}

timerService.scheduleWithFixedDelay(runnable, timerDelay, timerDelay, TimeUnit.SECONDS);
long startTime = timerDelay;

if (TIMER_DELAY_MS > 0) {
timerDelay = TIMER_DELAY_MS;
startTime = 0;
}

timerService.scheduleWithFixedDelay(runnable, startTime, timerDelay, TimeUnit.MILLISECONDS);
}
}
51 changes: 33 additions & 18 deletions sdk-java/src/main/java/ly/count/sdk/java/internal/EventQueue.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package ly.count.sdk.java.internal;

import ly.count.sdk.java.Countly;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nonnull;

public class EventQueue {

static final String DELIMITER = ":::";
Log L;
List<EventImpl> eventQueueMemoryCache;

protected final Object lockEQ = new Object();

protected EventQueue() {
}

Expand All @@ -24,7 +25,15 @@ protected EventQueue(@Nonnull Log logger, int eventThreshold) {
* Returns the number of events currently stored in the queue.
*/
protected int eqSize() {
return eventQueueMemoryCache.size();
synchronized (lockEQ) {
return eventQueueMemoryCache.size();
}
}

protected List<EventImpl> getEQ() {
synchronized (lockEQ) {
return new ArrayList<>(eventQueueMemoryCache);
}
}

void addEvent(@Nonnull final EventImpl event) {
Expand All @@ -33,8 +42,10 @@ void addEvent(@Nonnull final EventImpl event) {
return;
}
L.d("[EventQueue] Adding event: " + event.key);
eventQueueMemoryCache.add(event);
writeEventQueueToStorage();
synchronized (lockEQ) {
eventQueueMemoryCache.add(event);
writeEventQueueToStorage();
}
}

/**
Expand All @@ -56,20 +67,22 @@ void writeEventQueueToStorage() {
* Restores events from disk
*/
void restoreFromDisk() {
L.d("[EventQueue] Restoring events from disk");
eventQueueMemoryCache.clear();

final String[] array = getEvents();
for (String s : array) {

final EventImpl event = EventImpl.fromJSON(s, (ev) -> {
}, L);
if (event != null) {
eventQueueMemoryCache.add(event);
synchronized (lockEQ) {
L.d("[EventQueue] Restoring events from disk");
eventQueueMemoryCache.clear();

final String[] array = getEvents();
for (String s : array) {

final EventImpl event = EventImpl.fromJSON(s, (ev) -> {
}, L);
if (event != null) {
eventQueueMemoryCache.add(event);
}
}
// order the events from least to most recent
eventQueueMemoryCache.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp));
}
// order the events from least to most recent
eventQueueMemoryCache.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp));
}

@Nonnull String joinEvents(@Nonnull final Collection<EventImpl> collection) {
Expand All @@ -91,6 +104,8 @@ void restoreFromDisk() {

public void clear() {
SDKCore.instance.sdkStorage.storeEventQueue("");
eventQueueMemoryCache.clear();
synchronized (lockEQ) {
eventQueueMemoryCache.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void stop(InternalConfig config, final boolean clear) {
private synchronized void addEventsToRequestQ(String deviceId) {
L.d("[ModuleEvents] addEventsToRequestQ");

if (eventQueue.eventQueueMemoryCache.isEmpty()) {
if (eventQueue.getEQ().isEmpty()) {
L.d("[ModuleEvents] addEventsToRequestQ, eventQueueMemoryCache is empty, skipping");
return;
}
Expand All @@ -78,7 +78,7 @@ private synchronized void addEventsToRequestQ(String deviceId) {
if (deviceId != null) {
request.params.add("device_id", deviceId);
}
request.params.arr("events").put(eventQueue.eventQueueMemoryCache).add();
request.params.arr("events").put(eventQueue.getEQ()).add();
request.own(ModuleEvents.class);

eventQueue.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package ly.count.sdk.java.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import ly.count.sdk.java.Config;
import ly.count.sdk.java.Countly;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
public class MultiThreadingTest {
@After
public void stop() {
Countly.instance().halt();
}

@Before
public void beforeTest() {
TestUtils.createCleanTestState();
}

AtomicInteger feedbackWidgetCounter = new AtomicInteger(0);
AtomicInteger crashCounter = new AtomicInteger(0);
AtomicInteger viewCounter = new AtomicInteger(0);
AtomicInteger eventCounter = new AtomicInteger(0);
AtomicInteger locationCounter = new AtomicInteger(0);

/**
* Test that all modules are thread safe, and called at the desired count
*
* @throws BrokenBarrierException BrokenBarrierException
* @throws InterruptedException InterruptedException
*/
@Test
public void multiThread() throws BrokenBarrierException, InterruptedException {
CountlyTimer.TIMER_DELAY_MS = 1;
Countly.instance().init(getAllConfig());

int rqSize = TestUtils.getCurrentRQ().length;
List<EventImpl> events = new ArrayList<>();
for (int rqIdx = 0; rqIdx < rqSize; rqIdx++) {
events.addAll(TestUtils.readEventsFromRequest(rqIdx, TestUtils.DEVICE_ID));
}
events.addAll(TestUtils.getCurrentEQ());

Assert.assertEquals(0, feedbackWidgetCounter.get());
Assert.assertEquals(0, crashCounter.get());
Assert.assertEquals(0, viewCounter.get());
Assert.assertEquals(0, eventCounter.get());
Assert.assertEquals(0, locationCounter.get());
//print(events);
int eventThreads = 50;
int viewThreads = 50;
int locationThreads = 50;
int crashThreads = 50;
int feedbackThreads = 50;
final CyclicBarrier gate = new CyclicBarrier(eventThreads + viewThreads + crashThreads + locationThreads + feedbackThreads + 1);
List<Thread> runs = new ArrayList<>();

submitEvents(eventThreads, runs, gate);
submitViews(viewThreads, runs, gate);
submitCrashes(crashThreads, runs, gate);
submitLocations(locationThreads, runs, gate);
submitFeedbackWidget(feedbackThreads, runs, gate);

for (Thread t : runs) {
t.start();
}

gate.await();
Storage.await(Mockito.mock(Log.class));

for (Thread t : runs) {
t.join();
}

rqSize = TestUtils.getCurrentRQ().length;
events = new ArrayList<>();

for (int rqIdx = 0; rqIdx < rqSize; rqIdx++) {
events.addAll(TestUtils.readEventsFromRequest(rqIdx, TestUtils.DEVICE_ID));
}

events.addAll(TestUtils.getCurrentEQ());
//print(events);

Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash") && !r.get("crash").contains("java.lang.Exception")).forEach(r -> {
Assert.assertNull(r.get("crash")); // validate that there is no unhandled sdk crash occurs
});
Assert.assertEquals(feedbackThreads, feedbackWidgetCounter.get());
Assert.assertEquals(crashThreads, crashCounter.get());
Assert.assertEquals(viewThreads, viewCounter.get());
Assert.assertEquals(eventThreads, eventCounter.get());
Assert.assertEquals(locationThreads, locationCounter.get());
}

private void print(List<EventImpl> events) {
System.out.println(events.stream().filter(e -> e.key.equals("[CLY]_survey")).count());
System.out.println(events.stream().filter(e -> e.key.equals("[CLY]_view")).count());
System.out.println(events.stream().filter(e -> !e.key.equals("[CLY]_view") && !e.key.equals("[CLY]_survey")).count());
System.out.println((int) Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash")).count());
Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash") && !r.get("crash").contains("java.lang.Exception")).forEach(r -> {
System.out.println(r.get("crash"));
});
System.out.println("-------------------- CALL COUNTS -----------------");
System.out.println("feedbackWidgetCounter: " + feedbackWidgetCounter.get());
System.out.println("crashCounter: " + crashCounter.get());
System.out.println("viewCounter: " + viewCounter.get());
System.out.println("eventCounter: " + eventCounter.get());
System.out.println("locationCounter: " + locationCounter.get());
}

private void submitFeedbackWidget(int feedbackThreads, List<Thread> runs, CyclicBarrier gate) {
for (int a = 0; a < feedbackThreads; a++) {
int finalA = a;
runs.add(new Thread(() -> {
gateAwait(gate);
CountlyFeedbackWidget feedbackWidget = new CountlyFeedbackWidget();
feedbackWidget.widgetId = "testThreadFeedbackWidget_" + finalA;
feedbackWidget.type = FeedbackWidgetType.survey;
feedbackWidget.name = "testThreadFeedbackWidget_" + finalA;
feedbackWidget.tags = new String[] { "testThreadFeedbackWidget_" + finalA };
Countly.instance().feedback().reportFeedbackWidgetManually(feedbackWidget, null, null);
feedbackWidgetCounter.incrementAndGet();
}));
}
}

private void gateAwait(CyclicBarrier gate) {
try {
gate.await();
} catch (BrokenBarrierException | InterruptedException e) {
e.printStackTrace();
}
}

private void submitLocations(int locationThreads, List<Thread> runs, CyclicBarrier gate) {
for (int a = 0; a < locationThreads; a++) {
int finalA = a;
runs.add(new Thread(() -> {
gateAwait(gate);
Countly.instance().addLocation(finalA, finalA + 1);
locationCounter.incrementAndGet();
}));
}
}

private void submitCrashes(int crashThreads, List<Thread> runs, CyclicBarrier gate) {
for (int a = 0; a < crashThreads; a++) {
int finalA = a;
runs.add(new Thread(() -> {
gateAwait(gate);
Countly.instance().addCrashReport(new Exception("testThreadCrash_" + finalA), true);
crashCounter.incrementAndGet();
}));
}
}

private void submitViews(int viewThreads, List<Thread> runs, CyclicBarrier gate) {
for (int a = 0; a < viewThreads; a++) {
int finalA = a;
runs.add(new Thread(() -> {
gateAwait(gate);
Countly.instance().view("testThreadView_" + finalA).start(false);
viewCounter.incrementAndGet();
}));
}
}

private void submitEvents(int eventThreads, List<Thread> runs, CyclicBarrier gate) {
for (int a = 0; a < eventThreads; a++) {
int finalA = a;
runs.add(new Thread(() -> {
gateAwait(gate);
Countly.instance().events().recordEvent("testThreadEvent_" + finalA, finalA);
eventCounter.incrementAndGet();
}));
}
}

private Config getAllConfig() {
Config config = TestUtils.getBaseConfig();
config.enableFeatures(Config.Feature.Events, Config.Feature.Sessions, Config.Feature.Location, Config.Feature.CrashReporting, Config.Feature.Feedback, Config.Feature.UserProfiles, Config.Feature.Views, Config.Feature.RemoteConfig);
config.enableRemoteConfigValueCaching().setRequiresConsent(false);
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,12 @@ static List<EventImpl> readEventsFromRequest() {
static List<EventImpl> readEventsFromRequest(int requestIndex, String deviceId) {
Map<String, String> request = getCurrentRQ()[requestIndex];
validateRequiredParams(request, deviceId);
JSONArray array = new JSONArray(request.get("events"));
String events = request.get("events");
List<EventImpl> result = new ArrayList<>();
if (events == null) {
return result;
}
JSONArray array = new JSONArray(events);

array.forEach(value -> {
result.add(EventImpl.fromJSON(value.toString(), (ev) -> {
Expand Down

0 comments on commit 47a9012

Please sign in to comment.