From b7a51c2e914347e1c81ab6b261259ee588a61bc7 Mon Sep 17 00:00:00 2001 From: marcosschroh Date: Mon, 1 Jul 2024 12:38:18 +0000 Subject: [PATCH] deploy: 32e362dc068b571a4a149bb59e4965f334d564e0 --- engine/index.html | 44 ++++++++---- middleware/index.html | 4 +- monitoring/index.html | 48 +++++-------- serialization/index.html | 8 ++- sitemap.xml.gz | Bin 127 -> 127 bytes stream/index.html | 150 +++++++++++++++------------------------ 6 files changed, 110 insertions(+), 144 deletions(-) diff --git a/engine/index.html b/engine/index.html index dd2f605c..68764b8f 100644 --- a/engine/index.html +++ b/engine/index.html @@ -619,8 +619,7 @@

Source code in kstreams/engine.py -
 25
- 26
+              
 26
  27
  28
  29
@@ -843,7 +842,17 @@ 

246 247 248 -249

class StreamEngine:
+249
+250
+251
+252
+253
+254
+255
+256
+257
+258
+259
class StreamEngine:
     """
     Attributes:
         backend kstreams.backends.Kafka: Backend to connect. Default `Kafka`
@@ -953,13 +962,12 @@ 

return metadata async def start(self) -> None: - await self.start_producer() - await self.start_streams() - # add the producer and streams to the Monitor self.monitor.add_producer(self._producer) self.monitor.add_streams(self._streams) - self.monitor.start() + + await self.start_producer() + await self.start_streams() async def stop(self) -> None: await self.monitor.stop() @@ -967,9 +975,9 @@

await self.stop_streams() async def stop_producer(self): - logger.info("Waiting Producer to STOP....") if self._producer is not None: await self._producer.stop() + logger.info("Producer has STOPPED....") async def start_producer(self, **kwargs) -> None: if self.producer_class is None: @@ -987,13 +995,23 @@

for stream in self._streams if not inspect.isasyncgenfunction(stream.func) ] + + await self._start_streams_on_background_mode(streams) + + async def _start_streams_on_background_mode( + self, streams: typing.List[Stream] + ) -> None: + # start all the streams for stream in streams: - await stream.start() + asyncio.create_task(stream.start()) + + # start monitoring + asyncio.create_task(self.monitor.start()) async def stop_streams(self) -> None: - logger.info("Waiting for Streams to STOP....") for stream in self._streams: await stream.stop() + logger.info("Streams have STOPPED....") async def clean_streams(self): await self.stop_streams() @@ -1208,8 +1226,7 @@

Source code in kstreams/engine.py -
 81
- 82
+            
 82
  83
  84
  85
@@ -1259,7 +1276,8 @@ 

129 130 131 -132

async def send(
+132
+133
async def send(
     self,
     topic: str,
     value: typing.Any = None,
diff --git a/middleware/index.html b/middleware/index.html
index e70b98d9..5efe207b 100644
--- a/middleware/index.html
+++ b/middleware/index.html
@@ -769,8 +769,7 @@ 

Middleware by default

87 88 89 -90 -91
class ExceptionMiddleware(BaseMiddleware):
+90
class ExceptionMiddleware(BaseMiddleware):
     async def __call__(self, cr: ConsumerRecord) -> typing.Any:
         try:
             return await self.next_call(cr)
@@ -783,7 +782,6 @@ 

Middleware by default

f"Stream consuming from topics {self.stream.topics} CRASHED!!! \n\n " ) if sys.version_info >= (3, 11): - exc.add_note(f"Task: {self.stream._consumer_task}") exc.add_note(f"Handler: {self.stream.func}") exc.add_note(f"Topics: {self.stream.topics}") diff --git a/monitoring/index.html b/monitoring/index.html index f67f5465..39e23740 100644 --- a/monitoring/index.html +++ b/monitoring/index.html @@ -763,15 +763,7 @@

228 229 230 -231 -232 -233 -234 -235 -236 -237 -238 -239

class PrometheusMonitor:
+231
class PrometheusMonitor:
     """
     Metrics monitor to keep track of Producers and Consumers.
 
@@ -817,24 +809,16 @@ 

self.running = False self._producer = None self._streams: List[Stream] = [] - self._task: Optional[asyncio.Task] = None - def start(self) -> None: - logger.info("Starting Prometheus metrics...") + async def start(self) -> None: self.running = True - self._task = asyncio.create_task(self._metrics_task()) + logger.info("Starting Prometheus Monitoring started...") + await self._metrics_task() async def stop(self) -> None: - logger.info("Stopping Prometheus metrics...") self.running = False - - if self._task is not None: - # we need to make sure that the task is `done` - # to clean up properly - while not self._task.done(): - await asyncio.sleep(0.1) - self._clean_consumer_metrics() + logger.info("Prometheus Monitoring stopped...") def add_topic_partition_offset( self, topic: str, partition: int, offset: int @@ -977,7 +961,7 @@

async def _metrics_task(self) -> None: """ - Asyncio Task that runs in `backgroud` to generate + Task that runs in `backgroud` to generate consumer metrics. When self.running is False the task will finish and it @@ -1057,7 +1041,15 @@

Source code in kstreams/prometheus/monitor.py -
167
+            
159
+160
+161
+162
+163
+164
+165
+166
+167
 168
 169
 170
@@ -1101,15 +1093,7 @@ 

208 209 210 -211 -212 -213 -214 -215 -216 -217 -218 -219

async def generate_consumer_metrics(self, consumer: Consumer):
+211
async def generate_consumer_metrics(self, consumer: Consumer):
     """
     Generate Consumer Metrics for Prometheus
 
diff --git a/serialization/index.html b/serialization/index.html
index d95ab40f..2c900018 100644
--- a/serialization/index.html
+++ b/serialization/index.html
@@ -575,12 +575,16 @@ 

Serialization

Kafka's job is to move bytes from producer to consumers, through a topic.

By default, this is what kstream does.

-
from kstreams import ConsumerRecord, stream
+
import logging
+
+from kstreams import ConsumerRecord, stream
+
+logger = logging.getLogger(__name__)
 
 
 @stream("local--hello-world", group_id="example-group")
 async def consume(cr: ConsumerRecord) -> None:
-    print(f"showing bytes: {cr.value}")
+    logger.info(f"showing bytes: {cr.value}")
 

As you can see the ConsumerRecord's value is bytes.

In order to keep your code pythonic, we provide a mechanism to serialize/deserialize diff --git a/sitemap.xml.gz b/sitemap.xml.gz index 97d68662a7bce074b3a4df5bface6378e9048796..2d1fbf6b57b55b6eaaba40d0959a8a3977914768 100644 GIT binary patch delta 13 Ucmb=gXP58h;ILfWG?Bdm02~+uE&u=k delta 13 Ucmb=gXP58h;Be9?naExN02!|Xt^fc4 diff --git a/stream/index.html b/stream/index.html index fb8385a5..f1a13ed9 100644 --- a/stream/index.html +++ b/stream/index.html @@ -1286,22 +1286,7 @@

298 299 300 -301 -302 -303 -304 -305 -306 -307 -308 -309 -310 -311 -312 -313 -314 -315 -316

class Stream:
+301
class Stream:
     """
     Attributes:
         name str: Stream name
@@ -1371,7 +1356,6 @@ 

self.consumer_class = consumer_class self.consumer: typing.Optional[Consumer] = None self.config = config or {} - self._consumer_task: typing.Optional[asyncio.Task] = None self.name = name or str(uuid.uuid4()) self.deserializer = deserializer self.running = False @@ -1404,20 +1388,14 @@

if self.consumer is not None: await self.consumer.stop() - if self._consumer_task is not None: - self._consumer_task.cancel() - - logger.info( - f"Stream consuming from topics {self.topics} has stopped!!! \n\n" - ) - - async def _subscribe(self) -> None: - # Always create a consumer on stream.start - self.consumer = self._create_consumer() + logger.info( + f"Stream consuming from topics {self.topics} has stopped!!! \n\n" + ) - # add the chech tp avoid `mypy` complains - if self.consumer is not None: - await self.consumer.start() + def subscribe(self) -> None: + if self.consumer is None: + # Always create a consumer on stream.start + self.consumer = self._create_consumer() self.consumer.subscribe( topics=self.topics, listener=self.rebalance_listener ) @@ -1488,36 +1466,28 @@

if self.running: return None - await self._subscribe() - self.running = True + self.subscribe() - if self.udf_handler.type == UDFType.NO_TYPING: - # normal use case - logging.warn( - "Streams with `async for in` loop approach might be deprecated. " - "Consider migrating to a typing approach." - ) + if self.consumer is not None: + await self.consumer.start() + self.running = True - func = self.udf_handler.handler(self) - # create an asyncio.Task with func - self._consumer_task = asyncio.create_task(self.func_wrapper(func)) - else: - # Typing cases - if not inspect.isasyncgenfunction(self.udf_handler.handler): - # Is not an async_generator, then create an asyncio.Task with func - self._consumer_task = asyncio.create_task( - self.func_wrapper_with_typing() + if self.udf_handler.type == UDFType.NO_TYPING: + # normal use case + logging.warn( + "Streams with `async for in` loop approach might be deprecated. " + "Consider migrating to a typing approach." ) - return None - async def func_wrapper(self, func: typing.Awaitable) -> None: - try: - # await for the end user coroutine - # we do this to show a better error message to the user - # when the coroutine fails - await func - except Exception as e: - logger.exception(f"CRASHED Stream!!! Task {self._consumer_task} \n\n {e}") + func = self.udf_handler.handler(self) + await func + else: + # Typing cases + if not inspect.isasyncgenfunction(self.udf_handler.handler): + # Is not an async_generator, then create `await` the func + await self.func_wrapper_with_typing() + + return None async def func_wrapper_with_typing(self) -> None: while self.running: @@ -1927,7 +1897,14 @@

Get many

Source code in kstreams/streams.py -
172
+            
165
+166
+167
+168
+169
+170
+171
+172
 173
 174
 175
@@ -1961,14 +1938,7 @@ 

Get many

203 204 205 -206 -207 -208 -209 -210 -211 -212 -213
async def getmany(
+206
async def getmany(
     self,
     partitions: typing.Optional[typing.List[TopicPartition]] = None,
     timeout_ms: int = 0,
@@ -2080,9 +2050,7 @@ 

130 131 132 -133 -134 -135

class MetricsRebalanceListener(RebalanceListener):
+133
class MetricsRebalanceListener(RebalanceListener):
     async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
         """
         Coroutine to be called *before* a rebalance operation starts and
@@ -2097,7 +2065,8 @@ 

# lock all asyncio Tasks so no new metrics will be added to the Monitor if revoked and self.engine is not None: async with asyncio.Lock(): - await self.engine.monitor.stop() + if self.stream is not None: + self.engine.monitor.clean_stream_consumer_metrics(self.stream) async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None: """ @@ -2113,11 +2082,8 @@

# lock all asyncio Tasks so no new metrics will be added to the Monitor if assigned and self.engine is not None: async with asyncio.Lock(): - self.engine.monitor.start() - - stream = self.stream - if stream is not None: - stream.seek_to_initial_offsets() + if self.stream is not None: + self.stream.seek_to_initial_offsets()

@@ -2184,8 +2150,7 @@

Source code in kstreams/rebalance_listener.py -
117
-118
+            
118
 119
 120
 121
@@ -2200,9 +2165,7 @@ 

130 131 132 -133 -134 -135

async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None:
+133
async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None:
     """
     Coroutine to be called *after* partition re-assignment completes
     and *before* the consumer starts fetching data again.
@@ -2216,11 +2179,8 @@ 

# lock all asyncio Tasks so no new metrics will be added to the Monitor if assigned and self.engine is not None: async with asyncio.Lock(): - self.engine.monitor.start() - - stream = self.stream - if stream is not None: - stream.seek_to_initial_offsets() + if self.stream is not None: + self.stream.seek_to_initial_offsets()

@@ -2292,7 +2252,8 @@

112 113 114 -115

async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
+115
+116
async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
     """
     Coroutine to be called *before* a rebalance operation starts and
     *after* the consumer stops fetching data.
@@ -2306,7 +2267,8 @@ 

# lock all asyncio Tasks so no new metrics will be added to the Monitor if revoked and self.engine is not None: async with asyncio.Lock(): - await self.engine.monitor.stop() + if self.stream is not None: + self.engine.monitor.clean_stream_consumer_metrics(self.stream)

@@ -2342,7 +2304,9 @@

Source code in kstreams/rebalance_listener.py -
138
+              
136
+137
+138
 139
 140
 141
@@ -2366,9 +2330,7 @@ 

159 160 161 -162 -163 -164

class ManualCommitRebalanceListener(MetricsRebalanceListener):
+162
class ManualCommitRebalanceListener(MetricsRebalanceListener):
     async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
         """
         Coroutine to be called *before* a rebalance operation starts and
@@ -2463,7 +2425,9 @@ 

Source code in kstreams/rebalance_listener.py -
139
+            
137
+138
+139
 140
 141
 142
@@ -2486,9 +2450,7 @@ 

159 160 161 -162 -163 -164

async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
+162
async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
     """
     Coroutine to be called *before* a rebalance operation starts and
     *after* the consumer stops fetching data.