From 5dd6da39e85093c6b82b05b18f6879c24eff61ae Mon Sep 17 00:00:00 2001 From: ip2location Date: Fri, 8 Sep 2023 12:44:12 +0800 Subject: [PATCH 1/4] Create ip2locationio.py --- bbot/modules/ip2locationio.py | 63 +++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 bbot/modules/ip2locationio.py diff --git a/bbot/modules/ip2locationio.py b/bbot/modules/ip2locationio.py new file mode 100644 index 000000000..8460fb54a --- /dev/null +++ b/bbot/modules/ip2locationio.py @@ -0,0 +1,63 @@ +from .shodan_dns import shodan_dns + + +class IP2Locationio(shodan_dns): + ''' + IP2Location.io Geolocation API. + ''' + + watched_events = ["IP_ADDRESS"] + produced_events = ["GEOLOCATION"] + flags = ["passive", "safe"] + meta = {"description": "Query IP2location.io's API for geolocation information. ", "auth_required": True} + options = {"api_key": "", "lang": ""} + options_desc = {"api_key": "IP2location.io API Key", "lang": "Translation information(ISO639-1). The translation is only applicable for continent, country, region and city name."} + scope_distance_modifier = 1 + _priority = 2 + suppress_dupes = False + + base_url = "http://api.ip2location.io/" + + async def filter_event(self, event): + return True + + async def handle_event(self, event): + try: + # url = f"{self.base_url}/?key={self.api_key}&ip={event.data}&format=json&source=bbot" + # if self.lang != "": + if self.config.get("lang") != "": + url = f"{self.base_url}/?key={self.api_key}&ip={event.data}&lang={self.config.get('lang')}&format=json&source=bbot" + else: + url = f"{self.base_url}/?key={self.api_key}&ip={event.data}&format=json&source=bbot" + result = await self.request_with_fail_count(url) + if result: + j = result.json() + if not j: + self.verbose(f"No JSON response from {url}") + else: + self.verbose(f"No response from {url}") + # except Exception: + except Exception as err: + self.verbose(f"Error retrieving results for {event.data}", trace=True) + # print(f"Unexpected {err=}, {type(err)=}") + return + ''' + geo_data = { + "ip": j.get("ip"), + "country": j.get("country_name"), + "city": j.get("city_name"), + "zip_code": j.get("zip_code"), + "region": j.get("region_name"), + "latitude": j.get("latitude"), + "longitude": j.get("longitude"), + }''' + geo_data = j + geo_data = {k: v for k, v in geo_data.items() if v is not None} + if geo_data: + # event_data = ", ".join(f"{k.capitalize()}: {v}" for k, v in geo_data.items()) + event_data = ", ".join(f"{k}: {v}" for k, v in geo_data.items()) + self.emit_event(event_data, "GEOLOCATION", event) + elif "error" in j: + error_msg = j.get("error").get("error_message", "") + if error_msg: + self.warning(error_msg) \ No newline at end of file From d224aa2a5bfe010d2dc6bd73719552dfe7534a23 Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Tue, 12 Sep 2023 10:42:28 -0400 Subject: [PATCH 2/4] blacked, code cleanup --- bbot/modules/ip2location.py | 54 ++++++++++++++++++++++++++++++ bbot/modules/ip2locationio.py | 63 ----------------------------------- 2 files changed, 54 insertions(+), 63 deletions(-) create mode 100644 bbot/modules/ip2location.py delete mode 100644 bbot/modules/ip2locationio.py diff --git a/bbot/modules/ip2location.py b/bbot/modules/ip2location.py new file mode 100644 index 000000000..851bf1e04 --- /dev/null +++ b/bbot/modules/ip2location.py @@ -0,0 +1,54 @@ +from .shodan_dns import shodan_dns + + +class IP2Location(shodan_dns): + """ + IP2Location.io Geolocation API. + """ + + watched_events = ["IP_ADDRESS"] + produced_events = ["GEOLOCATION"] + flags = ["passive", "safe"] + meta = {"description": "Query IP2location.io's API for geolocation information. ", "auth_required": True} + options = {"api_key": "", "lang": ""} + options_desc = { + "api_key": "IP2location.io API Key", + "lang": "Translation information(ISO639-1). The translation is only applicable for continent, country, region and city name.", + } + scope_distance_modifier = 1 + _priority = 2 + suppress_dupes = False + + base_url = "http://api.ip2location.io/" + + async def filter_event(self, event): + return True + + async def setup(self): + self.lang = self.config.get("lang", "") + return await super().setup() + + async def handle_event(self, event): + try: + url = f"{self.base_url}/?key={self.api_key}&ip={event.data}&format=json&source=bbot" + if self.lang: + url = f"{url}&lang={self.lang}" + result = await self.request_with_fail_count(url) + if result: + geo_data = result.json() + if not geo_data: + self.verbose(f"No JSON response from {url}") + else: + self.verbose(f"No response from {url}") + except Exception: + self.verbose(f"Error retrieving results for {event.data}", trace=True) + return + + geo_data = {k: v for k, v in geo_data.items() if v is not None} + if geo_data: + event_data = ", ".join(f"{k}: {v}" for k, v in geo_data.items()) + self.emit_event(event_data, "GEOLOCATION", event) + elif "error" in geo_data: + error_msg = geo_data.get("error").get("error_message", "") + if error_msg: + self.warning(error_msg) diff --git a/bbot/modules/ip2locationio.py b/bbot/modules/ip2locationio.py deleted file mode 100644 index 8460fb54a..000000000 --- a/bbot/modules/ip2locationio.py +++ /dev/null @@ -1,63 +0,0 @@ -from .shodan_dns import shodan_dns - - -class IP2Locationio(shodan_dns): - ''' - IP2Location.io Geolocation API. - ''' - - watched_events = ["IP_ADDRESS"] - produced_events = ["GEOLOCATION"] - flags = ["passive", "safe"] - meta = {"description": "Query IP2location.io's API for geolocation information. ", "auth_required": True} - options = {"api_key": "", "lang": ""} - options_desc = {"api_key": "IP2location.io API Key", "lang": "Translation information(ISO639-1). The translation is only applicable for continent, country, region and city name."} - scope_distance_modifier = 1 - _priority = 2 - suppress_dupes = False - - base_url = "http://api.ip2location.io/" - - async def filter_event(self, event): - return True - - async def handle_event(self, event): - try: - # url = f"{self.base_url}/?key={self.api_key}&ip={event.data}&format=json&source=bbot" - # if self.lang != "": - if self.config.get("lang") != "": - url = f"{self.base_url}/?key={self.api_key}&ip={event.data}&lang={self.config.get('lang')}&format=json&source=bbot" - else: - url = f"{self.base_url}/?key={self.api_key}&ip={event.data}&format=json&source=bbot" - result = await self.request_with_fail_count(url) - if result: - j = result.json() - if not j: - self.verbose(f"No JSON response from {url}") - else: - self.verbose(f"No response from {url}") - # except Exception: - except Exception as err: - self.verbose(f"Error retrieving results for {event.data}", trace=True) - # print(f"Unexpected {err=}, {type(err)=}") - return - ''' - geo_data = { - "ip": j.get("ip"), - "country": j.get("country_name"), - "city": j.get("city_name"), - "zip_code": j.get("zip_code"), - "region": j.get("region_name"), - "latitude": j.get("latitude"), - "longitude": j.get("longitude"), - }''' - geo_data = j - geo_data = {k: v for k, v in geo_data.items() if v is not None} - if geo_data: - # event_data = ", ".join(f"{k.capitalize()}: {v}" for k, v in geo_data.items()) - event_data = ", ".join(f"{k}: {v}" for k, v in geo_data.items()) - self.emit_event(event_data, "GEOLOCATION", event) - elif "error" in j: - error_msg = j.get("error").get("error_message", "") - if error_msg: - self.warning(error_msg) \ No newline at end of file From 2e2acbe8639ae3138add109c79dd6b7728f4e37c Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Tue, 12 Sep 2023 11:02:32 -0400 Subject: [PATCH 3/4] wrote module tests --- bbot/modules/ip2location.py | 21 +++++++++++++----- bbot/modules/ipstack.py | 22 +++++-------------- .../module_tests/test_module_ipstack.py | 9 +++----- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/bbot/modules/ip2location.py b/bbot/modules/ip2location.py index 851bf1e04..823ede94d 100644 --- a/bbot/modules/ip2location.py +++ b/bbot/modules/ip2location.py @@ -19,7 +19,7 @@ class IP2Location(shodan_dns): _priority = 2 suppress_dupes = False - base_url = "http://api.ip2location.io/" + base_url = "http://api.ip2location.io" async def filter_event(self, event): return True @@ -28,11 +28,21 @@ async def setup(self): self.lang = self.config.get("lang", "") return await super().setup() + async def ping(self): + url = self.build_url("8.8.8.8") + r = await self.request_with_fail_count(url) + resp_content = getattr(r, "text", "") + assert getattr(r, "status_code", 0) == 200, resp_content + + def build_url(self, data): + url = f"{self.base_url}/?key={self.api_key}&ip={data}&format=json&source=bbot" + if self.lang: + url = f"{url}&lang={self.lang}" + return url + async def handle_event(self, event): try: - url = f"{self.base_url}/?key={self.api_key}&ip={event.data}&format=json&source=bbot" - if self.lang: - url = f"{url}&lang={self.lang}" + url = self.build_url(event.data) result = await self.request_with_fail_count(url) if result: geo_data = result.json() @@ -46,8 +56,7 @@ async def handle_event(self, event): geo_data = {k: v for k, v in geo_data.items() if v is not None} if geo_data: - event_data = ", ".join(f"{k}: {v}" for k, v in geo_data.items()) - self.emit_event(event_data, "GEOLOCATION", event) + self.emit_event(geo_data, "GEOLOCATION", event) elif "error" in geo_data: error_msg = geo_data.get("error").get("error_message", "") if error_msg: diff --git a/bbot/modules/ipstack.py b/bbot/modules/ipstack.py index 798065d63..22dce58be 100644 --- a/bbot/modules/ipstack.py +++ b/bbot/modules/ipstack.py @@ -17,7 +17,7 @@ class Ipstack(shodan_dns): _priority = 2 suppress_dupes = False - base_url = "http://api.ipstack.com/" + base_url = "http://api.ipstack.com" async def filter_event(self, event): return True @@ -33,28 +33,18 @@ async def handle_event(self, event): url = f"{self.base_url}/{event.data}?access_key={self.api_key}" result = await self.request_with_fail_count(url) if result: - j = result.json() - if not j: + geo_data = result.json() + if not geo_data: self.verbose(f"No JSON response from {url}") else: self.verbose(f"No response from {url}") except Exception: self.verbose(f"Error retrieving results for {event.data}", trace=True) return - geo_data = { - "ip": j.get("ip"), - "country": j.get("country_name"), - "city": j.get("city"), - "zip_code": j.get("zip"), - "region": j.get("region_name"), - "latitude": j.get("latitude"), - "longitude": j.get("longitude"), - } geo_data = {k: v for k, v in geo_data.items() if v is not None} if geo_data: - event_data = ", ".join(f"{k.capitalize()}: {v}" for k, v in geo_data.items()) - self.emit_event(event_data, "GEOLOCATION", event) - elif "error" in j: - error_msg = j.get("error").get("info", "") + self.emit_event(geo_data, "GEOLOCATION", event) + elif "error" in geo_data: + error_msg = geo_data.get("error").get("info", "") if error_msg: self.warning(error_msg) diff --git a/bbot/test/test_step_2/module_tests/test_module_ipstack.py b/bbot/test/test_step_2/module_tests/test_module_ipstack.py index 0e848ac6f..dea0b2865 100644 --- a/bbot/test/test_step_2/module_tests/test_module_ipstack.py +++ b/bbot/test/test_step_2/module_tests/test_module_ipstack.py @@ -7,7 +7,7 @@ class TestIPStack(ModuleTestBase): async def setup_before_prep(self, module_test): module_test.httpx_mock.add_response( - url="http://api.ipstack.com//check?access_key=asdf", + url="http://api.ipstack.com/check?access_key=asdf", json={ "ip": "1.2.3.4", "type": "ipv4", @@ -34,7 +34,7 @@ async def setup_before_prep(self, module_test): }, ) module_test.httpx_mock.add_response( - url="http://api.ipstack.com//8.8.8.8?access_key=asdf", + url="http://api.ipstack.com/8.8.8.8?access_key=asdf", json={ "ip": "8.8.8.8", "type": "ipv4", @@ -63,8 +63,5 @@ async def setup_before_prep(self, module_test): def check(self, module_test, events): assert any( - e.type == "GEOLOCATION" - and e.data - == "Ip: 8.8.8.8, Country: United States, City: Glenmont, Zip_code: 44628, Region: Ohio, Latitude: 40.5369987487793, Longitude: -82.12859344482422" - for e in events + e.type == "GEOLOCATION" and e.data["ip"] == "8.8.8.8" and e.data["city"] == "Glenmont" for e in events ), "Failed to geolocate IP" From 786f950946003b189a4b518cdb0ad54ec55e1e2b Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Tue, 12 Sep 2023 11:08:46 -0400 Subject: [PATCH 4/4] add test file --- .../module_tests/test_module_ip2location.py | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 bbot/test/test_step_2/module_tests/test_module_ip2location.py diff --git a/bbot/test/test_step_2/module_tests/test_module_ip2location.py b/bbot/test/test_step_2/module_tests/test_module_ip2location.py new file mode 100644 index 000000000..2a6360720 --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_ip2location.py @@ -0,0 +1,31 @@ +from .base import ModuleTestBase + + +class TestIP2Location(ModuleTestBase): + targets = ["8.8.8.8"] + config_overrides = {"modules": {"ip2location": {"api_key": "asdf"}}} + + async def setup_before_prep(self, module_test): + module_test.httpx_mock.add_response( + url="http://api.ip2location.io/?key=asdf&ip=8.8.8.8&format=json&source=bbot", + json={ + "ip": "8.8.8.8", + "country_code": "US", + "country_name": "United States of America", + "region_name": "California", + "city_name": "Mountain View", + "latitude": 37.405992, + "longitude": -122.078515, + "zip_code": "94043", + "time_zone": "-07:00", + "asn": "15169", + "as": "Google LLC", + "is_proxy": False, + }, + ) + + def check(self, module_test, events): + assert any( + e.type == "GEOLOCATION" and e.data["ip"] == "8.8.8.8" and e.data["city_name"] == "Mountain View" + for e in events + ), "Failed to geolocate IP"