Skip to content

Commit

Permalink
[client] Enhance collectors management for security platforms (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelHassine committed Jun 27, 2024
1 parent 2af95a7 commit 8ca89ed
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 9 deletions.
1 change: 1 addition & 0 deletions pyobas/apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .me import * # noqa: F401,F403
from .organization import * # noqa: F401,F403
from .payload import * # noqa: F401,F403
from .security_platform import * # noqa: F401,F403
from .team import * # noqa: F401,F403
from .user import * # noqa: F401,F403

Expand Down
10 changes: 10 additions & 0 deletions pyobas/apis/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ def download(self, document_id: str, **kwargs: Any) -> Dict[str, Any]:
path = f"{self.path}/" + document_id + "/file"
result = self.openbas.http_get(path, **kwargs)
return result

@exc.on_http_error(exc.OpenBASUpdateError)
def upsert(
self, document: Dict[str, Any], file: tuple, **kwargs: Any
) -> Dict[str, Any]:
path = f"{self.path}/upsert"
result = self.openbas.http_post(
path, post_data=document, files={"file": file}, **kwargs
)
return result
33 changes: 33 additions & 0 deletions pyobas/apis/security_platform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Any, Dict

from pyobas import exceptions as exc
from pyobas.base import RESTManager, RESTObject
from pyobas.mixins import CreateMixin, GetMixin, ListMixin, UpdateMixin
from pyobas.utils import RequiredOptional


class SecurityPlatform(RESTObject):
pass


class SecurityPlatformManager(
GetMixin, ListMixin, CreateMixin, UpdateMixin, RESTManager
):
_path = "/security_platforms"
_obj_cls = SecurityPlatform
_create_attrs = RequiredOptional(
required=("asset_name", "security_platform_type"),
optional=(
"asset_description",
"security_platform_logo_light",
"security_platform_logo_dark",
),
)

@exc.on_http_error(exc.OpenBASUpdateError)
def upsert(
self, security_platform: Dict[str, Any], **kwargs: Any
) -> Dict[str, Any]:
path = f"{self.path}/upsert"
result = self.openbas.http_post(path, post_data=security_platform, **kwargs)
return result
1 change: 1 addition & 0 deletions pyobas/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(
self.user = apis.UserManager(self)
self.inject_expectation = apis.InjectExpectationManager(self)
self.payload = apis.PayloadManager(self)
self.security_platform = apis.SecurityPlatformManager(self)

@staticmethod
def _check_redirects(result: requests.Response) -> None:
Expand Down
37 changes: 28 additions & 9 deletions pyobas/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,28 +307,47 @@ def get_conf(self, variable, is_number=None, default=None, required=None):


class OpenBASCollectorHelper:
def __init__(self, config: OpenBASConfigHelper, icon) -> None:
def __init__(
self, config: OpenBASConfigHelper, icon, security_platform_type=None
) -> None:
self.config_helper = config
self.api = OpenBAS(
url=config.get_conf("openbas_url"),
token=config.get_conf("openbas_token"),
)

self.config = {
"collector_id": config.get_conf("collector_id"),
"collector_name": config.get_conf("collector_name"),
"collector_type": config.get_conf("collector_type"),
"collector_period": config.get_conf("collector_period"),
}

self.logger_class = utils.logger(
config.get_conf("collector_log_level", default="info").upper(),
config.get_conf("collector_json_logging", default=True),
)
self.collector_logger = self.logger_class(config.get_conf("collector_name"))

icon_name = config.get_conf("collector_id") + ".png"
collector_icon = (icon_name, icon, "image/png")

security_platform_id = None
if security_platform_type is not None:
collector_icon = (icon_name, open(icon, "rb"), "image/png")
document = self.api.document.upsert(document={}, file=collector_icon)
security_platform = self.api.security_platform.upsert(
{
"asset_name": config.get_conf("collector_name"),
"asset_external_reference": config.get_conf("collector_id"),
"security_platform_type": security_platform_type,
"security_platform_logo_light": document.get("document_id"),
"security_platform_logo_dark": document.get("document_id"),
}
)
security_platform_id = security_platform.get("asset_id")

self.config = {
"collector_id": config.get_conf("collector_id"),
"collector_name": config.get_conf("collector_name"),
"collector_type": config.get_conf("collector_type"),
"collector_period": config.get_conf("collector_period"),
"collector_security_platform": security_platform_id,
}

collector_icon = (icon_name, open(icon, "rb"), "image/png")
self.api.collector.create(self.config, collector_icon)
self.connect_run_and_terminate = False
# self.api.injector.create(self.config)
Expand Down
1 change: 1 addition & 0 deletions pyobas/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def create(
self._create_attrs.validate_attrs(data=data)
# Handle specific URL for creation
path = kwargs.pop("path", self.path)

if icon:
files = {"icon": icon}
elif icon is False:
Expand Down

0 comments on commit 8ca89ed

Please sign in to comment.