Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update starter sample templates to v3.10.0 #596

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions python/destinations/starter_destination/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Starter transformation

[This code sample](https://github.com/quixio/quix-samples/tree/main/python/destinations/starter_destination) demonstrates how to consume data from a Kafka topic, perform an operation on that data, then persist or publish that data to an external destination.
[This code sample](https://github.com/quixio/quix-samples/tree/main/python/destinations/starter_destination) demonstrates how use the Quix Streams Sink framework to
consume and alter data from a Kafka topic, and publish these results to an external
destination.

The simple boilerplate code consumes to data from the source topic, prints the content to console output and publishes to any destination based on code that that you must add yourself.

To use the sample, first modify the Python code to publish to your chosen destination(s).
This is just a template, so add your own operations as required.

## How to run

Expand All @@ -19,6 +19,12 @@ The code sample uses the following environment variables:
- **input**: Name of the input topic to listen to.
- **output**: Name of the output topic to write to.

## Using Premade Sinks

Quix Streams has numerous prebuilt sinks available to use out of the box, so be
sure to [check them out!](https://quix.io/docs/quix-streams/connectors/sinks/index.html)


## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.
Expand Down
93 changes: 77 additions & 16 deletions python/destinations/starter_destination/main.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,89 @@
# import the Quix Streams modules for interacting with Kafka.
# For general info, see https://quix.io/docs/quix-streams/introduction.html
# For sinks, see https://quix.io/docs/quix-streams/connectors/sinks/index.html
from quixstreams import Application
from quixstreams.sinks import BatchingSink, SinkBatch, SinkBackpressureError

import os
import time

# for local dev, you can load env vars from a .env file
# from dotenv import load_dotenv
# load_dotenv()


class MyDatabaseSink(BatchingSink):
"""
Sinks are a way of writing data from a Kafka topic to a non-Kafka destination,
often some sort of database or file.

This is a custom placeholder Sink which showcases a simple pattern around
creating your own for a database.

There are numerous pre-built sinks available to use out of the box; see:
https://quix.io/docs/quix-streams/connectors/sinks/index.html
"""
def _write_to_db(self, data):
"""Placeholder for transformations and database write operation"""
...

def write(self, batch: SinkBatch):
"""
Every Sink requires a .write method.

Here is where we attempt to write batches of data (multiple consumed messages,
for the sake of efficiency/speed) to our database.

Sinks have sanctioned patterns around retrying and handling connections.

from dotenv import load_dotenv
load_dotenv()
See https://quix.io/docs/quix-streams/connectors/sinks/custom-sinks.html for
more details.
"""
attempts_remaining = 3
data = [item.value for item in batch]
while attempts_remaining:
try:
return self._write_to_db(data)
except ConnectionError:
# Maybe we just failed to connect, do a short wait and try again
# We can't repeat forever; the consumer will eventually time out
attempts_remaining -= 1
if attempts_remaining:
time.sleep(3)
except TimeoutError:
# Maybe the server is busy, do a sanctioned extended pause
# Always safe to do, but will require re-consuming the data.
raise SinkBackpressureError(
retry_after=30.0,
topic=batch.topic,
partition=batch.partition,
)
raise Exception("Error while writing to database")

# you decide what happens here!
def sink(message):
value = message['mykey']
# write_to_db(value) # implement your logic to write data or send alerts etc

# for more help using QuixStreams see the docs:
# https://quix.io/docs/quix-streams/introduction.html
def main():
""" Here we will set up our Application. """

app = Application(consumer_group="destination-v1", auto_offset_reset = "latest")
# Setup necessary objects
app = Application(
consumer_group="my_db_destination",
auto_create_topics=True,
auto_offset_reset="earliest"
)
my_db_sink = MyDatabaseSink()
input_topic = app.topic(name=os.environ["input"])
sdf = app.dataframe(topic=input_topic)

input_topic = app.topic(os.environ["input"])
# Do SDF operations/transformations
sdf = sdf.apply(lambda row: row).print(metadata=True)

sdf = app.dataframe(input_topic)
# Finish by calling StreamingDataFrame.sink()
sdf.sink(my_db_sink)

# call the sink function for every message received.
sdf = sdf.update(sink)
# With our pipeline defined, now run the Application
app.run()

# you can print the data row if you want to see what's going on.
sdf.print(metadata=True)

# It is recommended to execute Applications under a conditional main
if __name__ == "__main__":
app.run()
main()
2 changes: 1 addition & 1 deletion python/destinations/starter_destination/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
quixstreams==3.2.1
quixstreams==3.10.0
python-dotenv
12 changes: 10 additions & 2 deletions python/sources/starter_source/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Starter data source

[This code sample](https://github.com/quixio/quix-samples/tree/main/python/sources/starter_source) demonstrates how to publish hard-coded lines of JSON data to a Kafka topic in Quix.
[This code sample](https://github.com/quixio/quix-samples/tree/main/python/sources/starter_source) demonstrates how to use the Quix Streams Source framework to publish
hard-coded lines of JSON data to a Kafka topic.

This boilerplate runs in Quix Cloud without any necessary alterations.

## How to run

Expand All @@ -14,7 +17,12 @@ Clicking `Edit code` on the Sample, forks the project to your own Git repo so yo

The code sample uses the following environment variables:

- **Topic**: Name of the output topic to write into.
- **output**: Name of the output topic to write into.

## Using Premade Sources

Quix Streams has numerous prebuilt sources available to use out of the box, so be
sure to [check them out!](https://quix.io/docs/quix-streams/connectors/sources/index.html)

## Contribute

Expand Down
111 changes: 59 additions & 52 deletions python/sources/starter_source/main.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,87 @@
from quixstreams import Application # import the Quix Streams modules for interacting with Kafka:
# (see https://quix.io/docs/quix-streams/v2-0-latest/api-reference/quixstreams.html for more details)
# import the Quix Streams modules for interacting with Kafka.
# For general info, see https://quix.io/docs/quix-streams/introduction.html
# For sources, see https://quix.io/docs/quix-streams/connectors/sources/index.html
from quixstreams import Application
from quixstreams.sources import Source

# import additional modules as needed
import os
import json

# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()
# for local dev, you can load env vars from a .env file
# from dotenv import load_dotenv
# load_dotenv()

app = Application(consumer_group="data_source", auto_create_topics=True) # create an Application

# define the topic using the "output" environment variable
topic_name = os.environ["output"]
topic = app.topic(topic_name)
class MemoryUsageGenerator(Source):
"""
A Quix Streams Source enables Applications to read data from something other
than Kafka and publish it to a desired Kafka topic.

You provide a Source to an Application, which will handle the Source's lifecycle.

# this function loads the file and sends each row to the publisher
def get_data():
"""
A function to generate data from a hardcoded dataset in an endless manner.
It returns a list of tuples with a message_key and rows
In this case, we have built a new Source that reads from a static set of
already loaded json data representing a server's memory usage over time.

There are numerous pre-built sources available to use out of the box; see:
https://quix.io/docs/quix-streams/connectors/sources/index.html
"""

# define the hardcoded dataset
# this data is fake data representing used % of memory allocation over time
# there is one row of data every 1 to 2 seconds
data = [
memory_allocation_data = [
{"m": "mem", "host": "host1", "used_percent": "64.56", "time": "1577836800000000000"},
{"m": "mem", "host": "host2", "used_percent": "71.89", "time": "1577836801000000000"},
{"m": "mem", "host": "host1", "used_percent": "63.27", "time": "1577836803000000000"},
{"m": "mem", "host": "host2", "used_percent": "73.45", "time": "1577836804000000000"},
{"m": "mem", "host": "host1", "used_percent": "62.98", "time": "1577836806000000000"},
{"m": "mem", "host": "host2", "used_percent": "74.33", "time": "1577836808000000000"},
{"m": "mem", "host": "host1", "used_percent": "65.21", "time": "1577836810000000000"},
{"m": "mem", "host": "host2", "used_percent": "70.88", "time": "1577836812000000000"},
{"m": "mem", "host": "host1", "used_percent": "64.61", "time": "1577836814000000000"},
{"m": "mem", "host": "host2", "used_percent": "72.56", "time": "1577836816000000000"},
{"m": "mem", "host": "host1", "used_percent": "63.77", "time": "1577836818000000000"},
{"m": "mem", "host": "host2", "used_percent": "73.21", "time": "1577836820000000000"}
]

# create a list of tuples with row_data
data_with_id = [(row_data) for row_data in data]

return data_with_id
def run(self):
"""
Each Source must have a `run` method.

It will include the logic behind your source, contained within a
"while self.running" block for exiting when its parent Application stops.

There a few methods on a Source available for producing to Kafka, like
`self.serialize` and `self.produce`.
"""
data = iter(self.memory_allocation_data)
# either break when the app is stopped, or data is exhausted
while self.running:
try:
event = next(data)
event_serialized = self.serialize(key=event["host"], value=event)
self.produce(key=event_serialized.key, value=event_serialized.value)
print("Source produced event successfully!")
except StopIteration:
print("Source finished producing messages.")
return


def main():
"""
Read data from the hardcoded dataset and publish it to Kafka
"""

# create a pre-configured Producer object.
with app.get_producer() as producer:
# iterate over the data from the hardcoded dataset
data_with_id = get_data()
for row_data in data_with_id:
""" Here we will set up our Application. """

json_data = json.dumps(row_data) # convert the row to JSON
# Setup necessary objects
app = Application(consumer_group="data_producer", auto_create_topics=True)
memory_usage_source = MemoryUsageGenerator(name="memory-usage-producer")
output_topic = app.topic(name=os.environ["output"])

# publish the data to the topic
producer.produce(
topic=topic.name,
key=row_data['host'],
value=json_data,
)
# --- Setup Source ---
# OPTION 1: no additional processing with a StreamingDataFrame
# Generally the recommended approach; no additional operations needed!
app.add_source(source=memory_usage_source, topic=output_topic)

# for more help using QuixStreams see docs:
# https://quix.io/docs/quix-streams/introduction.html
# OPTION 2: additional processing with a StreamingDataFrame
# Useful for consolidating additional data cleanup into 1 Application.
# In this case, do NOT use `app.add_source()`.
# sdf = app.dataframe(source=source)
# <sdf operations here>
# sdf.to_topic(topic=output_topic) # you must do this to output your data!

print("All rows published")
# With our pipeline defined, now run the Application
app.run()


# Sources require execution under a conditional main
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Exiting.")
main()
2 changes: 1 addition & 1 deletion python/sources/starter_source/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
quixstreams==3.2.1
quixstreams==3.10.0
python-dotenv
12 changes: 10 additions & 2 deletions python/transformations/starter_transformation/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Starter transformation

[This code sample](https://github.com/quixio/quix-samples/tree/main/python/transformations/starter_transformation) demonstrates how to consume data from a topic, apply a simple transformation to that data and publish the result to an output topic (while printing content to the console output).
[This code sample](https://github.com/quixio/quix-samples/tree/main/python/transformations/starter_transformation) demonstrates how to consume and transform data from a Kafka topic
and publish these results to another Kafka topic, all using our `StreamingDataFrame`.

Modify the Python code to transform your data on the fly.
This boilerplate will run in Quix Cloud but largely has placeholder operations, so you
will need to add your own to do something besides printing the data to console!

## How to run

Expand All @@ -17,6 +19,12 @@ The code sample uses the following environment variables:
- **input**: Name of the input topic to listen to.
- **output**: Name of the output topic to write to.

## Possible `StreamingDataFrame` Operations

Many different operations and transformations are available, so
be sure to [explore what's possible](https://quix.io/docs/quix-streams/processing.html)!


## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.
Expand Down
52 changes: 39 additions & 13 deletions python/transformations/starter_transformation/main.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,49 @@
import os
# import the Quix Streams modules for interacting with Kafka.
# For general info, see https://quix.io/docs/quix-streams/introduction.html
from quixstreams import Application

import os

# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()
# from dotenv import load_dotenv
# load_dotenv()

app = Application(consumer_group="transformation-v1", auto_offset_reset="earliest")

input_topic = app.topic(os.environ["input"])
output_topic = app.topic(os.environ["output"])
def main():
"""
Transformations generally read from, and produce to, Kafka topics.

sdf = app.dataframe(input_topic)
They are conducted with Applications and their accompanying StreamingDataFrames
which define what transformations to perform on incoming data.

# put transformation logic here
# see docs for what you can do
# https://quix.io/docs/get-started/quixtour/process-threshold.html
Be sure to explicitly produce output to any desired topic(s); it does not happen
automatically!

sdf.print()
sdf.to_topic(output_topic)
To learn about what operations are possible, the best place to start is:
https://quix.io/docs/quix-streams/processing.html
"""

if __name__ == "__main__":
# Setup necessary objects
app = Application(
consumer_group="my_transformation",
auto_create_topics=True,
auto_offset_reset="earliest"
)
input_topic = app.topic(name=os.environ["input"])
output_topic = app.topic(name=os.environ["output"])
sdf = app.dataframe(topic=input_topic)

# Do StreamingDataFrame operations/transformations here
sdf = sdf.apply(lambda row: row).filter(lambda row: True)
sdf = sdf.print(metadata=True)

# Finish off by writing to the final result to the output topic
sdf.to_topic(output_topic)

# With our pipeline defined, now run the Application
app.run()


# It is recommended to execute Applications under a conditional main
if __name__ == "__main__":
main()
Loading