diff --git a/external-import/socradar/README.md b/external-import/socradar/README.md index 47daf97c5c..9fd14f8ba3 100644 --- a/external-import/socradar/README.md +++ b/external-import/socradar/README.md @@ -1,41 +1,72 @@ # OpenCTI SOCRadar Connector -This connector imports threat intelligence feeds from SOCRadar into OpenCTI. It supports various types of indicators including IP addresses, domains, URLs, and file hashes. +OpenCTI connector for importing threat intelligence feeds from SOCRadar platform. ## Description -The SOCRadar connector fetches data from SOCRadar's threat intelligence feeds and imports them into OpenCTI. It supports: -- IP addresses (IPv4 and IPv6) -- Domains -- URLs -- File hashes (MD5, SHA1, SHA256) - -Each indicator is created with proper STIX2 formatting and includes: -- Source attribution -- First/last seen dates -- Confidence levels -- TLP marking -- Kill chain phase information - -## Requirements - -- OpenCTI Platform >= 6.4.2 -- SOCRadar API key -- Python 3.11+ +This connector imports threat intelligence data from SOCRadar into OpenCTI. It processes various types of indicators including: +* IP addresses (IPv4 and IPv6) +* Domain names +* URLs +* File hashes (MD5, SHA1, SHA256) ## Configuration | Parameter | Docker envvar | Mandatory | Description | | --- | --- | --- | --- | -| `opencti_url` | `OPENCTI_URL` | Yes | The URL of the OpenCTI platform | -| `opencti_token` | `OPENCTI_TOKEN` | Yes | The default admin token configured in the OpenCTI platform | -| `connector_id` | `CONNECTOR_ID` | Yes | A valid arbitrary UUIDv4 for this connector | -| `connector_type` | `CONNECTOR_TYPE` | Yes | Must be 'EXTERNAL_IMPORT' | -| `connector_name` | `CONNECTOR_NAME` | Yes | Name of the connector | -| `connector_scope` | `CONNECTOR_SCOPE` | Yes | Scope of the connector (socradar) | -| `connector_confidence_level` | `CONNECTOR_CONFIDENCE_LEVEL` | Yes | Default confidence level for created data | -| `connector_log_level` | `CONNECTOR_LOG_LEVEL` | Yes | Logging level (debug, info, warn, error) | -| `radar_base_feed_url` | `RADAR_BASE_FEED_URL` | Yes | SocRadar API base URL | -| `radar_format_type` | `RADAR_FORMAT_TYPE` | Yes | Response format (.json) | -| `radar_socradar_key` | `RADAR_SOCRADAR_KEY` | Yes | Your SocRadar API key | -| `radar_interval` | `RADAR_INTERVAL` | Yes | Interval between runs in seconds | +| `opencti.url` | `OPENCTI_URL` | Yes | The URL of your OpenCTI platform | +| `opencti.token` | `OPENCTI_TOKEN` | Yes | Your OpenCTI admin token | +| `radar.radar_base_feed_url` | `RADAR_BASE_FEED_URL` | Yes | SOCRadar API base URL | +| `radar.radar_socradar_key` | `RADAR_SOCRADAR_KEY` | Yes | Your SOCRadar API key | +| `radar.radar_run_interval` | `RADAR_RUN_INTERVAL` | Yes | Time between runs (in seconds, default: 600) | +| `radar.radar_collections_uuid` | `RADAR_COLLECTIONS_UUID` | Yes | Collection IDs to fetch | + +The `radar_collections_uuid` parameter should contain the collection IDs you want to fetch from SOCRadar. Example configuration: + +```yaml +radar_collections_uuid: + collection_1: + id: ["YOUR_COLLECTION_ID"] + name: ["YOUR_COLLECTION_NAME"] + collection_2: + id: ["YOUR_COLLECTION_ID"] + name: ["YOUR_COLLECTION_NAME"] +``` + +## Installation + +1. Clone the repository: +```bash +git clone https://github.com/OpenCTI-Platform/connectors +cd connectors/external-import/socradar +``` + +2. Configure the connector: +```bash +cp src/config.yml.sample src/config.yml +``` +Edit `src/config.yml` with your OpenCTI and SOCRadar configurations. + +3. Add your connector to the `docker-compose.yml`: +```yaml + connector-socradar: + build: ./external-import/socradar + container_name: docker-connector-socradar + environment: + - OPENCTI_URL=http://opencti:8080 + - OPENCTI_TOKEN=${OPENCTI_ADMIN_TOKEN} + restart: always + depends_on: + opencti: + condition: service_healthy +``` + +4. Start with Docker: +```bash +docker-compose up -d connector-socradar +``` + +You can check the connector status and logs in the OpenCTI platform UI or using: +```bash +docker-compose logs -f connector-socradar +``` diff --git a/external-import/socradar/entrypoint.sh b/external-import/socradar/entrypoint.sh index 8f221ba603..0699cab37a 100644 --- a/external-import/socradar/entrypoint.sh +++ b/external-import/socradar/entrypoint.sh @@ -8,4 +8,4 @@ ls -la /opt/opencti-connector-socradar/src # Directly execute python script cd /opt/opencti-connector-socradar -python3 -v src/main.py +python3 src/main.py diff --git a/external-import/socradar/src/config.yml.sample b/external-import/socradar/src/config.yml.sample index c41c2c2cf2..1f6b78de4a 100644 --- a/external-import/socradar/src/config.yml.sample +++ b/external-import/socradar/src/config.yml.sample @@ -12,15 +12,13 @@ connector: update_existing_data: true radar: - base_feed_url: "https://platform.socradar.com/api/threat/intelligence/feed_list/" - format_type: ".json?key=" - socradar_key: "SOCRADAR_KEY" - run_interval: 600 - collections_uuid: + radar_base_feed_url: "https://platform.socradar.com/api/threat/intelligence/feed_list/" + radar_socradar_key: "SOCRADAR_KEY" + radar_run_interval: 600 + radar_collections_uuid: collection_1: id: ["COLLECTION_UUID"] name: ["COLLECTION_NAME"] - default_marking: 'TLP:WHITE' - create_observables: true - create_indicators: true - + collection_2: + id: ["COLLECTION_UUID"] + name: ["COLLECTION_NAME"] diff --git a/external-import/socradar/src/lib/radar.py b/external-import/socradar/src/lib/radar.py index 9bb5d9208c..d5f612a78b 100644 --- a/external-import/socradar/src/lib/radar.py +++ b/external-import/socradar/src/lib/radar.py @@ -1,58 +1,99 @@ -# Standard library imports +# =============================================================================== +# Imports: System and Third-Party Libraries +# =============================================================================== +import json import os import re -import time -import uuid +import sys from datetime import datetime, timedelta +from typing import Dict -# Third-party imports import requests import yaml -from pycti import OpenCTIConnectorHelper, get_config_variable -from stix2 import ( - TLP_WHITE, - URL, - Bundle, - DomainName, - File, - Identity, - Indicator, - IPv4Address, - IPv6Address, - KillChainPhase, - Relationship, -) - +# =============================================================================== +# Imports: OpenCTI Libraries +# =============================================================================== +# PyCTI +from pycti import Identity as PyctiIdentity +from pycti import Indicator as PyctiIndicator +from pycti import OpenCTIConnectorHelper, StixCoreRelationship, get_config_variable + +# STIX2 +from stix2 import TLP_WHITE, Bundle +from stix2 import Identity as Stix2Identity +from stix2 import Indicator as Stix2Indicator + +# =============================================================================== +# Constants +# =============================================================================== +BATCH_SIZE = 1000 +DEFAULT_INTERVAL = 600 +DEFAULT_CONFIDENCE = 75 +TLP_MARKING = TLP_WHITE.id + + +# =============================================================================== +# Main Operator: RadarConnector +# =============================================================================== class RadarConnector: - - def __init__(self): - # Step 1.0: Initialize connector from config - config_path = os.path.dirname(os.path.abspath(__file__)) + "/../config.yml" - config = yaml.load(open(config_path), Loader=yaml.SafeLoader) + """ + OpenCTI connector for SOCRadar threat intelligence feeds. + Processes indicators in batches and creates STIX2 objects. + """ + + def __init__(self) -> None: + """Initialize RadarConnector with configuration and helpers""" + # Step 1.0: Set up configuration paths + base_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(base_dir, "..", "config.yml") + + # Step 1.1: Load configuration file + if os.path.isfile(config_path): + with open(config_path, "r") as f: + config = yaml.safe_load(f) + else: + config = {} + + # Step 2.0: Initialize OpenCTI helper self.helper = OpenCTIConnectorHelper(config) - # Step 1.1: Get radar-specific configurations + # Step 3.0: Configure feed parameters + # Step 3.1: Set base URL and API key self.base_url = get_config_variable( - "RADAR_BASE_FEED_URL", ["radar", "base_feed_url"], config - ) - self.format_type = get_config_variable( - "RADAR_FORMAT_TYPE", ["radar", "format_type"], config + "RADAR_BASE_FEED_URL", ["radar", "radar_base_feed_url"], config ) self.socradar_key = get_config_variable( - "RADAR_SOCRADAR_KEY", ["radar", "socradar_key"], config - ) - self.collections = get_config_variable( - "RADAR_COLLECTIONS_UUID", ["radar", "collections_uuid"], config + "RADAR_SOCRADAR_KEY", ["radar", "radar_socradar_key"], config ) + + # Step 3.2: Set run interval self.interval = get_config_variable( - "RADAR_INTERVAL", ["radar", "run_interval"], config, True + "RADAR_RUN_INTERVAL", + ["radar", "radar_run_interval"], + config, + default=DEFAULT_INTERVAL, ) + if isinstance(self.interval, str): + self.interval = int(self.interval) - # Initialize empty identity mapping - self.identity_mapping = {} + # Step 3.3: Configure collections + raw_collections = get_config_variable( + "RADAR_COLLECTIONS_UUID", ["radar", "radar_collections_uuid"], config + ) + if isinstance(raw_collections, str): + try: + self.collections = json.loads(raw_collections) + except Exception: + self.collections = {} + else: + self.collections = raw_collections or {} - # Step 1.2: Initialize regex patterns for value classification + # Step 3.4: Set format type for API requests + self.format_type = ".json?key=" + + # Step 4.0: Initialize caches and patterns + self.identity_cache: Dict[str, Stix2Identity] = {} self.regex_patterns = { "md5": r"^[a-fA-F\d]{32}$", "sha1": r"^[a-fA-F\d]{40}$", @@ -63,292 +104,285 @@ def __init__(self): "url": r"^(https?|ftp):\/\/[^\s/$.?#].[^\s]*$", } - def _get_indicator_type(self, feed_type): - """Map feed types to STIX indicator types""" - type_mapping = { - "url": ["url-watchlist"], - "domain": ["domain-watchlist"], - "ip": ["ip-watchlist"], - "hash": ["file-hash-watchlist"], + # =============================================================================== + # Utility Methods + # =============================================================================== + def _matches_pattern(self, value: str, pattern_name: str) -> bool: + """Match value against regex pattern""" + return bool(re.match(self.regex_patterns[pattern_name], value)) + + def _validate_dates(self, first_seen: str, last_seen: str): + """Validate and convert date strings to datetime objects""" + # Step 1.0: Set datetime format + dt_format = "%Y-%m-%d %H:%M:%S" + + # Step 1.1: Convert strings to datetime objects + valid_from = datetime.strptime(first_seen, dt_format) + valid_until = datetime.strptime(last_seen, dt_format) + + # Step 1.2: Ensure valid time range + if valid_until <= valid_from: + valid_until = valid_from + timedelta(hours=1) + + return valid_from, valid_until + + def _create_stix_pattern(self, value: str, feed_type: str) -> str: + """ + Build a STIX pattern from feed_type or fallback detection + (handles ip, domain, url, hash, etc.) + """ + # If feed_type is "ip", check IPv4 or IPv6 + if feed_type == "ip": + if self._matches_pattern(value, "ipv4"): + return f"[ipv4-addr:value = '{value}']" + elif self._matches_pattern(value, "ipv6"): + return f"[ipv6-addr:value = '{value}']" + + known_patterns = { + "url": lambda v: f"[url:value = '{v}']", + "domain": lambda v: f"[domain-name:value = '{v}']", + "ipv4": lambda v: f"[ipv4-addr:value = '{v}']", + "ipv6": lambda v: f"[ipv6-addr:value = '{v}']", + "md5": lambda v: f"[file:hashes.'MD5' = '{v}']", + "sha1": lambda v: f"[file:hashes.'SHA-1' = '{v}']", + "sha256": lambda v: f"[file:hashes.'SHA-256' = '{v}']", } - return type_mapping.get(feed_type, ["malicious-activity"]) - def _get_or_create_identity(self, maintainer_name): - """Get existing identity or create new one for maintainer""" - try: - if maintainer_name in self.identity_mapping: - return self.identity_mapping[maintainer_name] + if feed_type in known_patterns: + return known_patterns[feed_type](value) - current_time = datetime.utcnow() + # Fallback detection + for ptype, regex in self.regex_patterns.items(): + if re.match(regex, value): + # e.g. ptype=md5 => "[file:hashes.'MD5' = '...']" + if ptype in known_patterns: + return known_patterns[ptype](value) + + # Otherwise, custom + return f"[x-custom:value = '{value}']" - # Create new identity for maintainer - identity = Identity( - id=f"identity--{str(uuid.uuid4())}", + def _get_or_create_identity(self, maintainer_name: str): + """ + Use pycti.Identity.generate_id(...) for stable dedup + Return a stix2.Identity w/ that ID + """ + if maintainer_name in self.identity_cache: + return self.identity_cache[maintainer_name] + + try: + identity_id = PyctiIdentity.generate_id( name=maintainer_name, identity_class="organization", - description=f"Feed Provider: {maintainer_name}", - sectors=["technology"], - created=current_time, - modified=current_time, ) - - # Store in mapping for reuse - self.identity_mapping[maintainer_name] = identity - self.helper.log_info( - f"Created new identity for maintainer: {maintainer_name}" + now = datetime.utcnow() + identity = Stix2Identity( + id=identity_id, + name=maintainer_name, + identity_class="organization", + description=f"Feed Provider: {maintainer_name}", + created=now, + modified=now, ) + self.identity_cache[maintainer_name] = identity return identity - except Exception as e: self.helper.log_error( - f"Error creating identity for {maintainer_name}: {str(e)}" + f"Error creating Identity for {maintainer_name}: {str(e)}" ) return None - def _process_feed_item(self, item, work_id): - try: - value = item["feed"] - feed_type = item.get("feed_type", "").lower() - maintainer = item.get("maintainer_name", "Unknown") - first_seen = datetime.strptime(item["first_seen_date"], "%Y-%m-%d %H:%M:%S") - last_seen = datetime.strptime(item["latest_seen_date"], "%Y-%m-%d %H:%M:%S") - - if last_seen <= first_seen: - last_seen = first_seen + timedelta(days=365) - - # Get or create identity for this maintainer - maintainer_identity = self._get_or_create_identity(maintainer) - if not maintainer_identity: - self.helper.log_error( - f"Could not create identity for maintainer: {maintainer}" - ) - return - - pattern = self._create_stix_pattern(value, feed_type) - if not pattern: - self.helper.log_error( - f"Could not create pattern for: {value} ({feed_type})" - ) - return - - # Create kill chain phase - kill_chain_phase = KillChainPhase( - kill_chain_name="lockheed-martin-cyber-kill-chain", - phase_name="reconnaissance", - ) - - indicator = Indicator( - id=f"indicator--{str(uuid.uuid4())}", - name=f"{feed_type.upper()}: {value}", - description=f"Type: {feed_type}\nValue: {value}\nSource: {maintainer}", - pattern_type="stix", - pattern=pattern, - valid_from=first_seen, - valid_until=last_seen, - labels=[feed_type, "malicious-activity"], - confidence=75, - indicator_types=self._get_indicator_type(feed_type), - kill_chain_phases=[kill_chain_phase], - created=first_seen, - modified=first_seen, - created_by_ref=maintainer_identity.id, # Use maintainer's identity - object_marking_refs=[TLP_WHITE], - ) - - # Create relationship between indicator and maintainer identity - relationship = Relationship( - id=f"relationship--{str(uuid.uuid4())}", - relationship_type="created-by", - source_ref=indicator.id, - target_ref=maintainer_identity.id, - description=f"This indicator was created by {maintainer}", - created=first_seen, - modified=first_seen, - confidence=75, - object_marking_refs=[TLP_WHITE], - ) - - # Create bundle with all objects - bundle = Bundle(objects=[maintainer_identity, indicator, relationship]) - - # Send to OpenCTI - self.helper.send_stix2_bundle(bundle.serialize(), work_id=work_id) - - self.helper.log_info( - f"Created {feed_type} indicator for: {value} from {maintainer}" + ######################################################################## + # Feed Processing + ######################################################################## + + def _process_feed_item(self, item: dict): + """Process single feed item into STIX objects""" + # Step 1.0: Initialize empty list for STIX objects + stix_objects = [] + + # Step 2.0: Extract core fields from feed item + # Step 2.1: Get primary indicator value + value = item.get("feed") + # Step 2.2: Get indicator type (default to IP if not specified) + feed_type = item.get("feed_type", "ip").lower() + # Step 2.3: Get source/maintainer information + maintainer = item.get("maintainer_name", "Unknown") + + # Step 3.0: Extract and validate timestamp fields + # Step 3.1: Get first seen date + first_seen_str = item.get("first_seen_date") + # Step 3.2: Get last seen date + last_seen_str = item.get("latest_seen_date") + # Step 3.3: Validate required fields exist + if not (value and first_seen_str and last_seen_str): + self.helper.log_error(f"Item missing fields: {item}") + return stix_objects + + # Step 4.0: Convert and validate dates + valid_from, valid_until = self._validate_dates(first_seen_str, last_seen_str) + + # Step 5.0: Create or get cached identity object + identity_obj = self._get_or_create_identity(maintainer) + if not identity_obj: + return stix_objects + + # Step 6.0: Generate STIX pattern for indicator + pattern = self._create_stix_pattern(value, feed_type) + if not pattern: + self.helper.log_error( + f"Could not determine pattern for: {value} / {feed_type}" ) + return stix_objects - except Exception as e: - self.helper.log_error(f"Error processing item {str(item)}: {str(e)}") - - def _create_observable(self, value, feed_type): - """Create appropriate observable based on value type with proper STIX ID""" + # Step 7.0: Generate stable indicator ID try: - if feed_type == "url" or self._matches_pattern(value, "url"): - return URL( - id=f"url--{str(uuid.uuid4())}", - value=value, - type="url", - defanged=False, - ) - elif feed_type == "domain" or self._matches_pattern(value, "domain"): - return DomainName( - id=f"domain-name--{str(uuid.uuid4())}", - value=value, - type="domain-name", - defanged=False, - ) - elif feed_type == "ip" or self._matches_pattern(value, "ipv4"): - return IPv4Address( - id=f"ipv4-addr--{str(uuid.uuid4())}", - value=value, - type="ipv4-addr", - defanged=False, - ) - elif self._matches_pattern(value, "ipv6"): - return IPv6Address( - id=f"ipv6-addr--{str(uuid.uuid4())}", - value=value, - type="ipv6-addr", - defanged=False, - ) - elif feed_type == "hash": - if self._matches_pattern(value, "md5"): - return File( - id=f"file--{str(uuid.uuid4())}", - type="file", - hashes={"MD5": value}, - ) - elif self._matches_pattern(value, "sha1"): - return File( - id=f"file--{str(uuid.uuid4())}", - type="file", - hashes={"SHA-1": value}, - ) - elif self._matches_pattern(value, "sha256"): - return File( - id=f"file--{str(uuid.uuid4())}", - type="file", - hashes={"SHA-256": value}, - ) - return None + indicator_id = PyctiIndicator.generate_id(pattern) except Exception as e: - self.helper.log_error(f"Error creating observable for {value}: {str(e)}") - return None + self.helper.log_error(f"Indicator ID generation error: {str(e)}") + return stix_objects + + # Step 8.0: Create STIX2 Indicator object + indicator = Stix2Indicator( + id=indicator_id, + name=f"{feed_type.upper()}: {value}", + description=f"Source: {maintainer}\nValue: {value}", + pattern=pattern, + pattern_type="stix", + valid_from=valid_from, + valid_until=valid_until, + created_by_ref=identity_obj.id, + object_marking_refs=[TLP_WHITE.id], + labels=["malicious-activity", feed_type], + confidence=75, + created=valid_from, + modified=valid_from, + ) - def _matches_pattern(self, value, pattern_name): - """Step 3.2: Check if value matches a regex pattern""" - import re - - return re.match(self.regex_patterns[pattern_name], value) is not None - - def _create_stix_pattern(self, value, feed_type): - """Create STIX pattern based on feed type""" - # Direct mapping for feed types - if feed_type == "url": - return f"[url:value = '{value}']" - elif feed_type == "domain": - return f"[domain-name:value = '{value}']" - elif feed_type == "ip": # Add handling for IP type - if re.match(self.regex_patterns["ipv4"], value): - return f"[ipv4-addr:value = '{value}']" - elif re.match(self.regex_patterns["ipv6"], value): - return f"[ipv6-addr:value = '{value}']" - elif feed_type == "hash": - if re.match(self.regex_patterns["md5"], value): - return f"[file:hashes.'MD5' = '{value}']" - elif re.match(self.regex_patterns["sha1"], value): - return f"[file:hashes.'SHA-1' = '{value}']" - elif re.match(self.regex_patterns["sha256"], value): - return f"[file:hashes.'SHA-256' = '{value}']" - - # Fallback to pattern detection if feed_type doesn't match - for pattern_type, regex in self.regex_patterns.items(): - if re.match(regex, value): - if pattern_type == "url": - return f"[url:value = '{value}']" - elif pattern_type == "domain": - return f"[domain-name:value = '{value}']" - elif pattern_type == "ipv4": - return f"[ipv4-addr:value = '{value}']" - elif pattern_type == "ipv6": - return f"[ipv6-addr:value = '{value}']" - - self.helper.log_error( - f"Could not determine pattern for value: {value} (type: {feed_type})" + # Step 9.0: Create relationship between indicator and identity + # Step 9.1: Generate stable relationship ID + relationship_id = StixCoreRelationship.generate_id( + "created-by", indicator.id, identity_obj.id ) - return None + # Step 9.2: Format timestamps for relationship + created_str = valid_from.strftime("%Y-%m-%dT%H:%M:%SZ") + modified_str = valid_until.strftime("%Y-%m-%dT%H:%M:%SZ") + + # Step 9.3: Build relationship object + relationship_obj = { + "id": relationship_id, + "type": "relationship", + "spec_version": "2.1", + "x_opencti_type": "stix-core-relationship", + "relationship_type": "created-by", + "source_ref": indicator.id, + "target_ref": identity_obj.id, + "created": created_str, + "modified": modified_str, + "confidence": 75, + "object_marking_refs": [TLP_WHITE.id], + } - def run(self): - self.helper.log_info("Starting RadarConnector...") + # Step 10.0: Combine all STIX objects + stix_objects.extend([identity_obj, indicator, relationship_obj]) + # Step 10.1: Log success + self.helper.log_info( + f"Created {feed_type} indicator => {value} from {maintainer}" + ) + # Step 10.2: Return combined objects + return stix_objects - # Force first run by setting last_run to None - self.helper.set_state(None) + def _process_feed(self, work_id: str): + """ + Batched feed ingestion in chunks of 1000 items + """ + self.helper.log_info("RadarConnector: Starting feed collection...") - while True: + for collection_name, collection_data in self.collections.items(): try: - timestamp = int(time.time()) - current_state = self.helper.get_state() - last_run = current_state.get("last_run") if current_state else None - - if last_run is None or ((timestamp - last_run) > self.interval): - self.helper.log_info("Starting feed collection...") - - for collection_name, collection_data in self.collections.items(): - try: - collection_id = collection_data["id"][0] - feed_url = f"{self.base_url}{collection_id}{self.format_type}{self.socradar_key}&v=2" - - self.helper.log_info( - f"Fetching data from: {collection_name}" - ) - response = requests.get(feed_url) - response.raise_for_status() - - items = response.json() - total_items = len(items) - self.helper.log_info( - f"Processing {total_items} items from {collection_name}" - ) - - # Process in smaller batches - batch_size = 10 # Reduced from 100 - total_batches = (total_items + batch_size - 1) // batch_size - - for i in range(0, total_items, batch_size): - batch = items[i : i + batch_size] - current_batch = i // batch_size + 1 - - self.helper.log_info( - f"Processing batch {current_batch}/{total_batches}" - ) - - work_id = self.helper.api.work.initiate_work( - self.helper.connect_id, - f"Processing batch {current_batch}/{total_batches} of {collection_name}", - ) - - for item in batch: - self._process_feed_item(item, work_id) - time.sleep(0.5) # Add small delay between items - - self.helper.api.work.to_processed( - work_id, "Batch processing completed" - ) - time.sleep(2) # Increased delay between batches - - except Exception as e: - self.helper.log_error( - f"Error processing collection {collection_name}: {str(e)}" - ) - - self.helper.set_state({"last_run": timestamp}) + # Build feed URL + coll_id = collection_data["id"][0] + feed_url = ( + f"{self.base_url}{coll_id}{self.format_type}{self.socradar_key}&v=2" + ) + + self.helper.log_info( + f"Fetching data from {collection_name} => {feed_url}" + ) + resp = requests.get(feed_url, timeout=30) + resp.raise_for_status() + items = resp.json() + self.helper.log_info(f"Got {len(items)} items from {collection_name}") + + stix_batch = [] + total_sent = 0 + + for idx, item in enumerate(items, start=1): + new_objs = self._process_feed_item(item) + if new_objs: + stix_batch.extend(new_objs) + + # If we reached a batch boundary + if idx % BATCH_SIZE == 0: + bundle = Bundle(objects=stix_batch, allow_custom=True) + self.helper.send_stix2_bundle( + bundle.serialize(), work_id=work_id + ) + total_sent += len(stix_batch) + self.helper.log_info( + f"Sent batch of {len(stix_batch)} objects (total: {total_sent})" + ) + stix_batch = [] + + # Final leftover + if stix_batch: + bundle = Bundle(objects=stix_batch, allow_custom=True) + self.helper.send_stix2_bundle(bundle.serialize(), work_id=work_id) + total_sent += len(stix_batch) self.helper.log_info( - f"Feed collection complete. Next run in {self.interval} seconds" + f"Sent final batch of {len(stix_batch)} objects (total: {total_sent})" ) - time.sleep(60) - except Exception as e: - self.helper.log_error(str(e)) - time.sleep(60) + self.helper.log_error(f"Failed to process {collection_name}: {str(e)}") + + ######################################################################## + # Connector Workflow + ######################################################################## + def process_message(self): + """ + Called each run. Create "Work", process feed, finalize. + """ + self.helper.log_info("RadarConnector: process_message started.") + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + friendly_name = f"SOCRadar Connector run @ {now_str}" + work_id = self.helper.api.work.initiate_work( + self.helper.connect_id, friendly_name + ) + + try: + self._process_feed(work_id) + message = "Radar feed import complete" + self.helper.api.work.to_processed(work_id, message) + self.helper.log_info(message) + except (KeyboardInterrupt, SystemExit): + self.helper.log_info("RadarConnector interrupted. Stopping.") + sys.exit(0) + except Exception as e: + self.helper.log_error(f"process_message error: {str(e)}") + + def run(self): + """ + Run in start-up. + Mainly runs with OpenCTI's schedule_iso. + """ + # Step 1.0: Run immediately on startup + self.helper.log_info("Running initial collection...") + self.process_message() + + # Step 2.0: Schedule recurring runs + duration_period = f"PT{self.interval}S" # e.g., PT600S for 10 minutes + self.helper.log_info(f"Scheduling recurring runs every {self.interval} seconds") + + self.helper.schedule_iso( + message_callback=self.process_message, duration_period=duration_period + ) diff --git a/external-import/socradar/src/main.py b/external-import/socradar/src/main.py index e1960d0a36..1051054e50 100644 --- a/external-import/socradar/src/main.py +++ b/external-import/socradar/src/main.py @@ -1,5 +1,9 @@ from lib.radar import RadarConnector if __name__ == "__main__": - connector = RadarConnector() - connector.run() + try: + connector = RadarConnector() + connector.run() + except Exception as e: + print(f"Error running connector: {str(e)}") + exit(1) diff --git a/socradar/Dockerfile b/socradar/Dockerfile deleted file mode 100644 index 8377304822..0000000000 --- a/socradar/Dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -FROM python:3.11 - -# Install system dependencies -RUN apt-get update && apt-get install -y \ - libmagic1 \ - libmagic-dev \ - file \ - && rm -rf /var/lib/apt/lists/* - -# Copy the connector -COPY src /opt/opencti-connector-socradar/src/ -COPY requirements.txt /opt/opencti-connector-socradar/ - -# Set the Python path -ENV PYTHONPATH="/opt/opencti-connector-socradar/src" - -# Install Python packages -RUN pip3 install --no-cache-dir -r /opt/opencti-connector-socradar/requirements.txt - -# Expose and entrypoint -COPY entrypoint.sh / -RUN chmod +x /entrypoint.sh -ENTRYPOINT ["/entrypoint.sh"] diff --git a/socradar/README.md b/socradar/README.md deleted file mode 100644 index 8c8c23554a..0000000000 --- a/socradar/README.md +++ /dev/null @@ -1,40 +0,0 @@ -# OpenCTI SOCRadar Connector - -The connector imports threat intelligence feeds from SOCRadar into OpenCTI. It processes various types of indicators including IP addresses, domains, URLs, and file hashes. - -SOCRadar provides comprehensive threat intelligence feeds that can be used to detect and prevent various types of cyber threats. The connector fetches these feeds and converts them into standardized STIX2 format for use in OpenCTI, enabling organizations to enhance their threat detection and response capabilities. - -## Installation - -### Requirements - -- OpenCTI Platform >= 6.4.5 -- SOCRadar API key -- Python 3.11+ - -### Configuration - -| Parameter | Docker envvar | Mandatory | Description | -| --- | --- | --- | --- | -| `opencti.url` | `OPENCTI_URL` | Yes | The URL of your OpenCTI platform | -| `opencti.token` | `OPENCTI_TOKEN` | Yes | Your OpenCTI admin token | -| `radar.radar_base_feed_url` | `RADAR_BASE_FEED_URL` | Yes | SOCRadar API base URL | -| `radar.radar_socradar_key` | `RADAR_SOCRADAR_KEY` | Yes | Your SOCRadar API key | -| `radar.radar_run_interval` | `RADAR_RUN_INTERVAL` | Yes | Time between runs (in seconds, default: 600) | -| `radar.radar_collections_uuid` | `RADAR_COLLECTIONS_UUID` | Yes | Collection IDs to fetch | - -### Debugging - -- Set log level to `debug` for detailed connector operations -- Check API key validity if feed collection fails -- Verify network connectivity to SOCRadar API -- Ensure OpenCTI platform is accessible -- Monitor memory usage for large feed processing - -### Additional Information - -The connector processes the following data: -* IP addresses (IPv4 and IPv6) -* Domain names -* URLs -* File hashes (MD5, SHA1, SHA256) diff --git a/socradar/entrypoint.sh b/socradar/entrypoint.sh deleted file mode 100644 index 0699cab37a..0000000000 --- a/socradar/entrypoint.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/sh - -# Add debugging information -echo "Current directory: $(pwd)" -echo "Python path: $PYTHONPATH" -echo "Directory contents:" -ls -la /opt/opencti-connector-socradar/src - -# Directly execute python script -cd /opt/opencti-connector-socradar -python3 src/main.py diff --git a/socradar/requirements.txt b/socradar/requirements.txt deleted file mode 100644 index 318d5347ed..0000000000 --- a/socradar/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -pycti==6.4.2 -python-dateutil==2.8.2 -PyYAML==6.0.1 -requests~=2.32.2 -stix2==3.0.1 diff --git a/socradar/src/config.yml.sample b/socradar/src/config.yml.sample deleted file mode 100644 index 0eeb3e2de0..0000000000 --- a/socradar/src/config.yml.sample +++ /dev/null @@ -1,21 +0,0 @@ -opencti: - url: 'http://localhost:8080' - token: 'OPENCTI_TOKEN' - -connector: - id: 'CONNECTOR_ID' - type: 'EXTERNAL_IMPORT' - name: 'SOCRadar' - scope: 'socradar' - confidence_level: 75 - log_level: 'info' - update_existing_data: true - -radar: - radar_base_feed_url: "https://platform.socradar.com/api/threat/intelligence/feed_list/" - radar_socradar_key: "SOCRADAR_KEY" - radar_run_interval: 600 - radar_collections_uuid: - collection_1: - id: ["COLLECTION_UUID"] - name: ["COLLECTION_NAME"] \ No newline at end of file diff --git a/socradar/src/lib/radar.py b/socradar/src/lib/radar.py deleted file mode 100644 index ea480c77d9..0000000000 --- a/socradar/src/lib/radar.py +++ /dev/null @@ -1,369 +0,0 @@ -# =============================================================================== -# Imports: System and Third-Party Libraries -# =============================================================================== -import json -import os -import re -import sys -from datetime import datetime, timedelta - -import requests -import yaml - -# =============================================================================== -# Imports: OpenCTI Libraries -# =============================================================================== -# PyCTI -from pycti import Identity as PyctiIdentity -from pycti import Indicator as PyctiIndicator -from pycti import OpenCTIConnectorHelper, StixCoreRelationship, get_config_variable - -# STIX2 -from stix2 import TLP_WHITE, Bundle -from stix2 import Identity as Stix2Identity -from stix2 import Indicator as Stix2Indicator - - -# =============================================================================== -# Main Operator: RadarConnector -# =============================================================================== -class RadarConnector: - def __init__(self): - """Initialize RadarConnector with configuration and helpers""" - # Step 1.0: Set up configuration paths - base_dir = os.path.dirname(os.path.abspath(__file__)) - config_path = os.path.join(base_dir, "..", "config.yml") - - # Step 1.1: Load configuration file - if os.path.isfile(config_path): - with open(config_path, "r") as f: - config = yaml.safe_load(f) - else: - config = {} - - # Step 2.0: Initialize OpenCTI helper - self.helper = OpenCTIConnectorHelper(config) - - # Step 3.0: Configure feed parameters - # Step 3.1: Set base URL and API key - self.base_url = get_config_variable( - "RADAR_BASE_FEED_URL", ["radar", "radar_base_feed_url"], config - ) - self.socradar_key = get_config_variable( - "RADAR_SOCRADAR_KEY", ["radar", "radar_socradar_key"], config - ) - - # Step 3.2: Set run interval - self.interval = get_config_variable( - "RADAR_RUN_INTERVAL", ["radar", "radar_run_interval"], config, default=600 - ) - if isinstance(self.interval, str): - self.interval = int(self.interval) - - # Step 3.3: Configure collections - raw_collections = get_config_variable( - "RADAR_COLLECTIONS_UUID", ["radar", "radar_collections_uuid"], config - ) - if isinstance(raw_collections, str): - try: - self.collections = json.loads(raw_collections) - except Exception: - self.collections = {} - else: - self.collections = raw_collections or {} - - # Step 3.4: Set format type for API requests - self.format_type = ".json?key=" - - # Step 4.0: Initialize caches and patterns - self.identity_cache = {} - self.regex_patterns = { - "md5": r"^[a-fA-F\d]{32}$", - "sha1": r"^[a-fA-F\d]{40}$", - "sha256": r"^[a-fA-F\d]{64}$", - "ipv4": r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$", - "ipv6": r"^(?:[a-fA-F\d]{1,4}:){7}[a-fA-F\d]{1,4}$", - "domain": r"^(?=.{1,255}$)(?:[A-Za-z0-9](?:[A-Za-z0-9-]{0,61}[A-Za-z0-9])?\.)+[A-Za-z]{2,6}$", - "url": r"^(https?|ftp):\/\/[^\s/$.?#].[^\s]*$", - } - - # =============================================================================== - # Utility Methods - # =============================================================================== - def _matches_pattern(self, value: str, pattern_name: str) -> bool: - """Match value against regex pattern""" - return bool(re.match(self.regex_patterns[pattern_name], value)) - - def _validate_dates(self, first_seen: str, last_seen: str): - """Validate and convert date strings to datetime objects""" - # Step 1.0: Set datetime format - dt_format = "%Y-%m-%d %H:%M:%S" - - # Step 1.1: Convert strings to datetime objects - valid_from = datetime.strptime(first_seen, dt_format) - valid_until = datetime.strptime(last_seen, dt_format) - - # Step 1.2: Ensure valid time range - if valid_until <= valid_from: - valid_until = valid_from + timedelta(hours=1) - - return valid_from, valid_until - - def _create_stix_pattern(self, value: str, feed_type: str) -> str: - """ - Build a STIX pattern from feed_type or fallback detection - (handles ip, domain, url, hash, etc.) - """ - # If feed_type is "ip", check IPv4 or IPv6 - if feed_type == "ip": - if self._matches_pattern(value, "ipv4"): - return f"[ipv4-addr:value = '{value}']" - elif self._matches_pattern(value, "ipv6"): - return f"[ipv6-addr:value = '{value}']" - - known_patterns = { - "url": lambda v: f"[url:value = '{v}']", - "domain": lambda v: f"[domain-name:value = '{v}']", - "ipv4": lambda v: f"[ipv4-addr:value = '{v}']", - "ipv6": lambda v: f"[ipv6-addr:value = '{v}']", - "md5": lambda v: f"[file:hashes.'MD5' = '{v}']", - "sha1": lambda v: f"[file:hashes.'SHA-1' = '{v}']", - "sha256": lambda v: f"[file:hashes.'SHA-256' = '{v}']", - } - - if feed_type in known_patterns: - return known_patterns[feed_type](value) - - # Fallback detection - for ptype, regex in self.regex_patterns.items(): - if re.match(regex, value): - # e.g. ptype=md5 => "[file:hashes.'MD5' = '...']" - if ptype in known_patterns: - return known_patterns[ptype](value) - - # Otherwise, custom - return f"[x-custom:value = '{value}']" - - def _get_or_create_identity(self, maintainer_name: str): - """ - Use pycti.Identity.generate_id(...) for stable dedup - Return a stix2.Identity w/ that ID - """ - if maintainer_name in self.identity_cache: - return self.identity_cache[maintainer_name] - - try: - identity_id = PyctiIdentity.generate_id( - name=maintainer_name, - identity_class="organization", - ) - now = datetime.utcnow() - identity = Stix2Identity( - id=identity_id, - name=maintainer_name, - identity_class="organization", - description=f"Feed Provider: {maintainer_name}", - created=now, - modified=now, - ) - self.identity_cache[maintainer_name] = identity - return identity - except Exception as e: - self.helper.log_error( - f"Error creating Identity for {maintainer_name}: {str(e)}" - ) - return None - - ######################################################################## - # Feed Processing - ######################################################################## - - def _process_feed_item(self, item: dict): - """Process single feed item into STIX objects""" - # Step 1.0: Initialize empty list for STIX objects - stix_objects = [] - - # Step 2.0: Extract core fields from feed item - # Step 2.1: Get primary indicator value - value = item.get("feed") - # Step 2.2: Get indicator type (default to IP if not specified) - feed_type = item.get("feed_type", "ip").lower() - # Step 2.3: Get source/maintainer information - maintainer = item.get("maintainer_name", "Unknown") - - # Step 3.0: Extract and validate timestamp fields - # Step 3.1: Get first seen date - first_seen_str = item.get("first_seen_date") - # Step 3.2: Get last seen date - last_seen_str = item.get("latest_seen_date") - # Step 3.3: Validate required fields exist - if not (value and first_seen_str and last_seen_str): - self.helper.log_error(f"Item missing fields: {item}") - return stix_objects - - # Step 4.0: Convert and validate dates - valid_from, valid_until = self._validate_dates(first_seen_str, last_seen_str) - - # Step 5.0: Create or get cached identity object - identity_obj = self._get_or_create_identity(maintainer) - if not identity_obj: - return stix_objects - - # Step 6.0: Generate STIX pattern for indicator - pattern = self._create_stix_pattern(value, feed_type) - if not pattern: - self.helper.log_error( - f"Could not determine pattern for: {value} / {feed_type}" - ) - return stix_objects - - # Step 7.0: Generate stable indicator ID - try: - indicator_id = PyctiIndicator.generate_id(pattern) - except Exception as e: - self.helper.log_error(f"Indicator ID generation error: {str(e)}") - return stix_objects - - # Step 8.0: Create STIX2 Indicator object - indicator = Stix2Indicator( - id=indicator_id, - name=f"{feed_type.upper()}: {value}", - description=f"Source: {maintainer}\nValue: {value}", - pattern=pattern, - pattern_type="stix", - valid_from=valid_from, - valid_until=valid_until, - created_by_ref=identity_obj.id, - object_marking_refs=[TLP_WHITE.id], - labels=["malicious-activity", feed_type], - confidence=75, - created=valid_from, - modified=valid_from, - ) - - # Step 9.0: Create relationship between indicator and identity - # Step 9.1: Generate stable relationship ID - relationship_id = StixCoreRelationship.generate_id( - "created-by", indicator.id, identity_obj.id - ) - # Step 9.2: Format timestamps for relationship - created_str = valid_from.strftime("%Y-%m-%dT%H:%M:%SZ") - modified_str = valid_until.strftime("%Y-%m-%dT%H:%M:%SZ") - - # Step 9.3: Build relationship object - relationship_obj = { - "id": relationship_id, - "type": "relationship", - "spec_version": "2.1", - "x_opencti_type": "stix-core-relationship", - "relationship_type": "created-by", - "source_ref": indicator.id, - "target_ref": identity_obj.id, - "created": created_str, - "modified": modified_str, - "confidence": 75, - "object_marking_refs": [TLP_WHITE.id], - } - - # Step 10.0: Combine all STIX objects - stix_objects.extend([identity_obj, indicator, relationship_obj]) - # Step 10.1: Log success - self.helper.log_info( - f"Created {feed_type} indicator => {value} from {maintainer}" - ) - # Step 10.2: Return combined objects - return stix_objects - - def _process_feed(self, work_id: str): - """ - Batched feed ingestion in chunks of 50 items - """ - self.helper.log_info("RadarConnector: Starting feed collection...") - - for collection_name, collection_data in self.collections.items(): - try: - # Build feed URL - coll_id = collection_data["id"][0] - feed_url = ( - f"{self.base_url}{coll_id}{self.format_type}{self.socradar_key}&v=2" - ) - - self.helper.log_info( - f"Fetching data from {collection_name} => {feed_url}" - ) - resp = requests.get(feed_url, timeout=30) - resp.raise_for_status() - items = resp.json() - self.helper.log_info(f"Got {len(items)} items from {collection_name}") - - BATCH_SIZE = 50 - stix_batch = [] - - for idx, item in enumerate(items, start=1): - new_objs = self._process_feed_item(item) - if new_objs: - stix_batch.extend(new_objs) - - # If we reached a batch boundary - if idx % BATCH_SIZE == 0: - bundle = Bundle(objects=stix_batch, allow_custom=True) - self.helper.send_stix2_bundle( - bundle.serialize(), work_id=work_id - ) - self.helper.log_info( - f"Sent {len(stix_batch)} objects for items {idx - BATCH_SIZE + 1}..{idx}" - ) - stix_batch = [] - - # Final leftover - if stix_batch: - bundle = Bundle(objects=stix_batch, allow_custom=True) - self.helper.send_stix2_bundle(bundle.serialize(), work_id=work_id) - self.helper.log_info( - f"Sent {len(stix_batch)} leftover objects for {collection_name}" - ) - - except Exception as e: - self.helper.log_error(f"Failed to process {collection_name}: {str(e)}") - - ######################################################################## - # Connector Workflow - ######################################################################## - def process_message(self): - """ - Called each run. Create "Work", process feed, finalize. - """ - self.helper.log_info("RadarConnector: process_message started.") - now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - friendly_name = f"SOCRadar Connector run @ {now_str}" - work_id = self.helper.api.work.initiate_work( - self.helper.connect_id, friendly_name - ) - - try: - self._process_feed(work_id) - message = "Radar feed import complete" - self.helper.api.work.to_processed(work_id, message) - self.helper.log_info(message) - except (KeyboardInterrupt, SystemExit): - self.helper.log_info("RadarConnector interrupted. Stopping.") - sys.exit(0) - except Exception as e: - self.helper.log_error(f"process_message error: {str(e)}") - - def run(self): - """ - Run in start-up. - Mainly runs with OpenCTI's schedule_iso. - """ - # Step 1.0: Run immediately on startup - self.helper.log_info("Running initial collection...") - self.process_message() - - # Step 2.0: Schedule recurring runs - duration_period = f"PT{self.interval}S" # e.g., PT600S for 10 minutes - self.helper.log_info(f"Scheduling recurring runs every {self.interval} seconds") - - self.helper.schedule_iso( - message_callback=self.process_message, duration_period=duration_period - ) diff --git a/socradar/src/main.py b/socradar/src/main.py deleted file mode 100644 index e1960d0a36..0000000000 --- a/socradar/src/main.py +++ /dev/null @@ -1,5 +0,0 @@ -from lib.radar import RadarConnector - -if __name__ == "__main__": - connector = RadarConnector() - connector.run()