Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init commit #2053

Draft
wants to merge 10 commits into
base: feature/mini-runtime-release
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
node-version: '17'

- name: mvn package command
run: mvn package
run: mvn package -DskipTests

- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
Expand Down Expand Up @@ -61,7 +61,7 @@ jobs:
IMAGE_TAG: latest
IMAGE_TAG1: testruntime
IMAGE_TAG2: local
IMAGE_TAG3: 1.42.11_local
IMAGE_TAG3: improve-testing-performance
run: |
docker buildx create --use
# Build a docker container and push it to DockerHub
Expand All @@ -86,7 +86,7 @@ jobs:
IMAGE_TAG: latest
IMAGE_TAG1: testruntime
IMAGE_TAG2: local
IMAGE_TAG3: 1.42.11_local
IMAGE_TAG3: improve-testing-performance
run: |
echo $IMAGE_TAG >> $GITHUB_STEP_SUMMARY
docker buildx create --use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,11 @@ public String countTestingRunResultSummaries() {
return Action.SUCCESS.toUpperCase();
}

public String findLatestTestingRunResultSummary(){
trrs = DbLayer.findLatestTestingRunResultSummary(filter);
return Action.SUCCESS.toUpperCase();
}

public List<CustomDataTypeMapper> getCustomDataTypes() {
return customDataTypes;
}
Expand Down
11 changes: 11 additions & 0 deletions apps/database-abstractor/src/main/resources/struts.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,17 @@
</result>
</action>

<action name="api/findLatestTestingRunResultSummary" class="com.akto.action.DbAction" method="findLatestTestingRunResultSummary">
<interceptor-ref name="json"/>
<interceptor-ref name="defaultStack" />
<result name="SUCCESS" type="json"/>
<result name="ERROR" type="json">
<param name="statusCode">422</param>
<param name="ignoreHierarchy">false</param>
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

</package>

</struts>
155 changes: 116 additions & 39 deletions apps/mini-testing/src/main/java/com/akto/testing/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import com.akto.data_actor.DataActor;
import com.akto.data_actor.DataActorFactory;
import com.akto.dto.*;
import com.akto.dto.billing.FeatureAccess;
import com.akto.dto.billing.Organization;
import com.akto.dto.billing.SyncLimit;
import com.akto.dto.test_run_findings.TestingRunIssues;
import com.akto.dto.testing.*;
import com.akto.dto.testing.TestingEndpoints.Operator;
Expand All @@ -16,6 +18,7 @@
import com.akto.dto.testing.rate_limit.GlobalApiRateLimit;
import com.akto.dto.testing.rate_limit.RateLimitHandler;
import com.akto.dto.type.SingleTypeInfo;
import com.akto.dto.usage.MetricTypes;
import com.akto.github.GithubUtils;
import com.akto.log.LoggerMaker;
import com.akto.log.LoggerMaker.LogDb;
Expand All @@ -25,16 +28,22 @@
import com.akto.notifications.slack.NewIssuesModel;
import com.akto.notifications.slack.SlackAlerts;
import com.akto.notifications.slack.SlackSender;
import com.akto.testing.kafka_utils.ConsumerUtil;
import com.akto.testing.kafka_utils.Producer;
import com.akto.util.Constants;
import com.akto.util.DashboardMode;
import com.akto.util.EmailAccountName;
import com.akto.util.enums.GlobalEnums;
import com.mongodb.BasicDBObject;
import com.mongodb.client.model.*;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.akto.testing.Utils.readJsonContentFromFile;

import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -83,6 +92,41 @@ public static Set<Integer> extractApiCollectionIds(List<ApiInfo.ApiInfoKey> apiI
private static final int LAST_TEST_RUN_EXECUTION_DELTA = 5 * 60;
private static final int MAX_RETRIES_FOR_FAILED_SUMMARIES = 3;

private static BasicDBObject checkIfAlreadyTestIsRunningOnMachine(){
// this will return true if consumer is running and this the latest summary of the testing run
// and also the summary should be in running state
try {
BasicDBObject currentTestInfo = readJsonContentFromFile(Constants.TESTING_STATE_FOLDER_PATH, Constants.TESTING_STATE_FILE_NAME, BasicDBObject.class);
if(currentTestInfo == null){
return null;
}
if(!currentTestInfo.getBoolean("CONSUMER_RUNNING", false)){
return null;
}
String testingRunId = currentTestInfo.getString("testingRunId");
String testingRunSummaryId = currentTestInfo.getString("summaryId");

int accountID = currentTestInfo.getInt("accountId");
Context.accountId.set(accountID);

TestingRunResultSummary testingRunResultSummary = dataActor.fetchTestingRunResultSummary(testingRunSummaryId);
if(testingRunResultSummary == null || testingRunResultSummary.getState() == null || testingRunResultSummary.getState() != State.RUNNING){
return null;
}
Bson filterQ = Filters.eq(TestingRunResultSummary.TESTING_RUN_ID, new ObjectId(testingRunId));
TestingRunResultSummary latestSummary = dataActor.findLatestTestingRunResultSummary(filterQ);
if(latestSummary.getHexId().equals(testingRunSummaryId)){
return currentTestInfo;
}else{
return null;
}
} catch (Exception e) {
logger.error("Error in reading the testing state file: " + e.getMessage());
return null;
}
}


private static void setTestingRunConfig(TestingRun testingRun, TestingRunResultSummary trrs) {
long timestamp = testingRun.getId().getTimestamp();
long seconds = Context.now() - timestamp;
Expand Down Expand Up @@ -157,8 +201,17 @@ public static void main(String[] args) throws InterruptedException {
}
}

Producer testingProducer = new Producer();
ConsumerUtil testingConsumer = new ConsumerUtil();
TestCompletion testCompletion = new TestCompletion();

loggerMaker.infoAndAddToDb("Starting.......", LogDb.TESTING);

if(Constants.IS_NEW_TESTING_ENABLED){
boolean val = Utils.createFolder(Constants.TESTING_STATE_FOLDER_PATH);
logger.info("Testing info folder status: " + val);
}

schedulerAccessMatrix.scheduleAtFixedRate(new Runnable() {
public void run() {
if (matrixAnalyzerRunning) {
Expand Down Expand Up @@ -187,6 +240,54 @@ public void run() {
Context.accountId.set(accountId);
GetRunningTestsStatus.getRunningTests().getStatusOfRunningTests();

BasicDBObject currentTestInfo = null;
if(Constants.IS_NEW_TESTING_ENABLED){
currentTestInfo = checkIfAlreadyTestIsRunningOnMachine();
}

if(currentTestInfo != null){
try {
loggerMaker.infoAndAddToDb("Tests were already running on this machine, thus resuming the test for account: "+ accountId, LogDb.TESTING);
Organization organization = dataActor.fetchOrganization(accountId);
FeatureAccess featureAccess = UsageMetricUtils.getFeatureAccess(organization, MetricTypes.TEST_RUNS);
SyncLimit syncLimit = featureAccess.fetchSyncLimit();

String testingRunSummaryId = currentTestInfo.getString("summaryId");
TestingRun testingRun = dataActor.findTestingRun(testingRunSummaryId);
TestingRunConfig baseConfig = dataActor.findTestingRunConfig(testingRun.getTestIdConfig());
testingRun.setTestingRunConfig(baseConfig);
ObjectId summaryId = new ObjectId(testingRunSummaryId);
testingProducer.initProducer(testingRun, summaryId, true, syncLimit);
int maxRunTime = testingRun.getTestRunTime() <= 0 ? 30*60 : testingRun.getTestRunTime();
testingConsumer.init(maxRunTime);

// mark the test completed here
testCompletion.markTestAsCompleteAndRunFunctions(testingRun, summaryId, System.currentTimeMillis());

// if (StringUtils.hasLength(AKTO_SLACK_WEBHOOK) ) {
// try {
// CustomTextAlert customTextAlert = new CustomTextAlert("Test completed for accountId=" + accountId + " testingRun=" + testingRun.getHexId() + " summaryId=" + summaryId.toHexString() + " : @Arjun you are up now. Make your time worth it. :)");
// SLACK_INSTANCE.send(AKTO_SLACK_WEBHOOK, customTextAlert.toJson());
// } catch (Exception e) {
// logger.error("Error sending slack alert for completion of test", e);
// }

// }

// deleteScheduler.execute(() -> {
// Context.accountId.set(accountId);
// try {
// deleteNonVulnerableResults();

// } catch (Exception e) {
// loggerMaker.errorAndAddToDb(e, "Error in deleting testing run results");
// }
// });
} catch (Exception e) {
logger.error("Error in running failed tests from file.", e);
}
}

singleTypeInfoInit(accountId);

while (true) {
Expand Down Expand Up @@ -339,52 +440,28 @@ public void run() {

}
}

Organization organization = dataActor.fetchOrganization(accountId);
FeatureAccess featureAccess = UsageMetricUtils.getFeatureAccess(organization, MetricTypes.TEST_RUNS);
SyncLimit syncLimit = featureAccess.fetchSyncLimit();

if(!maxRetriesReached){
testExecutor.init(testingRun, summaryId);
if(Constants.IS_NEW_TESTING_ENABLED){
int maxRunTime = testingRun.getTestRunTime() <= 0 ? 30*60 : testingRun.getTestRunTime();
testingProducer.initProducer(testingRun, summaryId, false, syncLimit);
testingConsumer.init(maxRunTime);
}else{
testExecutor.init(testingRun, summaryId, syncLimit, false);
}
AllMetrics.instance.setTestingRunCount(1);
}
//raiseMixpanelEvent(summaryId, testingRun, accountId);
// raiseMixpanelEvent(summaryId, testingRun, accountId);
} catch (Exception e) {
e.printStackTrace();
loggerMaker.errorAndAddToDb(e, "Error in init " + e);
}
int scheduleTs = 0;

if (testingRun.getPeriodInSeconds() > 0 ) {
scheduleTs = testingRun.getScheduleTimestamp() + testingRun.getPeriodInSeconds();
} else if (testingRun.getPeriodInSeconds() == -1) {
scheduleTs = testingRun.getScheduleTimestamp() + 5 * 60;
}

if(GetRunningTestsStatus.getRunningTests().isTestRunning(testingRun.getId())){
dataActor.updateTestingRunAndMarkCompleted(testingRun.getId().toHexString(), scheduleTs);
}

if(summaryId != null && testingRun.getTestIdConfig() != 1){
TestExecutor.updateTestSummary(summaryId);
}

loggerMaker.infoAndAddToDb("Tests completed in " + (Context.now() - start) + " seconds for account: " + accountId, LogDb.TESTING);
AllMetrics.instance.setTestingRunLatency(System.currentTimeMillis() - startDetailed);

Organization organization = dataActor.fetchOrganization(accountId);

if(organization != null && organization.getTestTelemetryEnabled()){
loggerMaker.infoAndAddToDb("Test telemetry enabled for account: " + accountId + ", sending results", LogDb.TESTING);
ObjectId finalSummaryId = summaryId;
testTelemetryScheduler.execute(() -> {
Context.accountId.set(accountId);
try {
com.akto.onprem.Constants.sendTestResults(finalSummaryId, organization);
loggerMaker.infoAndAddToDb("Test telemetry sent for account: " + accountId, LogDb.TESTING);
} catch (Exception e) {
loggerMaker.errorAndAddToDb(e, "Error in sending test telemetry for account: " + accountId);
}
});

} else {
loggerMaker.infoAndAddToDb("Test telemetry disabled for account: " + accountId, LogDb.TESTING);
}

testCompletion.markTestAsCompleteAndRunFunctions(testingRun, summaryId, startDetailed);

Thread.sleep(1000);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.akto.testing;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.bson.types.ObjectId;
import com.akto.crons.GetRunningTestsStatus;
import com.akto.dao.context.Context;
import com.akto.data_actor.DataActor;
import com.akto.data_actor.DataActorFactory;
import com.akto.dto.billing.Organization;
import com.akto.dto.testing.TestingRun;
import com.akto.log.LoggerMaker;
import com.akto.log.LoggerMaker.LogDb;
import com.akto.metrics.AllMetrics;

public class TestCompletion {

private static final LoggerMaker loggerMaker = new LoggerMaker(TestCompletion.class);
public static final ScheduledExecutorService testTelemetryScheduler = Executors.newScheduledThreadPool(2);
private static final DataActor dataActor = DataActorFactory.fetchInstance();

public void markTestAsCompleteAndRunFunctions(TestingRun testingRun, ObjectId summaryId, long startDetailed){
int scheduleTs = 0;
int accountId = Context.accountId.get();
if (testingRun.getPeriodInSeconds() > 0 ) {
scheduleTs = testingRun.getScheduleTimestamp() + testingRun.getPeriodInSeconds();
} else if (testingRun.getPeriodInSeconds() == -1) {
scheduleTs = testingRun.getScheduleTimestamp() + 5 * 60;
}

if(GetRunningTestsStatus.getRunningTests().isTestRunning(testingRun.getId())){
dataActor.updateTestingRunAndMarkCompleted(testingRun.getId().toHexString(), scheduleTs);
}

if(summaryId != null && testingRun.getTestIdConfig() != 1){
TestExecutor.updateTestSummary(summaryId);
}

AllMetrics.instance.setTestingRunLatency(System.currentTimeMillis() - startDetailed);

Organization organization = dataActor.fetchOrganization(accountId);

if(organization != null && organization.getTestTelemetryEnabled()){
loggerMaker.infoAndAddToDb("Test telemetry enabled for account: " + accountId + ", sending results", LogDb.TESTING);
ObjectId finalSummaryId = summaryId;
testTelemetryScheduler.execute(() -> {
Context.accountId.set(accountId);
try {
com.akto.onprem.Constants.sendTestResults(finalSummaryId, organization);
loggerMaker.infoAndAddToDb("Test telemetry sent for account: " + accountId, LogDb.TESTING);
} catch (Exception e) {
loggerMaker.errorAndAddToDb(e, "Error in sending test telemetry for account: " + accountId);
}
});

} else {
loggerMaker.infoAndAddToDb("Test telemetry disabled for account: " + accountId, LogDb.TESTING);
}
}
}
Loading
Loading