Skip to content

Commit

Permalink
deploy: 32e362d
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh committed Jul 1, 2024
1 parent fdd91a7 commit b7a51c2
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 144 deletions.
44 changes: 31 additions & 13 deletions engine/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,7 @@ <h2 id="kstreams.engine.StreamEngine" class="doc doc-heading">

<details class="quote">
<summary>Source code in <code>kstreams/engine.py</code></summary>
<div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span></span><span class="normal"> 25</span>
<span class="normal"> 26</span>
<div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span></span><span class="normal"> 26</span>
<span class="normal"> 27</span>
<span class="normal"> 28</span>
<span class="normal"> 29</span>
Expand Down Expand Up @@ -843,7 +842,17 @@ <h2 id="kstreams.engine.StreamEngine" class="doc doc-heading">
<span class="normal">246</span>
<span class="normal">247</span>
<span class="normal">248</span>
<span class="normal">249</span></pre></div></td><td class="code"><div><pre><span></span><code><span class="k">class</span> <span class="nc">StreamEngine</span><span class="p">:</span>
<span class="normal">249</span>
<span class="normal">250</span>
<span class="normal">251</span>
<span class="normal">252</span>
<span class="normal">253</span>
<span class="normal">254</span>
<span class="normal">255</span>
<span class="normal">256</span>
<span class="normal">257</span>
<span class="normal">258</span>
<span class="normal">259</span></pre></div></td><td class="code"><div><pre><span></span><code><span class="k">class</span> <span class="nc">StreamEngine</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Attributes:</span>
<span class="sd"> backend kstreams.backends.Kafka: Backend to connect. Default `Kafka`</span>
Expand Down Expand Up @@ -953,23 +962,22 @@ <h2 id="kstreams.engine.StreamEngine" class="doc doc-heading">
<span class="k">return</span> <span class="n">metadata</span>

<span class="k">async</span> <span class="k">def</span> <span class="nf">start</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_producer</span><span class="p">()</span>
<span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_streams</span><span class="p">()</span>

<span class="c1"># add the producer and streams to the Monitor</span>
<span class="bp">self</span><span class="o">.</span><span class="n">monitor</span><span class="o">.</span><span class="n">add_producer</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_producer</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">monitor</span><span class="o">.</span><span class="n">add_streams</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_streams</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">monitor</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>

<span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_producer</span><span class="p">()</span>
<span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_streams</span><span class="p">()</span>

<span class="k">async</span> <span class="k">def</span> <span class="nf">stop</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">monitor</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">stop_producer</span><span class="p">()</span>
<span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">stop_streams</span><span class="p">()</span>

<span class="k">async</span> <span class="k">def</span> <span class="nf">stop_producer</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Waiting Producer to STOP....&quot;</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_producer</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">_producer</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Producer has STOPPED....&quot;</span><span class="p">)</span>

<span class="k">async</span> <span class="k">def</span> <span class="nf">start_producer</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">producer_class</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
Expand All @@ -987,13 +995,23 @@ <h2 id="kstreams.engine.StreamEngine" class="doc doc-heading">
<span class="k">for</span> <span class="n">stream</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_streams</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">inspect</span><span class="o">.</span><span class="n">isasyncgenfunction</span><span class="p">(</span><span class="n">stream</span><span class="o">.</span><span class="n">func</span><span class="p">)</span>
<span class="p">]</span>

<span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">_start_streams_on_background_mode</span><span class="p">(</span><span class="n">streams</span><span class="p">)</span>

<span class="k">async</span> <span class="k">def</span> <span class="nf">_start_streams_on_background_mode</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">streams</span><span class="p">:</span> <span class="n">typing</span><span class="o">.</span><span class="n">List</span><span class="p">[</span><span class="n">Stream</span><span class="p">]</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># start all the streams</span>
<span class="k">for</span> <span class="n">stream</span> <span class="ow">in</span> <span class="n">streams</span><span class="p">:</span>
<span class="k">await</span> <span class="n">stream</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="n">asyncio</span><span class="o">.</span><span class="n">create_task</span><span class="p">(</span><span class="n">stream</span><span class="o">.</span><span class="n">start</span><span class="p">())</span>

<span class="c1"># start monitoring</span>
<span class="n">asyncio</span><span class="o">.</span><span class="n">create_task</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">monitor</span><span class="o">.</span><span class="n">start</span><span class="p">())</span>

<span class="k">async</span> <span class="k">def</span> <span class="nf">stop_streams</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Waiting for Streams to STOP....&quot;</span><span class="p">)</span>
<span class="k">for</span> <span class="n">stream</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_streams</span><span class="p">:</span>
<span class="k">await</span> <span class="n">stream</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Streams have STOPPED....&quot;</span><span class="p">)</span>

<span class="k">async</span> <span class="k">def</span> <span class="nf">clean_streams</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">stop_streams</span><span class="p">()</span>
Expand Down Expand Up @@ -1208,8 +1226,7 @@ <h3 id="kstreams.engine.StreamEngine.send" class="doc doc-heading">

<details class="quote">
<summary>Source code in <code>kstreams/engine.py</code></summary>
<div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span></span><span class="normal"> 81</span>
<span class="normal"> 82</span>
<div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span></span><span class="normal"> 82</span>
<span class="normal"> 83</span>
<span class="normal"> 84</span>
<span class="normal"> 85</span>
Expand Down Expand Up @@ -1259,7 +1276,8 @@ <h3 id="kstreams.engine.StreamEngine.send" class="doc doc-heading">
<span class="normal">129</span>
<span class="normal">130</span>
<span class="normal">131</span>
<span class="normal">132</span></pre></div></td><td class="code"><div><pre><span></span><code><span class="k">async</span> <span class="k">def</span> <span class="nf">send</span><span class="p">(</span>
<span class="normal">132</span>
<span class="normal">133</span></pre></div></td><td class="code"><div><pre><span></span><code><span class="k">async</span> <span class="k">def</span> <span class="nf">send</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">topic</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span>
<span class="n">value</span><span class="p">:</span> <span class="n">typing</span><span class="o">.</span><span class="n">Any</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
Expand Down
4 changes: 1 addition & 3 deletions middleware/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,7 @@ <h2 id="middleware-by-default">Middleware by default</h2>
<span class="normal">87</span>
<span class="normal">88</span>
<span class="normal">89</span>
<span class="normal">90</span>
<span class="normal">91</span></pre></div></td><td class="code"><div><pre><span></span><code><span class="k">class</span> <span class="nc">ExceptionMiddleware</span><span class="p">(</span><span class="n">BaseMiddleware</span><span class="p">):</span>
<span class="normal">90</span></pre></div></td><td class="code"><div><pre><span></span><code><span class="k">class</span> <span class="nc">ExceptionMiddleware</span><span class="p">(</span><span class="n">BaseMiddleware</span><span class="p">):</span>
<span class="k">async</span> <span class="k">def</span> <span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">cr</span><span class="p">:</span> <span class="n">ConsumerRecord</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">typing</span><span class="o">.</span><span class="n">Any</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">return</span> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">next_call</span><span class="p">(</span><span class="n">cr</span><span class="p">)</span>
Expand All @@ -783,7 +782,6 @@ <h2 id="middleware-by-default">Middleware by default</h2>
<span class="sa">f</span><span class="s2">&quot;Stream consuming from topics </span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">stream</span><span class="o">.</span><span class="n">topics</span><span class="si">}</span><span class="s2"> CRASHED!!! </span><span class="se">\n\n</span><span class="s2"> &quot;</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">sys</span><span class="o">.</span><span class="n">version_info</span> <span class="o">&gt;=</span> <span class="p">(</span><span class="mi">3</span><span class="p">,</span> <span class="mi">11</span><span class="p">):</span>
<span class="n">exc</span><span class="o">.</span><span class="n">add_note</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Task: </span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">stream</span><span class="o">.</span><span class="n">_consumer_task</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="n">exc</span><span class="o">.</span><span class="n">add_note</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Handler: </span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">stream</span><span class="o">.</span><span class="n">func</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="n">exc</span><span class="o">.</span><span class="n">add_note</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Topics: </span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">stream</span><span class="o">.</span><span class="n">topics</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>

Expand Down
Loading

0 comments on commit b7a51c2

Please sign in to comment.