Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added IP2Location.io Module #689

Merged
merged 11 commits into from
Sep 12, 2023
63 changes: 63 additions & 0 deletions bbot/modules/ip2location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from .shodan_dns import shodan_dns


class IP2Location(shodan_dns):
"""
IP2Location.io Geolocation API.
"""

watched_events = ["IP_ADDRESS"]
produced_events = ["GEOLOCATION"]
flags = ["passive", "safe"]
meta = {"description": "Query IP2location.io's API for geolocation information. ", "auth_required": True}
options = {"api_key": "", "lang": ""}
options_desc = {
"api_key": "IP2location.io API Key",
"lang": "Translation information(ISO639-1). The translation is only applicable for continent, country, region and city name.",
}
scope_distance_modifier = 1
_priority = 2
suppress_dupes = False

base_url = "http://api.ip2location.io"

async def filter_event(self, event):
return True

async def setup(self):
self.lang = self.config.get("lang", "")
return await super().setup()

async def ping(self):
url = self.build_url("8.8.8.8")
r = await self.request_with_fail_count(url)
resp_content = getattr(r, "text", "")
assert getattr(r, "status_code", 0) == 200, resp_content

def build_url(self, data):
url = f"{self.base_url}/?key={self.api_key}&ip={data}&format=json&source=bbot"
if self.lang:
url = f"{url}&lang={self.lang}"
return url

async def handle_event(self, event):
try:
url = self.build_url(event.data)
result = await self.request_with_fail_count(url)
if result:
geo_data = result.json()
if not geo_data:
self.verbose(f"No JSON response from {url}")
else:
self.verbose(f"No response from {url}")
except Exception:
self.verbose(f"Error retrieving results for {event.data}", trace=True)
return

geo_data = {k: v for k, v in geo_data.items() if v is not None}
if geo_data:
self.emit_event(geo_data, "GEOLOCATION", event)
elif "error" in geo_data:
error_msg = geo_data.get("error").get("error_message", "")
if error_msg:
self.warning(error_msg)
22 changes: 6 additions & 16 deletions bbot/modules/ipstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Ipstack(shodan_dns):
_priority = 2
suppress_dupes = False

base_url = "http://api.ipstack.com/"
base_url = "http://api.ipstack.com"

async def filter_event(self, event):
return True
Expand All @@ -33,28 +33,18 @@ async def handle_event(self, event):
url = f"{self.base_url}/{event.data}?access_key={self.api_key}"
result = await self.request_with_fail_count(url)
if result:
j = result.json()
if not j:
geo_data = result.json()
if not geo_data:
self.verbose(f"No JSON response from {url}")
else:
self.verbose(f"No response from {url}")
except Exception:
self.verbose(f"Error retrieving results for {event.data}", trace=True)
return
geo_data = {
"ip": j.get("ip"),
"country": j.get("country_name"),
"city": j.get("city"),
"zip_code": j.get("zip"),
"region": j.get("region_name"),
"latitude": j.get("latitude"),
"longitude": j.get("longitude"),
}
geo_data = {k: v for k, v in geo_data.items() if v is not None}
if geo_data:
event_data = ", ".join(f"{k.capitalize()}: {v}" for k, v in geo_data.items())
self.emit_event(event_data, "GEOLOCATION", event)
elif "error" in j:
error_msg = j.get("error").get("info", "")
self.emit_event(geo_data, "GEOLOCATION", event)
elif "error" in geo_data:
error_msg = geo_data.get("error").get("info", "")
if error_msg:
self.warning(error_msg)
31 changes: 31 additions & 0 deletions bbot/test/test_step_2/module_tests/test_module_ip2location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from .base import ModuleTestBase


class TestIP2Location(ModuleTestBase):
targets = ["8.8.8.8"]
config_overrides = {"modules": {"ip2location": {"api_key": "asdf"}}}

async def setup_before_prep(self, module_test):
module_test.httpx_mock.add_response(
url="http://api.ip2location.io/?key=asdf&ip=8.8.8.8&format=json&source=bbot",
json={
"ip": "8.8.8.8",
"country_code": "US",
"country_name": "United States of America",
"region_name": "California",
"city_name": "Mountain View",
"latitude": 37.405992,
"longitude": -122.078515,
"zip_code": "94043",
"time_zone": "-07:00",
"asn": "15169",
"as": "Google LLC",
"is_proxy": False,
},
)

def check(self, module_test, events):
assert any(
e.type == "GEOLOCATION" and e.data["ip"] == "8.8.8.8" and e.data["city_name"] == "Mountain View"
for e in events
), "Failed to geolocate IP"
9 changes: 3 additions & 6 deletions bbot/test/test_step_2/module_tests/test_module_ipstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class TestIPStack(ModuleTestBase):

async def setup_before_prep(self, module_test):
module_test.httpx_mock.add_response(
url="http://api.ipstack.com//check?access_key=asdf",
url="http://api.ipstack.com/check?access_key=asdf",
json={
"ip": "1.2.3.4",
"type": "ipv4",
Expand All @@ -34,7 +34,7 @@ async def setup_before_prep(self, module_test):
},
)
module_test.httpx_mock.add_response(
url="http://api.ipstack.com//8.8.8.8?access_key=asdf",
url="http://api.ipstack.com/8.8.8.8?access_key=asdf",
json={
"ip": "8.8.8.8",
"type": "ipv4",
Expand Down Expand Up @@ -63,8 +63,5 @@ async def setup_before_prep(self, module_test):

def check(self, module_test, events):
assert any(
e.type == "GEOLOCATION"
and e.data
== "Ip: 8.8.8.8, Country: United States, City: Glenmont, Zip_code: 44628, Region: Ohio, Latitude: 40.5369987487793, Longitude: -82.12859344482422"
for e in events
e.type == "GEOLOCATION" and e.data["ip"] == "8.8.8.8" and e.data["city"] == "Glenmont" for e in events
), "Failed to geolocate IP"