-
Notifications
You must be signed in to change notification settings - Fork 817
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
85 additions
and
92 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,36 +1,32 @@ | ||
|
||
import pytest | ||
|
||
from keep.api.core.db import ( | ||
add_alerts_to_incident_by_incident_id, | ||
create_incident_from_dict | ||
create_incident_from_dict, | ||
) | ||
|
||
from tests.fixtures.client import client, setup_api_key, test_app | ||
from tests.fixtures.client import setup_api_key | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"test_app", ["NO_AUTH"], indirect=True | ||
) | ||
def test_add_remove_alert_to_incidents(client, db_session, test_app, setup_stress_alerts_no_elastic): | ||
@pytest.mark.parametrize("test_app", ["NO_AUTH"], indirect=True) | ||
def test_add_remove_alert_to_incidents( | ||
db_session, client, test_app, setup_stress_alerts_no_elastic | ||
): | ||
alerts = setup_stress_alerts_no_elastic(14) | ||
incident = create_incident_from_dict("keep", {"user_generated_name": "test", "description": "test"}) | ||
incident = create_incident_from_dict( | ||
"keep", {"user_generated_name": "test", "description": "test"} | ||
) | ||
valid_api_key = "valid_api_key" | ||
setup_api_key(db_session, valid_api_key) | ||
|
||
add_alerts_to_incident_by_incident_id( | ||
"keep", | ||
incident.id, | ||
[a.id for a in alerts] | ||
) | ||
add_alerts_to_incident_by_incident_id("keep", incident.id, [a.id for a in alerts]) | ||
|
||
response = client.get( | ||
"/metrics", | ||
headers={"X-API-KEY": "valid_api_key"} | ||
) | ||
response = client.get("/metrics", headers={"X-API-KEY": "valid_api_key"}) | ||
|
||
# Checking for alert_total metric | ||
assert f"alerts_total{{incident_name=\"test\" incident_id=\"{incident.id}\"}} 14" in response.text.split("\n") | ||
assert ( | ||
f'alerts_total{{incident_name="test" incident_id="{incident.id}"}} 14' | ||
in response.text.split("\n") | ||
) | ||
|
||
# Checking for open_incidents_total metric | ||
assert "open_incidents_total 1" in response.text.split("\n") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,7 @@ | ||
import pytest | ||
|
||
from keep.api.core.dependencies import SINGLE_TENANT_UUID | ||
from keep.api.core.db import create_rule as create_rule_db | ||
|
||
from keep.api.core.dependencies import SINGLE_TENANT_UUID | ||
from tests.fixtures.client import client, setup_api_key, test_app # noqa | ||
|
||
TEST_RULE_DATA = { | ||
|
@@ -18,28 +17,36 @@ | |
"created_by": "[email protected]", | ||
} | ||
|
||
INVALID_DATA_STEPS = [{ | ||
"update": {"sqlQuery": {"sql": "", "params": []}}, | ||
"error": "SQL is required", | ||
}, { | ||
"update": {"sqlQuery": {"sql": "SELECT", "params": []}}, | ||
"error": "Params are required", | ||
}, { | ||
"update": {"celQuery": ""}, | ||
"error": "CEL is required", | ||
}, { | ||
"update": {"ruleName": ""}, | ||
"error": "Rule name is required", | ||
}, { | ||
"update": {"timeframeInSeconds": 0}, | ||
"error": "Timeframe is required", | ||
}, { | ||
"update": {"timeUnit": ""}, | ||
"error": "Timeunit is required", | ||
}] | ||
INVALID_DATA_STEPS = [ | ||
{ | ||
"update": {"sqlQuery": {"sql": "", "params": []}}, | ||
"error": "SQL is required", | ||
}, | ||
{ | ||
"update": {"sqlQuery": {"sql": "SELECT", "params": []}}, | ||
"error": "Params are required", | ||
}, | ||
{ | ||
"update": {"celQuery": ""}, | ||
"error": "CEL is required", | ||
}, | ||
{ | ||
"update": {"ruleName": ""}, | ||
"error": "Rule name is required", | ||
}, | ||
{ | ||
"update": {"timeframeInSeconds": 0}, | ||
"error": "Timeframe is required", | ||
}, | ||
{ | ||
"update": {"timeUnit": ""}, | ||
"error": "Timeunit is required", | ||
}, | ||
] | ||
|
||
|
||
@pytest.mark.parametrize("test_app", ["NO_AUTH"], indirect=True) | ||
def test_get_rules_api(client, db_session, test_app): | ||
def test_get_rules_api(db_session, client, test_app): | ||
rule = create_rule_db(**TEST_RULE_DATA) | ||
|
||
response = client.get( | ||
|
@@ -50,7 +57,7 @@ def test_get_rules_api(client, db_session, test_app): | |
assert response.status_code == 200 | ||
data = response.json() | ||
assert len(data) == 1 | ||
assert data[0]['id'] == str(rule.id) | ||
assert data[0]["id"] == str(rule.id) | ||
|
||
rule2 = create_rule_db(**TEST_RULE_DATA) | ||
|
||
|
@@ -62,27 +69,26 @@ def test_get_rules_api(client, db_session, test_app): | |
assert response2.status_code == 200 | ||
data = response2.json() | ||
assert len(data) == 2 | ||
assert data[0]['id'] == str(rule.id) | ||
assert data[1]['id'] == str(rule2.id) | ||
assert data[0]["id"] == str(rule.id) | ||
assert data[1]["id"] == str(rule2.id) | ||
|
||
|
||
@pytest.mark.parametrize("test_app", ["NO_AUTH"], indirect=True) | ||
def test_create_rule_api(client, db_session, test_app): | ||
def test_create_rule_api(db_session, client, test_app): | ||
|
||
rule_data = { | ||
"ruleName": "test rule", | ||
"sqlQuery": {"sql": "SELECT * FROM alert where severity = %s", "params": ["critical"]}, | ||
"sqlQuery": { | ||
"sql": "SELECT * FROM alert where severity = %s", | ||
"params": ["critical"], | ||
}, | ||
"celQuery": "severity = 'critical'", | ||
"timeframeInSeconds": 300, | ||
"timeUnit": "seconds", | ||
"requireApprove": False, | ||
} | ||
|
||
response = client.post( | ||
"/rules", | ||
headers={"x-api-key": "some-key"}, | ||
json=rule_data | ||
) | ||
response = client.post("/rules", headers={"x-api-key": "some-key"}, json=rule_data) | ||
|
||
assert response.status_code == 200 | ||
data = response.json() | ||
|
@@ -92,9 +98,7 @@ def test_create_rule_api(client, db_session, test_app): | |
invalid_rule_data = {k: v for k, v in rule_data.items() if k != "ruleName"} | ||
|
||
invalid_data_response = client.post( | ||
"/rules", | ||
headers={"x-api-key": "some-key"}, | ||
json=invalid_rule_data | ||
"/rules", headers={"x-api-key": "some-key"}, json=invalid_rule_data | ||
) | ||
|
||
assert invalid_data_response.status_code == 422 | ||
|
@@ -109,7 +113,7 @@ def test_create_rule_api(client, db_session, test_app): | |
invalid_data_response_2 = client.post( | ||
"/rules", | ||
headers={"x-api-key": "some-key"}, | ||
json=dict(rule_data, **invalid_data_step["update"]) | ||
json=dict(rule_data, **invalid_data_step["update"]), | ||
) | ||
|
||
assert invalid_data_response_2.status_code == 400, current_step | ||
|
@@ -119,7 +123,7 @@ def test_create_rule_api(client, db_session, test_app): | |
|
||
|
||
@pytest.mark.parametrize("test_app", ["NO_AUTH"], indirect=True) | ||
def test_delete_rule_api(client, db_session, test_app): | ||
def test_delete_rule_api(db_session, client, test_app): | ||
rule = create_rule_db(**TEST_RULE_DATA) | ||
|
||
response = client.delete( | ||
|
@@ -143,25 +147,25 @@ def test_delete_rule_api(client, db_session, test_app): | |
assert data["detail"] == "Rule not found" | ||
|
||
|
||
|
||
@pytest.mark.parametrize("test_app", ["NO_AUTH"], indirect=True) | ||
def test_update_rule_api(client, db_session, test_app): | ||
def test_update_rule_api(db_session, client, test_app): | ||
|
||
rule = create_rule_db(**TEST_RULE_DATA) | ||
|
||
rule_data = { | ||
"ruleName": "test rule", | ||
"sqlQuery": {"sql": "SELECT * FROM alert where severity = %s", "params": ["critical"]}, | ||
"sqlQuery": { | ||
"sql": "SELECT * FROM alert where severity = %s", | ||
"params": ["critical"], | ||
}, | ||
"celQuery": "severity = 'critical'", | ||
"timeframeInSeconds": 300, | ||
"timeUnit": "seconds", | ||
"requireApprove": False, | ||
} | ||
|
||
response = client.put( | ||
"/rules/{}".format(rule.id), | ||
headers={"x-api-key": "some-key"}, | ||
json=rule_data | ||
"/rules/{}".format(rule.id), headers={"x-api-key": "some-key"}, json=rule_data | ||
) | ||
|
||
assert response.status_code == 200 | ||
|
@@ -174,7 +178,7 @@ def test_update_rule_api(client, db_session, test_app): | |
invalid_data_response_2 = client.put( | ||
"/rules/{}".format(rule.id), | ||
headers={"x-api-key": "some-key"}, | ||
json=dict(rule_data, **invalid_data_step["update"]) | ||
json=dict(rule_data, **invalid_data_step["update"]), | ||
) | ||
|
||
assert invalid_data_response_2.status_code == 400, current_step | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.