Skip to content

Commit

Permalink
kafka sink
Browse files Browse the repository at this point in the history
  • Loading branch information
arikalon1 committed Jun 21, 2021
1 parent cec7061 commit 4b1e0fd
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 4 deletions.
25 changes: 22 additions & 3 deletions playbooks/deployment_babysitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@


class DeploymentBabysitterConfig(BaseModel):
slack_channel: str
slack_channel: str = ""
fields_to_monitor: Tuple[str] = (
"status.readyReplicas",
"message",
"reason",
"spec"
)
sinks: List[SinkConfigBase] = None


# TODO: filter out all the managed fields crap
Expand All @@ -40,6 +41,7 @@ def babysitter_get_blocks(diffs: List[DiffDetail]):
@on_deployment_all_changes
def deployment_babysitter(event: DeploymentEvent, config: DeploymentBabysitterConfig):
"""Track changes to a deployment and send the changes in slack."""
filtered_diffs = None
if event.operation == K8sOperationType.UPDATE:
all_diffs = event.obj.diff(event.old_obj)
filtered_diffs = list(filter(lambda x: babysitter_should_include_diff(x, config), all_diffs))
Expand All @@ -48,5 +50,22 @@ def deployment_babysitter(event: DeploymentEvent, config: DeploymentBabysitterCo
event.report_attachment_blocks.extend(babysitter_get_blocks(filtered_diffs))

event.report_title = f"Deployment {event.obj.metadata.name} {event.operation.value}d in namespace {event.obj.metadata.namespace}"
event.slack_channel = config.slack_channel
send_to_slack(event)
if config.slack_channel:
event.slack_channel = config.slack_channel
send_to_slack(event)

if config.sinks:
data = {
"deployment": event.obj.metadata.name,
"deployment_namespace": event.obj.metadata.namespace,
"message": "Deployment properties change",
"changed_properties": [{
"property": ".".join(diff.path),
"old": diff.other_value,
"new": diff.value
} for diff in filtered_diffs]
}
for sink_config in config.sinks:
SinkFactory.get_sink(sink_config).write(data)


37 changes: 37 additions & 0 deletions playbooks/grafana_enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,43 @@ def add_deployment_lines_to_grafana(event: DeploymentEvent, action_params: Param
grafana.add_line_to_dashboard(action_params.grafana_dashboard_uid, msg, tags=[event.obj.metadata.name])


class ImageChangesParams(BaseModel):
sinks: List[SinkConfigBase]

@on_deployment_update
def report_image_changes(event: DeploymentEvent, action_params: ImageChangesParams):
"""
Report image changed whenever a new application version is deployed so that you can easily see changes.
"""
new_images = event.obj.get_images()
old_images = event.old_obj.get_images()
if new_images == old_images:
return

msg = ""
changed_properties = []
if new_images.keys() != old_images.keys():
msg = f"number or names of images changed: new - {new_images} old - {old_images}"
else:
for name in new_images:
if new_images[name] != old_images[name]:
msg += f"image name: {name} new tag: {new_images[name]} old tag {old_images[name]}"
changed_properties.append({
"property": "image",
"old": f"{name}:{old_images[name]}",
"new": f"{name}:{new_images[name]}"
})

data = {
"deployment": event.obj.metadata.name,
"deployment_namespace": event.obj.metadata.namespace,
"message": msg,
"changed_properties": changed_properties
}
for sink_config in action_params.sinks:
SinkFactory.get_sink(sink_config).write(data)


@on_pod_create
def test_pod_orm(event : PodEvent):
logging.info('running test_pod_orm')
Expand Down
17 changes: 16 additions & 1 deletion src/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dulwich = "^0.20.23"
better-exceptions = "^0.3.3"
CairoSVG = "^2.5.2"
tabulate = "^0.8.9"
kafka-python = "^2.0.2"

[tool.poetry.dev-dependencies]
hikaru = {git = "https://github.com/aantn/hikaru.git", rev = "fix_datetimes"}
Expand Down
4 changes: 4 additions & 0 deletions src/robusta/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
from ..integrations.manual.triggers import *
from ..integrations.scheduled.triggers import *
from ..integrations.git.git_repo_manager import *
from ..integrations.sinks.sink_base import *
from ..integrations.sinks.sink_config import *
from ..integrations.sinks.sink_factory import *
from ..integrations.sinks.kafka import *
from ..core.persistency.in_memory import get_persistent_data
from ..utils.rate_limiter import RateLimiter
from ..runner.object_updater import *
Empty file.
14 changes: 14 additions & 0 deletions src/robusta/integrations/sinks/kafka/kafka_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import json

from ..sink_base import SinkBase
from kafka import KafkaProducer


class KafkaSink(SinkBase):

def __init__(self, producer: KafkaProducer, topic: str):
self.producer = producer
self.topic = topic

def write(self, data: dict):
self.producer.send(self.topic, value=json.dumps(data).encode("utf-8"))
6 changes: 6 additions & 0 deletions src/robusta/integrations/sinks/kafka/kafka_sink_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pydantic import BaseModel


class KafkaSinkConfig(BaseModel):
kafka_url: str
topic: str
23 changes: 23 additions & 0 deletions src/robusta/integrations/sinks/kafka/kafka_sink_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import threading
from collections import defaultdict

from kafka import KafkaProducer

from .kafka_sink import KafkaSink
from ..sink_base import SinkBase


class KafkaSinkManager:

manager_lock = threading.Lock()
producers_map = defaultdict(None)

@staticmethod
def get_kafka_sink(kafka_url : str, topic: str) -> SinkBase:
with KafkaSinkManager.manager_lock:
producer = KafkaSinkManager.producers_map.get(kafka_url)
if producer is not None:
return KafkaSink(producer, topic)
producer = KafkaProducer(bootstrap_servers=kafka_url)
KafkaSinkManager.producers_map[kafka_url] = producer
return KafkaSink(producer, topic)
5 changes: 5 additions & 0 deletions src/robusta/integrations/sinks/sink_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

class SinkBase:

def write(self, data: dict):
pass
8 changes: 8 additions & 0 deletions src/robusta/integrations/sinks/sink_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Dict

from pydantic import BaseModel


class SinkConfigBase(BaseModel):
sink_type: str
params: Dict
15 changes: 15 additions & 0 deletions src/robusta/integrations/sinks/sink_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .kafka.kafka_sink_config import KafkaSinkConfig
from .kafka.kafka_sink_manager import KafkaSinkManager
from .sink_config import SinkConfigBase
from .sink_base import SinkBase


class SinkFactory:

@staticmethod
def get_sink(sink_config: SinkConfigBase) -> SinkBase:
if sink_config.sink_type == "kafka":
kafka_sink_config = KafkaSinkConfig(**sink_config.params)
return KafkaSinkManager.get_kafka_sink(kafka_sink_config.kafka_url, kafka_sink_config.topic)

raise Exception(f"Sink not supported {sink_config.sink_type}")

0 comments on commit 4b1e0fd

Please sign in to comment.