From 4d641226a9e7521dcbb4c769047d849711cfd8ac Mon Sep 17 00:00:00 2001 From: msm Date: Sun, 27 Oct 2024 19:51:07 +0100 Subject: [PATCH 1/2] Add index queues and indexer --- src/daemon.py | 37 +++++++++------- src/db.py | 22 ++++++++++ src/migrations/env.py | 1 + .../versions/85ed22b90d72_add_queuedfile.py | 32 ++++++++++++++ src/models/queuedfile.py | 14 +++++++ src/tasks.py | 42 +++++++++++++++++++ 6 files changed, 133 insertions(+), 15 deletions(-) create mode 100644 src/migrations/versions/85ed22b90d72_add_queuedfile.py create mode 100644 src/models/queuedfile.py diff --git a/src/daemon.py b/src/daemon.py index cc1e7ee1..cdcca11e 100644 --- a/src/daemon.py +++ b/src/daemon.py @@ -10,14 +10,14 @@ from .config import app_config -def start_worker(args: argparse.Namespace, process_index: int) -> None: +def start_worker(group_id: str, process_index: int) -> None: setup_logging() logging.info( - "Agent [%s] running (process %s)...", args.group_id, process_index + "Agent [%s] running (process %s)...", group_id, process_index ) with Connection(Redis(app_config.redis.host, app_config.redis.port)): - w = Worker([args.group_id]) + w = Worker([group_id]) w.work() @@ -34,11 +34,18 @@ def main() -> None: default="default", ) parser.add_argument( + "-s", "--scale", type=int, - help="Specifies the number of concurrent processes to use.", + help="Specifies the number of worker processes to start.", default=1, ) + parser.add_argument( + "-i", + "--with-indexer", + action="store_true", + help="Also start the index worker. Must run in the same filesystem as ursadb.", + ) args = parser.parse_args() @@ -46,17 +53,17 @@ def main() -> None: # The goal is to make the web UI aware of this worker and its configuration. tasks.make_agent(args.group_id).register() - if args.scale > 1: - children = [ - Process(target=start_worker, args=(args, i)) - for i in range(args.scale) - ] - for child in children: - child.start() - for child in children: - child.join() - else: - start_worker(args, 0) + children = [ + Process(target=start_worker, args=(args.group_id, i)) + for i in range(args.scale) + ] + if args.with_indexer: + indexer_name = args.group_id + ":indexer" + children.append(Process(target=start_worker, args=(indexer_name, 0))) + for child in children: + child.start() + for child in children: + child.join() if __name__ == "__main__": diff --git a/src/db.py b/src/db.py index 4c85d1be..806cf074 100644 --- a/src/db.py +++ b/src/db.py @@ -25,6 +25,7 @@ from .models.job import Job, JobStatus from .models.jobagent import JobAgent from .models.match import Match +from .models.queuedfile import QueuedFile from .schema import MatchesSchema, ConfigSchema from .config import app_config @@ -412,6 +413,27 @@ def set_config_key(self, plugin_name: str, key: str, value: str) -> None: session.add(entry) session.commit() + def get_from_index_queue(self, ursadb_id: str, limit: int) -> List[str]: + """Get next `limit` files from the specified ursadb. + This function does not mark files as `in progress` in any way, so + subsequent executions may return the same set of files until + remove_from_index_queue is called.""" + with self.session() as session: + statement = select(QueuedFile).where( + QueuedFile.ursadb_id == ursadb_id, + ).limit(limit) + files = session.exec(statement).all() + return [f.path for f in files] + + def remove_from_index_queue(self, paths: List[str]) -> None: + """Delete given paths from the index queue of the specified ursadb. + For performance reasons avoid huge lists of paths, since they + must fit into a single SQL query.""" + with self.session() as session: + delete_query = delete(QueuedFile).where(QueuedFile.path in paths) + session.execute(delete_query) + session.commit() + def alembic_upgrade(self) -> None: config_file = Path(__file__).parent / "alembic.ini" alembic_cfg = Config(str(config_file)) diff --git a/src/migrations/env.py b/src/migrations/env.py index 68b19a69..9ae1abfe 100644 --- a/src/migrations/env.py +++ b/src/migrations/env.py @@ -8,6 +8,7 @@ from mquery.models.configentry import ConfigEntry # type: ignore # noqa from mquery.models.job import Job # type: ignore # noqa from mquery.models.match import Match # type: ignore # noqa +from mquery.models.queuedfile import QueuedFile # type: ignore # noqa target_metadata = SQLModel.metadata diff --git a/src/migrations/versions/85ed22b90d72_add_queuedfile.py b/src/migrations/versions/85ed22b90d72_add_queuedfile.py new file mode 100644 index 00000000..8ebc90b2 --- /dev/null +++ b/src/migrations/versions/85ed22b90d72_add_queuedfile.py @@ -0,0 +1,32 @@ +"""add queuedfile +Revision ID: 85ed22b90d72 +Revises: 6b495d5a4855 +Create Date: 2024-10-27 18:35:05.991934 +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel + + +# revision identifiers, used by Alembic. +revision = '85ed22b90d72' +down_revision = '6b495d5a4855' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('queuedfile', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('ursadb_id', sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.Column('path', sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('queuedfile') + # ### end Alembic commands ### diff --git a/src/models/queuedfile.py b/src/models/queuedfile.py new file mode 100644 index 00000000..d89a16c0 --- /dev/null +++ b/src/models/queuedfile.py @@ -0,0 +1,14 @@ +from sqlmodel import SQLModel, Field +from typing import Union + + +class QueuedFile(SQLModel, table=True): + """Represents a file queued to be indexed.""" + + id: Union[int, None] = Field(default=None, primary_key=True) + + # ID of the ursadb ("agent group") this file belongs to. + ursadb_id: str + + # A file path on one of the daemons + path: str diff --git a/src/tasks.py b/src/tasks.py index 4a0a88bf..dcd397bf 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -166,6 +166,12 @@ def make_agent(group_override: Optional[str] = None): group_id = group_override else: group_id = get_current_job().origin # type: ignore + if ':' in group_id: + # Slight hack: `:` delimits worker type, so "default" is a queue + # for the default group, but same goes for "default:indexer" + group_id = group_id[:group_id.rfind(":")] + if ':' in group_id: + raise RuntimeError("Group ID can't contain a special character ':'") return Agent(group_id) @@ -290,3 +296,39 @@ def run_yara_batch(job_id: JobId, iterator: str, batch_size: int) -> None: agent.execute_yara(job, pop_result.files) agent.add_tasks_in_progress(job, -1) + + +def index_queue(batch_size: int) -> None: + agent = make_agent() + # while True: - later + for i in range(1): + files = agent.db.get_from_index_queue(agent.group_id, batch_size) + if not files: + logging.info("no files to index %s", agent.group_id) + continue + + to_index = [] + for orig_name in files: + try: + path = agent.plugins.filter(orig_name) + if path: + to_index.append(path) + except Exception: + logging.exception("Unknown error (plugin?): %s", orig_name) + + logging.exception("Indexing %s files", len(to_index)) + + # todo refactor this + mounted_names = [] + for fname in to_index: + fname = fname.replace('"', '\\"') + mounted_names.append(fname) + mounted_list = " ".join(f'"{fpath}"' for fpath in mounted_names) + + #agent.ursa.execute_command( + logging.info( + f"index {mounted_list} with [gram3, text4, wide8, hash4];" + ) + + agent.db.remove_from_index_queue(files) + agent.plugins.cleanup() From 9acf489bf2d9e5a5de8809b6d49b8cef0e9836e7 Mon Sep 17 00:00:00 2001 From: msm Date: Wed, 30 Oct 2024 22:48:49 +0100 Subject: [PATCH 2/2] WIP on indexing --- src/plugins/requirements-s3.txt | 1 + src/tasks.py | 71 ++++++++++++++++++--------------- 2 files changed, 40 insertions(+), 32 deletions(-) create mode 100644 src/plugins/requirements-s3.txt diff --git a/src/plugins/requirements-s3.txt b/src/plugins/requirements-s3.txt new file mode 100644 index 00000000..36f9d86d --- /dev/null +++ b/src/plugins/requirements-s3.txt @@ -0,0 +1 @@ +minio diff --git a/src/tasks.py b/src/tasks.py index dcd397bf..9c06745b 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -4,6 +4,8 @@ from redis import Redis from contextlib import contextmanager import yara # type: ignore +import os +import shutil from .db import Database, JobId from .util import make_sha256_tag @@ -300,35 +302,40 @@ def run_yara_batch(job_id: JobId, iterator: str, batch_size: int) -> None: def index_queue(batch_size: int) -> None: agent = make_agent() - # while True: - later - for i in range(1): - files = agent.db.get_from_index_queue(agent.group_id, batch_size) - if not files: - logging.info("no files to index %s", agent.group_id) - continue - - to_index = [] - for orig_name in files: - try: - path = agent.plugins.filter(orig_name) - if path: - to_index.append(path) - except Exception: - logging.exception("Unknown error (plugin?): %s", orig_name) - - logging.exception("Indexing %s files", len(to_index)) - - # todo refactor this - mounted_names = [] - for fname in to_index: - fname = fname.replace('"', '\\"') - mounted_names.append(fname) - mounted_list = " ".join(f'"{fpath}"' for fpath in mounted_names) - - #agent.ursa.execute_command( - logging.info( - f"index {mounted_list} with [gram3, text4, wide8, hash4];" - ) - - agent.db.remove_from_index_queue(files) - agent.plugins.cleanup() + files = agent.db.get_from_index_queue(agent.group_id, batch_size) + if not files: + logging.info("no files to index %s", agent.group_id) + return + + to_index = [] + for orig_name in files: + try: + path = agent.plugins.filter(orig_name) + if path: + if path != orig_name: + assert not os.path.isfile(orig_name) + shutil.copyfile(path, orig_name) + to_index.append(orig_name) + except Exception: + logging.exception("Unknown error (plugin?): %s", orig_name) + + logging.exception("Indexing %s files", len(to_index)) + + if not to_index: + return + + # todo refactor this + mounted_names = [] + for fname in to_index: + fname = fname.replace('"', '\\"') + mounted_names.append(fname) + mounted_list = " ".join(f'"{fpath}"' for fpath in mounted_names) + + #agent.ursa.execute_command( + logging.error( + f"index {mounted_list} with [gram3, text4, wide8, hash4];" + ) + logging.error("ok") + + agent.db.remove_from_index_queue(files) + agent.plugins.cleanup()