diff --git a/bbot/modules/ip2location.py b/bbot/modules/ip2location.py new file mode 100644 index 0000000000..823ede94da --- /dev/null +++ b/bbot/modules/ip2location.py @@ -0,0 +1,63 @@ +from .shodan_dns import shodan_dns + + +class IP2Location(shodan_dns): + """ + IP2Location.io Geolocation API. + """ + + watched_events = ["IP_ADDRESS"] + produced_events = ["GEOLOCATION"] + flags = ["passive", "safe"] + meta = {"description": "Query IP2location.io's API for geolocation information. ", "auth_required": True} + options = {"api_key": "", "lang": ""} + options_desc = { + "api_key": "IP2location.io API Key", + "lang": "Translation information(ISO639-1). The translation is only applicable for continent, country, region and city name.", + } + scope_distance_modifier = 1 + _priority = 2 + suppress_dupes = False + + base_url = "http://api.ip2location.io" + + async def filter_event(self, event): + return True + + async def setup(self): + self.lang = self.config.get("lang", "") + return await super().setup() + + async def ping(self): + url = self.build_url("8.8.8.8") + r = await self.request_with_fail_count(url) + resp_content = getattr(r, "text", "") + assert getattr(r, "status_code", 0) == 200, resp_content + + def build_url(self, data): + url = f"{self.base_url}/?key={self.api_key}&ip={data}&format=json&source=bbot" + if self.lang: + url = f"{url}&lang={self.lang}" + return url + + async def handle_event(self, event): + try: + url = self.build_url(event.data) + result = await self.request_with_fail_count(url) + if result: + geo_data = result.json() + if not geo_data: + self.verbose(f"No JSON response from {url}") + else: + self.verbose(f"No response from {url}") + except Exception: + self.verbose(f"Error retrieving results for {event.data}", trace=True) + return + + geo_data = {k: v for k, v in geo_data.items() if v is not None} + if geo_data: + self.emit_event(geo_data, "GEOLOCATION", event) + elif "error" in geo_data: + error_msg = geo_data.get("error").get("error_message", "") + if error_msg: + self.warning(error_msg) diff --git a/bbot/modules/ipstack.py b/bbot/modules/ipstack.py index 798065d635..22dce58be5 100644 --- a/bbot/modules/ipstack.py +++ b/bbot/modules/ipstack.py @@ -17,7 +17,7 @@ class Ipstack(shodan_dns): _priority = 2 suppress_dupes = False - base_url = "http://api.ipstack.com/" + base_url = "http://api.ipstack.com" async def filter_event(self, event): return True @@ -33,28 +33,18 @@ async def handle_event(self, event): url = f"{self.base_url}/{event.data}?access_key={self.api_key}" result = await self.request_with_fail_count(url) if result: - j = result.json() - if not j: + geo_data = result.json() + if not geo_data: self.verbose(f"No JSON response from {url}") else: self.verbose(f"No response from {url}") except Exception: self.verbose(f"Error retrieving results for {event.data}", trace=True) return - geo_data = { - "ip": j.get("ip"), - "country": j.get("country_name"), - "city": j.get("city"), - "zip_code": j.get("zip"), - "region": j.get("region_name"), - "latitude": j.get("latitude"), - "longitude": j.get("longitude"), - } geo_data = {k: v for k, v in geo_data.items() if v is not None} if geo_data: - event_data = ", ".join(f"{k.capitalize()}: {v}" for k, v in geo_data.items()) - self.emit_event(event_data, "GEOLOCATION", event) - elif "error" in j: - error_msg = j.get("error").get("info", "") + self.emit_event(geo_data, "GEOLOCATION", event) + elif "error" in geo_data: + error_msg = geo_data.get("error").get("info", "") if error_msg: self.warning(error_msg) diff --git a/bbot/test/test_step_2/module_tests/test_module_ip2location.py b/bbot/test/test_step_2/module_tests/test_module_ip2location.py new file mode 100644 index 0000000000..2a63607207 --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_ip2location.py @@ -0,0 +1,31 @@ +from .base import ModuleTestBase + + +class TestIP2Location(ModuleTestBase): + targets = ["8.8.8.8"] + config_overrides = {"modules": {"ip2location": {"api_key": "asdf"}}} + + async def setup_before_prep(self, module_test): + module_test.httpx_mock.add_response( + url="http://api.ip2location.io/?key=asdf&ip=8.8.8.8&format=json&source=bbot", + json={ + "ip": "8.8.8.8", + "country_code": "US", + "country_name": "United States of America", + "region_name": "California", + "city_name": "Mountain View", + "latitude": 37.405992, + "longitude": -122.078515, + "zip_code": "94043", + "time_zone": "-07:00", + "asn": "15169", + "as": "Google LLC", + "is_proxy": False, + }, + ) + + def check(self, module_test, events): + assert any( + e.type == "GEOLOCATION" and e.data["ip"] == "8.8.8.8" and e.data["city_name"] == "Mountain View" + for e in events + ), "Failed to geolocate IP" diff --git a/bbot/test/test_step_2/module_tests/test_module_ipstack.py b/bbot/test/test_step_2/module_tests/test_module_ipstack.py index 0e848ac6fb..dea0b28657 100644 --- a/bbot/test/test_step_2/module_tests/test_module_ipstack.py +++ b/bbot/test/test_step_2/module_tests/test_module_ipstack.py @@ -7,7 +7,7 @@ class TestIPStack(ModuleTestBase): async def setup_before_prep(self, module_test): module_test.httpx_mock.add_response( - url="http://api.ipstack.com//check?access_key=asdf", + url="http://api.ipstack.com/check?access_key=asdf", json={ "ip": "1.2.3.4", "type": "ipv4", @@ -34,7 +34,7 @@ async def setup_before_prep(self, module_test): }, ) module_test.httpx_mock.add_response( - url="http://api.ipstack.com//8.8.8.8?access_key=asdf", + url="http://api.ipstack.com/8.8.8.8?access_key=asdf", json={ "ip": "8.8.8.8", "type": "ipv4", @@ -63,8 +63,5 @@ async def setup_before_prep(self, module_test): def check(self, module_test, events): assert any( - e.type == "GEOLOCATION" - and e.data - == "Ip: 8.8.8.8, Country: United States, City: Glenmont, Zip_code: 44628, Region: Ohio, Latitude: 40.5369987487793, Longitude: -82.12859344482422" - for e in events + e.type == "GEOLOCATION" and e.data["ip"] == "8.8.8.8" and e.data["city"] == "Glenmont" for e in events ), "Failed to geolocate IP"