Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add dispatcher service #61

Merged
merged 7 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fixbackend/all_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@
from fixbackend.organizations.models.orm import Organization, OrganizationInvite # noqa
from fixbackend.graph_db.service import GraphDatabaseAccessEntity # noqa
from fixbackend.cloud_accounts.models.orm import CloudAccount # noqa
from fixbackend.dispatcher.next_run_repository import NextRun # noqa
31 changes: 29 additions & 2 deletions fixbackend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@
from fastapi.exception_handlers import http_exception_handler
from fastapi.staticfiles import StaticFiles
from prometheus_fastapi_instrumentator import Instrumentator
from redis.asyncio import Redis
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine
from starlette.exceptions import HTTPException

from fixbackend import config, dependencies
from fixbackend.auth.oauth import github_client, google_client
from fixbackend.auth.router import auth_router, users_router
from fixbackend.cloud_accounts.repository import CloudAccountRepositoryImpl
from fixbackend.cloud_accounts.router import cloud_accounts_router, cloud_accounts_callback_router
from fixbackend.collect.collect_queue import RedisCollectQueue
from fixbackend.config import Config
from fixbackend.dependencies import FixDependencies
from fixbackend.dependencies import ServiceNames as SN
from fixbackend.dispatcher.dispatcher_service import DispatcherService
from fixbackend.dispatcher.next_run_repository import NextRunRepository
from fixbackend.events.router import websocket_router
from fixbackend.graph_db.service import GraphDatabaseAccessManager
from fixbackend.inventory.inventory_client import InventoryClient
from fixbackend.inventory.inventory_service import InventoryService
from fixbackend.inventory.router import inventory_router
Expand All @@ -53,8 +59,14 @@ def fast_api_app(cfg: Config) -> FastAPI:
@asynccontextmanager
async def setup_teardown_application(_: FastAPI) -> AsyncIterator[None]:
arq_redis = deps.add(SN.arg_redis, await create_pool(RedisSettings.from_dsn(cfg.redis_queue_url)))
deps.add(SN.async_engine, create_async_engine(cfg.database_url, pool_size=10))
deps.add(SN.readonly_redis, Redis.from_url(cfg.redis_readonly_url))
deps.add(SN.readwrite_redis, Redis.from_url(cfg.redis_readwrite_url))
engine = deps.add(SN.async_engine, create_async_engine(cfg.database_url, pool_size=10))
session_maker = deps.add(SN.session_maker, async_sessionmaker(engine))
deps.add(SN.cloud_account_repo, CloudAccountRepositoryImpl(session_maker))
deps.add(SN.next_run_repo, NextRunRepository(session_maker))
deps.add(SN.collect_queue, RedisCollectQueue(arq_redis))
deps.add(SN.graph_db_access, GraphDatabaseAccessManager(cfg, session_maker))
client = deps.add(SN.inventory_client, InventoryClient(cfg.inventory_url))
deps.add(SN.inventory, InventoryService(client))
if not cfg.static_assets:
Expand All @@ -67,7 +79,22 @@ async def setup_teardown_application(_: FastAPI) -> AsyncIterator[None]:

@asynccontextmanager
async def setup_teardown_dispatcher(_: FastAPI) -> AsyncIterator[None]:
yield None
arq_redis = deps.add(SN.arg_redis, await create_pool(RedisSettings.from_dsn(cfg.redis_queue_url)))
deps.add(SN.readonly_redis, Redis.from_url(cfg.redis_readonly_url))
rw_redis = deps.add(SN.readwrite_redis, Redis.from_url(cfg.redis_readwrite_url))
engine = deps.add(SN.async_engine, create_async_engine(cfg.database_url, pool_size=10))
session_maker = deps.add(SN.session_maker, async_sessionmaker(engine))
cloud_accounts = deps.add(SN.cloud_account_repo, CloudAccountRepositoryImpl(session_maker))
next_run_repo = deps.add(SN.next_run_repo, NextRunRepository(session_maker))
collect_queue = deps.add(SN.collect_queue, RedisCollectQueue(arq_redis))
db_access = deps.add(SN.graph_db_access, GraphDatabaseAccessManager(cfg, session_maker))
deps.add(SN.dispatching, DispatcherService(rw_redis, cloud_accounts, next_run_repo, collect_queue, db_access))

async with deps:
log.info("Application services started.")
yield None
await arq_redis.close()
log.info("Application services stopped.")

app = FastAPI(
title="Fix Backend",
Expand Down
8 changes: 3 additions & 5 deletions fixbackend/auth/current_user_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from fixbackend.auth.jwt import get_auth_backend
from fixbackend.auth.models import User
from fixbackend.config import get_config
from fixbackend.graph_db.dependencies import GraphDatabaseAccessManagerDependency
from fixbackend.dependencies import FixDependency
from fixbackend.graph_db.models import GraphDatabaseAccess
from fixbackend.ids import TenantId
from fixbackend.organizations.dependencies import OrganizationServiceDependency
Expand Down Expand Up @@ -82,10 +82,8 @@ async def get_tenant(
TenantDependency = Annotated[TenantId, Depends(get_tenant)]


async def get_current_graph_db(
manager: GraphDatabaseAccessManagerDependency, tenant: TenantDependency
) -> GraphDatabaseAccess:
access = await manager.get_database_access(tenant)
async def get_current_graph_db(fix: FixDependency, tenant: TenantDependency) -> GraphDatabaseAccess:
access = await fix.graph_database_access.get_database_access(tenant)
if access is None:
raise AttributeError("No database access found for tenant")
return access
Expand Down
7 changes: 2 additions & 5 deletions fixbackend/cloud_accounts/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from sqlalchemy import select

from fixbackend.cloud_accounts.models import orm, CloudAccount, AwsCloudAccess
from fixbackend.db import AsyncSessionMaker
from fixbackend.ids import CloudAccountId, TenantId
from fixbackend.types import AsyncSessionMaker
from abc import ABC, abstractmethod


Expand All @@ -41,10 +41,7 @@ async def delete(self, id: CloudAccountId) -> None:


class CloudAccountRepositoryImpl(CloudAccountRepository):
def __init__(
self,
session_maker: AsyncSessionMaker,
) -> None:
def __init__(self, session_maker: AsyncSessionMaker) -> None:
self.session_maker = session_maker

async def create(self, cloud_account: CloudAccount) -> CloudAccount:
Expand Down
5 changes: 2 additions & 3 deletions fixbackend/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import AsyncGenerator, Annotated, Callable
from typing import AsyncGenerator, Annotated

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from fixbackend.dependencies import FixDependency

AsyncSessionMaker = Callable[[], AsyncSession]
from fixbackend.types import AsyncSessionMaker


async def get_async_session_maker(fix: FixDependency) -> AsyncSessionMaker:
Expand Down
22 changes: 22 additions & 0 deletions fixbackend/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,29 @@
from arq import ArqRedis
from fastapi.params import Depends
from fixcloudutils.service import Dependencies
from redis.asyncio import Redis
from sqlalchemy.ext.asyncio import AsyncEngine

from fixbackend.collect.collect_queue import RedisCollectQueue
from fixbackend.graph_db.service import GraphDatabaseAccessManager
from fixbackend.inventory.inventory_client import InventoryClient
from fixbackend.inventory.inventory_service import InventoryService
from fixbackend.types import AsyncSessionMaker


class ServiceNames:
arg_redis = "arq_redis"
readonly_redis = "readonly_redis"
readwrite_redis = "readwrite_redis"
collect_queue = "collect_queue"
async_engine = "async_engine"
session_maker = "session_maker"
cloud_account_repo = "cloud_account_repo"
next_run_repo = "next_run_repo"
graph_db_access = "graph_db_access"
inventory = "inventory"
inventory_client = "inventory_client"
dispatching = "dispatching"


class FixDependencies(Dependencies):
Expand All @@ -44,6 +54,10 @@ def collect_queue(self) -> RedisCollectQueue:
def async_engine(self) -> AsyncEngine:
return self.service(ServiceNames.async_engine, AsyncEngine)

@property
def session_maker(self) -> AsyncSessionMaker:
return self.service(ServiceNames.async_engine, AsyncSessionMaker) # type: ignore

@property
def inventory(self) -> InventoryService:
return self.service(ServiceNames.inventory, InventoryService)
Expand All @@ -52,6 +66,14 @@ def inventory(self) -> InventoryService:
def inventory_client(self) -> InventoryClient:
return self.service(ServiceNames.inventory, InventoryClient)

@property
def readonly_redis(self) -> Redis:
return self.service(ServiceNames.readonly_redis, Redis)

@property
def graph_database_access(self) -> GraphDatabaseAccessManager:
return self.service(ServiceNames.graph_db_access, GraphDatabaseAccessManager)


# placeholder for dependencies, will be replaced during the app initialization
def fix_dependencies() -> FixDependencies:
Expand Down
13 changes: 13 additions & 0 deletions fixbackend/dispatcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2023. Some Engineering
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
121 changes: 121 additions & 0 deletions fixbackend/dispatcher/dispatcher_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Copyright (c) 2023. Some Engineering
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
from datetime import timedelta, datetime
from typing import Any, Optional

from fixcloudutils.asyncio.periodic import Periodic
from fixcloudutils.redis.event_stream import RedisStreamListener, Json, MessageContext
from fixcloudutils.service import Service
from fixcloudutils.util import utc
from redis.asyncio import Redis

from fixbackend.cloud_accounts.models import AwsCloudAccess, CloudAccount
from fixbackend.cloud_accounts.repository import CloudAccountRepository
from fixbackend.collect.collect_queue import CollectQueue, AccountInformation, AwsAccountInformation
from fixbackend.dispatcher.next_run_repository import NextRunRepository
from fixbackend.graph_db.service import GraphDatabaseAccessManager
from fixbackend.ids import CloudAccountId, TenantId

log = logging.getLogger(__name__)


class DispatcherService(Service):
def __init__(
self,
readwrite_redis: Redis,
cloud_account_repo: CloudAccountRepository,
next_run_repo: NextRunRepository,
collect_queue: CollectQueue,
access_manager: GraphDatabaseAccessManager,
) -> None:
self.cloud_account_repo = cloud_account_repo
self.next_run_repo = next_run_repo
self.collect_queue = collect_queue
self.access_manager = access_manager
self.periodic = Periodic("schedule_next_runs", self.schedule_next_runs, timedelta(minutes=1))
self.listener = RedisStreamListener(
readwrite_redis,
"fixbackend::cloudaccount",
group="dispatching",
listener="dispatching",
message_processor=self.process_message,
consider_failed_after=timedelta(minutes=5),
batch_size=1,
)

async def start(self) -> Any:
await self.listener.start()
await self.periodic.start()

async def stop(self) -> None:
await self.periodic.stop()
await self.listener.stop()

async def process_message(self, message: Json, context: MessageContext) -> None:
match context.kind:
case "cloud_account_created":
await self.cloud_account_created(CloudAccountId(message["id"]))
case "cloud_account_deleted":
await self.cloud_account_deleted(CloudAccountId(message["id"]))
case _:
log.error(f"Don't know how to handle messages of kind {context.kind}")

async def cloud_account_created(self, cid: CloudAccountId) -> None:
if account := await self.cloud_account_repo.get(cid):
await self.trigger_collect(account)
# store an entry in the next_run table
next_run_at = await self.compute_next_run(account.tenant_id)
await self.next_run_repo.create(cid, next_run_at)
else:
log.error("Received a message, that a cloud account is created, but it does not exist in the database")

async def cloud_account_deleted(self, cid: CloudAccountId) -> None:
# delete the entry from the scheduler table
await self.next_run_repo.delete(cid)

async def compute_next_run(self, tenant: TenantId) -> datetime:
# compute next run time dependent on the tenant.
result = datetime.now() + timedelta(hours=1)
log.info(f"Next run for tenant: {tenant} is {result}")
return result

async def trigger_collect(self, account: CloudAccount) -> None:
def account_information() -> Optional[AccountInformation]:
match account.access:
case AwsCloudAccess(account_id=account_id, role_name=role_name, external_id=external_id):
return AwsAccountInformation(
aws_account_id=account_id,
aws_account_name=None,
aws_role_arn=f"arn:aws:iam::{account_id}:role/{role_name}",
external_id=str(external_id),
)
case _:
log.error(f"Don't know how to handle this cloud access {account.access}. Ignore it.")
return None

if (ai := account_information()) and (db := await self.access_manager.get_database_access(account.tenant_id)):
await self.collect_queue.enqueue(db, ai) # TODO: create a unique identifier for this run

async def schedule_next_runs(self) -> None:
now = utc()
async for cid in self.next_run_repo.older_than(now):
if account := await self.cloud_account_repo.get(cid):
await self.trigger_collect(account)
next_run_at = await self.compute_next_run(account.tenant_id)
await self.next_run_repo.update_next_run_at(cid, next_run_at)
else:
log.error("Received a message, that a cloud account is created, but it does not exist in the database")
continue
59 changes: 59 additions & 0 deletions fixbackend/dispatcher/next_run_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright (c) 2023. Some Engineering
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
from typing import AsyncIterator

from fastapi_users_db_sqlalchemy.generics import GUID
from sqlalchemy import DATETIME, Index, select
from sqlalchemy.orm import Mapped, mapped_column

from fixbackend.base_model import Base
from fixbackend.ids import CloudAccountId
from fixbackend.types import AsyncSessionMaker


class NextRun(Base):
__tablename__ = "next_run"
__table_args__ = (Index("idx_at", "at"),)

cloud_account_id: Mapped[CloudAccountId] = mapped_column(GUID, primary_key=True)
at: Mapped[datetime] = mapped_column(DATETIME, nullable=False)
aquamatthias marked this conversation as resolved.
Show resolved Hide resolved


class NextRunRepository:
def __init__(self, session_maker: AsyncSessionMaker) -> None:
self.session_maker = session_maker

async def create(self, cid: CloudAccountId, next_run: datetime) -> None:
async with self.session_maker() as session:
session.add(NextRun(cloud_account_id=cid, at=next_run))
await session.commit()

async def update_next_run_at(self, cid: CloudAccountId, next_run: datetime) -> None:
async with self.session_maker() as session:
if nxt := await session.get(NextRun, cid):
nxt.at = next_run
await session.commit()

async def delete(self, cid: CloudAccountId) -> None:
async with self.session_maker() as session:
results = await session.execute(select(NextRun).where(NextRun.cloud_account_id == cid))
if run := results.unique().scalar():
await session.delete(run)
await session.commit()

async def older_than(self, at: datetime) -> AsyncIterator[CloudAccountId]:
async with self.session_maker() as session:
async for (entry,) in await session.stream(select(NextRun).where(NextRun.at < at)):
yield entry.cloud_account_id
Loading