Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/index queue #432

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions src/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
from .config import app_config


def start_worker(args: argparse.Namespace, process_index: int) -> None:
def start_worker(group_id: str, process_index: int) -> None:
setup_logging()
logging.info(
"Agent [%s] running (process %s)...", args.group_id, process_index
"Agent [%s] running (process %s)...", group_id, process_index
)

with Connection(Redis(app_config.redis.host, app_config.redis.port)):
w = Worker([args.group_id])
w = Worker([group_id])
w.work()


Expand All @@ -34,29 +34,36 @@ def main() -> None:
default="default",
)
parser.add_argument(
"-s",
"--scale",
type=int,
help="Specifies the number of concurrent processes to use.",
help="Specifies the number of worker processes to start.",
default=1,
)
parser.add_argument(
"-i",
"--with-indexer",
action="store_true",
help="Also start the index worker. Must run in the same filesystem as ursadb.",
)

args = parser.parse_args()

# Initial registration of the worker group.
# The goal is to make the web UI aware of this worker and its configuration.
tasks.make_agent(args.group_id).register()

if args.scale > 1:
children = [
Process(target=start_worker, args=(args, i))
for i in range(args.scale)
]
for child in children:
child.start()
for child in children:
child.join()
else:
start_worker(args, 0)
children = [
Process(target=start_worker, args=(args.group_id, i))
for i in range(args.scale)
]
if args.with_indexer:
indexer_name = args.group_id + ":indexer"
children.append(Process(target=start_worker, args=(indexer_name, 0)))
for child in children:
child.start()
for child in children:
child.join()


if __name__ == "__main__":
Expand Down
22 changes: 22 additions & 0 deletions src/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .models.job import Job, JobStatus
from .models.jobagent import JobAgent
from .models.match import Match
from .models.queuedfile import QueuedFile
from .schema import MatchesSchema, ConfigSchema
from .config import app_config

Expand Down Expand Up @@ -412,6 +413,27 @@ def set_config_key(self, plugin_name: str, key: str, value: str) -> None:
session.add(entry)
session.commit()

def get_from_index_queue(self, ursadb_id: str, limit: int) -> List[str]:
"""Get next `limit` files from the specified ursadb.
This function does not mark files as `in progress` in any way, so
subsequent executions may return the same set of files until
remove_from_index_queue is called."""
with self.session() as session:
statement = select(QueuedFile).where(
QueuedFile.ursadb_id == ursadb_id,
).limit(limit)
files = session.exec(statement).all()
return [f.path for f in files]

def remove_from_index_queue(self, paths: List[str]) -> None:
"""Delete given paths from the index queue of the specified ursadb.
For performance reasons avoid huge lists of paths, since they
must fit into a single SQL query."""
with self.session() as session:
delete_query = delete(QueuedFile).where(QueuedFile.path in paths)
session.execute(delete_query)
session.commit()

def alembic_upgrade(self) -> None:
config_file = Path(__file__).parent / "alembic.ini"
alembic_cfg = Config(str(config_file))
Expand Down
1 change: 1 addition & 0 deletions src/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from mquery.models.configentry import ConfigEntry # type: ignore # noqa
from mquery.models.job import Job # type: ignore # noqa
from mquery.models.match import Match # type: ignore # noqa
from mquery.models.queuedfile import QueuedFile # type: ignore # noqa


target_metadata = SQLModel.metadata
Expand Down
32 changes: 32 additions & 0 deletions src/migrations/versions/85ed22b90d72_add_queuedfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""add queuedfile
Revision ID: 85ed22b90d72
Revises: 6b495d5a4855
Create Date: 2024-10-27 18:35:05.991934
"""
from alembic import op
import sqlalchemy as sa
import sqlmodel


# revision identifiers, used by Alembic.
revision = '85ed22b90d72'
down_revision = '6b495d5a4855'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('queuedfile',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('ursadb_id', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('path', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('queuedfile')
# ### end Alembic commands ###
14 changes: 14 additions & 0 deletions src/models/queuedfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from sqlmodel import SQLModel, Field
from typing import Union


class QueuedFile(SQLModel, table=True):
"""Represents a file queued to be indexed."""

id: Union[int, None] = Field(default=None, primary_key=True)

# ID of the ursadb ("agent group") this file belongs to.
ursadb_id: str

# A file path on one of the daemons
path: str
1 change: 1 addition & 0 deletions src/plugins/requirements-s3.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
minio
49 changes: 49 additions & 0 deletions src/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from redis import Redis
from contextlib import contextmanager
import yara # type: ignore
import os
import shutil

from .db import Database, JobId
from .util import make_sha256_tag
Expand Down Expand Up @@ -166,6 +168,12 @@ def make_agent(group_override: Optional[str] = None):
group_id = group_override
else:
group_id = get_current_job().origin # type: ignore
if ':' in group_id:
# Slight hack: `:` delimits worker type, so "default" is a queue
# for the default group, but same goes for "default:indexer"
group_id = group_id[:group_id.rfind(":")]
if ':' in group_id:
raise RuntimeError("Group ID can't contain a special character ':'")
return Agent(group_id)


Expand Down Expand Up @@ -290,3 +298,44 @@ def run_yara_batch(job_id: JobId, iterator: str, batch_size: int) -> None:

agent.execute_yara(job, pop_result.files)
agent.add_tasks_in_progress(job, -1)


def index_queue(batch_size: int) -> None:
agent = make_agent()
files = agent.db.get_from_index_queue(agent.group_id, batch_size)
if not files:
logging.info("no files to index %s", agent.group_id)
return

to_index = []
for orig_name in files:
try:
path = agent.plugins.filter(orig_name)
if path:
if path != orig_name:
assert not os.path.isfile(orig_name)
shutil.copyfile(path, orig_name)
to_index.append(orig_name)
except Exception:
logging.exception("Unknown error (plugin?): %s", orig_name)

logging.exception("Indexing %s files", len(to_index))

if not to_index:
return

# todo refactor this
mounted_names = []
for fname in to_index:
fname = fname.replace('"', '\\"')
mounted_names.append(fname)
mounted_list = " ".join(f'"{fpath}"' for fpath in mounted_names)

#agent.ursa.execute_command(
logging.error(
f"index {mounted_list} with [gram3, text4, wide8, hash4];"
)
logging.error("ok")

agent.db.remove_from_index_queue(files)
agent.plugins.cleanup()
Loading