Skip to content

Commit

Permalink
wrote module tests
Browse files Browse the repository at this point in the history
  • Loading branch information
TheTechromancer committed Sep 12, 2023
1 parent d224aa2 commit 2e2acbe
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 28 deletions.
21 changes: 15 additions & 6 deletions bbot/modules/ip2location.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class IP2Location(shodan_dns):
_priority = 2
suppress_dupes = False

base_url = "http://api.ip2location.io/"
base_url = "http://api.ip2location.io"

async def filter_event(self, event):
return True
Expand All @@ -28,11 +28,21 @@ async def setup(self):
self.lang = self.config.get("lang", "")
return await super().setup()

async def ping(self):
url = self.build_url("8.8.8.8")
r = await self.request_with_fail_count(url)
resp_content = getattr(r, "text", "")
assert getattr(r, "status_code", 0) == 200, resp_content

def build_url(self, data):
url = f"{self.base_url}/?key={self.api_key}&ip={data}&format=json&source=bbot"
if self.lang:
url = f"{url}&lang={self.lang}"
return url

async def handle_event(self, event):
try:
url = f"{self.base_url}/?key={self.api_key}&ip={event.data}&format=json&source=bbot"
if self.lang:
url = f"{url}&lang={self.lang}"
url = self.build_url(event.data)
result = await self.request_with_fail_count(url)
if result:
geo_data = result.json()
Expand All @@ -46,8 +56,7 @@ async def handle_event(self, event):

geo_data = {k: v for k, v in geo_data.items() if v is not None}
if geo_data:
event_data = ", ".join(f"{k}: {v}" for k, v in geo_data.items())
self.emit_event(event_data, "GEOLOCATION", event)
self.emit_event(geo_data, "GEOLOCATION", event)
elif "error" in geo_data:
error_msg = geo_data.get("error").get("error_message", "")
if error_msg:
Expand Down
22 changes: 6 additions & 16 deletions bbot/modules/ipstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Ipstack(shodan_dns):
_priority = 2
suppress_dupes = False

base_url = "http://api.ipstack.com/"
base_url = "http://api.ipstack.com"

async def filter_event(self, event):
return True
Expand All @@ -33,28 +33,18 @@ async def handle_event(self, event):
url = f"{self.base_url}/{event.data}?access_key={self.api_key}"
result = await self.request_with_fail_count(url)
if result:
j = result.json()
if not j:
geo_data = result.json()
if not geo_data:
self.verbose(f"No JSON response from {url}")
else:
self.verbose(f"No response from {url}")
except Exception:
self.verbose(f"Error retrieving results for {event.data}", trace=True)
return
geo_data = {
"ip": j.get("ip"),
"country": j.get("country_name"),
"city": j.get("city"),
"zip_code": j.get("zip"),
"region": j.get("region_name"),
"latitude": j.get("latitude"),
"longitude": j.get("longitude"),
}
geo_data = {k: v for k, v in geo_data.items() if v is not None}
if geo_data:
event_data = ", ".join(f"{k.capitalize()}: {v}" for k, v in geo_data.items())
self.emit_event(event_data, "GEOLOCATION", event)
elif "error" in j:
error_msg = j.get("error").get("info", "")
self.emit_event(geo_data, "GEOLOCATION", event)
elif "error" in geo_data:
error_msg = geo_data.get("error").get("info", "")
if error_msg:
self.warning(error_msg)
9 changes: 3 additions & 6 deletions bbot/test/test_step_2/module_tests/test_module_ipstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class TestIPStack(ModuleTestBase):

async def setup_before_prep(self, module_test):
module_test.httpx_mock.add_response(
url="http://api.ipstack.com//check?access_key=asdf",
url="http://api.ipstack.com/check?access_key=asdf",
json={
"ip": "1.2.3.4",
"type": "ipv4",
Expand All @@ -34,7 +34,7 @@ async def setup_before_prep(self, module_test):
},
)
module_test.httpx_mock.add_response(
url="http://api.ipstack.com//8.8.8.8?access_key=asdf",
url="http://api.ipstack.com/8.8.8.8?access_key=asdf",
json={
"ip": "8.8.8.8",
"type": "ipv4",
Expand Down Expand Up @@ -63,8 +63,5 @@ async def setup_before_prep(self, module_test):

def check(self, module_test, events):
assert any(
e.type == "GEOLOCATION"
and e.data
== "Ip: 8.8.8.8, Country: United States, City: Glenmont, Zip_code: 44628, Region: Ohio, Latitude: 40.5369987487793, Longitude: -82.12859344482422"
for e in events
e.type == "GEOLOCATION" and e.data["ip"] == "8.8.8.8" and e.data["city"] == "Glenmont" for e in events
), "Failed to geolocate IP"

0 comments on commit 2e2acbe

Please sign in to comment.