Skip to content

Commit

Permalink
Merge pull request #233 from hotosm/enhance/queue-name
Browse files Browse the repository at this point in the history
Enhance : Queue name and workers name
  • Loading branch information
kshitijrajsharma authored Mar 20, 2024
2 parents 4cab5b8 + 379b3a7 commit 09fff0a
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 295 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/Unit-Test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ jobs:
pip install -r requirements.txt
python setup.py install
- name: Launch Default Worker
- name: Launch ondemand Worker
run: |
celery --app API.api_worker worker --loglevel=INFO --queues='raw_ondemand' &
- name: Launch Special Worker
- name: Launch daemon Worker
run: |
celery --app API.api_worker worker --loglevel=INFO --queues='raw_daemon' &
Expand Down
478 changes: 239 additions & 239 deletions API/data/hdx.sql

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion API/data/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ CREATE TABLE if not exists public.hdx (
cid INT NULL,
hdx_upload BOOLEAN DEFAULT true,
dataset JSONB,
queue VARCHAR DEFAULT 'raw_daemon',
queue VARCHAR DEFAULT 'raw_ondemand',
meta BOOLEAN DEFAULT false,
categories JSONB NULL,
geometry public.geometry(MultiPolygon, 4326) NULL
Expand Down
4 changes: 3 additions & 1 deletion API/raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

"""[Router Responsible for Raw data API ]
"""
# Standard library imports
import json

# Third party imports
import redis
from area import area
from fastapi import APIRouter, Body, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi_versioning import version

# Reader imports
from src.app import RawData
from src.config import (
ALLOW_BIND_ZIP_FILTER,
Expand Down Expand Up @@ -445,7 +448,6 @@ def get_osm_current_snapshot_as_file(
],
)

# queue_name = "raw_daemon" if not params.uuid else "raw_ondemand"
queue_name = DEFAULT_QUEUE_NAME # Everything directs to default now
task = process_raw_data.apply_async(
args=(params.model_dump(),),
Expand Down
7 changes: 5 additions & 2 deletions API/tasks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Standard library imports
import json
from datetime import datetime

# Third party imports
import redis
from celery.result import AsyncResult
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import JSONResponse
from fastapi_versioning import version

from src.config import CELERY_BROKER_URL, DAEMON_QUEUE_NAME, DEFAULT_QUEUE_NAME
# Reader imports
from src.config import CELERY_BROKER_URL, DEFAULT_QUEUE_NAME, ONDEMAND_QUEUE_NAME
from src.validation.models import SnapshotTaskResponse

from .api_worker import celery
Expand Down Expand Up @@ -156,7 +159,7 @@ def discard_all_waiting_tasks(user: AuthUser = Depends(admin_required)):
return JSONResponse({"tasks_discarded": purged})


queues = [DEFAULT_QUEUE_NAME, DAEMON_QUEUE_NAME]
queues = [DEFAULT_QUEUE_NAME, ONDEMAND_QUEUE_NAME]


@router.get("/queue/")
Expand Down
6 changes: 3 additions & 3 deletions docs/src/installation/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ The following are the different configuration options that are accepted.
| `EXTRA_README_TXT` | `EXTRA_README_TXT` | `[API_CONFIG]` | `` | Append extra string to export readme.txt | OPTIONAL |
| `ENABLE_TILES` | `ENABLE_TILES` | `[API_CONFIG]` | `false` | Enable Tile Output (Pmtiles and Mbtiles) | OPTIONAL |
| `ENABLE_SOZIP` | `ENABLE_SOZIP` | `[API_CONFIG]` | `false` | Enables sozip compression | OPTIONAL |
| `DEFAULT_QUEUE_NAME` | `DEFAULT_QUEUE_NAME` | `[API_CONFIG]` | `raw_ondemand` | Option to define default queue name| OPTIONAL |
| `DAEMON_QUEUE_NAME` | `DAEMON_QUEUE_NAME` | `[API_CONFIG]` | `raw_daemon` | Option to define daemon queue name for scheduled and long exports | OPTIONAL |
| `DEFAULT_QUEUE_NAME` | `DEFAULT_QUEUE_NAME` | `[API_CONFIG]` | `raw_daemon` | Option to define default queue name| OPTIONAL |
| `ONDEMAND_QUEUE_NAME` | `ONDEMAND_QUEUE_NAME` | `[API_CONFIG]` | `raw_ondemand` | Option to define daemon queue name for scheduled and long exports | OPTIONAL |
| `ENABLE_POLYGON_STATISTICS_ENDPOINTS` | `ENABLE_POLYGON_STATISTICS_ENDPOINTS` | `[API_CONFIG]` | `False` | Option to enable endpoints related the polygon statistics about the approx buildings,road length in passed polygon| OPTIONAL |
| `ENABLE_CUSTOM_EXPORTS` | `ENABLE_CUSTOM_EXPORTS` | `[API_CONFIG]` | False | Enables custom exports endpoint and imports | OPTIONAL |
| `POLYGON_STATISTICS_API_URL` | `POLYGON_STATISTICS_API_URL` | `[API_CONFIG]` | `None` | API URL for the polygon statistics to fetch the metadata , Currently tested with graphql query endpoint of Kontour , Only required if it is enabled from ENABLE_POLYGON_STATISTICS_ENDPOINTS | OPTIONAL |
Expand Down Expand Up @@ -127,7 +127,7 @@ API Tokens have expiry date, It is `important to update API Tokens manually each
| `EXTRA_README_TXT` | `[API_CONFIG]` | No | Yes |
| `INDEX_THRESHOLD` | `[API_CONFIG]` | No | Yes |
| `DEFAULT_QUEUE_NAME` | `[API_CONFIG]` | Yes | No |
| `DAEMON_QUEUE_NAME` | `[API_CONFIG]` | Yes | No |
| `ONDEMAND_QUEUE_NAME` | `[API_CONFIG]` | Yes | No |
| `ENABLE_POLYGON_STATISTICS_ENDPOINTS` | `[API_CONFIG]` | Yes | Yes |
| `POLYGON_STATISTICS_API_URL` | `[API_CONFIG]` | Yes | Yes |
| `POLYGON_STATISTICS_API_RATE_LIMIT` | `[API_CONFIG]` | Yes | No |
Expand Down
12 changes: 6 additions & 6 deletions docs/src/installation/local.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ uvicorn API.main:app --reload
### Queues

Currently there are two type of queue implemented :
- "raw_daemon" : Queue for recurring exports which will replace the previous exports if present on the system , can be enabled through uuid:false API Param
- "raw_ondemand" : Queue for default exports which will create each unique id for exports
- "raw_daemon" : Queue for default exports which will create each unique id for exports , This queue is attached to 24/7 available workers
- "raw_ondemand" : Queue for recurring exports which will replace the previous exports if present on the system , can be enabled through uuid:false API Param . This queue will be attached to worker which will only spin up upon request.

### Start Celery Worker

You should be able to start [celery](https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html#running-the-celery-worker-server) worker by running following command on different shell

- Start for default queue
- Start for default daemon queue
```
celery --app API.api_worker worker --loglevel=INFO --queues="raw_ondemand" -n 'default_worker'
celery --app API.api_worker worker --loglevel=INFO --queues="raw_daemon" -n 'default_worker'
```
- Start for recurring queue
- Start for on demand queue
```
celery --app API.api_worker worker --loglevel=INFO --queues="raw_daemon" -n 'recurring_worker'
celery --app API.api_worker worker --loglevel=INFO --queues="raw_ondemand" -n 'ondemand_worker'
```

Set no of request that a worker can take at a time by using --concurrency
Expand Down
17 changes: 14 additions & 3 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# 1100 13th Street NW Suite 800 Washington, D.C. 20005
# <[email protected]>
"""Page contains Main core logic of app"""
# Standard library imports
import concurrent.futures
import json
import os
Expand All @@ -32,6 +33,7 @@
from json import dumps
from json import loads as json_loads

# Third party imports
import boto3
import humanize
import orjson
Expand All @@ -44,6 +46,7 @@
from psycopg2.extras import DictCursor
from slugify import slugify

# Reader imports
from src.config import (
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
Expand Down Expand Up @@ -89,27 +92,35 @@
from src.validation.models import EXPORT_TYPE_MAPPING, RawDataOutputType

if ENABLE_SOZIP:
# Third party imports
import sozipfile.sozipfile as zipfile
else:
# Standard library imports
import zipfile

# import instance for pooling
if use_connection_pooling:
# Reader imports
from src.db_session import database_instance
else:
database_instance = None
# Standard library imports
import logging as log

if ENABLE_CUSTOM_EXPORTS:
if USE_DUCK_DB_FOR_CUSTOM_EXPORTS is True:
# Third party imports
import duckdb

# Reader imports
from src.config import DUCK_DB_MEMORY_LIMIT, DUCK_DB_THREAD_LIMIT

if ENABLE_HDX_EXPORTS:
# Third party imports
from hdx.data.dataset import Dataset
from hdx.data.resource import Resource

# Reader imports
from src.config import HDX_MAINTAINER, HDX_OWNER_ORG, HDX_URL_PREFIX


Expand Down Expand Up @@ -1451,7 +1462,7 @@ def process_export_format(export_format):
and PARALLEL_PROCESSING_CATEGORIES is True
):
logging.info(
"Using Parallel Processing for %s Export formats", category_name.lower()
"Using Parallel Processing for %s Export formats with total %s workers", category_name.lower(), os.cpu_count()
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count()
Expand Down Expand Up @@ -1925,7 +1936,7 @@ def create_hdx(self, hdx_data):
hdx_data.get("iso3", None),
hdx_data.get("hdx_upload", True),
json.dumps(hdx_data.get("dataset")),
hdx_data.get("queue", "raw_daemon"),
hdx_data.get("queue", "raw_ondemand"),
hdx_data.get("meta", False),
json.dumps(hdx_data.get("categories", {})),
json.dumps(hdx_data.get("geometry")),
Expand Down Expand Up @@ -2055,7 +2066,7 @@ def update_hdx(self, hdx_id: int, hdx_data):
hdx_data.get("iso3", None),
hdx_data.get("hdx_upload", True),
json.dumps(hdx_data.get("dataset")),
hdx_data.get("queue", "raw_daemon"),
hdx_data.get("queue", "raw_ondemand"),
hdx_data.get("meta", False),
json.dumps(hdx_data.get("categories", {})),
json.dumps(hdx_data.get("geometry")),
Expand Down
15 changes: 10 additions & 5 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
# 1100 13th Street NW Suite 800 Washington, D.C. 20005
# <[email protected]>

import errno
# Standard library imports
import logging
import os
from configparser import ConfigParser
from distutils.util import strtobool

# Third party imports
from slowapi import Limiter
from slowapi.util import get_remote_address

Expand Down Expand Up @@ -198,10 +199,10 @@ def not_raises(func, *args, **kwargs):
# Queue

DEFAULT_QUEUE_NAME = os.environ.get("DEFAULT_QUEUE_NAME") or config.get(
"API_CONFIG", "DEFAULT_QUEUE_NAME", fallback="raw_ondemand"
"API_CONFIG", "DEFAULT_QUEUE_NAME", fallback="raw_daemon"
)
DAEMON_QUEUE_NAME = os.environ.get("DAEMON_QUEUE_NAME") or config.get(
"API_CONFIG", "DAEMON_QUEUE_NAME", fallback="raw_daemon"
ONDEMAND_QUEUE_NAME = os.environ.get("ONDEMAND_QUEUE_NAME") or config.get(
"API_CONFIG", "ONDEMAND_QUEUE_NAME", fallback="raw_ondemand"
)

# Polygon statistics which will deliver the stats of approx buildings/ roads in the area
Expand Down Expand Up @@ -296,6 +297,7 @@ def not_raises(func, *args, **kwargs):
)

else:
# Standard library imports
import json

hdx_credentials_json = json.loads(hdx_credentials)
Expand All @@ -307,8 +309,8 @@ def not_raises(func, *args, **kwargs):

if None in (HDX_SITE, HDX_API_KEY, HDX_OWNER_ORG, HDX_MAINTAINER):
raise ValueError("HDX Remote Credentials Malformed")
logging.error("HDX Remote Credentials Malformed")

# Third party imports
from hdx.api.configuration import Configuration

try:
Expand All @@ -325,6 +327,7 @@ def not_raises(func, *args, **kwargs):
ENABLE_HDX_EXPORTS = False

if ENABLE_HDX_EXPORTS:
# Third party imports
from hdx.data.dataset import Dataset
from hdx.data.vocabulary import Vocabulary

Expand Down Expand Up @@ -377,6 +380,7 @@ def get_db_connection_params() -> dict:
)

else:
# Standard library imports
import json

connection_params = json.loads(db_credentials)
Expand Down Expand Up @@ -442,6 +446,7 @@ def get_oauth_credentials() -> tuple:
)

else:
# Standard library imports
import json

oauth2_credentials_json = json.loads(oauth2_credentials)
Expand Down
70 changes: 37 additions & 33 deletions src/validation/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
# 1100 13th Street NW Suite 800 Washington, D.C. 20005
# <[email protected]>
"""Page contains validation models for application"""
# Standard library imports
import json
from enum import Enum
from typing import Dict, List, Optional, Union

# Third party imports
from geojson_pydantic import Feature, FeatureCollection, MultiPolygon, Polygon
from geojson_pydantic.types import BBox
from pydantic import BaseModel as PydanticModel
from pydantic import Field, validator
from typing_extensions import TypedDict

# Reader imports
from src.config import (
ALLOW_BIND_ZIP_FILTER,
ENABLE_HDX_EXPORTS,
Expand All @@ -35,6 +38,7 @@
)

if ENABLE_HDX_EXPORTS:
# Reader imports
from src.config import ALLOWED_HDX_TAGS, ALLOWED_HDX_UPDATE_FREQUENCIES


Expand Down Expand Up @@ -296,22 +300,22 @@ class StatsRequestParams(BaseModel, GeometryValidatorMixin):
max_length=3,
example="NPL",
)
geometry: Optional[
Union[Polygon, MultiPolygon, Feature, FeatureCollection]
] = Field(
default=None,
example={
"type": "Polygon",
"coordinates": [
[
[83.96919250488281, 28.194446860487773],
[83.99751663208006, 28.194446860487773],
[83.99751663208006, 28.214869548073377],
[83.96919250488281, 28.214869548073377],
[83.96919250488281, 28.194446860487773],
]
],
},
geometry: Optional[Union[Polygon, MultiPolygon, Feature, FeatureCollection]] = (
Field(
default=None,
example={
"type": "Polygon",
"coordinates": [
[
[83.96919250488281, 28.194446860487773],
[83.99751663208006, 28.194446860487773],
[83.99751663208006, 28.214869548073377],
[83.96919250488281, 28.214869548073377],
[83.96919250488281, 28.194446860487773],
]
],
},
)
)

@validator("geometry", pre=True, always=True)
Expand Down Expand Up @@ -583,7 +587,7 @@ class DynamicCategoriesModel(BaseModel, GeometryValidatorMixin):
default=None, description="Dataset Configurations for HDX Upload"
)
queue: Optional[str] = Field(
default="raw_daemon",
default="raw_ondemand",
description="Lets you decide which queue you wanna place your task, Requires admin access",
)
meta: bool = Field(
Expand All @@ -608,22 +612,22 @@ class DynamicCategoriesModel(BaseModel, GeometryValidatorMixin):
}
],
)
geometry: Optional[
Union[Polygon, MultiPolygon, Feature, FeatureCollection]
] = Field(
default=None,
example={
"type": "Polygon",
"coordinates": [
[
[83.96919250488281, 28.194446860487773],
[83.99751663208006, 28.194446860487773],
[83.99751663208006, 28.214869548073377],
[83.96919250488281, 28.214869548073377],
[83.96919250488281, 28.194446860487773],
]
],
},
geometry: Optional[Union[Polygon, MultiPolygon, Feature, FeatureCollection]] = (
Field(
default=None,
example={
"type": "Polygon",
"coordinates": [
[
[83.96919250488281, 28.194446860487773],
[83.99751663208006, 28.194446860487773],
[83.99751663208006, 28.214869548073377],
[83.96919250488281, 28.214869548073377],
[83.96919250488281, 28.194446860487773],
]
],
},
)
)

@validator("geometry", pre=True, always=True)
Expand Down

0 comments on commit 09fff0a

Please sign in to comment.