Skip to content

Commit

Permalink
Fixing logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Ark2307 committed Feb 8, 2025
1 parent 5fe65df commit edf087d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
19 changes: 9 additions & 10 deletions apps/mini-testing/src/main/java/com/akto/testing/TestExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.akto.testing.workflow_node_executor.Utils;

public class TestExecutor {
Expand Down Expand Up @@ -256,11 +258,12 @@ public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug
}

final int maxRunTime = tempRunTime;
AtomicInteger totalRecords = new AtomicInteger(0);
for (ApiInfo.ApiInfoKey apiInfoKey: apiInfoKeyList) {
List<String> messages = testingUtil.getSampleMessages().get(apiInfoKey);
if(Constants.IS_NEW_TESTING_ENABLED){
for (String testSubCategory: testingRunSubCategories) {
Future<Void> future = threadPool.submit(() ->insertRecordInKafka(accountId, testSubCategory, apiInfoKey, messages, summaryId, syncLimit, apiInfoKeyToHostMap, subCategoryEndpointMap, testConfigMap, testLogs, testingRun, new AtomicBoolean(false)));
Future<Void> future = threadPool.submit(() ->insertRecordInKafka(accountId, testSubCategory, apiInfoKey, messages, summaryId, syncLimit, apiInfoKeyToHostMap, subCategoryEndpointMap, testConfigMap, testLogs, testingRun, new AtomicBoolean(false), totalRecords));
testingRecords.add(future);
}
latch.countDown();
Expand All @@ -272,33 +275,28 @@ public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug
}

try {
//boolean awaitResult = latch.await(maxRunTime, TimeUnit.SECONDS);
int waitTs = Context.now();
while(latch.getCount() > 0 && GetRunningTestsStatus.getRunningTests().isTestRunning(summaryId)
&& (Context.now() - waitTs < maxRunTime)) {
loggerMaker.infoAndAddToDb("waiting for tests to finish", LogDb.TESTING);
Thread.sleep(10000);
}
loggerMaker.infoAndAddToDb("test is completed", LogDb.TESTING);
//awaitResult = latch.getCount() > 0 && GetRunningTestsStatus.getRunningTests().isTestRunning(summaryId);
//loggerMaker.infoAndAddToDb("Await result: " + awaitResult, LogDb.TESTING);

for (Future<Void> future : testingRecords) {
future.cancel(true);
future.cancel(!Constants.IS_NEW_TESTING_ENABLED);
}
loggerMaker.infoAndAddToDb("Canceled all running future tasks due to timeout.", LogDb.TESTING);

} catch (Exception e) {
throw new RuntimeException(e);
}
if(!shouldInitOnly && Constants.IS_NEW_TESTING_ENABLED){
loggerMaker.infoAndAddToDb("Finished inserting records in kafka, Total records: " + totalRecords.get(), LogDb.TESTING);
dbObject.put("PRODUCER_RUNNING", false);
dbObject.put("CONSUMER_RUNNING", true);
writeJsonContentInFile(Constants.TESTING_STATE_FOLDER_PATH, Constants.TESTING_STATE_FILE_NAME, dbObject);
loggerMaker.infoAndAddToDb("Finished inserting records in kafka", LogDb.TESTING);
}
}
loggerMaker.infoAndAddToDb("Finished testing", LogDb.TESTING);
}

public static void updateTestSummary(ObjectId summaryId){
Expand Down Expand Up @@ -505,7 +503,7 @@ public Void startWithLatch(
try {
for (String testSubCategory: testingRunSubCategories) {
if(GetRunningTestsStatus.getRunningTests().isTestRunning(summaryId)){
insertRecordInKafka(accountId, testSubCategory, apiInfoKey, messages, summaryId, syncLimit, apiInfoKeyToHostMap, subCategoryEndpointMap, testConfigMap, testLogs, testingRun, isApiInfoTested);
insertRecordInKafka(accountId, testSubCategory, apiInfoKey, messages, summaryId, syncLimit, apiInfoKeyToHostMap, subCategoryEndpointMap, testConfigMap, testLogs, testingRun, isApiInfoTested, new AtomicInteger(0));
}else{
logger.info("Test stopped for id: " + testingRun.getHexId());
break;
Expand Down Expand Up @@ -606,7 +604,7 @@ public void insertResultsAndMakeIssues(List<TestingRunResult> testingRunResults,
private Void insertRecordInKafka(int accountId, String testSubCategory, ApiInfo.ApiInfoKey apiInfoKey,
List<String> messages, ObjectId summaryId, SyncLimit syncLimit, Map<ApiInfoKey, String> apiInfoKeyToHostMap,
ConcurrentHashMap<String, String> subCategoryEndpointMap, Map<String, TestConfig> testConfigMap,
List<TestingRunResult.TestLog> testLogs, TestingRun testingRun, AtomicBoolean isApiInfoTested) {
List<TestingRunResult.TestLog> testLogs, TestingRun testingRun, AtomicBoolean isApiInfoTested, AtomicInteger totalRecords) {
Context.accountId.set(accountId);
TestConfig testConfig = testConfigMap.get(testSubCategory);

Expand All @@ -633,6 +631,7 @@ private Void insertRecordInKafka(int accountId, String testSubCategory, ApiInfo.
SingleTestPayload singleTestPayload = new SingleTestPayload(
testingRun.getId(), summaryId, apiInfoKey, testSubType, testLogs, accountId
);
totalRecords.incrementAndGet();
logger.info("Inserting record for apiInfoKey: " + apiInfoKey.toString() + " subcategory: " + testSubType);
try {
Producer.pushMessagesToKafka(Arrays.asList(singleTestPayload));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.*;
import org.bson.types.ObjectId;
Expand Down Expand Up @@ -116,6 +117,9 @@ public void init(int maxRunTimeInSeconds) {
final ObjectId summaryObjectId = new ObjectId(summaryIdForTest);
final int startTime = Context.now();
AtomicBoolean firstRecordRead = new AtomicBoolean(false);
AtomicInteger maxRetries = new AtomicInteger(0);
AtomicInteger countVal = new AtomicInteger(0);

boolean isConsumerRunning = false;
if(currentTestInfo != null){
isConsumerRunning = currentTestInfo.getBoolean("CONSUMER_RUNNING");
Expand Down Expand Up @@ -176,6 +180,10 @@ public void init(int maxRunTimeInSeconds) {
});

while (parallelConsumer != null) {
if(countVal.get() % 100 == 0){
countVal.set(0);
logger.info("Total work remaining now is: " + parallelConsumer.workRemaining());
}
if(!GetRunningTestsStatus.getRunningTests().isTestRunning(summaryObjectId)){
logger.info("Tests have been marked stopped.");
executor.shutdownNow();
Expand All @@ -186,10 +194,14 @@ else if ((Context.now() - startTime > maxRunTimeInSeconds)) {
executor.shutdownNow();
break;
}else if(firstRecordRead.get() && parallelConsumer.workRemaining() == 0){
logger.info("Records are empty now, thus executing final tests");
executor.shutdown();
executor.awaitTermination(maxRunTimeInSeconds, TimeUnit.SECONDS);
break;
if(maxRetries.get() < 3){
maxRetries.incrementAndGet();
}else{
logger.info("Records are empty now, thus executing final tests");
executor.shutdown();
executor.awaitTermination(maxRunTimeInSeconds, TimeUnit.SECONDS);
break;
}
}
Thread.sleep(100);
}
Expand Down

0 comments on commit edf087d

Please sign in to comment.