Skip to content

Commit

Permalink
[feat] Use posthog analytics sender (#384)
Browse files Browse the repository at this point in the history
Co-authored-by: Nikita Melkozerov <[email protected]>
  • Loading branch information
aquamatthias and meln1k authored Apr 9, 2024
1 parent a30eca6 commit c156e36
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 19 deletions.
99 changes: 95 additions & 4 deletions fixbackend/analytics/analytics_event_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,27 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

import asyncio
import logging
import uuid
from asyncio import Lock
from collections import deque
from datetime import timedelta
from typing import List, Optional, Any
from typing import MutableSequence

from async_lru import alru_cache
from fixcloudutils.asyncio.periodic import Periodic
from fixcloudutils.types import Json
from fixcloudutils.util import uuid_str, utc
from httpx import AsyncClient
from posthog.client import Client
from prometheus_client import Counter

from fixbackend.analytics import AnalyticsEventSender
from fixbackend.analytics.events import AEWorkspaceCreated, AEUserRegistered
from fixbackend.analytics.events import AnalyticsEvent
from fixbackend.ids import WorkspaceId, UserId
from fixbackend.utils import group_by, md5
Expand All @@ -42,6 +50,21 @@ async def user_id_from_workspace(self, workspace_id: WorkspaceId) -> UserId:
return UserId(uuid.uuid5(uuid.NAMESPACE_DNS, "fixbackend"))


class MultiAnalyticsEventSender(AnalyticsEventSender):
def __init__(self, senders: List[AnalyticsEventSender]) -> None:
self.senders = senders
self.event_handler: Optional[Any] = None

async def send(self, event: AnalyticsEvent) -> None:
for sender in self.senders:
await sender.send(event)

async def user_id_from_workspace(self, workspace_id: WorkspaceId) -> UserId:
for sender in self.senders:
return await sender.user_id_from_workspace(workspace_id)
raise ValueError("No senders configured")


class GoogleAnalyticsEventSender(AnalyticsEventSender):
def __init__(
self, client: AsyncClient, measurement_id: str, api_secret: str, workspace_repo: WorkspaceRepository
Expand All @@ -54,7 +77,6 @@ def __init__(
self.events: List[AnalyticsEvent] = []
self.lock = Lock()
self.sender = Periodic("send_events", self.send_events, timedelta(seconds=30))
self.event_handler: Optional[Any] = None

async def start(self) -> None:
await self.sender.start()
Expand All @@ -75,7 +97,7 @@ def event_to_json(event: AnalyticsEvent) -> Json:
ev.pop("user_id", None)
return dict(name=event.kind, params=ev)

# return early, if there are no events to send
# return early if there are no events to send
if not self.events:
return

Expand Down Expand Up @@ -109,7 +131,76 @@ def event_to_json(event: AnalyticsEvent) -> Json:

@alru_cache(maxsize=1024)
async def user_id_from_workspace(self, workspace_id: WorkspaceId) -> UserId:
if (workspace := await self.workspace_repo.get_workspace(workspace_id)) and (workspace.all_users()):
return workspace.all_users()[0]
if workspace := await self.workspace_repo.get_workspace(workspace_id):
return workspace.owner_id
else:
return UserId(uuid.uuid5(uuid.NAMESPACE_DNS, "fixbackend"))


class PostHogEventSender(AnalyticsEventSender):
def __init__(
self,
api_key: str,
workspace_repo: WorkspaceRepository,
flush_at: int = 100,
interval: timedelta = timedelta(minutes=1),
host: str = "https://eu.posthog.com",
) -> None:
super().__init__()
self.client = Client( # type: ignore
project_api_key=api_key, host=host, flush_interval=0.5, max_retries=3, gzip=True
)
self.workspace_repo = workspace_repo
self.run_id = uuid_str() # create a unique id for this instance run
self.queue: MutableSequence[AnalyticsEvent] = deque()
self.flush_at = flush_at
self.flusher = Periodic("flush_analytics", self.flush, interval)
self.lock = asyncio.Lock()

async def send(self, event: AnalyticsEvent) -> None:
async with self.lock:
self.queue.append(event)

if len(self.queue) >= self.flush_at:
await self.flush()

@alru_cache(maxsize=1024)
async def user_id_from_workspace(self, workspace_id: WorkspaceId) -> UserId:
if workspace := await self.workspace_repo.get_workspace(workspace_id):
return workspace.owner_id
else:
raise ValueError(f"Workspace with id {workspace_id} not found")

async def flush(self) -> None:
async with self.lock:
for event in self.queue:
# when a user is registered, identify it as user
if isinstance(event, AEUserRegistered):
self.client.identify( # type: ignore
distinct_id=str(event.user_id),
properties={"email": event.email},
)
# when a workspace is created, identify it as a group
if isinstance(event, AEWorkspaceCreated):
self.client.group_identify( # type: ignore
"workspace_id", str(event.workspace_id), properties={"name": event.name, "slug": event.slug}
)
# if the event has a workspace_id, use it to define the group
groups = {"workspace_id": str(ws)} if (ws := getattr(event, "workspace_id", None)) else None
self.client.capture( # type: ignore
distinct_id=uuid_str(),
event=event.kind,
properties=event.to_json(),
timestamp=utc(),
groups=groups,
)
self.queue.clear()

async def start(self) -> PostHogEventSender:
await self.flusher.start()
return self

async def stop(self) -> None:
await self.flusher.stop()
await self.flush()
self.client.shutdown() # type: ignore
27 changes: 19 additions & 8 deletions fixbackend/analytics/domain_event_to_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from typing import List

from httpx import AsyncClient

from fixbackend.analytics import AnalyticsEventSender
from fixbackend.analytics.analytics_event_sender import GoogleAnalyticsEventSender, NoAnalyticsEventSender
from fixbackend.analytics.analytics_event_sender import (
GoogleAnalyticsEventSender,
NoAnalyticsEventSender,
PostHogEventSender,
MultiAnalyticsEventSender,
)
from fixbackend.analytics.events import (
AEUserRegistered,
AEAccountDiscovered,
Expand Down Expand Up @@ -78,7 +84,7 @@ def __init__(self, domain_event_subscriber: DomainEventSubscriber, sender: Analy
async def handle(self, event: Event) -> None:
match event:
case UserRegistered() as event:
await self.sender.send(AEUserRegistered(event.user_id, event.tenant_id))
await self.sender.send(AEUserRegistered(event.user_id, event.tenant_id, event.email))
case AwsAccountDiscovered() as event:
user_id = await self.sender.user_id_from_workspace(event.tenant_id)
await self.sender.send(AEAccountDiscovered(user_id, event.tenant_id, "aws"))
Expand All @@ -94,7 +100,7 @@ async def handle(self, event: Event) -> None:
user_id = await self.sender.user_id_from_workspace(event.tenant_id)
await self.sender.send(AEAccountNameChanged(user_id, event.tenant_id, event.cloud))
case WorkspaceCreated() as event:
await self.sender.send(AEWorkspaceCreated(event.user_id, event.workspace_id))
await self.sender.send(AEWorkspaceCreated(event.user_id, event.workspace_id, event.name, event.slug))
case InvitationAccepted() as event:
user_id = event.user_id or await self.sender.user_id_from_workspace(event.workspace_id)
await self.sender.send(AEInvitationAccepted(user_id, event.workspace_id))
Expand Down Expand Up @@ -127,11 +133,16 @@ def analytics(
domain_event_subscriber: DomainEventSubscriber,
workspace_repo: WorkspaceRepository,
) -> AnalyticsEventSender:
senders: List[AnalyticsEventSender] = []
if (measurement_id := config.google_analytics_measurement_id) and (secret := config.google_analytics_api_secret):
log.info("Use Google Analytics Event Sender.")
sender = GoogleAnalyticsEventSender(client, measurement_id, secret, workspace_repo)
sender.event_handler = DomainEventToAnalyticsEventHandler(domain_event_subscriber, sender)
return sender
else:
senders.append(GoogleAnalyticsEventSender(client, measurement_id, secret, workspace_repo))
if api_key := config.posthog_api_key:
log.info("Use Posthog Event Sender.")
senders.append(PostHogEventSender(api_key, workspace_repo))
if len(senders) == 0:
log.info("Analytics turned off.")
return NoAnalyticsEventSender()
senders.append(NoAnalyticsEventSender())
sender = MultiAnalyticsEventSender(senders)
sender.event_handler = DomainEventToAnalyticsEventHandler(domain_event_subscriber, sender)
return sender
3 changes: 3 additions & 0 deletions fixbackend/analytics/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def to_json(self) -> Json:
class AEUserRegistered(AnalyticsEvent):
kind: ClassVar[str] = "fix_user_registered"
workspace_id: WorkspaceId
email: str


@frozen
Expand Down Expand Up @@ -109,6 +110,8 @@ class AEFirstAccountCollectFinished(AnalyticsEvent):
class AEWorkspaceCreated(AnalyticsEvent):
kind: ClassVar[str] = "fix_workspace_created"
workspace_id: WorkspaceId
name: str
slug: str


@frozen
Expand Down
2 changes: 2 additions & 0 deletions fixbackend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Config(BaseSettings):
profiling_interval: float
google_analytics_measurement_id: Optional[str]
google_analytics_api_secret: Optional[str]
posthog_api_key: Optional[str]
aws_marketplace_url: str
billing_period: Literal["month", "day"]
discord_oauth_client_id: str
Expand Down Expand Up @@ -176,6 +177,7 @@ def parse_args(argv: Optional[Sequence[str]] = None) -> Namespace:
parser.add_argument("--profiling-enabled", action="store_true", default=os.environ.get("PROFILING_ENABLED", False))
parser.add_argument("--profiling-interval", type=float, default=os.environ.get("PROFILING_INTERVAL", 0.001))
parser.add_argument("--google-analytics-measurement-id", default=os.environ.get("GOOGLE_ANALYTICS_MEASUREMENT_ID"))
parser.add_argument("--posthog-api-key", default=os.environ.get("POSTHOG_API_KEY"))
parser.add_argument("--google-analytics-api-secret", default=os.environ.get("GOOGLE_ANALYTICS_API_SECRET"))
parser.add_argument("--aws-marketplace-url", default=os.environ.get("AWS_MARKETPLACE_URL", ""))
parser.add_argument("--service-base-url", default=os.environ.get("SERVICE_BASE_URL", ""))
Expand Down
2 changes: 2 additions & 0 deletions fixbackend/domain_events/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ class WorkspaceCreated(Event):
kind: ClassVar[str] = "workspace_created"

workspace_id: WorkspaceId
name: str
slug: str
user_id: UserId


Expand Down
2 changes: 1 addition & 1 deletion fixbackend/workspaces/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def create_workspace(self, name: str, slug: str, owner: User) -> Workspace
# create a database access object for this organization in the same transaction
await self.graph_db_access_manager.create_database_access(workspace_id, session=session)
await self.role_repository.add_roles(owner.id, workspace_id, Roles.workspace_owner, session=session)
await self.domain_event_sender.publish(WorkspaceCreated(workspace_id, owner.id))
await self.domain_event_sender.publish(WorkspaceCreated(workspace_id, name, slug, owner.id))

await session.commit()
await session.refresh(organization)
Expand Down
51 changes: 48 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pyinstrument = ">=4.6.1"
cattrs = ">=23.2.3"
bcrypt = "4.0.1"
pyotp = "^2.9.0"
posthog = "^3.5.0"


[tool.poetry.group.dev.dependencies]
Expand Down
1 change: 1 addition & 0 deletions tests/fixbackend/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def default_config() -> Config:
slack_oauth_client_secret="",
service_base_url="http://localhost:8000",
push_gateway_url=None,
posthog_api_key=None,
)


Expand Down
2 changes: 1 addition & 1 deletion tests/fixbackend/dispatcher/dispatcher_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def test_receive_workspace_created(
)
# signal to the dispatcher that the new workspace was created
await dispatcher.process_workspace_created(
WorkspaceCreated(workspace.id, user.id),
WorkspaceCreated(workspace.id, workspace.name, workspace.slug, user.id),
)
# check that a new entry was created in the next_run table
next_run = await session.get(NextTenantRun, workspace.id)
Expand Down
2 changes: 1 addition & 1 deletion tests/fixbackend/domain_events/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
AwsAccountDeleted(user_id, fix_cloud_account_id, workspace_id, cloud_account_id),
AwsAccountDegraded(fix_cloud_account_id, workspace_id, cloud_account_id, "aws final name", "some error"),
TenantAccountsCollected(workspace_id, {fix_cloud_account_id: collect_info}, now + timedelta(hours=1)),
WorkspaceCreated(workspace_id, user_id),
WorkspaceCreated(workspace_id, "name", "slug", user_id),
]

# CHANGING THE JSON STRUCTURE HERE MEANS BREAKING THE EVENT CONTRACT!
Expand Down
9 changes: 8 additions & 1 deletion tests/fixbackend/inventory/inventory_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,14 @@ async def inventory_call(request: Request) -> Response:
raise ValueError(f"Unexpected request: {request.url}: {content}")

request_handler_mock.append(inventory_call)
await inventory_service._process_workspace_created(WorkspaceCreated(workspace.id, user.id))
await inventory_service._process_workspace_created(
WorkspaceCreated(
workspace.id,
workspace.name,
workspace.slug,
user.id,
)
)
assert len(inventory_requests) == 2


Expand Down

0 comments on commit c156e36

Please sign in to comment.