Skip to content

Commit

Permalink
Update api_examples.py
Browse files Browse the repository at this point in the history
  • Loading branch information
phutelmyer committed Feb 21, 2024
1 parent d99d2d0 commit 5807586
Showing 1 changed file with 37 additions and 66 deletions.
103 changes: 37 additions & 66 deletions misc/examples/api_examples.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import Optional

import json
import requests
from typing import Optional
from os import getenv

# Set API key and base URL
api_key = "<API_KEY>"
url_base = "http://0.0.0.0:8080/api/strelka"
# Environment variables for configuration
api_base = getenv("STRELKA_API_BASE", "http://localhost:8081/api/strelka")
api_key = getenv("STRELKA_API_KEY", "")

# Define headers
headers = {
Expand All @@ -13,83 +14,53 @@
"Accept": "application/json",
}


def get_scans_list(page: int = 1, per_page: int = 10) -> Optional[str]:
"""
Get list of scans from the Strelka UI API.
Args:
page (int): The page number to retrieve. Defaults to 1.
per_page (int): The number of items per page. Defaults to 10.
Returns:
Optional[str]: The list of scans as a JSON string if the request is successful, None otherwise.
"""
url_route = f"/scans?page={page}&per_page={per_page}"
# Helper function to handle requests
def make_request(method: str, url: str, **kwargs) -> Optional[str]:
try:
response = requests.get(url_base + url_route, headers=headers)
response = requests.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response.text
except requests.exceptions.HTTPError as e:
print(f"HTTP Error: {e.response.status_code} for {url}: {e.response.text}")
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
print(f"Request Error: {e}")
return None


def get_scan_by_id(scan_id: str) -> Optional[str]:
"""
Get scan details by ID from the Strelka UI API.
# Endpoints
def get_scans_list(page: int = 1, per_page: int = 10) -> Optional[str]:
return make_request("GET", f"{api_base}/scans?page={page}&per_page={per_page}")

Args:
scan_id (str): The ID of the scan to retrieve.

Returns:
Optional[str]: The scan details as a JSON string if the request is successful, None otherwise.
"""
url_route = f"/scans/{scan_id}"
try:
response = requests.get(url_base + url_route, headers=headers)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
def get_scan_by_id(scan_id: str) -> Optional[str]:
return make_request("GET", f"{api_base}/scans/{scan_id}")


def upload_file(filename: str, description: str) -> Optional[str]:
"""
Upload a file to the Strelka UI API.
with open(filename, "rb") as f:
files = {"file": (filename, f)}
data = {"description": description}
return make_request("POST", f"{api_base}/upload", files=files, data=data)

Args:
filename (str): The file path of the file to upload.
description (str): The description of the file.

Returns:
Optional[str]: The response as a JSON string if the upload is successful, None otherwise.
def upload_file_to_vt(file_hash: str, description: str) -> Optional[str]:
"""
url_route = "/upload"
upload_headers = {
"X-API-KEY": api_key,
Sample Response:
{"file_id":"07538964-b156-4ed3-93b4-85bcd07b5fbc",
"results":[{"enrichment":{"virustotal":-1},"file":{"depth":0,"flavors":{"mime":["application/zip"],"yara":["encrypted_zip","zip_file"]}...
}

try:
with open(filename, "rb") as f:
file_data = f.read()

files = [("file", (filename, file_data))]
data = {"description": description}

response = requests.post(
url_base + url_route, files=files, data=data, headers=upload_headers
)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
"""
payload = json.dumps({"description": description, "hash": file_hash})
return make_request("POST", f"{api_base}/upload", data=payload)


if __name__ == "__main__":
# Example usage
print(get_scans_list())
print(get_scan_by_id("<SCAN_ID>"))
print(upload_file("<FILEPATH_TO_FILE_TO_UPLOAD>", "This is a test file"))
# print(get_scans_list())
# print(get_scan_by_id("<SCAN_ID>"))
# print(upload_file("<FILEPATH_TO_FILE_TO_UPLOAD>", "This is a test file"))
print(
upload_file_to_vt(
"5da8c98136d98dfec4716edd79c7145f", "VirusTotal Upload - Calc.exe"
)
)

0 comments on commit 5807586

Please sign in to comment.