Skip to content

Commit

Permalink
Merge pull request #211 from spenny-liam/main
Browse files Browse the repository at this point in the history
Database to API / NoSQL comments
  • Loading branch information
volcan01010 authored Jun 26, 2024
2 parents ec3ca1d + 530f492 commit d0de851
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
23 changes: 19 additions & 4 deletions docs/code_demos/recipes/database_to_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,47 @@
import datetime as dt

import aiohttp
from etlhelper import iter_chunks
import etlhelper as etl

# import DbParams
from db import ORACLE_DB

logger = logging.getLogger("copy_sensors_async")

# SQL query to get data from Oracle
SELECT_SENSORS = """
SELECT CODE, DESCRIPTION
FROM BGS.DIC_SEN_SENSOR
WHERE date_updated BETWEEN :startdate AND :enddate
ORDER BY date_updated
"""

# URL of API we want to send data to
BASE_URL = "http://localhost:9200/"
# Headers to tell the API we are sending data in JSON format
HEADERS = {"Content-Type": "application/json"}


def copy_sensors(startdate: dt.datetime, enddate: dt.datetime) -> None:
"""Read sensors from Oracle and post to REST API."""
"""Read sensors from Oracle and post to REST API.
Requires startdate amd enddate to filter to rows changed in a certain time period.
"""
logger.info("Copying sensors with timestamps from %s to %s",
startdate.isoformat(), enddate.isoformat())
row_count = 0

# Connect using the imported DbParams
with ORACLE_DB.connect("ORACLE_PASSWORD") as conn:
# chunks is a generator that yields lists of dictionaries
chunks = iter_chunks(
# passing in our select query, connection object, bind variable parameters and custom transform function
chunks = etl.iter_chunks(
SELECT_SENSORS,
conn,
parameters={"startdate": startdate, "enddate": enddate},
transform=transform_sensors,
)

# for each chunk of rows, synchronously post them to API
for chunk in chunks:
result = asyncio.run(post_chunk(chunk))
row_count += len(result)
Expand Down Expand Up @@ -65,10 +75,14 @@ def transform_sensors(chunk: list[tuple]) -> list[tuple]:

async def post_chunk(chunk: list[tuple]) -> list:
"""Post multiple items to API asynchronously."""
# initialize aiohttp session
async with aiohttp.ClientSession() as session:
# Build list of tasks
tasks = []
# add each row to list of tasks for aiohttp to execute
for item in chunk:
# a task is the instance of a function being executed with distinct arguments
# in this case, the post_one function with argument of a dictionary representing a row of data
tasks.append(post_one(item, session))

# Process tasks in parallel. An exception in any will be raised.
Expand All @@ -83,6 +97,7 @@ async def post_one(item: tuple, session: aiohttp.ClientSession) -> int:
response = await session.post(
BASE_URL + "sensors/_doc",
headers=HEADERS,
# convert python dict to json object
data=json.dumps(item),
)

Expand All @@ -108,6 +123,6 @@ async def post_one(item: tuple, session: aiohttp.ClientSession) -> int:
logger.setLevel(logging.INFO)
logger.addHandler(handler)

# Copy data from 1 January 2000 to 00:00:00 today
# Copy data that was updated between 1 January 2000 to 00:00:00 today
today = dt.datetime.combine(dt.date.today(), dt.time.min)
copy_sensors(dt.datetime(2000, 1, 1), today)
2 changes: 2 additions & 0 deletions docs/etl_functions/extract.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Keyword arguments
All extract functions are derived from :func:`iter_chunks() <etlhelper.iter_chunks>`
and take the same keyword arguments, which are passed through.

.. _parameters:

parameters
""""""""""

Expand Down
14 changes: 11 additions & 3 deletions docs/recipes/database_to_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ ETL for posting data from a database into an HTTP API. The API could be
a NoSQL document store (e.g. ElasticSearch, Cassandra) or some other web
service.

This example transfers data from Oracle to ElasticSearch. It uses
``iter_chunks`` to fetch data from the database without loading it all
into memory at once. A custom transform function creates a dictionary
This example posts data from an Oracle database to an HTTP API. It uses
:func:`iter_chunks() <etlhelper.iter_chunks>` to fetch data from the
database without loading it all
into memory at once. :ref:`Parameters <parameters>` are sent with the database query to filter
rows to only those changed within specified time period. This is used to
only transfer data that has changed since the last time this script was
ran. A custom transform function creates a dictionary
structure from each row of data. This is “dumped” into JSON and posted
to the API via ``aiohttp``.

Expand All @@ -23,3 +27,7 @@ transfer as opposed to posting records in series.

In this example, failed rows will fail the whole job. Removing the
``raise_for_status()`` call will let them just be logged instead.

To provide the database connection, :class:`DbParams <etlhelper.DbParams>` object is
imported from a separate `db` file.

0 comments on commit d0de851

Please sign in to comment.