diff --git a/Makefile b/Makefile index 2b7b4a2..b5b712b 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ lint: - black --check . + black --diff . isort --check . ruff check . diff --git a/ctfcli/core/challenge.py b/ctfcli/core/challenge.py index 553cc91..517f550 100644 --- a/ctfcli/core/challenge.py +++ b/ctfcli/core/challenge.py @@ -47,7 +47,7 @@ class Challenge(dict): "type", "extra", "image", "protocol", "host", "connection_info", "healthcheck", "attempts", "flags", "files", "topics", "tags", "files", "hints", - "requirements", "state", "version", + "requirements", "next", "state", "version", # fmt: on ] @@ -61,7 +61,7 @@ class Challenge(dict): @staticmethod def load_installed_challenge(challenge_id) -> Dict: api = API() - r = api.get(f"/api/v1/challenges/{challenge_id}") + r = api.get(f"/api/v1/challenges/{challenge_id}?view=admin") if not r.ok: raise RemoteChallengeNotFound(f"Could not load challenge with id={challenge_id}") @@ -103,6 +103,11 @@ def is_default_challenge_property(key: str, value: Any) -> bool: if key in ["tags", "hints", "topics", "requirements", "files"] and value == []: return True + if key == "requirements" and value == {"prerequisites": [], "anonymize": False}: + return True + + if key == "next" and value is None: + return True return False @staticmethod @@ -224,7 +229,9 @@ def _process_challenge_image(self, challenge_image: Optional[str]) -> Optional[I # Check if it's a local pre-built image if ( subprocess.call( - ["docker", "inspect", challenge_image], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ["docker", "inspect", challenge_image], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, ) == 0 ): @@ -407,14 +414,28 @@ def _create_hints(self): def _set_required_challenges(self): remote_challenges = self.load_installed_challenges() required_challenges = [] + anonymize = False + if type(self["requirements"]) == dict: + rc = self["requirements"].get("prerequisites", []) + anonymize = self["requirements"].get("anonymize", False) + else: + rc = self["requirements"] - for required_challenge in self["requirements"]: + for required_challenge in rc: if type(required_challenge) == str: # requirement by name # find the challenge id from installed challenges + found = False for remote_challenge in remote_challenges: if remote_challenge["name"] == required_challenge: required_challenges.append(remote_challenge["id"]) + found = True + break + if found is False: + click.secho( + f'Challenge id cannot be found. Skipping invalid requirement name "{required_challenge}".', + fg="yellow", + ) elif type(required_challenge) == int: # requirement by challenge id @@ -429,11 +450,50 @@ def _set_required_challenges(self): fg="yellow", ) required_challenges.remove(self.challenge_id) + required_challenges.sort() - requirements_payload = {"requirements": {"prerequisites": required_challenges}} + requirements_payload = { + "requirements": { + "prerequisites": required_challenges, + "anonymize": anonymize, + } + } r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json=requirements_payload) r.raise_for_status() + def _set_next(self, _next): + if type(_next) == str: + # nid by name + # find the challenge id from installed challenges + remote_challenges = self.load_installed_challenges() + for remote_challenge in remote_challenges: + if remote_challenge["name"] == _next: + _next = remote_challenge["id"] + break + if type(_next) == str: + click.secho( + "Challenge cannot find next challenge. Maybe it is invalid name or id. It will be cleared.", + fg="yellow", + ) + _next = None + elif type(_next) == int and _next > 0: + # nid by challenge id + # trust it and use it directly + _next = remote_challenge["id"] + else: + _next = None + + if self.challenge_id == _next: + click.secho( + "Challenge cannot set next challenge itself. Skipping invalid next challenge.", + fg="yellow", + ) + _next = None + + next_payload = {"next_id": _next} + r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json=next_payload) + r.raise_for_status() + # Compare challenge requirements, will resolve all IDs to names def _compare_challenge_requirements(self, r1: List[Union[str, int]], r2: List[Union[str, int]]) -> bool: remote_challenges = self.load_installed_challenges() @@ -451,7 +511,27 @@ def normalize_requirements(requirements): return normalized - return normalize_requirements(r1) == normalize_requirements(r2) + nr1 = normalize_requirements(r1) + nr1.sort() + nr2 = normalize_requirements(r2) + nr2.sort() + return nr1 == nr2 + + # Compare next challenges, will resolve all IDs to names + def _compare_challenge_next(self, r1: Union[str, int, None], r2: Union[str, int, None]) -> bool: + def normalize_next(r): + normalized = None + if type(r) == int: + if r > 0: + remote_challenge = self.load_installed_challenge(r) + if remote_challenge["id"] == r: + normalized = remote_challenge["name"] + else: + normalized = r + + return normalized + + return normalize_next(r1) == normalize_next(r2) # Normalize challenge data from the API response to match challenge.yml # It will remove any extra fields from the remote, as well as expand external references @@ -462,7 +542,15 @@ def normalize_requirements(requirements): def _normalize_challenge(self, challenge_data: Dict[str, Any]): challenge = {} - copy_keys = ["name", "category", "attribution", "value", "type", "state", "connection_info"] + copy_keys = [ + "name", + "category", + "attribution", + "value", + "type", + "state", + "connection_info", + ] for key in copy_keys: if key in challenge_data: challenge[key] = challenge_data[key] @@ -483,9 +571,15 @@ def _normalize_challenge(self, challenge_data: Dict[str, Any]): r.raise_for_status() flags = r.json()["data"] challenge["flags"] = [ - f["content"] - if f["type"] == "static" and (f["data"] is None or f["data"] == "") - else {"content": f["content"].strip().replace("\r\n", "\n"), "type": f["type"], "data": f["data"]} + ( + f["content"] + if f["type"] == "static" and (f["data"] is None or f["data"] == "") + else { + "content": f["content"].strip().replace("\r\n", "\n"), + "type": f["type"], + "data": f["data"], + } + ) for f in flags ] @@ -501,7 +595,7 @@ def _normalize_challenge(self, challenge_data: Dict[str, Any]): hints = r.json()["data"] # skipping pre-requisites for hints because they are not supported in ctfcli challenge["hints"] = [ - {"content": h["content"], "cost": h["cost"]} if h["cost"] > 0 else h["content"] for h in hints + ({"content": h["content"], "cost": h["cost"]} if h["cost"] > 0 else h["content"]) for h in hints ] # Add topics @@ -514,12 +608,25 @@ def _normalize_challenge(self, challenge_data: Dict[str, Any]): r = self.api.get(f"/api/v1/challenges/{self.challenge_id}/requirements") r.raise_for_status() requirements = (r.json().get("data") or {}).get("prerequisites", []) + challenge["requirements"] = {"prerequisites": [], "anonymize": False} if len(requirements) > 0: # Prefer challenge names over IDs - r = self.api.get("/api/v1/challenges") + r2 = self.api.get("/api/v1/challenges?view=admin") + r2.raise_for_status() + challenges = r2.json()["data"] + challenge["requirements"]["prerequisites"] = [c["name"] for c in challenges if c["id"] in requirements] + # Add anonymize flag + challenge["requirements"]["anonymize"] = (r.json().get("data") or {}).get("anonymize", False) + + # Add next + nid = challenge_data.get("next_id", None) + if nid: + # Prefer challenge names over IDs + r = self.api.get(f"/api/v1/challenges/{nid}") r.raise_for_status() - challenges = r.json()["data"] - challenge["requirements"] = [c["name"] for c in challenges if c["id"] in requirements] + challenge["next"] = (r.json().get("data") or {}).get("name", None) + else: + challenge["next"] = None return challenge @@ -528,7 +635,10 @@ def _normalize_remote_files(self, remote_files: List[str]) -> Dict[str, Dict[str normalized = {} for f in remote_files: file_parts = f.split("?token=")[0].split("/") - normalized[file_parts[-1]] = {"url": f, "location": f"{file_parts[-2]}/{file_parts[-1]}"} + normalized[file_parts[-1]] = { + "url": f, + "location": f"{file_parts[-2]}/{file_parts[-1]}", + } return normalized @@ -560,7 +670,13 @@ def sync(self, ignore: Tuple[str] = ()) -> None: remote_challenge = self.load_installed_challenge(self.challenge_id) # if value, category, type or description are ignored, revert them to the remote state in the initial payload - reset_properties_if_ignored = ["value", "category", "type", "description", "attribution"] + reset_properties_if_ignored = [ + "value", + "category", + "type", + "description", + "attribution", + ] for p in reset_properties_if_ignored: if p in ignore: challenge_payload[p] = remote_challenge[p] @@ -614,7 +730,10 @@ def sync(self, ignore: Tuple[str] = ()) -> None: # sha1sum is present in CTFd 3.7+, use it instead of always re-uploading the file if possible remote_file_sha1sum = sha1sums[remote_files[local_file_name]["location"]] if remote_file_sha1sum is not None: - with open(self.challenge_directory / local_files[local_file_name], "rb") as lf: + with open( + self.challenge_directory / local_files[local_file_name], + "rb", + ) as lf: local_file_sha1sum = hash_file(lf) if local_file_sha1sum == remote_file_sha1sum: @@ -634,6 +753,11 @@ def sync(self, ignore: Tuple[str] = ()) -> None: if challenge.get("requirements") and "requirements" not in ignore: self._set_required_challenges() + # Set next + _next = challenge.get("next", None) + if "next" not in ignore: + self._set_next(_next) + make_challenge_visible = False # Bring back the challenge to be visible if: @@ -711,6 +835,11 @@ def create(self, ignore: Tuple[str] = ()) -> None: if challenge.get("requirements") and "requirements" not in ignore: self._set_required_challenges() + # Add next + _next = challenge.get("next", None) + if "next" not in ignore: + self._set_next(_next) + # Bring back the challenge if it's supposed to be visible # Either explicitly, or by assuming the default value (possibly because the state is ignored) if challenge.get("state", "visible") == "visible" or "state" in ignore: @@ -723,7 +852,14 @@ def lint(self, skip_hadolint=False, flag_format="flag{") -> bool: issues = {"fields": [], "dockerfile": [], "hadolint": [], "files": []} # Check if required fields are present - for field in ["name", "author", "category", "description", "attribution", "value"]: + for field in [ + "name", + "author", + "category", + "description", + "attribution", + "value", + ]: # value is allowed to be none if the challenge type is dynamic if field == "value" and challenge.get("type") == "dynamic": continue @@ -858,13 +994,34 @@ def verify(self, ignore: Tuple[str] = ()) -> bool: if self.is_default_challenge_property(key, normalized_challenge[key]): continue + click.secho( + f"{key} is not in challenge.", + fg="yellow", + ) + return False if challenge[key] != normalized_challenge[key]: if key == "requirements": - if self._compare_challenge_requirements(challenge[key], normalized_challenge[key]): + if type(challenge[key]) == dict: + cr = challenge[key]["prerequisites"] + ca = challenge[key].get("anonymize", False) + else: + cr = challenge[key] + ca = False + if self._compare_challenge_requirements(cr, normalized_challenge[key]["prerequisites"]): + if ca == normalized_challenge[key]["anonymize"]: + continue + + if key == "next": + if self._compare_challenge_next(challenge[key], normalized_challenge[key]): continue + click.secho( + f"{key} comparison failed.", + fg="yellow", + ) + return False # Handle a special case for files, unless they are ignored @@ -874,18 +1031,30 @@ def verify(self, ignore: Tuple[str] = ()) -> bool: self._validate_files() local_files = {Path(f).name: f for f in challenge["files"]} except InvalidChallengeFile: + click.secho( + "InvalidChallengeFile", + fg="yellow", + ) return False remote_files = self._normalize_remote_files(remote_challenge["files"]) # Check if there are no extra local files for local_file in local_files: if local_file not in remote_files: + click.secho( + f"{local_file} is not in remote challenge.", + fg="yellow", + ) return False sha1sums = self._get_files_sha1sums() # Check if all remote files are present locally for remote_file_name in remote_files: if remote_file_name not in local_files: + click.secho( + f"{remote_file_name} is not in local challenge.", + fg="yellow", + ) return False # sha1sum is present in CTFd 3.7+, use it instead of downloading the file if possible @@ -895,6 +1064,10 @@ def verify(self, ignore: Tuple[str] = ()) -> bool: local_file_sha1sum = hash_file(lf) if local_file_sha1sum != remote_file_sha1sum: + click.secho( + "sha1sum does not match with remote one.", + fg="yellow", + ) return False return True @@ -906,6 +1079,10 @@ def verify(self, ignore: Tuple[str] = ()) -> bool: local_file_contents = (self.challenge_directory / local_files[remote_file_name]).read_bytes() if remote_file_contents != local_file_contents: + click.secho( + "the file content does not match with the remote one.", + fg="yellow", + ) return False return True diff --git a/ctfcli/spec/challenge-example.yml b/ctfcli/spec/challenge-example.yml index 903b4c3..4c73f13 100644 --- a/ctfcli/spec/challenge-example.yml +++ b/ctfcli/spec/challenge-example.yml @@ -61,6 +61,7 @@ flags: - { type: "static", content: "flag{wat}", + data: "case_sensitive", } # A static case insensitive flag - { @@ -115,6 +116,22 @@ requirements: - "Warmup" - "Are you alive" +# The following format for requirements can also be used. This format can also +# support the "anonymize" flag, which will display the challenge but with all +# the challenge information anonymized if it is not unlocked yet. +#requirements: +# prerequisites: +# - "Warmup" +# - "Are you alive" +# anonymize: true + +# The next is used to display a next recommended challenge to a user when +# the user correctly answers the current challenge. +# Can be removed if unused +# Accepts a challenge name as a string, a challenge ID as an integer, or null +# if you want to remove or disable it. +next: null + # The state of the challenge. # If the field is omitted, the challenge is visible by default. # If provided, the field can take one of two values: hidden, visible. @@ -122,4 +139,4 @@ state: hidden # Specifies what version of the challenge specification was used. # Subject to change until ctfcli v1.0.0 -version: "0.1" \ No newline at end of file +version: "0.1" diff --git a/tests/core/test_challenge.py b/tests/core/test_challenge.py index 261e3df..cbc404d 100644 --- a/tests/core/test_challenge.py +++ b/tests/core/test_challenge.py @@ -125,7 +125,7 @@ def test_load_installed_challenge(self, mock_api: MagicMock): Challenge.load_installed_challenge(1) mock_get = mock_api.return_value.get - mock_get.assert_called_once_with("/api/v1/challenges/1") + mock_get.assert_called_once_with("/api/v1/challenges/1?view=admin") @mock.patch("ctfcli.core.challenge.API") def test_load_installed_challenges(self, mock_api: MagicMock): @@ -211,7 +211,7 @@ def test_updates_simple_properties(self, mock_api_constructor: MagicMock, *args, # expect GET calls loading existing resources to check if something needs to be deleted mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -223,6 +223,8 @@ def test_updates_simple_properties(self, mock_api_constructor: MagicMock, *args, [ call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), + call("/api/v1/challenges/1", json={"next_id": None}), + call().raise_for_status(), call("/api/v1/challenges/1", json={"state": "visible"}), call().raise_for_status(), ] @@ -253,7 +255,7 @@ def test_updates_attempts(self, mock_api_constructor: MagicMock, *args, **kwargs mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -301,7 +303,7 @@ def test_updates_extra_properties(self, mock_api_constructor: MagicMock, *args, mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -373,7 +375,7 @@ def test_updates_flags(self, mock_api_constructor: MagicMock, *args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -455,7 +457,7 @@ def test_updates_topics(self, mock_api_constructor: MagicMock, *args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -516,7 +518,7 @@ def test_updates_tags(self, mock_api_constructor: MagicMock, *args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -570,7 +572,7 @@ def test_updates_files(self, mock_api_constructor: MagicMock, *args, **kwargs): def mock_get(*args, **kwargs): path = args[0] - if path == "/api/v1/challenges/1": + if path == "/api/v1/challenges/1" or path == "/api/v1/challenges/1?view=admin": mock_response = MagicMock() mock_response.json.return_value = {"success": True, "data": self.installed_challenges[0]} return mock_response @@ -595,7 +597,7 @@ def mock_get(*args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -682,7 +684,7 @@ def test_updates_hints(self, mock_api_constructor: MagicMock, *args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -736,7 +738,7 @@ def test_updates_requirements(self, mock_api_constructor: MagicMock, *args, **kw mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), ] ) @@ -745,7 +747,9 @@ def test_updates_requirements(self, mock_api_constructor: MagicMock, *args, **kw call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), # challenge 2 retrieved by name, and challenge 3 retrieved by id - call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2, 3]}}), + call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2, 3], "anonymize": False}}), + call().raise_for_status(), + call("/api/v1/challenges/1", json={"next_id": None}), call().raise_for_status(), ] ) @@ -796,7 +800,7 @@ def mock_get(*args, **kwargs): [ call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), - call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2]}}), + call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2], "anonymize": False}}), call().raise_for_status(), ], any_order=True, @@ -810,7 +814,7 @@ def mock_get(*args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), ] ) mock_api.post.assert_not_called() @@ -840,7 +844,7 @@ def test_defaults_to_standard_challenge_type(self, mock_api_constructor: MagicMo mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -880,7 +884,7 @@ def test_defaults_to_visible_state(self, mock_api_constructor: MagicMock, *args, mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -892,6 +896,8 @@ def test_defaults_to_visible_state(self, mock_api_constructor: MagicMock, *args, [ call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), + call("/api/v1/challenges/1", json={"next_id": None}), + call().raise_for_status(), # this tests the real assigned state call("/api/v1/challenges/1", json={"state": "visible"}), call().raise_for_status(), @@ -988,7 +994,7 @@ def test_updates_multiple_attributes_at_once(self, mock_api_constructor: MagicMo mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -1002,7 +1008,9 @@ def test_updates_multiple_attributes_at_once(self, mock_api_constructor: MagicMo [ call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), - call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2]}}), + call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2], "anonymize": False}}), + call().raise_for_status(), + call("/api/v1/challenges/1", json={"next_id": None}), call().raise_for_status(), call("/api/v1/challenges/1", json={"state": "visible"}), call().raise_for_status(), @@ -1122,6 +1130,8 @@ def test_does_not_update_ignored_attributes(self): [ call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), + call("/api/v1/challenges/1", json={"next_id": None}), + call().raise_for_status(), call("/api/v1/challenges/1", json={"state": "visible"}), call().raise_for_status(), ] @@ -1234,7 +1244,9 @@ def mock_post(*args, **kwargs): mock_api.patch.assert_has_calls( [ - call("/api/v1/challenges/3", json={"requirements": {"prerequisites": [1, 2]}}), + call("/api/v1/challenges/3", json={"requirements": {"prerequisites": [1, 2], "anonymize": False}}), + call().raise_for_status(), + call("/api/v1/challenges/3", json={"next_id": None}), call().raise_for_status(), call("/api/v1/challenges/3", json={"state": "visible"}), call().raise_for_status(), @@ -1529,7 +1541,7 @@ def mock_get(self, *args, **kwargs): mock_response.json.return_value = {"success": True, "data": self.installed_challenges} return mock_response - if path == "/api/v1/challenges/3": + if path == "/api/v1/challenges/3" or path == "/api/v1/challenges/3?view=admin": mock_response = MagicMock() mock_response.json.return_value = { "success": True, @@ -1733,7 +1745,8 @@ def test_normalize_fetches_and_normalizes_challenge(self, mock_api_constructor: "tags": ["tag-1", "tag-2"], "hints": ["free hint", {"content": "paid hint", "cost": 100}], "topics": ["topic-1", "topic-2"], - "requirements": ["First Test Challenge", "Other Test Challenge"], + "next": None, + "requirements": {"prerequisites": ["First Test Challenge", "Other Test Challenge"], "anonymize": False}, "extra": { "initial": 100, "decay": 10, @@ -1795,7 +1808,7 @@ def test_mirror_challenge(self, mock_api_constructor: MagicMock): # and ctfcli will update them to use the name expected_challenge = Challenge( self.full_challenge, - {"requirements": ["First Test Challenge", "Other Test Challenge"]}, + {"requirements": {"prerequisites": ["First Test Challenge", "Other Test Challenge"], "anonymize": False}}, ) # pop keys with default values as they should not be in the loaded data