diff --git a/python/destinations/starter_destination/README.md b/python/destinations/starter_destination/README.md index c890c98b..551eda7f 100644 --- a/python/destinations/starter_destination/README.md +++ b/python/destinations/starter_destination/README.md @@ -1,10 +1,10 @@ # Starter transformation -[This code sample](https://github.com/quixio/quix-samples/tree/main/python/destinations/starter_destination) demonstrates how to consume data from a Kafka topic, perform an operation on that data, then persist or publish that data to an external destination. +[This code sample](https://github.com/quixio/quix-samples/tree/main/python/destinations/starter_destination) demonstrates how use the Quix Streams Sink framework to +consume and alter data from a Kafka topic, and publish these results to an external +destination. -The simple boilerplate code consumes to data from the source topic, prints the content to console output and publishes to any destination based on code that that you must add yourself. - -To use the sample, first modify the Python code to publish to your chosen destination(s). +This is just a template, so add your own operations as required. ## How to run @@ -19,6 +19,12 @@ The code sample uses the following environment variables: - **input**: Name of the input topic to listen to. - **output**: Name of the output topic to write to. +## Using Premade Sinks + +Quix Streams has numerous prebuilt sinks available to use out of the box, so be +sure to [check them out!](https://quix.io/docs/quix-streams/connectors/sinks/index.html) + + ## Contribute Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. diff --git a/python/destinations/starter_destination/main.py b/python/destinations/starter_destination/main.py index ed126cbe..ba054ff5 100644 --- a/python/destinations/starter_destination/main.py +++ b/python/destinations/starter_destination/main.py @@ -1,28 +1,89 @@ +# import the Quix Streams modules for interacting with Kafka. +# For general info, see https://quix.io/docs/quix-streams/introduction.html +# For sinks, see https://quix.io/docs/quix-streams/connectors/sinks/index.html from quixstreams import Application +from quixstreams.sinks import BatchingSink, SinkBatch, SinkBackpressureError + import os +import time + +# for local dev, you can load env vars from a .env file +# from dotenv import load_dotenv +# load_dotenv() + + +class MyDatabaseSink(BatchingSink): + """ + Sinks are a way of writing data from a Kafka topic to a non-Kafka destination, + often some sort of database or file. + + This is a custom placeholder Sink which showcases a simple pattern around + creating your own for a database. + + There are numerous pre-built sinks available to use out of the box; see: + https://quix.io/docs/quix-streams/connectors/sinks/index.html + """ + def _write_to_db(self, data): + """Placeholder for transformations and database write operation""" + ... + + def write(self, batch: SinkBatch): + """ + Every Sink requires a .write method. + + Here is where we attempt to write batches of data (multiple consumed messages, + for the sake of efficiency/speed) to our database. + + Sinks have sanctioned patterns around retrying and handling connections. -from dotenv import load_dotenv -load_dotenv() + See https://quix.io/docs/quix-streams/connectors/sinks/custom-sinks.html for + more details. + """ + attempts_remaining = 3 + data = [item.value for item in batch] + while attempts_remaining: + try: + return self._write_to_db(data) + except ConnectionError: + # Maybe we just failed to connect, do a short wait and try again + # We can't repeat forever; the consumer will eventually time out + attempts_remaining -= 1 + if attempts_remaining: + time.sleep(3) + except TimeoutError: + # Maybe the server is busy, do a sanctioned extended pause + # Always safe to do, but will require re-consuming the data. + raise SinkBackpressureError( + retry_after=30.0, + topic=batch.topic, + partition=batch.partition, + ) + raise Exception("Error while writing to database") -# you decide what happens here! -def sink(message): - value = message['mykey'] - # write_to_db(value) # implement your logic to write data or send alerts etc - # for more help using QuixStreams see the docs: - # https://quix.io/docs/quix-streams/introduction.html +def main(): + """ Here we will set up our Application. """ -app = Application(consumer_group="destination-v1", auto_offset_reset = "latest") + # Setup necessary objects + app = Application( + consumer_group="my_db_destination", + auto_create_topics=True, + auto_offset_reset="earliest" + ) + my_db_sink = MyDatabaseSink() + input_topic = app.topic(name=os.environ["input"]) + sdf = app.dataframe(topic=input_topic) -input_topic = app.topic(os.environ["input"]) + # Do SDF operations/transformations + sdf = sdf.apply(lambda row: row).print(metadata=True) -sdf = app.dataframe(input_topic) + # Finish by calling StreamingDataFrame.sink() + sdf.sink(my_db_sink) -# call the sink function for every message received. -sdf = sdf.update(sink) + # With our pipeline defined, now run the Application + app.run() -# you can print the data row if you want to see what's going on. -sdf.print(metadata=True) +# It is recommended to execute Applications under a conditional main if __name__ == "__main__": - app.run() \ No newline at end of file + main() diff --git a/python/destinations/starter_destination/requirements.txt b/python/destinations/starter_destination/requirements.txt index 365f9bd4..eecebca3 100644 --- a/python/destinations/starter_destination/requirements.txt +++ b/python/destinations/starter_destination/requirements.txt @@ -1,2 +1,2 @@ -quixstreams==3.2.1 +quixstreams==3.10.0 python-dotenv \ No newline at end of file diff --git a/python/sources/starter_source/README.md b/python/sources/starter_source/README.md index d588639b..f9eb512b 100644 --- a/python/sources/starter_source/README.md +++ b/python/sources/starter_source/README.md @@ -1,6 +1,9 @@ # Starter data source -[This code sample](https://github.com/quixio/quix-samples/tree/main/python/sources/starter_source) demonstrates how to publish hard-coded lines of JSON data to a Kafka topic in Quix. +[This code sample](https://github.com/quixio/quix-samples/tree/main/python/sources/starter_source) demonstrates how to use the Quix Streams Source framework to publish +hard-coded lines of JSON data to a Kafka topic. + +This boilerplate runs in Quix Cloud without any necessary alterations. ## How to run @@ -14,7 +17,12 @@ Clicking `Edit code` on the Sample, forks the project to your own Git repo so yo The code sample uses the following environment variables: -- **Topic**: Name of the output topic to write into. +- **output**: Name of the output topic to write into. + +## Using Premade Sources + +Quix Streams has numerous prebuilt sources available to use out of the box, so be +sure to [check them out!](https://quix.io/docs/quix-streams/connectors/sources/index.html) ## Contribute diff --git a/python/sources/starter_source/main.py b/python/sources/starter_source/main.py index 43e3084b..452fca0d 100644 --- a/python/sources/starter_source/main.py +++ b/python/sources/starter_source/main.py @@ -1,32 +1,31 @@ -from quixstreams import Application # import the Quix Streams modules for interacting with Kafka: -# (see https://quix.io/docs/quix-streams/v2-0-latest/api-reference/quixstreams.html for more details) +# import the Quix Streams modules for interacting with Kafka. +# For general info, see https://quix.io/docs/quix-streams/introduction.html +# For sources, see https://quix.io/docs/quix-streams/connectors/sources/index.html +from quixstreams import Application +from quixstreams.sources import Source -# import additional modules as needed import os -import json -# for local dev, load env vars from a .env file -from dotenv import load_dotenv -load_dotenv() +# for local dev, you can load env vars from a .env file +# from dotenv import load_dotenv +# load_dotenv() -app = Application(consumer_group="data_source", auto_create_topics=True) # create an Application -# define the topic using the "output" environment variable -topic_name = os.environ["output"] -topic = app.topic(topic_name) +class MemoryUsageGenerator(Source): + """ + A Quix Streams Source enables Applications to read data from something other + than Kafka and publish it to a desired Kafka topic. + You provide a Source to an Application, which will handle the Source's lifecycle. -# this function loads the file and sends each row to the publisher -def get_data(): - """ - A function to generate data from a hardcoded dataset in an endless manner. - It returns a list of tuples with a message_key and rows + In this case, we have built a new Source that reads from a static set of + already loaded json data representing a server's memory usage over time. + + There are numerous pre-built sources available to use out of the box; see: + https://quix.io/docs/quix-streams/connectors/sources/index.html """ - # define the hardcoded dataset - # this data is fake data representing used % of memory allocation over time - # there is one row of data every 1 to 2 seconds - data = [ + memory_allocation_data = [ {"m": "mem", "host": "host1", "used_percent": "64.56", "time": "1577836800000000000"}, {"m": "mem", "host": "host2", "used_percent": "71.89", "time": "1577836801000000000"}, {"m": "mem", "host": "host1", "used_percent": "63.27", "time": "1577836803000000000"}, @@ -34,47 +33,55 @@ def get_data(): {"m": "mem", "host": "host1", "used_percent": "62.98", "time": "1577836806000000000"}, {"m": "mem", "host": "host2", "used_percent": "74.33", "time": "1577836808000000000"}, {"m": "mem", "host": "host1", "used_percent": "65.21", "time": "1577836810000000000"}, - {"m": "mem", "host": "host2", "used_percent": "70.88", "time": "1577836812000000000"}, - {"m": "mem", "host": "host1", "used_percent": "64.61", "time": "1577836814000000000"}, - {"m": "mem", "host": "host2", "used_percent": "72.56", "time": "1577836816000000000"}, - {"m": "mem", "host": "host1", "used_percent": "63.77", "time": "1577836818000000000"}, - {"m": "mem", "host": "host2", "used_percent": "73.21", "time": "1577836820000000000"} ] - # create a list of tuples with row_data - data_with_id = [(row_data) for row_data in data] - - return data_with_id + def run(self): + """ + Each Source must have a `run` method. + + It will include the logic behind your source, contained within a + "while self.running" block for exiting when its parent Application stops. + + There a few methods on a Source available for producing to Kafka, like + `self.serialize` and `self.produce`. + """ + data = iter(self.memory_allocation_data) + # either break when the app is stopped, or data is exhausted + while self.running: + try: + event = next(data) + event_serialized = self.serialize(key=event["host"], value=event) + self.produce(key=event_serialized.key, value=event_serialized.value) + print("Source produced event successfully!") + except StopIteration: + print("Source finished producing messages.") + return def main(): - """ - Read data from the hardcoded dataset and publish it to Kafka - """ - - # create a pre-configured Producer object. - with app.get_producer() as producer: - # iterate over the data from the hardcoded dataset - data_with_id = get_data() - for row_data in data_with_id: + """ Here we will set up our Application. """ - json_data = json.dumps(row_data) # convert the row to JSON + # Setup necessary objects + app = Application(consumer_group="data_producer", auto_create_topics=True) + memory_usage_source = MemoryUsageGenerator(name="memory-usage-producer") + output_topic = app.topic(name=os.environ["output"]) - # publish the data to the topic - producer.produce( - topic=topic.name, - key=row_data['host'], - value=json_data, - ) + # --- Setup Source --- + # OPTION 1: no additional processing with a StreamingDataFrame + # Generally the recommended approach; no additional operations needed! + app.add_source(source=memory_usage_source, topic=output_topic) - # for more help using QuixStreams see docs: - # https://quix.io/docs/quix-streams/introduction.html + # OPTION 2: additional processing with a StreamingDataFrame + # Useful for consolidating additional data cleanup into 1 Application. + # In this case, do NOT use `app.add_source()`. + # sdf = app.dataframe(source=source) + # + # sdf.to_topic(topic=output_topic) # you must do this to output your data! - print("All rows published") + # With our pipeline defined, now run the Application + app.run() +# Sources require execution under a conditional main if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print("Exiting.") \ No newline at end of file + main() diff --git a/python/sources/starter_source/requirements.txt b/python/sources/starter_source/requirements.txt index 365f9bd4..eecebca3 100644 --- a/python/sources/starter_source/requirements.txt +++ b/python/sources/starter_source/requirements.txt @@ -1,2 +1,2 @@ -quixstreams==3.2.1 +quixstreams==3.10.0 python-dotenv \ No newline at end of file diff --git a/python/transformations/starter_transformation/README.md b/python/transformations/starter_transformation/README.md index f53a235e..4a018f28 100644 --- a/python/transformations/starter_transformation/README.md +++ b/python/transformations/starter_transformation/README.md @@ -1,8 +1,10 @@ # Starter transformation -[This code sample](https://github.com/quixio/quix-samples/tree/main/python/transformations/starter_transformation) demonstrates how to consume data from a topic, apply a simple transformation to that data and publish the result to an output topic (while printing content to the console output). +[This code sample](https://github.com/quixio/quix-samples/tree/main/python/transformations/starter_transformation) demonstrates how to consume and transform data from a Kafka topic +and publish these results to another Kafka topic, all using our `StreamingDataFrame`. -Modify the Python code to transform your data on the fly. +This boilerplate will run in Quix Cloud but largely has placeholder operations, so you +will need to add your own to do something besides printing the data to console! ## How to run @@ -17,6 +19,12 @@ The code sample uses the following environment variables: - **input**: Name of the input topic to listen to. - **output**: Name of the output topic to write to. +## Possible `StreamingDataFrame` Operations + +Many different operations and transformations are available, so +be sure to [explore what's possible](https://quix.io/docs/quix-streams/processing.html)! + + ## Contribute Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. diff --git a/python/transformations/starter_transformation/main.py b/python/transformations/starter_transformation/main.py index 860b1385..82a0949f 100644 --- a/python/transformations/starter_transformation/main.py +++ b/python/transformations/starter_transformation/main.py @@ -1,23 +1,49 @@ -import os +# import the Quix Streams modules for interacting with Kafka. +# For general info, see https://quix.io/docs/quix-streams/introduction.html from quixstreams import Application +import os + # for local dev, load env vars from a .env file -from dotenv import load_dotenv -load_dotenv() +# from dotenv import load_dotenv +# load_dotenv() -app = Application(consumer_group="transformation-v1", auto_offset_reset="earliest") -input_topic = app.topic(os.environ["input"]) -output_topic = app.topic(os.environ["output"]) +def main(): + """ + Transformations generally read from, and produce to, Kafka topics. -sdf = app.dataframe(input_topic) + They are conducted with Applications and their accompanying StreamingDataFrames + which define what transformations to perform on incoming data. -# put transformation logic here -# see docs for what you can do -# https://quix.io/docs/get-started/quixtour/process-threshold.html + Be sure to explicitly produce output to any desired topic(s); it does not happen + automatically! -sdf.print() -sdf.to_topic(output_topic) + To learn about what operations are possible, the best place to start is: + https://quix.io/docs/quix-streams/processing.html + """ -if __name__ == "__main__": + # Setup necessary objects + app = Application( + consumer_group="my_transformation", + auto_create_topics=True, + auto_offset_reset="earliest" + ) + input_topic = app.topic(name=os.environ["input"]) + output_topic = app.topic(name=os.environ["output"]) + sdf = app.dataframe(topic=input_topic) + + # Do StreamingDataFrame operations/transformations here + sdf = sdf.apply(lambda row: row).filter(lambda row: True) + sdf = sdf.print(metadata=True) + + # Finish off by writing to the final result to the output topic + sdf.to_topic(output_topic) + + # With our pipeline defined, now run the Application app.run() + + +# It is recommended to execute Applications under a conditional main +if __name__ == "__main__": + main() diff --git a/python/transformations/starter_transformation/requirements.txt b/python/transformations/starter_transformation/requirements.txt index 365f9bd4..eecebca3 100644 --- a/python/transformations/starter_transformation/requirements.txt +++ b/python/transformations/starter_transformation/requirements.txt @@ -1,2 +1,2 @@ -quixstreams==3.2.1 +quixstreams==3.10.0 python-dotenv \ No newline at end of file