Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Create a multithreading test #175

Merged
merged 9 commits into from
Nov 22, 2023
Merged
3 changes: 3 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class CountlyTimer {

private final Log L;
private ScheduledExecutorService timerService;
protected static int TIMER_DELAY_MS = 0; // for testing purposes

protected CountlyTimer(Log logger) {
L = logger;
Expand All @@ -34,11 +35,19 @@ protected void stopTimer() {

protected void startTimer(long timerDelay, Runnable runnable) {
L.i("[CountlyTimer] startTimer, Starting global timer timerDelay: [" + timerDelay + "]");
timerDelay = timerDelay * 1000;

if (timerDelay < 1) {
timerDelay = 1;
if (timerDelay < 1000) {
timerDelay = 1000;
}

timerService.scheduleWithFixedDelay(runnable, timerDelay, timerDelay, TimeUnit.SECONDS);
long startTime = timerDelay;

if (TIMER_DELAY_MS > 0) {
timerDelay = TIMER_DELAY_MS;
startTime = 0;
}

timerService.scheduleWithFixedDelay(runnable, startTime, timerDelay, TimeUnit.MILLISECONDS);
}
}
51 changes: 33 additions & 18 deletions sdk-java/src/main/java/ly/count/sdk/java/internal/EventQueue.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package ly.count.sdk.java.internal;

import ly.count.sdk.java.Countly;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nonnull;

public class EventQueue {

static final String DELIMITER = ":::";
Log L;
List<EventImpl> eventQueueMemoryCache;

protected final Object lockEQ = new Object();

protected EventQueue() {
}

Expand All @@ -24,7 +25,15 @@ protected EventQueue(@Nonnull Log logger, int eventThreshold) {
* Returns the number of events currently stored in the queue.
*/
protected int eqSize() {
return eventQueueMemoryCache.size();
synchronized (lockEQ) {
return eventQueueMemoryCache.size();
}
}

protected List<EventImpl> getEQ() {
synchronized (lockEQ) {
return new ArrayList<>(eventQueueMemoryCache);
}
}

void addEvent(@Nonnull final EventImpl event) {
Expand All @@ -33,8 +42,10 @@ void addEvent(@Nonnull final EventImpl event) {
return;
}
L.d("[EventQueue] Adding event: " + event.key);
eventQueueMemoryCache.add(event);
writeEventQueueToStorage();
synchronized (lockEQ) {
eventQueueMemoryCache.add(event);
writeEventQueueToStorage();
}
}

/**
Expand All @@ -56,20 +67,22 @@ void writeEventQueueToStorage() {
* Restores events from disk
*/
void restoreFromDisk() {
L.d("[EventQueue] Restoring events from disk");
eventQueueMemoryCache.clear();

final String[] array = getEvents();
for (String s : array) {

final EventImpl event = EventImpl.fromJSON(s, (ev) -> {
}, L);
if (event != null) {
eventQueueMemoryCache.add(event);
synchronized (lockEQ) {
L.d("[EventQueue] Restoring events from disk");
eventQueueMemoryCache.clear();

final String[] array = getEvents();
for (String s : array) {

final EventImpl event = EventImpl.fromJSON(s, (ev) -> {
}, L);
if (event != null) {
eventQueueMemoryCache.add(event);
}
}
// order the events from least to most recent
eventQueueMemoryCache.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp));
}
// order the events from least to most recent
eventQueueMemoryCache.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp));
}

@Nonnull String joinEvents(@Nonnull final Collection<EventImpl> collection) {
Expand All @@ -91,6 +104,8 @@ void restoreFromDisk() {

public void clear() {
SDKCore.instance.sdkStorage.storeEventQueue("");
eventQueueMemoryCache.clear();
synchronized (lockEQ) {
eventQueueMemoryCache.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void stop(InternalConfig config, final boolean clear) {
private synchronized void addEventsToRequestQ(String deviceId) {
L.d("[ModuleEvents] addEventsToRequestQ");

if (eventQueue.eventQueueMemoryCache.isEmpty()) {
if (eventQueue.getEQ().isEmpty()) {
L.d("[ModuleEvents] addEventsToRequestQ, eventQueueMemoryCache is empty, skipping");
return;
}
Expand All @@ -78,7 +78,7 @@ private synchronized void addEventsToRequestQ(String deviceId) {
if (deviceId != null) {
request.params.add("device_id", deviceId);
}
request.params.arr("events").put(eventQueue.eventQueueMemoryCache).add();
request.params.arr("events").put(eventQueue.getEQ()).add();
request.own(ModuleEvents.class);

eventQueue.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package ly.count.sdk.java.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import ly.count.sdk.java.Config;
import ly.count.sdk.java.Countly;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
public class MultiThreadingTest {
@After
public void stop() {
Countly.instance().halt();
}

@Before
public void beforeTest() {
TestUtils.createCleanTestState();
}

AtomicInteger feedbackWidgetCounter = new AtomicInteger(0);
AtomicInteger crashCounter = new AtomicInteger(0);
AtomicInteger viewCounter = new AtomicInteger(0);
AtomicInteger eventCounter = new AtomicInteger(0);
AtomicInteger locationCounter = new AtomicInteger(0);

/**
* Test that all modules are thread safe, and called at the desired count
*
* @throws BrokenBarrierException BrokenBarrierException
* @throws InterruptedException InterruptedException
*/
@Test
public void multiThread() throws BrokenBarrierException, InterruptedException {
CountlyTimer.TIMER_DELAY_MS = 1;
Countly.instance().init(getAllConfig());

int rqSize = TestUtils.getCurrentRQ().length;
List<EventImpl> events = new ArrayList<>();
for (int rqIdx = 0; rqIdx < rqSize; rqIdx++) {
events.addAll(TestUtils.readEventsFromRequest(rqIdx, TestUtils.DEVICE_ID));
}
events.addAll(TestUtils.getCurrentEQ());

Assert.assertEquals(0, feedbackWidgetCounter.get());
Assert.assertEquals(0, crashCounter.get());
Assert.assertEquals(0, viewCounter.get());
Assert.assertEquals(0, eventCounter.get());
Assert.assertEquals(0, locationCounter.get());
//print(events);
int eventThreads = 50;
int viewThreads = 50;
int locationThreads = 50;
int crashThreads = 50;
int feedbackThreads = 50;
final CyclicBarrier gate = new CyclicBarrier(eventThreads + viewThreads + crashThreads + locationThreads + feedbackThreads + 1);
List<Thread> runs = new ArrayList<>();

submitEvents(eventThreads, runs, gate);
submitViews(viewThreads, runs, gate);
submitCrashes(crashThreads, runs, gate);
submitLocations(locationThreads, runs, gate);
submitFeedbackWidget(feedbackThreads, runs, gate);

for (Thread t : runs) {
t.start();
}

gate.await();
Storage.await(Mockito.mock(Log.class));

for (Thread t : runs) {
t.join();
}

rqSize = TestUtils.getCurrentRQ().length;
events = new ArrayList<>();

for (int rqIdx = 0; rqIdx < rqSize; rqIdx++) {
events.addAll(TestUtils.readEventsFromRequest(rqIdx, TestUtils.DEVICE_ID));
}

events.addAll(TestUtils.getCurrentEQ());
//print(events);

Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash") && !r.get("crash").contains("java.lang.Exception")).forEach(r -> {
Assert.assertNull(r.get("crash")); // validate that there is no unhandled sdk crash occurs
});
Assert.assertEquals(feedbackThreads, feedbackWidgetCounter.get());
Assert.assertEquals(crashThreads, crashCounter.get());
Assert.assertEquals(viewThreads, viewCounter.get());
Assert.assertEquals(eventThreads, eventCounter.get());
Assert.assertEquals(locationThreads, locationCounter.get());
}

private void print(List<EventImpl> events) {
System.out.println(events.stream().filter(e -> e.key.equals("[CLY]_survey")).count());
System.out.println(events.stream().filter(e -> e.key.equals("[CLY]_view")).count());
System.out.println(events.stream().filter(e -> !e.key.equals("[CLY]_view") && !e.key.equals("[CLY]_survey")).count());
System.out.println((int) Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash")).count());
Arrays.stream(TestUtils.getCurrentRQ()).filter(r -> r.containsKey("crash") && !r.get("crash").contains("java.lang.Exception")).forEach(r -> {
System.out.println(r.get("crash"));
});
System.out.println("-------------------- CALL COUNTS -----------------");
System.out.println("feedbackWidgetCounter: " + feedbackWidgetCounter.get());
System.out.println("crashCounter: " + crashCounter.get());
System.out.println("viewCounter: " + viewCounter.get());
System.out.println("eventCounter: " + eventCounter.get());
System.out.println("locationCounter: " + locationCounter.get());
}

private void submitFeedbackWidget(int feedbackThreads, List<Thread> runs, CyclicBarrier gate) {
for (int a = 0; a < feedbackThreads; a++) {
int finalA = a;
runs.add(new Thread(() -> {
gateAwait(gate);
CountlyFeedbackWidget feedbackWidget = new CountlyFeedbackWidget();
feedbackWidget.widgetId = "testThreadFeedbackWidget_" + finalA;
feedbackWidget.type = FeedbackWidgetType.survey;
feedbackWidget.name = "testThreadFeedbackWidget_" + finalA;
feedbackWidget.tags = new String[] { "testThreadFeedbackWidget_" + finalA };
Countly.instance().feedback().reportFeedbackWidgetManually(feedbackWidget, null, null);
feedbackWidgetCounter.incrementAndGet();
}));
}
}

private void gateAwait(CyclicBarrier gate) {
try {
gate.await();
} catch (BrokenBarrierException | InterruptedException e) {
e.printStackTrace();
}
}

private void submitLocations(int locationThreads, List<Thread> runs, CyclicBarrier gate) {
for (int a = 0; a < locationThreads; a++) {
int finalA = a;
runs.add(new Thread(() -> {
gateAwait(gate);
Countly.instance().addLocation(finalA, finalA + 1);
locationCounter.incrementAndGet();
}));
}
}

private void submitCrashes(int crashThreads, List<Thread> runs, CyclicBarrier gate) {
for (int a = 0; a < crashThreads; a++) {
int finalA = a;
runs.add(new Thread(() -> {
gateAwait(gate);
Countly.instance().addCrashReport(new Exception("testThreadCrash_" + finalA), true);
crashCounter.incrementAndGet();
}));
}
}

private void submitViews(int viewThreads, List<Thread> runs, CyclicBarrier gate) {
for (int a = 0; a < viewThreads; a++) {
int finalA = a;
runs.add(new Thread(() -> {
gateAwait(gate);
Countly.instance().view("testThreadView_" + finalA).start(false);
viewCounter.incrementAndGet();
}));
}
}

private void submitEvents(int eventThreads, List<Thread> runs, CyclicBarrier gate) {
for (int a = 0; a < eventThreads; a++) {
int finalA = a;
runs.add(new Thread(() -> {
gateAwait(gate);
Countly.instance().events().recordEvent("testThreadEvent_" + finalA, finalA);
eventCounter.incrementAndGet();
}));
}
}

private Config getAllConfig() {
Config config = TestUtils.getBaseConfig();
config.enableFeatures(Config.Feature.Events, Config.Feature.Sessions, Config.Feature.Location, Config.Feature.CrashReporting, Config.Feature.Feedback, Config.Feature.UserProfiles, Config.Feature.Views, Config.Feature.RemoteConfig);
config.enableRemoteConfigValueCaching().setRequiresConsent(false);
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,12 @@ static List<EventImpl> readEventsFromRequest() {
static List<EventImpl> readEventsFromRequest(int requestIndex, String deviceId) {
Map<String, String> request = getCurrentRQ()[requestIndex];
validateRequiredParams(request, deviceId);
JSONArray array = new JSONArray(request.get("events"));
String events = request.get("events");
List<EventImpl> result = new ArrayList<>();
if (events == null) {
return result;
}
JSONArray array = new JSONArray(events);

array.forEach(value -> {
result.add(EventImpl.fromJSON(value.toString(), (ev) -> {
Expand Down
Loading