From c156e36429b341250c62d55f46db2af6272a4d6d Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Tue, 9 Apr 2024 20:02:33 +0200 Subject: [PATCH] [feat] Use posthog analytics sender (#384) Co-authored-by: Nikita Melkozerov --- .../analytics/analytics_event_sender.py | 99 ++++++++++++++++++- .../analytics/domain_event_to_analytics.py | 27 +++-- fixbackend/analytics/events.py | 3 + fixbackend/config.py | 2 + fixbackend/domain_events/events.py | 2 + fixbackend/workspaces/repository.py | 2 +- poetry.lock | 51 +++++++++- pyproject.toml | 1 + tests/fixbackend/conftest.py | 1 + .../dispatcher/dispatcher_service_test.py | 2 +- tests/fixbackend/domain_events/test_events.py | 2 +- .../inventory/inventory_service_test.py | 9 +- 12 files changed, 182 insertions(+), 19 deletions(-) diff --git a/fixbackend/analytics/analytics_event_sender.py b/fixbackend/analytics/analytics_event_sender.py index afa81e3d..dc8e5fbe 100644 --- a/fixbackend/analytics/analytics_event_sender.py +++ b/fixbackend/analytics/analytics_event_sender.py @@ -11,19 +11,27 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from __future__ import annotations + +import asyncio import logging import uuid from asyncio import Lock +from collections import deque from datetime import timedelta from typing import List, Optional, Any +from typing import MutableSequence from async_lru import alru_cache from fixcloudutils.asyncio.periodic import Periodic from fixcloudutils.types import Json +from fixcloudutils.util import uuid_str, utc from httpx import AsyncClient +from posthog.client import Client from prometheus_client import Counter from fixbackend.analytics import AnalyticsEventSender +from fixbackend.analytics.events import AEWorkspaceCreated, AEUserRegistered from fixbackend.analytics.events import AnalyticsEvent from fixbackend.ids import WorkspaceId, UserId from fixbackend.utils import group_by, md5 @@ -42,6 +50,21 @@ async def user_id_from_workspace(self, workspace_id: WorkspaceId) -> UserId: return UserId(uuid.uuid5(uuid.NAMESPACE_DNS, "fixbackend")) +class MultiAnalyticsEventSender(AnalyticsEventSender): + def __init__(self, senders: List[AnalyticsEventSender]) -> None: + self.senders = senders + self.event_handler: Optional[Any] = None + + async def send(self, event: AnalyticsEvent) -> None: + for sender in self.senders: + await sender.send(event) + + async def user_id_from_workspace(self, workspace_id: WorkspaceId) -> UserId: + for sender in self.senders: + return await sender.user_id_from_workspace(workspace_id) + raise ValueError("No senders configured") + + class GoogleAnalyticsEventSender(AnalyticsEventSender): def __init__( self, client: AsyncClient, measurement_id: str, api_secret: str, workspace_repo: WorkspaceRepository @@ -54,7 +77,6 @@ def __init__( self.events: List[AnalyticsEvent] = [] self.lock = Lock() self.sender = Periodic("send_events", self.send_events, timedelta(seconds=30)) - self.event_handler: Optional[Any] = None async def start(self) -> None: await self.sender.start() @@ -75,7 +97,7 @@ def event_to_json(event: AnalyticsEvent) -> Json: ev.pop("user_id", None) return dict(name=event.kind, params=ev) - # return early, if there are no events to send + # return early if there are no events to send if not self.events: return @@ -109,7 +131,76 @@ def event_to_json(event: AnalyticsEvent) -> Json: @alru_cache(maxsize=1024) async def user_id_from_workspace(self, workspace_id: WorkspaceId) -> UserId: - if (workspace := await self.workspace_repo.get_workspace(workspace_id)) and (workspace.all_users()): - return workspace.all_users()[0] + if workspace := await self.workspace_repo.get_workspace(workspace_id): + return workspace.owner_id else: return UserId(uuid.uuid5(uuid.NAMESPACE_DNS, "fixbackend")) + + +class PostHogEventSender(AnalyticsEventSender): + def __init__( + self, + api_key: str, + workspace_repo: WorkspaceRepository, + flush_at: int = 100, + interval: timedelta = timedelta(minutes=1), + host: str = "https://eu.posthog.com", + ) -> None: + super().__init__() + self.client = Client( # type: ignore + project_api_key=api_key, host=host, flush_interval=0.5, max_retries=3, gzip=True + ) + self.workspace_repo = workspace_repo + self.run_id = uuid_str() # create a unique id for this instance run + self.queue: MutableSequence[AnalyticsEvent] = deque() + self.flush_at = flush_at + self.flusher = Periodic("flush_analytics", self.flush, interval) + self.lock = asyncio.Lock() + + async def send(self, event: AnalyticsEvent) -> None: + async with self.lock: + self.queue.append(event) + + if len(self.queue) >= self.flush_at: + await self.flush() + + @alru_cache(maxsize=1024) + async def user_id_from_workspace(self, workspace_id: WorkspaceId) -> UserId: + if workspace := await self.workspace_repo.get_workspace(workspace_id): + return workspace.owner_id + else: + raise ValueError(f"Workspace with id {workspace_id} not found") + + async def flush(self) -> None: + async with self.lock: + for event in self.queue: + # when a user is registered, identify it as user + if isinstance(event, AEUserRegistered): + self.client.identify( # type: ignore + distinct_id=str(event.user_id), + properties={"email": event.email}, + ) + # when a workspace is created, identify it as a group + if isinstance(event, AEWorkspaceCreated): + self.client.group_identify( # type: ignore + "workspace_id", str(event.workspace_id), properties={"name": event.name, "slug": event.slug} + ) + # if the event has a workspace_id, use it to define the group + groups = {"workspace_id": str(ws)} if (ws := getattr(event, "workspace_id", None)) else None + self.client.capture( # type: ignore + distinct_id=uuid_str(), + event=event.kind, + properties=event.to_json(), + timestamp=utc(), + groups=groups, + ) + self.queue.clear() + + async def start(self) -> PostHogEventSender: + await self.flusher.start() + return self + + async def stop(self) -> None: + await self.flusher.stop() + await self.flush() + self.client.shutdown() # type: ignore diff --git a/fixbackend/analytics/domain_event_to_analytics.py b/fixbackend/analytics/domain_event_to_analytics.py index 30c86c97..4a2156f4 100644 --- a/fixbackend/analytics/domain_event_to_analytics.py +++ b/fixbackend/analytics/domain_event_to_analytics.py @@ -12,11 +12,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import logging +from typing import List from httpx import AsyncClient from fixbackend.analytics import AnalyticsEventSender -from fixbackend.analytics.analytics_event_sender import GoogleAnalyticsEventSender, NoAnalyticsEventSender +from fixbackend.analytics.analytics_event_sender import ( + GoogleAnalyticsEventSender, + NoAnalyticsEventSender, + PostHogEventSender, + MultiAnalyticsEventSender, +) from fixbackend.analytics.events import ( AEUserRegistered, AEAccountDiscovered, @@ -78,7 +84,7 @@ def __init__(self, domain_event_subscriber: DomainEventSubscriber, sender: Analy async def handle(self, event: Event) -> None: match event: case UserRegistered() as event: - await self.sender.send(AEUserRegistered(event.user_id, event.tenant_id)) + await self.sender.send(AEUserRegistered(event.user_id, event.tenant_id, event.email)) case AwsAccountDiscovered() as event: user_id = await self.sender.user_id_from_workspace(event.tenant_id) await self.sender.send(AEAccountDiscovered(user_id, event.tenant_id, "aws")) @@ -94,7 +100,7 @@ async def handle(self, event: Event) -> None: user_id = await self.sender.user_id_from_workspace(event.tenant_id) await self.sender.send(AEAccountNameChanged(user_id, event.tenant_id, event.cloud)) case WorkspaceCreated() as event: - await self.sender.send(AEWorkspaceCreated(event.user_id, event.workspace_id)) + await self.sender.send(AEWorkspaceCreated(event.user_id, event.workspace_id, event.name, event.slug)) case InvitationAccepted() as event: user_id = event.user_id or await self.sender.user_id_from_workspace(event.workspace_id) await self.sender.send(AEInvitationAccepted(user_id, event.workspace_id)) @@ -127,11 +133,16 @@ def analytics( domain_event_subscriber: DomainEventSubscriber, workspace_repo: WorkspaceRepository, ) -> AnalyticsEventSender: + senders: List[AnalyticsEventSender] = [] if (measurement_id := config.google_analytics_measurement_id) and (secret := config.google_analytics_api_secret): log.info("Use Google Analytics Event Sender.") - sender = GoogleAnalyticsEventSender(client, measurement_id, secret, workspace_repo) - sender.event_handler = DomainEventToAnalyticsEventHandler(domain_event_subscriber, sender) - return sender - else: + senders.append(GoogleAnalyticsEventSender(client, measurement_id, secret, workspace_repo)) + if api_key := config.posthog_api_key: + log.info("Use Posthog Event Sender.") + senders.append(PostHogEventSender(api_key, workspace_repo)) + if len(senders) == 0: log.info("Analytics turned off.") - return NoAnalyticsEventSender() + senders.append(NoAnalyticsEventSender()) + sender = MultiAnalyticsEventSender(senders) + sender.event_handler = DomainEventToAnalyticsEventHandler(domain_event_subscriber, sender) + return sender diff --git a/fixbackend/analytics/events.py b/fixbackend/analytics/events.py index 59edabba..5e663377 100644 --- a/fixbackend/analytics/events.py +++ b/fixbackend/analytics/events.py @@ -47,6 +47,7 @@ def to_json(self) -> Json: class AEUserRegistered(AnalyticsEvent): kind: ClassVar[str] = "fix_user_registered" workspace_id: WorkspaceId + email: str @frozen @@ -109,6 +110,8 @@ class AEFirstAccountCollectFinished(AnalyticsEvent): class AEWorkspaceCreated(AnalyticsEvent): kind: ClassVar[str] = "fix_workspace_created" workspace_id: WorkspaceId + name: str + slug: str @frozen diff --git a/fixbackend/config.py b/fixbackend/config.py index b54a0e6c..76d5f0e3 100644 --- a/fixbackend/config.py +++ b/fixbackend/config.py @@ -74,6 +74,7 @@ class Config(BaseSettings): profiling_interval: float google_analytics_measurement_id: Optional[str] google_analytics_api_secret: Optional[str] + posthog_api_key: Optional[str] aws_marketplace_url: str billing_period: Literal["month", "day"] discord_oauth_client_id: str @@ -176,6 +177,7 @@ def parse_args(argv: Optional[Sequence[str]] = None) -> Namespace: parser.add_argument("--profiling-enabled", action="store_true", default=os.environ.get("PROFILING_ENABLED", False)) parser.add_argument("--profiling-interval", type=float, default=os.environ.get("PROFILING_INTERVAL", 0.001)) parser.add_argument("--google-analytics-measurement-id", default=os.environ.get("GOOGLE_ANALYTICS_MEASUREMENT_ID")) + parser.add_argument("--posthog-api-key", default=os.environ.get("POSTHOG_API_KEY")) parser.add_argument("--google-analytics-api-secret", default=os.environ.get("GOOGLE_ANALYTICS_API_SECRET")) parser.add_argument("--aws-marketplace-url", default=os.environ.get("AWS_MARKETPLACE_URL", "")) parser.add_argument("--service-base-url", default=os.environ.get("SERVICE_BASE_URL", "")) diff --git a/fixbackend/domain_events/events.py b/fixbackend/domain_events/events.py index 0f48d930..922cd9eb 100644 --- a/fixbackend/domain_events/events.py +++ b/fixbackend/domain_events/events.py @@ -195,6 +195,8 @@ class WorkspaceCreated(Event): kind: ClassVar[str] = "workspace_created" workspace_id: WorkspaceId + name: str + slug: str user_id: UserId diff --git a/fixbackend/workspaces/repository.py b/fixbackend/workspaces/repository.py index eb385c02..01b8edcd 100644 --- a/fixbackend/workspaces/repository.py +++ b/fixbackend/workspaces/repository.py @@ -147,7 +147,7 @@ async def create_workspace(self, name: str, slug: str, owner: User) -> Workspace # create a database access object for this organization in the same transaction await self.graph_db_access_manager.create_database_access(workspace_id, session=session) await self.role_repository.add_roles(owner.id, workspace_id, Roles.workspace_owner, session=session) - await self.domain_event_sender.publish(WorkspaceCreated(workspace_id, owner.id)) + await self.domain_event_sender.publish(WorkspaceCreated(workspace_id, name, slug, owner.id)) await session.commit() await session.refresh(organization) diff --git a/poetry.lock b/poetry.lock index 0bd907a3..5a22a6fa 100644 --- a/poetry.lock +++ b/poetry.lock @@ -157,6 +157,17 @@ tests = ["attrs[tests-no-zope]", "zope-interface"] tests-mypy = ["mypy (>=1.6)", "pytest-mypy-plugins"] tests-no-zope = ["attrs[tests-mypy]", "cloudpickle", "hypothesis", "pympler", "pytest (>=4.3.0)", "pytest-xdist[psutil]"] +[[package]] +name = "backoff" +version = "2.2.1" +description = "Function decoration for backoff and retry" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"}, + {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, +] + [[package]] name = "bcrypt" version = "4.0.1" @@ -1468,6 +1479,17 @@ files = [ {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, ] +[[package]] +name = "monotonic" +version = "1.6" +description = "An implementation of time.monotonic() for Python 2 & < 3.3" +optional = false +python-versions = "*" +files = [ + {file = "monotonic-1.6-py2.py3-none-any.whl", hash = "sha256:68687e19a14f11f26d140dd5c86f3dba4bf5df58003000ed467e0e2a69bca96c"}, + {file = "monotonic-1.6.tar.gz", hash = "sha256:3a55207bcfed53ddd5c5bae174524062935efed17792e9de2ad0205ce9ad63f7"}, +] + [[package]] name = "mypy" version = "1.9.0" @@ -1637,6 +1659,29 @@ files = [ dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "posthog" +version = "3.5.0" +description = "Integrate PostHog into any python application." +optional = false +python-versions = "*" +files = [ + {file = "posthog-3.5.0-py2.py3-none-any.whl", hash = "sha256:3c672be7ba6f95d555ea207d4486c171d06657eb34b3ce25eb043bfe7b6b5b76"}, + {file = "posthog-3.5.0.tar.gz", hash = "sha256:8f7e3b2c6e8714d0c0c542a2109b83a7549f63b7113a133ab2763a89245ef2ef"}, +] + +[package.dependencies] +backoff = ">=1.10.0" +monotonic = ">=1.5" +python-dateutil = ">2.1" +requests = ">=2.7,<3.0" +six = ">=1.5" + +[package.extras] +dev = ["black", "flake8", "flake8-print", "isort", "pre-commit"] +sentry = ["django", "sentry-sdk"] +test = ["coverage", "flake8", "freezegun (==0.3.15)", "mock (>=2.0.0)", "pylint", "pytest", "pytest-timeout"] + [[package]] name = "prometheus-client" version = "0.20.0" @@ -2067,11 +2112,11 @@ files = [ ] [package.dependencies] -pytest = ">=7.0.0,<9" +pytest = ">=7.0.0" [package.extras] docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"] -testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] +testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy (>=0.931)", "pytest-trio (>=0.7.0)"] [[package]] name = "pytest-cov" @@ -3060,4 +3105,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.12,<4.0" -content-hash = "e2a6b19dfdceeced874b76982438ab817d544e4cf9a51592b1361667b70047f4" +content-hash = "0c6560fab51d489293a9844d9278ffcb19742a134b2c2c9ffe82a649ce5d1dbf" diff --git a/pyproject.toml b/pyproject.toml index a6b49a9a..d26e35dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ pyinstrument = ">=4.6.1" cattrs = ">=23.2.3" bcrypt = "4.0.1" pyotp = "^2.9.0" +posthog = "^3.5.0" [tool.poetry.group.dev.dependencies] diff --git a/tests/fixbackend/conftest.py b/tests/fixbackend/conftest.py index 5af0fe51..28156731 100644 --- a/tests/fixbackend/conftest.py +++ b/tests/fixbackend/conftest.py @@ -175,6 +175,7 @@ def default_config() -> Config: slack_oauth_client_secret="", service_base_url="http://localhost:8000", push_gateway_url=None, + posthog_api_key=None, ) diff --git a/tests/fixbackend/dispatcher/dispatcher_service_test.py b/tests/fixbackend/dispatcher/dispatcher_service_test.py index a9018597..92ab4ade 100644 --- a/tests/fixbackend/dispatcher/dispatcher_service_test.py +++ b/tests/fixbackend/dispatcher/dispatcher_service_test.py @@ -99,7 +99,7 @@ async def test_receive_workspace_created( ) # signal to the dispatcher that the new workspace was created await dispatcher.process_workspace_created( - WorkspaceCreated(workspace.id, user.id), + WorkspaceCreated(workspace.id, workspace.name, workspace.slug, user.id), ) # check that a new entry was created in the next_run table next_run = await session.get(NextTenantRun, workspace.id) diff --git a/tests/fixbackend/domain_events/test_events.py b/tests/fixbackend/domain_events/test_events.py index 462e695e..850c86d7 100644 --- a/tests/fixbackend/domain_events/test_events.py +++ b/tests/fixbackend/domain_events/test_events.py @@ -45,7 +45,7 @@ AwsAccountDeleted(user_id, fix_cloud_account_id, workspace_id, cloud_account_id), AwsAccountDegraded(fix_cloud_account_id, workspace_id, cloud_account_id, "aws final name", "some error"), TenantAccountsCollected(workspace_id, {fix_cloud_account_id: collect_info}, now + timedelta(hours=1)), - WorkspaceCreated(workspace_id, user_id), + WorkspaceCreated(workspace_id, "name", "slug", user_id), ] # CHANGING THE JSON STRUCTURE HERE MEANS BREAKING THE EVENT CONTRACT! diff --git a/tests/fixbackend/inventory/inventory_service_test.py b/tests/fixbackend/inventory/inventory_service_test.py index 27cf3ce0..d2dde95d 100644 --- a/tests/fixbackend/inventory/inventory_service_test.py +++ b/tests/fixbackend/inventory/inventory_service_test.py @@ -367,7 +367,14 @@ async def inventory_call(request: Request) -> Response: raise ValueError(f"Unexpected request: {request.url}: {content}") request_handler_mock.append(inventory_call) - await inventory_service._process_workspace_created(WorkspaceCreated(workspace.id, user.id)) + await inventory_service._process_workspace_created( + WorkspaceCreated( + workspace.id, + workspace.name, + workspace.slug, + user.id, + ) + ) assert len(inventory_requests) == 2