Skip to content

Commit

Permalink
implemented socketio for console log in apps
Browse files Browse the repository at this point in the history
  • Loading branch information
adpham95 committed Oct 28, 2019
1 parent a30f511 commit 11286e1
Show file tree
Hide file tree
Showing 27 changed files with 140 additions and 141 deletions.
2 changes: 2 additions & 0 deletions api/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ motor
aiofiles
python-multipart
pyjwt
python-socketio
websocket-client
minio
aiodocker == 0.14.0
docker
Expand Down
1 change: 0 additions & 1 deletion api/server/endpoints/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ async def push_to_action_stream_queue(node_statuses: NodeStatus, event):
# TODO SIO: this will become sio.on('workflow_status_update', namespaces=['/results'])
# TODO: maybe make an internal user for the worker/umpire?
# @router.put("/workflow_status/{execution_id}")
@sio
async def update_workflow_status(body: JSONPatch, event: str, execution_id: str,
workflow_col: AsyncIOMotorCollection = Depends((get_mongo_c)), close: str = None):
old_workflow = workflow_status_getter(execution_id, workflow_col)
Expand Down
7 changes: 2 additions & 5 deletions app_sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ class HelloWorld(AppBase):
"""
__version__ = "1.0.0"
def __init__(self, redis, logger, console_logger=None):
def __init__(self, redis, logger):
"""
Each app should make a call to super().__init__ to set up Redis and logging.
"""
super().__init__(redis, logger, console_logger)
super().__init__(redis, logger)
# Define all desired functions as asyncio couroutines using the "async" keyword
async def hello_world(self):
Expand All @@ -47,9 +47,6 @@ class HelloWorld(AppBase):
# This logs to the docker or local log
self.logger.info(message)
# This sends a log message to the frontend
await self.console_logger.info(message)
# Returns the message as an action result to the frontend
return message
Expand Down
3 changes: 3 additions & 0 deletions app_sdk/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ asteval
cryptography
six
tenacity
python-socketio
requests
websocket-client
4 changes: 3 additions & 1 deletion app_sdk/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@
author_email='',
license='',
packages=find_packages(),
install_requires=["aiohttp", "aioredis", "pyyaml"])
install_requires=["aiohttp", "pyyaml", "asteval", "cryptography", "six",
"tenacity", "python-socketio", "requests", "websocket-client"]
)
46 changes: 24 additions & 22 deletions app_sdk/walkoff_app_sdk/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@ def __init__(self, sio=None):
super().__init__()
self.sio = sio
self.execution_id = None
self.workflow_id = None

def set_execution_id(self, channel):
self.execution_id = channel

async def flush(self):
def flush(self):
pass

@retry(stop=stop_after_attempt(5), wait=wait_exponential(min=1, max=10))
async def write(self, message):
def write(self, message):

data = {
"workflow_id": self.workflow_id,
"execution_id": self.execution_id,
"message": message
}
await self.sio.emit("console update", data, "console")
self.sio.emit("log", data, "/console")

async def close(self):
def close(self):
pass


Expand All @@ -47,7 +46,7 @@ class AppBase:
__version__ = None
app_name = None

def __init__(self, redis=None, logger=None, console_logger=None):#, docker_client=None):
def __init__(self, redis=None, logger=None):
if self.app_name is None or self.__version__ is None:
logger.error(("App name or version not set. Please ensure self.app_name is set to match the "
"docker-compose service name and self.__version__ is set to match the api.yaml."))
Expand All @@ -62,8 +61,8 @@ def __init__(self, redis=None, logger=None, console_logger=None):#, docker_clien
# Creates redis keys of format "{AppName}:{Version}:{Priority}"
self.redis: aioredis.Redis = redis
self.logger = logger if logger is not None else logging.getLogger("AppBaseLogger")
self.console_logger = console_logger if console_logger is not None else logging.getLogger("ConsoleBaseLogger")
self.current_execution_id = None
self.current_workflow_id = None

async def get_actions(self):
""" Continuously monitors the action queue and asynchronously executes actions """
Expand Down Expand Up @@ -108,9 +107,13 @@ async def get_actions(self):

async def execute_action(self, action: Action):
""" Execute an action, and push its result to Redis. """
# TODO: Is there a better way to do this?
self.logger.handlers[0].stream.execution_id = action.execution_id
self.logger.handlers[0].stream.workflow_id = action.workflow_id

self.logger.debug(f"Attempting execution of: {action.label}-{action.execution_id}")
self.console_logger.handlers[0].stream.set_execution_id(action.execution_id)
self.current_execution_id = action.execution_id
self.current_workflow_id = action.workflow_id

results_stream = f"{action.execution_id}:results"

Expand Down Expand Up @@ -163,18 +166,17 @@ async def execute_action(self, action: Action):
@classmethod
async def run(cls):
""" Connect to Redis and HTTP session, await actions """
async with connect_to_aioredis_pool(config.REDIS_URI) as redis, \
connect_to_socketio(config.SOCKETIO_URI, "console") as sio:
# TODO: Migrate to the common log config
logging.basicConfig(format="{asctime} - {name} - {levelname}:{message}", style='{')
logger = logging.getLogger(f"{cls.__name__}")
logger.setLevel(logging.DEBUG)
async with connect_to_aioredis_pool(config.REDIS_URI) as redis:
with connect_to_socketio(config.SOCKETIO_URI, ["/console"]) as sio:
# TODO: Migrate to the common log config
logging.basicConfig(format="{asctime} - {name} - {levelname}:{message}", style='{')
logger = logging.getLogger(f"{cls.__name__}")
logger.setLevel(logging.DEBUG)

console_logger = AsyncLogger(f"{cls.__name__}", level=logging.DEBUG)
handler = AsyncHandler(stream=SIOStream(sio))
handler.setFormatter(logging.Formatter(fmt="{asctime} - {name} - {levelname}:{message}", style='{'))
console_logger.addHandler(handler)
handler = logging.StreamHandler(stream=SIOStream(sio))
handler.setFormatter(logging.Formatter(fmt="{asctime} - {name} - {levelname}:{message}", style='{'))
logger.addHandler(handler)

app = cls(redis=redis, logger=logger, console_logger=console_logger)
app = cls(redis=redis, logger=logger)

await app.get_actions()
await app.get_actions()
24 changes: 12 additions & 12 deletions apps/adversary_hunting/1.0.0/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class AdversaryHunting(AppBase):
__version__ = "1.0.0"
app_name = "adversary_hunting"

def __init__(self, redis, logger, console_logger=None):
super().__init__(redis, logger, console_logger)
def __init__(self, redis, logger):
super().__init__(redis, logger)

async def set_timestamp(self):
timestamp = '{:%Y-%m-%d_%H-%M-%S}'.format(datetime.datetime.now())
Expand Down Expand Up @@ -284,7 +284,7 @@ async def get_services(self, hosts, username, password, transport, server_cert_v
return results

async def get_memory_kansa(self, hosts, username, password, transport, server_cert_validation,
message_encryption):
message_encryption):
"""
Execute a list of remote commands on a list of hosts.
:param hosts: List of host ips to run command on
Expand Down Expand Up @@ -316,7 +316,7 @@ async def get_memory_kansa(self, hosts, username, password, transport, server_ce
return results

async def get_dns_cache_kansa(self, hosts, username, password, transport, server_cert_validation,
message_encryption):
message_encryption):
"""
Execute a list of remote commands on a list of hosts.
:param hosts: List of host ips to run command on
Expand Down Expand Up @@ -348,7 +348,7 @@ async def get_dns_cache_kansa(self, hosts, username, password, transport, server
return results

async def get_netstat_kansa(self, hosts, username, password, transport, server_cert_validation,
message_encryption):
message_encryption):
"""
Execute a list of remote commands on a list of hosts.
:param hosts: List of host ips to run command on
Expand Down Expand Up @@ -378,9 +378,9 @@ async def get_netstat_kansa(self, hosts, username, password, transport, server_c
results[host] = {"stdout": "", "stderr": f"{e}"}

return results

async def get_arp_kansa(self, hosts, username, password, transport, server_cert_validation,
message_encryption):
message_encryption):
"""
Execute a list of remote commands on a list of hosts.
:param hosts: List of host ips to run command on
Expand Down Expand Up @@ -412,7 +412,7 @@ async def get_arp_kansa(self, hosts, username, password, transport, server_cert_
return results

async def get_proc_dump_kansa(self, hosts, username, password, transport, server_cert_validation,
message_encryption):
message_encryption):
"""
Execute a list of remote commands on a list of hosts.
:param hosts: List of host ips to run command on
Expand Down Expand Up @@ -442,9 +442,9 @@ async def get_proc_dump_kansa(self, hosts, username, password, transport, server
results[host] = {"stdout": "", "stderr": f"{e}"}

return results

async def get_procs_n_modules_kansa(self, hosts, username, password, transport, server_cert_validation,
message_encryption):
message_encryption):
"""
Execute a list of remote commands on a list of hosts.
:param hosts: List of host ips to run command on
Expand Down Expand Up @@ -474,9 +474,9 @@ async def get_procs_n_modules_kansa(self, hosts, username, password, transport,
results[host] = {"stdout": "", "stderr": f"{e}"}

return results

async def get_procs_wmi_kansa(self, hosts, username, password, transport, server_cert_validation,
message_encryption):
message_encryption):
"""
Execute a list of remote commands on a list of hosts.
:param hosts: List of host ips to run command on
Expand Down
13 changes: 3 additions & 10 deletions apps/basics/1.0.0/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ class Basics(AppBase):
__version__ = "1.0.0"
app_name = "basics" # this needs to match "name" in api.yaml

def __init__(self, redis, logger, console_logger=None):
def __init__(self, redis, logger):
"""
Each app should have this __init__ to set up Redis and logging.
:param redis:
:param logger:
:param console_logger:
"""
super().__init__(redis, logger, console_logger)
super().__init__(redis, logger)

async def hello_world(self):
"""
Expand All @@ -31,12 +30,9 @@ async def hello_world(self):
"""
message = f"Hello World from {socket.gethostname()} in workflow {self.current_execution_id}!"

# This logs to the docker logs
# This logs to both the container's stdout and to the UI console in the workflow editor
self.logger.info(message)

# This sends a log message to the frontend workflow editor
await self.console_logger.info(message)

return message

async def string_to_json(self, call):
Expand All @@ -58,18 +54,15 @@ async def random_number(self):

async def echo_array(self, data):
self.logger.info(f"Echoing array: {data}")
await self.console_logger.info(f"Echoing array: {data}")
return data

async def echo_json(self, data):
self.logger.info(f"Echoing JSON: {data}")
await self.console_logger.info(f"Echoing JSON: {data}")
return data

async def sample_report_data(self):
message = f"Alpha,Beta,Charlie\n1,2,3\n4,5,6\n1,2,3\n4,5,6\n1,2,3\n4,5,6\n1,2,3\n4,5,6\n1,2,3\n4,5,6\n1,2,3\n4,5,6\n1,2,3\n4,5,6\n1,2,3\n4,5,6"
self.logger.info(message)
await self.console_logger.info(message)
return message


Expand Down
6 changes: 3 additions & 3 deletions apps/hive/1.0.0/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class Hive(AppBase):
__version__ = "1.0.0"
app_name = "hive"

def __init__(self, redis, logger, console_logger=None):
super().__init__(redis, logger, console_logger)
def __init__(self, redis, logger):
super().__init__(redis, logger)

async def create_case(self, url, api_key, title, description="", tlp=2, severity=1, tags=None):
tags = tags if tags else []
Expand Down Expand Up @@ -110,7 +110,7 @@ async def create_case_task(self, case_id, url, api_key, data=None):
else:
raise IOError(r.text)
except Exception as e:
self.console_logger.info(f"Failed to create task with input {item} because: {e}")
self.logger.info(f"Failed to create task with input {item} because: {e}")

return results

Expand Down
4 changes: 2 additions & 2 deletions apps/ip_addr_utils/1.0.0/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ class IPAddrUtils(AppBase):
__version__ = "1.0.0"
app_name = "ip_addr_utils"

def __init__(self, redis, logger, console_logger=None):
super().__init__(redis, logger, console_logger)
def __init__(self, redis, logger):
super().__init__(redis, logger)

async def set_timestamp(self):
timestamp = '{:%Y-%m-%d_%H-%M-%S}'.format(datetime.datetime.now())
Expand Down
4 changes: 2 additions & 2 deletions apps/mitre_attack/1.0.0/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class MitreAttack(AppBase):
__version__ = "1.0.0"
app_name = "mitre_attack"

def __init__(self, redis, logger, console_logger=None):
super().__init__(redis, logger, console_logger)
def __init__(self, redis, logger):
super().__init__(redis, logger)

async def set_timestamp(self):
timestamp = '{:%Y-%m-%d_%H-%M-%S}'.format(datetime.datetime.now())
Expand Down
11 changes: 6 additions & 5 deletions apps/nmap/1.0.0/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ RUN pip install --no-warn-script-location --prefix="/install" -r /requirements.t
# Stage - Copy pip packages and source files
FROM base

COPY --from=127.0.0.1:5000/walkoff_app_sdk /usr/local /usr/local
COPY --from=127.0.0.1:5000/walkoff_app_sdk /app /app
COPY --from=builder /install /usr/local
COPY src /app

# Install any OS packages dependencies needed in the final image for our app to run
# Removing package cache reduces final image size since we won't apt-get install anything else in this stage

RUN apt-get update \
&& apt-get install -y --no-install-recommends nmap \
&& rm -rf /var/lib/apt/lists/*

COPY --from=127.0.0.1:5000/walkoff_app_sdk /usr/local /usr/local
COPY --from=127.0.0.1:5000/walkoff_app_sdk /app /app
COPY --from=builder /install /usr/local
COPY src /app


# Finally, set the working directory and how our app will run.
WORKDIR /app
CMD python app.py --log-level DEBUG
4 changes: 2 additions & 2 deletions apps/nmap/1.0.0/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class Nmap(AppBase):
__version__ = "1.0.0"
app_name = "nmap"

def __init__(self, redis, logger, console_logger=None):
super().__init__(redis, logger, console_logger)
def __init__(self, redis, logger):
super().__init__(redis, logger)

async def run_scan(self, targets, options):
results = []
Expand Down
4 changes: 2 additions & 2 deletions apps/power_shell/1.0.0/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class PowerShell(AppBase):
__version__ = "1.0.0"
app_name = "power_shell"

def __init__(self, redis, logger, console_logger=None):
super().__init__(redis, logger, console_logger)
def __init__(self, redis, logger):
super().__init__(redis, logger)

async def set_timestamp(self):
timestamp = '{:%Y-%m-%d_%H-%M-%S}'.format(datetime.datetime.now())
Expand Down
4 changes: 2 additions & 2 deletions apps/ssh/1.0.0/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class SSH(AppBase):
__version__ = "1.0.0"
app_name = "ssh"

def __init__(self, redis, logger, console_logger=None):
super().__init__(redis, logger, console_logger)
def __init__(self, redis, logger):
super().__init__(redis, logger)

async def exec_local_command(self, command):
proc = await asyncio.create_subprocess_shell(command, stdout=asyncio.subprocess.PIPE,
Expand Down
Loading

0 comments on commit 11286e1

Please sign in to comment.