diff --git a/.env-example b/.env-example index 2875061..5f0be1d 100644 --- a/.env-example +++ b/.env-example @@ -8,6 +8,7 @@ MONGODB_CONNECTION_URI="" # Hookdeck Project API Key # Hookdeck Dashboard -> Settings -> Secrets HOOKDECK_PROJECT_API_KEY="" +HOOKDECK_WEBHOOK_SECRET= # Replicate API Token REPLICATE_API_TOKEN="" diff --git a/allthethings/generators.py b/allthethings/generators.py index 187fca5..24b6041 100644 --- a/allthethings/generators.py +++ b/allthethings/generators.py @@ -23,7 +23,6 @@ def generate(self, id, text): class SyncEmbeddingsGenerator: - def generate(self, text): payload = { "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305", @@ -35,7 +34,7 @@ def generate(self, text): "https://api.replicate.com/v1/predictions", headers={**Config.REPLICATE_API_AUTH_HEADERS, "Prefer": "wait"}, json=payload, - timeout=60, + timeout=180, ) return response.json() diff --git a/app.py b/app.py index 9544730..6c52a37 100644 --- a/app.py +++ b/app.py @@ -1,12 +1,16 @@ import httpx from urllib.parse import urlparse from bson import ObjectId +import hmac +import hashlib +import base64 from flask import Flask, jsonify, request, render_template, redirect, url_for, flash from config import Config from allthethings.mongo import Database from allthethings.processors import get_asset_processor + from allthethings.generators import ( AsyncEmbeddingsGenerator, SyncEmbeddingsGenerator, @@ -136,18 +140,19 @@ def process(): @app.route("/search", methods=["GET"]) def search(): - return render_template("search.html", results=[]) - - -@app.route("/search", methods=["POST"]) -def search_post(): - query = request.form["query"] + query = request.args.get("query") + if query is None: + return render_template("search.html", results=[]) app.logger.info("Query submitted") app.logger.debug(query) results = query_vector_search(query) + if results is None: + flash("Search embeddings generation failed") + return redirect(url_for("search")) + results = format_results(results) # TODO: look into warning logged here @@ -194,8 +199,17 @@ def query_vector_search(q): # Because the search is user-driven, we use the synchronous generator generator = SyncEmbeddingsGenerator() - generator_response = generator.generate(q) - app.logger.debug(generator_response) + try: + generator_response = generator.generate(q) + app.logger.debug(generator_response) + except Exception as e: + app.logger.error("Error generating embeddings: %s", e) + return None + + if generator_response["output"] is None: + app.logger.debug("Embeddings generation timed out") + return None + query_embedding = generator_response["output"][0]["embedding"] app.logger.info("Query embedding generated") @@ -205,8 +219,8 @@ def query_vector_search(q): "index": "vector_index", "path": "embedding", "queryVector": query_embedding, - "numCandidates": 10, - "limit": 2, + "numCandidates": 100, + "limit": 10, } new_search_query = {"$vectorSearch": vs_query} @@ -235,8 +249,30 @@ def query_vector_search(q): return res +def verify_webhook(request): + if Config.HOOKDECK_WEBHOOK_SECRET is None: + app.logger.error("No HOOKDECK_WEBHOOK_SECRET found.") + return False + + hmac_header = request.headers.get("x-hookdeck-signature") + + hash = base64.b64encode( + hmac.new( + Config.HOOKDECK_WEBHOOK_SECRET.encode(), request.data, hashlib.sha256 + ).digest() + ).decode() + + verified = hash == hmac_header + app.logger.debug("Webhook signature verification: %s", verified) + return verified + + @app.route("/webhooks/audio/", methods=["POST"]) def webhook_audio(id): + if not verify_webhook(request): + app.logger.error("Webhook signature verification failed") + return jsonify({"error": "Webhook signature verification failed"}), 401 + payload = request.json app.logger.info("Audio payload received for id %s", id) app.logger.debug(payload) @@ -267,13 +303,17 @@ def webhook_audio(id): app.logger.info("Transcription updated") app.logger.debug(result) - request_embeddings(result["_id"]) + request_embeddings(id) return "OK" @app.route("/webhooks/embedding/", methods=["POST"]) def webhook_embeddings(id): + if not verify_webhook(request): + app.logger.error("Webhook signature verification failed") + return jsonify({"error": "Webhook signature verification failed"}), 401 + payload = request.json app.logger.info("Embeddings payload recieved") app.logger.debug(payload) diff --git a/config.py b/config.py index f40d221..04617bd 100644 --- a/config.py +++ b/config.py @@ -11,6 +11,7 @@ class Config: DB_NAME = "iaat" COLLECTION_NAME = "assets" + HOOKDECK_WEBHOOK_SECRET = os.getenv("HOOKDECK_WEBHOOK_SECRET") HOOKDECK_QUEUE_API_KEY_HEADER_NAME = "x-iaat-queue-api-key" HOOKDECK_REPLICATE_API_QUEUE_API_KEY = os.getenv( "HOOKDECK_REPLICATE_API_QUEUE_API_KEY" diff --git a/templates/search.html b/templates/search.html index 461edaa..7cdf0bf 100644 --- a/templates/search.html +++ b/templates/search.html @@ -24,7 +24,7 @@ {% block content %}
-
+
diff --git a/tutorial/README.md b/tutorial/README.md index dadfa30..c5811b9 100644 --- a/tutorial/README.md +++ b/tutorial/README.md @@ -75,7 +75,8 @@ Update the values within `.env` as follows: - `SECRET_KEY`: See the [`SECRET_KEY` Flask docs](https://flask.palletsprojects.com/en/stable/config/#SECRET_KEY). - `MONGODB_CONNECTION_URI`: Populate with a MongoDB Atlas connection string with a **Read and write to any database** role. See the [Get Connection String docs](https://www.mongodb.com/docs/guides/atlas/connection-string/). -- `HOOKDECK_PROJECT_API_KEY`: Get an API Key from the **Project** -> **Settings** -> **Secrets** section of the [Hookdeck Dashboard](https://dashboard.hookdeck.com?ref=mongodb-iatt). +- `HOOKDECK_PROJECT_API_KEY`: Get an **API Key** from the **Project** -> **Settings** -> **Secrets** section of the [Hookdeck Dashboard](https://dashboard.hookdeck.com?ref=mongodb-iatt). +- `HOOKDECK_WEBHOOK_SECRET`: Get a **Signing Secret** from the **Project** -> **Settings** -> **Secrets** section of the [Hookdeck Dashboard](https://dashboard.hookdeck.com?ref=mongodb-iatt). - `REPLICATE_API_TOKEN`: [Create an API token](https://replicate.com/account/api-tokens) in the Replicate dashboard. - `REPLICATE_WEBHOOKS_SECRET`: Go to the [Webhooks section](https://replicate.com/account/webhook) of the Replicate dashboard and click the **Show signing key** button. - `HOOKDECK_REPLICATE_API_QUEUE_API_KEY`, `HOOKDECK_REPLICATE_API_QUEUE_URL`, `AUDIO_WEBHOOK_URL` and `EMBEDDINGS_WEBHOOK_URL` will be automatically populated in the next step. @@ -636,25 +637,50 @@ Flash a success message to the user and redirect them to the index page: At this point, the Flask application has offloaded all the work to Replicate and, from a data journey perspective, we're waiting for the predication completed webhook. - +### H3: Handle Audio to Text Prediction Completion Webhook -### H3: Handle Prediction Completion Webhook +Once Replicate completes the predication, it makes a webhook callback to Hookdeck. Hookdeck instantly ingests the webhook, verifies the event came from Replicate, and pushes the data onto a queue for processing and delivery. Based on the current Hookdeck Connection setup, the webhook event is delivered to the CLI and then to the `/webhooks/audio/` endpoint of the Flask application. Let's look at the code that handles the `/webhooks/audio/` request. -Once Replicate completes the predication, it makes a webhook callback to Hookdeck. Hookdeck instantly verifies the event came from Replicate, ingests the webhook, pushing the data onto a queue for processing and delivery. Based on the current Hookdeck Connection setup, the webhook event is delivered to the CLI and then to the `/webhooks/audio` endpoint of the Flask application. Let's look at the code that handles the `/webhooks/audio` request. - -First, define the `/webhooks/audio` route in `app.py`: +Here's the `/webhooks/audio/` route definition in `app.py`: ```py -@app.route("/webhooks/audio", methods=["POST"]) -def webhook_audio(): +@app.route("/webhooks/audio/", methods=["POST"]) +def webhook_audio(id): + if not verify_webhook(request): + app.logger.error("Webhook signature verification failed") + return jsonify({"error": "Webhook signature verification failed"}), 401 + payload = request.json - app.logger.info("Audio payload received") + app.logger.info("Audio payload received for id %s", id) app.logger.debug(payload) ``` -This route handles `POST` requests to the `/webhooks/audio` endpoint. It retrieves the JSON payload from the webhook callback from Replicate. +This route handles `POST` requests to the `/webhooks/audio/` endpoint. The `id` path parameter represents the asset in the MongoDB database that the audio callback is for. The JSON payload from the webhook callback from Replicate. -Next, determine the processing status based on the presence of an error in the payload: +Before handling the webhook, we check that the webhook came from Hookdeck via a `verify_webhook` function. If the verification fails a `401` response is returned. Here's the code to verify the webhook: + +```py +def verify_webhook(request): + if Config.HOOKDECK_WEBHOOK_SECRET is None: + app.logger.error("No HOOKDECK_WEBHOOK_SECRET found.") + return False + + hmac_header = request.headers.get("x-hookdeck-signature") + + hash = base64.b64encode( + hmac.new( + Config.HOOKDECK_WEBHOOK_SECRET.encode(), request.data, hashlib.sha256 + ).digest() + ).decode() + + verified = hash == hmac_header + app.logger.debug("Webhook signature verification: %s", verified) + return verified +``` + +This reads the Hookdeck webhook secret stored in the `HOOKDECK_WEBHOOK_SECRET` environment variable, generates a hash using the secret from the inbound webhook data, and compares it with the hash that was sent in the `x-hookdeck-signature` header. If they match, the webhook is verified. + +Next, the processing status is determined based on the presence of an error in the payload: ```py database = Database() @@ -667,11 +693,11 @@ Next, determine the processing status based on the presence of an error in the p If an error is present, the status is set to `PROCESSING_ERROR`; otherwise, it is set to `PROCESSED`. -Update the database with the transcription results and the processing status: +The database is updated with the transcription results and the processing status: ```py result = collection.find_one_and_update( - filter={"replicate_process_id": payload["id"]}, + filter={"_id": ObjectId(id)}, update={ "$set": { "status": status, @@ -683,9 +709,9 @@ Update the database with the transcription results and the processing status: ) ``` -This finds the document in the database with the matching `replicate_process_id` and updates it with the new status, transcription `text`, and the entire payload. +This finds the document in the database with the matching `id` and updates it with the new status, transcription `text`, and the entire Replicate response payload. -If the document wasn't found, log an error and return a `404` response: +Next, we check to ensure the document was found: ```py if result is None: @@ -695,7 +721,7 @@ If the document wasn't found, log an error and return a `404` response: return jsonify({"error": "No document found to add audio transcript"}), 404 ``` -If no document is found for the given `replicate_process_id`, an error is logged, and a JSON response with an error message is returned. The `404` response will inform Hookdeck that although the request did not succeed, the request should not be retried. +If no document is found for the given `id`, an error is logged, and a JSON response with an error message is returned. The `404` response will inform Hookdeck that although the request did not succeed, the request should not be retried. With the audio converted to text and stored, the data journey moves to generating embeddings via Replicate: @@ -703,20 +729,16 @@ With the audio converted to text and stored, the data journey moves to generatin app.logger.info("Transcription updated") app.logger.debug(result) - request_embeddings(result["_id"]) + request_embeddings(id) return "OK" ``` -This code logs that the transcription has been updated and calls the `request_embeddings` function to generate embeddings for the processed audio. The endpoint returns an `OK` response to inform Hookdeck the webhook has been successfully processed. - -In summary, this route updates the database with transcription results and requests embeddings for the processed audio. +Next, the `request_embeddings` function is called to generate embeddings for the processed audio. The endpoint returns an `OK` response to inform Hookdeck the webhook has been successfully processed. ## H2: Generate Embedding -The `request_embeddings` function triggers the generation of embeddings for the textual representation of any indexed assets. - -Begin by retrieving the asset representation from MongoDB: +The `request_embeddings` function triggers the generation of embeddings for the textual representation of an indexed asset: ```py def request_embeddings(id): @@ -729,103 +751,104 @@ def request_embeddings(id): if asset is None: raise RuntimeError("Asset not found") -``` - -This code finds the document in the database with the matching ID. If no document is found, a `RuntimeError` is raised. -Check if the asset has been processed: - -```py if asset["status"] != "PROCESSED": raise RuntimeError("Asset has not been processed") ``` -This code checks if the status of the asset is `PROCESSED`, indicating that a textual representation has been created. If the asset has not been processed, a `RuntimeError` is raised. +If this asset with the passed `id` is not found or the status of the asset is not `PROCESSED`, which indicates that a textual representation has been created, a `RuntimeError` is raised. ### H3: Trigger Embedding Generation with Webhook Callback -Next, generate the embeddings for the processed asset using the `AsyncEmbeddingsGenerator`: +Next, the embeddings are generated for the processed asset using the `AsyncEmbeddingsGenerator`: ```py generator = AsyncEmbeddingsGenerator() - generate_request = generator.generate(asset["text"]) + try: + response = generator.generate(id, asset["text"]) + except Exception as e: + app.logger.error("Error generating embeddings for %s: %s", id, e) + raise ``` -This code initializes the `AsyncEmbeddingsGenerator` and calls the `generate` function on the instance, passing the textual representation of the asset. +This initializes the `AsyncEmbeddingsGenerator` and calls the `generate` function on the instance, passing the ID of the asset being indexed and the textual representation. The `AsyncEmbeddingsGenerator` definition in `allthethings/generators.py` follows a similar pattern to the previously used processor: ```py -import replicate +import httpx from config import Config + class AsyncEmbeddingsGenerator: - def __init__(self): - self.WEBHOOK_URL = Config.EMBEDDINGS_WEBHOOK_URL - self.model = replicate.models.get("replicate/all-mpnet-base-v2") - self.version = self.model.versions.get( - "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305" + def generate(self, id, text): + payload = { + "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305", + "input": {"text": text}, + "webhook": f"{Config.EMBEDDINGS_WEBHOOK_URL}/{id}", + "webhook_events_filter": ["completed"], + } + + response = httpx.request( + "POST", + f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions", + headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS, + json=payload, ) -``` -This class initializes the `AsyncEmbeddingsGenerator` with the `EMBEDDINGS_WEBHOOK_URL` webhook URL passed to receive the asynchronous response. The [replicate/all-mpnet-base-v2](replicate/all-mpnet-base-v2) model us used to generate embeddings. + return response.json() +``` -Next, the `generate` method within the `AsyncEmbeddingsGenerator` class: +The `generate` method receives the asset `id` and the `text` that embeddings are to be generated for. -```py - def generate(self, text): - input = {"text": text} +A request `payload` is created containing a `version` that identifies that the [replicate/all-mpnet-base-v2](replicate/all-mpnet-base-v2) model is used to generate embeddings and he `text` for the embedding is passed within an `input` parameter. - prediction = replicate.predictions.create( - version=self.version, - input=input, - webhook=self.WEBHOOK_URL, - webhook_events_filter=["completed"], - ) +The `webhook` property is set to `Config.EMBEDDINGS_WEBHOOK_URL` with an appended path (`/{id}`) that indicates which asset the callback is for. As before, the use of the `webhook_events_filter=["completed"]` filter informs Replicate to only send a webhook when the prediction is completed. - return prediction -``` +Since this is an asynchronous call, Hookdeck is again used to queue the Replicate API request via a call to the `HOOKDECK_REPLICATE_API_QUEUE_URL` endpoint with the `/predications` path. -This method generates embeddings for the provided text by creating a prediction request to the Replicate model. As before, the use of the `webhook_events_filter=["completed"]` filter informs Replicate to only send a webhook when the prediction is completed. The method returns the prediction object, which contains the details of the embedding generation request. +The method returns the Hookdeck response. Back in `app.py`, update the database with the status and embedding request ID: ```py collection.update_one( - filter={"_id": id}, + filter={"_id": ObjectId(id)}, update={ "$set": { "status": "GENERATING_EMBEDDINGS", - "replicate_embedding_id": generate_request.id, + "generator_response": response, } }, ) ``` -Update the document in the database with the new status `GENERATING_EMBEDDINGS` and the ID of the embedding request. +Update the document in the database with the new status `GENERATING_EMBEDDINGS` and the Hookdeck queue response. The request to asynchronously generate the embeddings has been triggered, and the work offloaded to Replicate. When the result is read, a webhook will be triggered with the result. ### H3: Handle Embedding Generation Webhook Callback -### Step 16: Handling Embedding Webhooks +Once Replicate has generated the embedding, a webhook callback is made to the `/webhooks/embedding/` route in our Flask application. This route receives the webhook payload, verifies it came from Hookdeck, updates the database with the embedding results, and sets the appropriate status. -The `/webhooks/embedding` route in our Flask application handles the webhooks for embedding generation. This route receives the webhook payload, updates the database with the embedding results, and sets the appropriate status. - -First, the `/webhooks/embedding` route definition: +Here's the route definition: ```py -@app.route("/webhooks/embedding", methods=["POST"]) -def webhook_embeddings(): +@app.route("/webhooks/audio/", methods=["POST"]) +def webhook_audio(id): + if not verify_webhook(request): + app.logger.error("Webhook signature verification failed") + return jsonify({"error": "Webhook signature verification failed"}), 401 + payload = request.json - app.logger.info("Embeddings payload received") + app.logger.info("Audio payload received for id %s", id) app.logger.debug(payload) ``` -This route handles POST requests to the `/webhooks/embedding` endpoint. It retrieves the JSON payload from the request. +This route handles `POST` requests to the `/webhooks/embedding/` endpoint and is passed the `id` path parameter. It verifies the request came from Hookdeck, and if so, retrieves the JSON payload from the request. Otherwise, it returns a `401` response. -Determine the processing status based on the presence of an error in the payload: +Next, it checks for errors: ```py status = ( @@ -833,9 +856,9 @@ Determine the processing status based on the presence of an error in the payload ) ``` -This code checks if there is an error in the payload. If an error is present, the status is set to `EMBEDDINGS_ERROR`; otherwise, it is set to `SEARCHABLE`. +If there is an error in the payload. If an error is present, the status is set to `EMBEDDINGS_ERROR`; otherwise, it is set to `SEARCHABLE`. -Next, extract the vector embedding from the payload and update the database with the embedding details and the new status: +Next, the vector embedding is extracted from the payload and the database is updated with the embedding details and the new status: ```py embedding = payload["output"][0]["embedding"] @@ -844,7 +867,7 @@ Next, extract the vector embedding from the payload and update the database with collection = database.get_collection() result = collection.update_one( - filter={"replicate_embedding_id": payload["id"]}, + filter={"_id": ObjectId(id)}, update={ "$set": { "status": status, @@ -855,7 +878,7 @@ Next, extract the vector embedding from the payload and update the database with ) ``` -This finds the document in the database with the matching `replicate_embedding_id` and updates it with the new status, embedding, and the entire payload. +This finds the document in the database with the matching `id` and updates it with the new status, embedding, and the entire payload. Check if the document was found and updated: @@ -869,24 +892,26 @@ Check if the document was found and updated: return "OK" ``` -If no document is found for the given `replicate_embedding_id`, an error is logged, and a JSON response with an error message is returned with a `404` status. If the update was success, return an `OK` to inform Hookdeck the webhook has been processed. +If no document is found for the given `id`, an error is logged, and a JSON response with an error message is returned with a `404` status. If the update was success, return an `OK` to inform Hookdeck the webhook has been processed. With the vector embedding stored in the `embedding` property, it's now searchable with MongoDB due to the previously defined vector search index. ## H2: Searching using Atlas Vector Search -Search is user-driven. The user enters a search term and submits a form. That search query is handled, processed and the resulted returned and displayed. Let's walk through through each of those steps. +Search is user-driven. The user enters a search term and submits a form. That search query is handled, processed and the resulted returned and displayed. Ideally this is a real-time experience, so operations are performed synchronously. -![Search results](