Skip to content

Commit

Permalink
docs: update asyncio http server examples
Browse files Browse the repository at this point in the history
  • Loading branch information
ttu committed Nov 22, 2024
1 parent 2565ba0 commit 5f049ec
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 60 deletions.
75 changes: 37 additions & 38 deletions examples/http_server_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,80 +11,79 @@
aiohttp - pip install aiohttp
"""

# pylint: disable=duplicate-code

import asyncio
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from multiprocessing import Manager

from aiohttp import web

from ruuvitag_sensor.ruuvi import RuuviTagSensor

all_data = {}

tags: dict[str, str] = {
# "F4:A5:74:89:16:57": "kitchen",
# "CC:2C:6A:1E:59:3D": "bedroom",
# "BB:2C:6A:1E:59:3D": "livingroom",
}


def run_get_data_background(macs, queue):
async def run_get_data_background(macs_to_fetch, queue):
"""
Background process from RuuviTag Sensors
"""
async for sensor_data in RuuviTagSensor.get_data_async(macs_to_fetch):
sensor_data[1]["time"] = str(datetime.now())
queue.put(sensor_data)

def callback(data):
data[1]["time"] = str(datetime.now())
queue.put(data)

RuuviTagSensor.get_data(callback, macs)


async def data_update(queue):
async def data_update(queue, shared_data):
"""
Update data sent by the background process to global all_data variable
"""
global all_data # pylint: disable=global-variable-not-assigned
while True:
while not queue.empty():
data = queue.get()
all_data[data[0]] = data[1]
for key, value in tags.items():
if key in all_data:
all_data[key]["name"] = value
mac, sensor_data = queue.get()
shared_data["name"] = tags.get(mac, "unknown")
shared_data[mac] = sensor_data
await asyncio.sleep(0.5)


async def get_all_data(_):
return web.json_response(all_data)
async def get_all_data(_, shared_data):
return web.json_response(dict(shared_data))


async def get_data(request):
async def get_data(request, shared_data):
mac = request.match_info.get("mac")
if mac not in all_data:
if mac not in shared_data:
return web.json_response(status=404)
return web.json_response(all_data[mac])
return web.json_response(dict(shared_data[mac]))


# pylint: disable=redefined-outer-name
def setup_routes(app):
app.router.add_get("/data", get_all_data)
app.router.add_get("/data/{mac}", get_data)
def setup_routes(application, shared_data):
application.router.add_get("/data", lambda request: get_all_data(request, shared_data))
application.router.add_get("/data/{mac}", lambda request: get_data(request, shared_data))


if __name__ == "__main__":
tags = {"F4:A5:74:89:16:57": "kitchen", "CC:2C:6A:1E:59:3D": "bedroom", "BB:2C:6A:1E:59:3D": "livingroom"}

m = Manager()
data = m.dict()
q = m.Queue()

# Start background process
executor = ProcessPoolExecutor(1)
executor.submit(run_get_data_background, list(tags.keys()), q)
macs = list(tags.keys())

loop = asyncio.get_event_loop()
async def start_background_tasks(application):
application["run_get_data"] = asyncio.create_task(run_get_data_background(macs, q))
application["data_updater"] = asyncio.create_task(data_update(q, data))

# Start data updater
loop.create_task(data_update(q))
async def cleanup_background_tasks(application):
application["run_get_data"].cancel()
application["data_updater"].cancel()
await asyncio.gather(app["run_get_data"], app["data_updater"], return_exceptions=True)
print("Background tasks shut down.")

# Setup and start web application
app = web.Application(loop=loop)
setup_routes(app)
web.run_app(app, host="0.0.0.0", port=5000)
app = web.Application()
setup_routes(app, data)
app.on_startup.append(start_background_tasks)
app.on_shutdown.append(cleanup_background_tasks)
web.run_app(app, host="0.0.0.0", port=5500)
65 changes: 43 additions & 22 deletions examples/http_server_asyncio_rx.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,66 @@
aiohttp - pip install aiohttp
"""

# pylint: disable=duplicate-code
import asyncio
from multiprocessing import Manager

from aiohttp import web

from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive

all_data = {}

tags: dict[str, str] = {
# "F4:A5:74:89:16:57": "kitchen",
# "CC:2C:6A:1E:59:3D": "bedroom",
# "BB:2C:6A:1E:59:3D": "livingroom",
}


async def get_all_data(_):
return web.json_response(all_data)
async def get_all_data(_, shared_data):
return web.json_response(dict(shared_data))


async def get_data(request):
async def get_data(request, shared_data):
mac = request.match_info.get("mac")
if mac not in all_data:
if mac not in shared_data:
return web.json_response(status=404)
return web.json_response(all_data[mac])

return web.json_response(dict(shared_data[mac]))

# pylint: disable=redefined-outer-name
def setup_routes(app):
app.router.add_get("/data", get_all_data)
app.router.add_get("/data/{mac}", get_data)


if __name__ == "__main__":
tags = {"F4:A5:74:89:16:57": "kitchen", "CC:2C:6A:1E:59:3D": "bedroom", "BB:2C:6A:1E:59:3D": "livingroom"}
async def run_get_data_background(known_tags, shared_data):
"""
Background process from RuuviTag Sensors
"""

def handle_new_data(data):
global all_data # pylint: disable=global-variable-not-assigned
data[1]["name"] = tags[data[0]]
all_data[data[0]] = data[1]
mac, sensor_data = data
sensor_data["name"] = known_tags.get(mac, "unknown")
shared_data[data[0]] = sensor_data

ruuvi_rx = RuuviTagReactive(list(tags.keys()))
ruuvi_rx = RuuviTagReactive(list(known_tags.keys()))
data_stream = ruuvi_rx.get_subject()
data_stream.subscribe(handle_new_data)


def setup_routes(application, shared_data):
application.router.add_get("/data", lambda request: get_all_data(request, shared_data))
application.router.add_get("/data/{mac}", lambda request: get_data(request, shared_data))


if __name__ == "__main__":
m = Manager()
data = m.dict()

async def start_background_tasks(application):
application["run_get_data"] = asyncio.create_task(run_get_data_background(tags, data))

async def cleanup_background_tasks(application):
application["run_get_data"].cancel()
await asyncio.gather(app["run_get_data"], return_exceptions=True)
print("Background tasks shut down.")

# Setup and start web application
app = web.Application()
setup_routes(app)
web.run_app(app, host="0.0.0.0", port=5000)
setup_routes(app, data)
app.on_startup.append(start_background_tasks)
app.on_shutdown.append(cleanup_background_tasks)
web.run_app(app, host="0.0.0.0", port=5500)

0 comments on commit 5f049ec

Please sign in to comment.