From 2e2acbe8639ae3138add109c79dd6b7728f4e37c Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Tue, 12 Sep 2023 11:02:32 -0400 Subject: [PATCH] wrote module tests --- bbot/modules/ip2location.py | 21 +++++++++++++----- bbot/modules/ipstack.py | 22 +++++-------------- .../module_tests/test_module_ipstack.py | 9 +++----- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/bbot/modules/ip2location.py b/bbot/modules/ip2location.py index 851bf1e04..823ede94d 100644 --- a/bbot/modules/ip2location.py +++ b/bbot/modules/ip2location.py @@ -19,7 +19,7 @@ class IP2Location(shodan_dns): _priority = 2 suppress_dupes = False - base_url = "http://api.ip2location.io/" + base_url = "http://api.ip2location.io" async def filter_event(self, event): return True @@ -28,11 +28,21 @@ async def setup(self): self.lang = self.config.get("lang", "") return await super().setup() + async def ping(self): + url = self.build_url("8.8.8.8") + r = await self.request_with_fail_count(url) + resp_content = getattr(r, "text", "") + assert getattr(r, "status_code", 0) == 200, resp_content + + def build_url(self, data): + url = f"{self.base_url}/?key={self.api_key}&ip={data}&format=json&source=bbot" + if self.lang: + url = f"{url}&lang={self.lang}" + return url + async def handle_event(self, event): try: - url = f"{self.base_url}/?key={self.api_key}&ip={event.data}&format=json&source=bbot" - if self.lang: - url = f"{url}&lang={self.lang}" + url = self.build_url(event.data) result = await self.request_with_fail_count(url) if result: geo_data = result.json() @@ -46,8 +56,7 @@ async def handle_event(self, event): geo_data = {k: v for k, v in geo_data.items() if v is not None} if geo_data: - event_data = ", ".join(f"{k}: {v}" for k, v in geo_data.items()) - self.emit_event(event_data, "GEOLOCATION", event) + self.emit_event(geo_data, "GEOLOCATION", event) elif "error" in geo_data: error_msg = geo_data.get("error").get("error_message", "") if error_msg: diff --git a/bbot/modules/ipstack.py b/bbot/modules/ipstack.py index 798065d63..22dce58be 100644 --- a/bbot/modules/ipstack.py +++ b/bbot/modules/ipstack.py @@ -17,7 +17,7 @@ class Ipstack(shodan_dns): _priority = 2 suppress_dupes = False - base_url = "http://api.ipstack.com/" + base_url = "http://api.ipstack.com" async def filter_event(self, event): return True @@ -33,28 +33,18 @@ async def handle_event(self, event): url = f"{self.base_url}/{event.data}?access_key={self.api_key}" result = await self.request_with_fail_count(url) if result: - j = result.json() - if not j: + geo_data = result.json() + if not geo_data: self.verbose(f"No JSON response from {url}") else: self.verbose(f"No response from {url}") except Exception: self.verbose(f"Error retrieving results for {event.data}", trace=True) return - geo_data = { - "ip": j.get("ip"), - "country": j.get("country_name"), - "city": j.get("city"), - "zip_code": j.get("zip"), - "region": j.get("region_name"), - "latitude": j.get("latitude"), - "longitude": j.get("longitude"), - } geo_data = {k: v for k, v in geo_data.items() if v is not None} if geo_data: - event_data = ", ".join(f"{k.capitalize()}: {v}" for k, v in geo_data.items()) - self.emit_event(event_data, "GEOLOCATION", event) - elif "error" in j: - error_msg = j.get("error").get("info", "") + self.emit_event(geo_data, "GEOLOCATION", event) + elif "error" in geo_data: + error_msg = geo_data.get("error").get("info", "") if error_msg: self.warning(error_msg) diff --git a/bbot/test/test_step_2/module_tests/test_module_ipstack.py b/bbot/test/test_step_2/module_tests/test_module_ipstack.py index 0e848ac6f..dea0b2865 100644 --- a/bbot/test/test_step_2/module_tests/test_module_ipstack.py +++ b/bbot/test/test_step_2/module_tests/test_module_ipstack.py @@ -7,7 +7,7 @@ class TestIPStack(ModuleTestBase): async def setup_before_prep(self, module_test): module_test.httpx_mock.add_response( - url="http://api.ipstack.com//check?access_key=asdf", + url="http://api.ipstack.com/check?access_key=asdf", json={ "ip": "1.2.3.4", "type": "ipv4", @@ -34,7 +34,7 @@ async def setup_before_prep(self, module_test): }, ) module_test.httpx_mock.add_response( - url="http://api.ipstack.com//8.8.8.8?access_key=asdf", + url="http://api.ipstack.com/8.8.8.8?access_key=asdf", json={ "ip": "8.8.8.8", "type": "ipv4", @@ -63,8 +63,5 @@ async def setup_before_prep(self, module_test): def check(self, module_test, events): assert any( - e.type == "GEOLOCATION" - and e.data - == "Ip: 8.8.8.8, Country: United States, City: Glenmont, Zip_code: 44628, Region: Ohio, Latitude: 40.5369987487793, Longitude: -82.12859344482422" - for e in events + e.type == "GEOLOCATION" and e.data["ip"] == "8.8.8.8" and e.data["city"] == "Glenmont" for e in events ), "Failed to geolocate IP"