Skip to content

Commit

Permalink
deploy: b2461b6
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh committed Dec 6, 2023
1 parent 0b68723 commit 120ae87
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 144 deletions.
19 changes: 8 additions & 11 deletions engine/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -594,9 +594,8 @@ <h2 id="kstreams.engine.StreamEngine" class="doc doc-heading">
<span class="p">)</span>

<span class="nd">@kstreams</span><span class="o">.</span><span class="n">stream</span><span class="p">(</span><span class="s2">&quot;local--hello-world&quot;</span><span class="p">,</span> <span class="n">group_id</span><span class="o">=</span><span class="s2">&quot;example-group&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">consume</span><span class="p">(</span><span class="n">stream</span><span class="p">:</span> <span class="n">kstreams</span><span class="o">.</span><span class="n">Stream</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">async</span> <span class="k">for</span> <span class="n">cr</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;showing bytes: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">value</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">consume</span><span class="p">(</span><span class="n">stream</span><span class="p">:</span> <span class="n">kstreams</span><span class="o">.</span><span class="n">ConsumerRecord</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;showing bytes: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">value</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>


<span class="k">await</span> <span class="n">stream_engine</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
Expand Down Expand Up @@ -814,8 +813,7 @@ <h2 id="kstreams.engine.StreamEngine" class="doc doc-heading">
<span class="normal">228</span>
<span class="normal">229</span>
<span class="normal">230</span>
<span class="normal">231</span>
<span class="normal">232</span></pre></div></td><td class="code"><div><pre><span></span><code><span class="k">class</span> <span class="nc">StreamEngine</span><span class="p">:</span>
<span class="normal">231</span></pre></div></td><td class="code"><div><pre><span></span><code><span class="k">class</span> <span class="nc">StreamEngine</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Attributes:</span>
<span class="sd"> backend kstreams.backends.Kafka: Backend to connect. Default `Kafka`</span>
Expand All @@ -842,9 +840,8 @@ <h2 id="kstreams.engine.StreamEngine" class="doc doc-heading">
<span class="sd"> )</span>

<span class="sd"> @kstreams.stream(&quot;local--hello-world&quot;, group_id=&quot;example-group&quot;)</span>
<span class="sd"> async def consume(stream: kstreams.Stream) -&gt; None:</span>
<span class="sd"> async for cr in stream:</span>
<span class="sd"> print(f&quot;showing bytes: {cr.value}&quot;)</span>
<span class="sd"> async def consume(stream: kstreams.ConsumerRecord) -&gt; None:</span>
<span class="sd"> print(f&quot;showing bytes: {cr.value}&quot;)</span>


<span class="sd"> await stream_engine.start()</span>
Expand Down Expand Up @@ -1166,7 +1163,8 @@ <h3 id="kstreams.engine.StreamEngine.send" class="doc doc-heading">

<details class="quote">
<summary>Source code in <code>kstreams/engine.py</code></summary>
<div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span></span><span class="normal"> 79</span>
<div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span></span><span class="normal"> 78</span>
<span class="normal"> 79</span>
<span class="normal"> 80</span>
<span class="normal"> 81</span>
<span class="normal"> 82</span>
Expand Down Expand Up @@ -1216,8 +1214,7 @@ <h3 id="kstreams.engine.StreamEngine.send" class="doc doc-heading">
<span class="normal">126</span>
<span class="normal">127</span>
<span class="normal">128</span>
<span class="normal">129</span>
<span class="normal">130</span></pre></div></td><td class="code"><div><pre><span></span><code><span class="k">async</span> <span class="k">def</span> <span class="nf">send</span><span class="p">(</span>
<span class="normal">129</span></pre></div></td><td class="code"><div><pre><span></span><code><span class="k">async</span> <span class="k">def</span> <span class="nf">send</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">topic</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">value</span><span class="p">:</span> <span class="n">Any</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
Expand Down
21 changes: 9 additions & 12 deletions getting_started/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -488,15 +488,14 @@ <h1 id="getting-started">Getting Started</h1>
<p>You can starting using <code>kstreams</code> with simple <code>producers</code> and <code>consumers</code> and/or integrated it with any <code>async</code> framework like <code>FastAPI</code></p>
<h2 id="simple-consumer-and-producer">Simple consumer and producer</h2>
<div class="highlight"><span class="filename">Simple use case</span><pre><span></span><code><span class="kn">import</span> <span class="nn">asyncio</span>
<span class="kn">from</span> <span class="nn">kstreams</span> <span class="kn">import</span> <span class="n">create_engine</span><span class="p">,</span> <span class="n">Stream</span>
<span class="kn">from</span> <span class="nn">kstreams</span> <span class="kn">import</span> <span class="n">create_engine</span><span class="p">,</span> <span class="n">ConsumerRecord</span>

<span class="n">stream_engine</span> <span class="o">=</span> <span class="n">create_engine</span><span class="p">(</span><span class="n">title</span><span class="o">=</span><span class="s2">&quot;my-stream-engine&quot;</span><span class="p">)</span>


<span class="nd">@stream_engine</span><span class="o">.</span><span class="n">stream</span><span class="p">(</span><span class="s2">&quot;local--py-stream&quot;</span><span class="p">,</span> <span class="n">group_id</span><span class="o">=</span><span class="s2">&quot;de-my-partition&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">consume</span><span class="p">(</span><span class="n">stream</span><span class="p">:</span> <span class="n">Stream</span><span class="p">):</span>
<span class="k">async</span> <span class="k">for</span> <span class="n">cr</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Event consumed: headers: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">headers</span><span class="si">}</span><span class="s2">, payload: </span><span class="si">{</span><span class="n">value</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">consume</span><span class="p">(</span><span class="n">cr</span><span class="p">:</span> <span class="n">ConsumerRecord</span><span class="p">):</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Event consumed: headers: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">headers</span><span class="si">}</span><span class="s2">, payload: </span><span class="si">{</span><span class="n">value</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>


<span class="k">async</span> <span class="k">def</span> <span class="nf">produce</span><span class="p">():</span>
Expand Down Expand Up @@ -531,15 +530,14 @@ <h2 id="recommended-usage">Recommended usage</h2>
<p>In the previous example you can see some boiler plate regarding how to start the program. We recommend to use <a href="https://github.com/cjrh/aiorun">aiorun</a>,
so you want have to worry about <code>set signal handlers</code>, <code>shutdown callbacks</code>, <code>graceful shutdown</code> and <code>close the event loop</code>.</p>
<div class="highlight"><span class="filename">Usage with aiorun</span><pre><span></span><code><span class="kn">import</span> <span class="nn">aiorun</span>
<span class="kn">from</span> <span class="nn">kstreams</span> <span class="kn">import</span> <span class="n">create_engine</span><span class="p">,</span> <span class="n">Stream</span>
<span class="kn">from</span> <span class="nn">kstreams</span> <span class="kn">import</span> <span class="n">create_engine</span><span class="p">,</span> <span class="n">ConsumerRecord</span>

<span class="n">stream_engine</span> <span class="o">=</span> <span class="n">create_engine</span><span class="p">(</span><span class="n">title</span><span class="o">=</span><span class="s2">&quot;my-stream-engine&quot;</span><span class="p">)</span>


<span class="nd">@stream_engine</span><span class="o">.</span><span class="n">stream</span><span class="p">(</span><span class="s2">&quot;local--py-stream&quot;</span><span class="p">,</span> <span class="n">group_id</span><span class="o">=</span><span class="s2">&quot;de-my-partition&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">consume</span><span class="p">(</span><span class="n">stream</span><span class="p">:</span> <span class="n">Stream</span><span class="p">):</span>
<span class="k">async</span> <span class="k">for</span> <span class="n">cr</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Event consumed: headers: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">headers</span><span class="si">}</span><span class="s2">, payload: </span><span class="si">{</span><span class="n">value</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">consume</span><span class="p">(</span><span class="n">cr</span><span class="p">:</span> <span class="n">ConsumerRecord</span><span class="p">):</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Event consumed: headers: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">headers</span><span class="si">}</span><span class="s2">, payload: </span><span class="si">{</span><span class="n">value</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>


<span class="k">async</span> <span class="k">def</span> <span class="nf">produce</span><span class="p">():</span>
Expand Down Expand Up @@ -577,13 +575,12 @@ <h2 id="fastapi">FastAPI</h2>
<p>Define the <code>streams</code>:</p>
<div class="highlight"><span class="filename">Application stream</span><pre><span></span><code><span class="c1"># streaming.streams.py</span>
<span class="kn">from</span> <span class="nn">.engine</span> <span class="kn">import</span> <span class="n">stream_engine</span>
<span class="kn">from</span> <span class="nn">kstreams</span> <span class="kn">import</span> <span class="n">Stream</span>
<span class="kn">from</span> <span class="nn">kstreams</span> <span class="kn">import</span> <span class="n">ConsumerRecord</span>


<span class="nd">@stream_engine</span><span class="o">.</span><span class="n">stream</span><span class="p">(</span><span class="s2">&quot;local--kstream&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">stream</span><span class="p">(</span><span class="n">stream</span><span class="p">:</span> <span class="n">Stream</span><span class="p">):</span>
<span class="k">async</span> <span class="k">for</span> <span class="n">cr</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Event consumed: headers: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">headers</span><span class="si">}</span><span class="s2">, payload: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">payload</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">stream</span><span class="p">(</span><span class="n">cr</span><span class="p">:</span> <span class="n">ConsumerRecord</span><span class="p">):</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Event consumed: headers: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">headers</span><span class="si">}</span><span class="s2">, payload: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">payload</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
</code></pre></div>
<p>Create the <code>FastAPI</code>:</p>
<div class="highlight"><span class="filename">FastAPI</span><pre><span></span><code><span class="c1"># app.py</span>
Expand Down
7 changes: 3 additions & 4 deletions index.html
Original file line number Diff line number Diff line change
Expand Up @@ -539,15 +539,14 @@ <h2 id="installation">Installation</h2>
</code></pre></div>
<h2 id="usage">Usage</h2>
<div class="highlight"><pre><span></span><code><span class="kn">import</span> <span class="nn">aiorun</span>
<span class="kn">from</span> <span class="nn">kstreams</span> <span class="kn">import</span> <span class="n">create_engine</span><span class="p">,</span> <span class="n">Stream</span>
<span class="kn">from</span> <span class="nn">kstreams</span> <span class="kn">import</span> <span class="n">create_engine</span><span class="p">,</span> <span class="n">ConsumerRecord</span>


<span class="n">stream_engine</span> <span class="o">=</span> <span class="n">create_engine</span><span class="p">(</span><span class="n">title</span><span class="o">=</span><span class="s2">&quot;my-stream-engine&quot;</span><span class="p">)</span>

<span class="nd">@stream_engine</span><span class="o">.</span><span class="n">stream</span><span class="p">(</span><span class="s2">&quot;local--kstream&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">consume</span><span class="p">(</span><span class="n">stream</span><span class="p">:</span> <span class="n">Stream</span><span class="p">):</span>
<span class="k">async</span> <span class="k">for</span> <span class="n">cr</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Event consumed: headers: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">headers</span><span class="si">}</span><span class="s2">, payload: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">value</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">consume</span><span class="p">(</span><span class="n">cr</span><span class="p">:</span> <span class="n">ConsumerRecord</span><span class="p">):</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Event consumed: headers: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">headers</span><span class="si">}</span><span class="s2">, payload: </span><span class="si">{</span><span class="n">cr</span><span class="o">.</span><span class="n">value</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>


<span class="k">async</span> <span class="k">def</span> <span class="nf">produce</span><span class="p">():</span>
Expand Down
11 changes: 5 additions & 6 deletions monitoring/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -1208,12 +1208,11 @@ <h2 id="custom-business-metrics">Custom Business Metrics</h2>
<div class="highlight"><pre><span></span><code><span class="n">stream_engine</span> <span class="o">=</span> <span class="n">create_engine</span><span class="p">(</span><span class="n">title</span><span class="o">=</span><span class="s2">&quot;my-engine&quot;</span><span class="p">,</span> <span class="n">monitor</span><span class="o">=</span><span class="n">MyAppPrometheusMonitor</span><span class="p">())</span>

<span class="nd">@stream_engine</span><span class="o">.</span><span class="n">stream</span><span class="p">(</span><span class="s2">&quot;my-special-orders&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">consume_orders_received</span><span class="p">(</span><span class="n">consumer</span><span class="p">):</span>
<span class="k">for</span> <span class="n">cr</span><span class="p">,</span> <span class="n">value</span><span class="p">,</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">consumer</span><span class="p">:</span>
<span class="k">if</span> <span class="n">value</span><span class="o">.</span><span class="n">status</span> <span class="o">==</span> <span class="s2">&quot;NEW&quot;</span><span class="p">:</span>
<span class="n">stream_engine</span><span class="o">.</span><span class="n">monitor</span><span class="o">.</span><span class="n">increase_received</span><span class="p">()</span>
<span class="k">elif</span> <span class="n">value</span><span class="o">.</span><span class="n">status</span> <span class="o">==</span> <span class="s2">&quot;SHIPPED&quot;</span><span class="p">:</span>
<span class="n">stream_engine</span><span class="o">.</span><span class="n">monitor</span><span class="o">.</span><span class="n">increase_shipped</span><span class="p">()</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">consume_orders_received</span><span class="p">(</span><span class="n">cr</span><span class="p">:</span> <span class="n">ConsumerRecord</span><span class="p">):</span>
<span class="k">if</span> <span class="n">cr</span><span class="o">.</span><span class="n">value</span><span class="o">.</span><span class="n">status</span> <span class="o">==</span> <span class="s2">&quot;NEW&quot;</span><span class="p">:</span>
<span class="n">stream_engine</span><span class="o">.</span><span class="n">monitor</span><span class="o">.</span><span class="n">increase_received</span><span class="p">()</span>
<span class="k">elif</span> <span class="n">cr</span><span class="o">.</span><span class="n">value</span><span class="o">.</span><span class="n">status</span> <span class="o">==</span> <span class="s2">&quot;SHIPPED&quot;</span><span class="p">:</span>
<span class="n">stream_engine</span><span class="o">.</span><span class="n">monitor</span><span class="o">.</span><span class="n">increase_shipped</span><span class="p">()</span>
</code></pre></div>
<p>Your app's prometheus would display this data, which you might utilize to build a stylish ✨dashboard✨ interface.</p>
<p>For further details, see the <a href="https://github.com/prometheus/client">Prometheus python client</a> documentation.</p>
Expand Down
Loading

0 comments on commit 120ae87

Please sign in to comment.