Skip to content

Commit

Permalink
clean up new content template
Browse files Browse the repository at this point in the history
  • Loading branch information
pyth0n1c committed Nov 20, 2024
1 parent c647a9f commit 0a910ce
Showing 1 changed file with 82 additions and 65 deletions.
147 changes: 82 additions & 65 deletions contentctl/actions/new_content.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


from dataclasses import dataclass
import questionary
from typing import Any
Expand Down Expand Up @@ -28,64 +26,87 @@ class NewContent:
}
]

def buildDetection(self)->dict[str,Any]:
def buildDetection(self) -> tuple[dict[str, Any], str]:
questions = NewContentQuestions.get_questions_detection()
answers: dict[str,str] = questionary.prompt(
questions,
kbi_msg="User did not answer all of the prompt questions. Exiting...")
answers: dict[str, str] = questionary.prompt(
questions,
kbi_msg="User did not answer all of the prompt questions. Exiting...",
)
if not answers:
raise ValueError("User didn't answer one or more questions!")
answers.update(answers)
answers['name'] = answers['detection_name']
del answers['detection_name']
answers['id'] = str(uuid.uuid4())
answers['version'] = 1
answers['date'] = datetime.today().strftime('%Y-%m-%d')
answers['author'] = answers['detection_author']
del answers['detection_author']
answers['data_source'] = answers['data_source']
answers['type'] = answers['detection_type']
del answers['detection_type']
answers['status'] = "production" #start everything as production since that's what we INTEND the content to become
answers['description'] = 'UPDATE_DESCRIPTION'
file_name = answers['name'].replace(' ', '_').replace('-','_').replace('.','_').replace('/','_').lower()
answers['search'] = answers['detection_search'] + ' | `' + file_name + '_filter`'
del answers['detection_search']
answers['how_to_implement'] = 'UPDATE_HOW_TO_IMPLEMENT'
answers['known_false_positives'] = 'UPDATE_KNOWN_FALSE_POSITIVES'
answers['references'] = ['REFERENCE']
if answers['type'] in ["TTP", "Correlation", "Anomaly", "TTP"]:
answers['drilldown_searches'] = NewContent.DEFAULT_DRILLDOWN_DEF
answers['tags'] = dict()
answers['tags']['analytic_story'] = ['UPDATE_STORY_NAME']
answers['tags']['asset_type'] = 'UPDATE asset_type'
answers['tags']['confidence'] = 'UPDATE value between 1-100'
answers['tags']['impact'] = 'UPDATE value between 1-100'
answers['tags']['message'] = 'UPDATE message'
answers['tags']['mitre_attack_id'] = [x.strip() for x in answers['mitre_attack_ids'].split(',')]
answers['tags']['observable'] = [{'name': 'UPDATE', 'type': 'UPDATE', 'role': ['UPDATE']}]
answers['tags']['product'] = ['Splunk Enterprise','Splunk Enterprise Security','Splunk Cloud']
answers['tags']['security_domain'] = answers['security_domain']
del answers["security_domain"]
answers['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE']

#generate the tests section
answers['tests'] = [
{
'name': "True Positive Test",
'attack_data': [
{
'data': "Go to https://github.com/splunk/contentctl/wiki for information about the format of this field",
"sourcetype": "UPDATE SOURCETYPE",
"source": "UPDATE SOURCE"
}
]
}
]
del answers["mitre_attack_ids"]
return answers

def buildStory(self)->dict[str,Any]:
data_source_field = (
answers["data_source"] if len(answers["data_source"]) > 0 else ["UPDATE"]
)
file_name = (
answers["detection_name"]
.replace(" ", "_")
.replace("-", "_")
.replace(".", "_")
.replace("/", "_")
.lower()
)

#Minimum lenght for a mitre tactic is 5 characters: T1000
if len(answers["mitre_attack_ids"]) >= 5:
mitre_attack_ids = [x.strip() for x in answers["mitre_attack_ids"].split(",")]
else:
#string was too short, so just put a placeholder
mitre_attack_ids = ["UPDATE"]

output_file_answers: dict[str, Any] = {
"name": answers["detection_name"],
"id": str(uuid.uuid4()),
"version": 1,
"date": datetime.today().strftime("%Y-%m-%d"),
"author": answers["detection_author"],
"status": "production", # start everything as production since that's what we INTEND the content to become
"type": answers["detection_type"],
"description": "UPDATE_DESCRIPTION",
"data_source": data_source_field,
"search": f"{answers['detection_search']} | `{file_name}_filter`'",
"how_to_implement": "UPDATE_HOW_TO_IMPLEMENT",
"known_false_positives": "UPDATE_KNOWN_FALSE_POSITIVES",
"references": ["REFERENCE"],
"drilldown_searches": NewContent.DEFAULT_DRILLDOWN_DEF,
"tags": {
"analytic_story": ["UPDATE_STORY_NAME"],
"asset_type": "UPDATE asset_type",
"confidence": "UPDATE value between 1-100",
"impact": "UPDATE value between 1-100",
"message": "UPDATE message",
"mitre_attack_id": mitre_attack_ids,
"observable": [
{"name": "UPDATE", "type": "UPDATE", "role": ["UPDATE"]}
],
"product": [
"Splunk Enterprise",
"Splunk Enterprise Security",
"Splunk Cloud",
],
"security_domain": answers["security_domain"],
"cve": ["UPDATE WITH CVE(S) IF APPLICABLE"],
},
"tests": [
{
"name": "True Positive Test",
"attack_data": [
{
"data": "Go to https://github.com/splunk/contentctl/wiki for information about the format of this field",
"sourcetype": "UPDATE SOURCETYPE",
"source": "UPDATE SOURCE",
}
],
}
],
}

if answers["detection_type"] not in ["TTP", "Correlation", "Anomaly", "TTP"]:
del output_file_answers["drilldown_searches"]

return output_file_answers, answers['detection_kind']

def buildStory(self) -> dict[str, Any]:
questions = NewContentQuestions.get_questions_story()
answers = questionary.prompt(
questions,
Expand All @@ -110,12 +131,11 @@ def buildStory(self)->dict[str,Any]:
del answers['usecase']
answers['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE']
return answers


def execute(self, input_dto: new) -> None:
if input_dto.type == NewContentType.detection:
content_dict = self.buildDetection()
subdirectory = pathlib.Path('detections') / content_dict.pop('detection_kind')
content_dict, detection_kind = self.buildDetection()
subdirectory = pathlib.Path('detections') / detection_kind
elif input_dto.type == NewContentType.story:
content_dict = self.buildStory()
subdirectory = pathlib.Path('stories')
Expand All @@ -125,23 +145,20 @@ def execute(self, input_dto: new) -> None:
full_output_path = input_dto.path / subdirectory / SecurityContentObject_Abstract.contentNameToFileName(content_dict.get('name'))
YmlWriter.writeYmlFile(str(full_output_path), content_dict)



def writeObjectNewContent(self, object: dict, subdirectory_name: str, type: NewContentType) -> None:
if type == NewContentType.detection:
file_path = os.path.join(self.output_path, 'detections', subdirectory_name, self.convertNameToFileName(object['name'], object['tags']['product']))
output_folder = pathlib.Path(self.output_path)/'detections'/subdirectory_name
#make sure the output folder exists for this detection
# make sure the output folder exists for this detection
output_folder.mkdir(exist_ok=True)

YmlWriter.writeDetection(file_path, object)
print("Successfully created detection " + file_path)

elif type == NewContentType.story:
file_path = os.path.join(self.output_path, 'stories', self.convertNameToFileName(object['name'], object['tags']['product']))
YmlWriter.writeStory(file_path, object)
print("Successfully created story " + file_path)

else:
raise(Exception(f"Object Must be Story or Detection, but is not: {object}"))

0 comments on commit 0a910ce

Please sign in to comment.