Skip to content

Commit

Permalink
Fixing Soda Cloud agent flow after real dev testing
Browse files Browse the repository at this point in the history
  • Loading branch information
tombaeyens committed Feb 22, 2025
1 parent 0105c27 commit d864199
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 50 deletions.
85 changes: 56 additions & 29 deletions soda-core/src/soda_core/common/soda_cloud.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from __future__ import annotations

import base64
import json
import logging
import os
import re
from abc import ABC
from dataclasses import dataclass
from datetime import date, datetime, timedelta, timezone, time
from decimal import Decimal
from enum import Enum
from operator import index
from tempfile import TemporaryFile
from time import sleep
from typing import Optional
Expand All @@ -20,7 +19,7 @@
from soda_core.common.version import SODA_CORE_VERSION
from soda_core.common.yaml import YamlFileContent, YamlObject
from soda_core.contracts.contract_verification import ContractResult, \
CheckResult, CheckOutcome, Threshold, Contract, DataSourceInfo
CheckResult, CheckOutcome, Threshold, Contract
from soda_core.contracts.impl.contract_yaml import ContractYaml


Expand Down Expand Up @@ -72,8 +71,7 @@ def from_file(cls, soda_cloud_file_content: YamlFileContent) -> SodaCloud | None

soda_cloud_yaml_object: YamlObject | None = soda_cloud_yaml_root_object.read_object_opt("soda_cloud")
if not soda_cloud_yaml_object:
logs.error(f"key 'soda_cloud' is required in a Soda Cloud configuration file")
return None
logs.debug(f"key 'soda_cloud' is required in a Soda Cloud configuration file.")

return SodaCloud(
host=soda_cloud_yaml_object.read_string_opt(
Expand Down Expand Up @@ -173,7 +171,7 @@ def _build_contract_result_json(self, contract_result: ContractResult, skip_publ
# for query in contract_result._queries:
# query_list += query.get_cloud_dicts()

return self.to_jsonnable( # type: ignore
contract_result_json: dict = self.to_jsonnable( # type: ignore
{
"definitionName": contract_result.contract.soda_qualified_dataset_name,
"defaultDataSource": contract_result.data_source_info.name,
Expand Down Expand Up @@ -219,6 +217,10 @@ def _build_contract_result_json(self, contract_result: ContractResult, skip_publ
"skipPublish": skip_publish
}
)
scan_id: Optional[str] = os.environ.get("SCAN_ID")
if scan_id:
contract_result_json["scanId"] = scan_id
return contract_result_json

def _translate_check_outcome_for_soda_cloud(self, outcome: CheckOutcome) -> str:
if outcome == CheckOutcome.PASSED:
Expand Down Expand Up @@ -376,20 +378,32 @@ def execute_contracts_on_agent(self, contract_yamls: list[ContractYaml]) -> list

scan_is_finished: bool = self._poll_remote_scan_finished(scan_id=scan_id)

self.logs.debug(f"Asking Soda Cloud the logs of scan {scan_id}")
logs_response: Response = self._get_scan_logs(scan_id=scan_id)
logs: list[dict] = logs_response.json()
for log in logs:
json_level_str: str = log.get("level")
logging_level: int = logging.getLevelName(json_level_str.upper())
timestamp_str: str = log.get("timestamp")
timestamp: datetime = self.convert_str_to_datetime(timestamp_str)
self.logs.log(
Log(level=logging_level,
message=log.get("message"),
timestamp=timestamp,
index=log.get("index")
)
)
self.logs.debug(f"Soda Cloud responded with {json.dumps(dict(logs_response.headers))}\n{logs_response.text}")

response_json: dict = logs_response.json()
logs: list[dict] = response_json.get("logs")
if isinstance(logs, list):
for log in logs:
if isinstance(log, dict):
json_level_str: str = log.get("level")
logging_level: int = logging.getLevelName(json_level_str.upper())
timestamp_str: str = log.get("timestamp")
timestamp: datetime = self.convert_str_to_datetime(timestamp_str)
self.logs.log(
Log(level=logging_level,
message=log.get("message"),
timestamp=timestamp,
index=log.get("index")
)
)
else:
self.logs.debug(f"Expected dict for logs list element, but was {type(log).__name__}")
elif logs is None:
self.logs.debug(f"No logs in Soda Cloud response")
else:
self.logs.debug(f"Expected dict for logs, but was {type(logs).__name__}")

if not scan_is_finished:
self.logs.error("Max retries exceeded. Contract verification did not finish yet.")
Expand Down Expand Up @@ -420,14 +434,14 @@ def _poll_remote_scan_finished(self, scan_id: str, max_retry: int = 5) -> bool:

self.logs.debug(f"Asking Soda Cloud if scan {scan_id} is already completed. Attempt {attempt}/{max_retry}.")
response = self._get_scan_status(scan_id)
self.logs.debug(f"Soda Cloud responded with {json.dumps(dict(response.headers))}\n{response.text}")
if response:
json: Optional[dict] = response.json() if response else None
scan_status: Optional[dict] = json.get("scanStatus") if json else None
scan_status_value: Optional[str] = scan_status.get("value") if scan_status else None
response_body_dict: Optional[dict] = response.json() if response else None
scan_status: str = response_body_dict.get("state") if response_body_dict else None

self.logs.debug(f"Soda Cloud responded scan {scan_id} status is '{scan_status_value}'")
self.logs.debug(f"Scan {scan_id} status is '{scan_status}'")

if scan_status_value in REMOTE_SCAN_FINAL_STATES:
if scan_status in REMOTE_SCAN_FINAL_STATES:
return True

if attempt < max_retry:
Expand Down Expand Up @@ -455,13 +469,13 @@ def _poll_remote_scan_finished(self, scan_id: str, max_retry: int = 5) -> bool:

def _get_scan_status(self, scan_id: str) -> Response:
return self._execute_rest_get(
relative_url_path=f"v1/scans/{scan_id}",
relative_url_path=f"scans/{scan_id}",
request_log_name="get_scan_status",
)

def _get_scan_logs(self, scan_id: str) -> Response:
return self._execute_rest_get(
relative_url_path=f"v1/scans/{scan_id}/logs",
relative_url_path=f"scans/{scan_id}/logs",
request_log_name="get_scan_logs",
)

Expand Down Expand Up @@ -540,10 +554,21 @@ def _execute_rest_get(
is_retry: bool = True
) -> Response:

credentials_plain = f"{self.api_key_id}:{self.api_key_secret}"
credentials_encoded = base64.b64encode(credentials_plain.encode()).decode()

headers = {
"Authorization": f"Basic {credentials_encoded}",
"Content-Type": "application/octet-stream", # Probably not needed
"Is-V3": "true", # Probably not needed
"Accept": "application/json",
}

url: str = f"{self.api_url}/v1/{relative_url_path}"
self.logs.debug(f"Sending GET {url} request to Soda Cloud")
response: Response = self._http_get(
url=url
url=url,
headers=headers
)

trace_id: str = response.headers.get("X-Soda-Trace-Id")
Expand Down Expand Up @@ -649,5 +674,7 @@ def convert_str_to_datetime(cls, date_string: str) -> datetime:
if date_string.endswith("Z"):
# Z means Zulu time, which is UTC
# Converting timezone to format that fromisoformat understands
date_string = f"{date_string[:-1]}+00:00"
datetime_str_without_z: str = date_string[:-1]
datetime_str_without_z_seconds = re.sub(r"\.(\d+)$", "", datetime_str_without_z)
date_string = f"{datetime_str_without_z_seconds}+00:00"
return datetime.fromisoformat(date_string)
2 changes: 1 addition & 1 deletion soda-core/src/soda_core/contracts/contract_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def with_soda_cloud_yaml_file(self, soda_cloud_yaml_file_path: str) -> ContractV
def with_soda_cloud_yaml_str(self, soda_cloud_yaml_str: str) -> ContractVerificationBuilder:
if isinstance(soda_cloud_yaml_str, str):
if self.soda_cloud_yaml_source is None:
self.logs.debug(f" ...with soda_cloud_yaml_str '{soda_cloud_yaml_str}'")
self.logs.debug(f" ...with soda_cloud_yaml_str [{len(soda_cloud_yaml_str)}]")
else:
self.logs.debug(
f"{Emoticons.POLICE_CAR_LIGHT} ...with soda_cloud_yaml_str '{soda_cloud_yaml_str}'. "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from textwrap import dedent

from dotenv import load_dotenv

from conftest import configure_logging
from soda_core.contracts.contract_verification import ContractVerification, ContractVerificationResult


def main():
print("Verifying contract on agent")
configure_logging()

project_root_dir = __file__[: -len("/soda-core/tests/soda_core/tests/components/manual_test_agent_flow.py")]
load_dotenv(f"{project_root_dir}/.env", override=True)

contract_yaml_str: str = dedent("""
data_source: milan_throwaway
dataset_prefix: [nyc, public]
dataset: bus_breakdown_and_delays
columns:
- name: reason
valid_values: [ 'Heavy Traffic', 'Other', 'Mechanical Problem', 'Won`t Start', 'Problem Run' ]
checks:
- type: invalid_count
checks:
- type: schema
""").strip()

soda_cloud_yaml_str = dedent("""
soda_cloud:
bla: bla
""").strip()

contract_verification_result: ContractVerificationResult = (
ContractVerification.builder()
.with_soda_cloud_yaml_str(soda_cloud_yaml_str)
.with_contract_yaml_str(contract_yaml_str)
.with_execution_on_soda_agent()
.execute()
)


if __name__ == "__main__":
main()
40 changes: 20 additions & 20 deletions soda-core/tests/soda_core/tests/components/test_soda_cloud.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from unittest import skip

from soda_core.common.soda_cloud import SodaCloud
from soda_core.tests.helpers.data_source_test_helper import DataSourceTestHelper
Expand Down Expand Up @@ -67,38 +68,37 @@ def test_execute_over_agent(data_source_test_helper: DataSourceTestHelper):
},
json_object={
"scanId": "ssscanid",
"scanStatus": {
"value": "running"
}
"state": "running"
}
),
MockResponse(
method=MockHttpMethod.GET,
status_code=200,
json_object={
"scanId": "ssscanid",
"scanStatus": {
"value": "completed"
}
"state": "completed"
}
),
MockResponse(
method=MockHttpMethod.GET,
status_code=200,
json_object=[
{
"level": "debug",
"message": "m1",
"timestamp": "2025-02-21T06:16:58+00:00",
"index": 0,
},
{
"level": "info",
"message": "m2",
"timestamp": "2025-02-21T06:16:59+00:00",
"index": 1,
}
]
json_object={
"scanId": "ssscanid",
"logs": [
{
"level": "debug",
"message": "m1",
"timestamp": "2025-02-21T06:16:58+00:00",
"index": 0,
},
{
"level": "info",
"message": "m2",
"timestamp": "2025-02-21T06:16:59+00:00",
"index": 1,
}
]
}
)
])

Expand Down

0 comments on commit d864199

Please sign in to comment.