From b9f572faa5f61d28cb9213a575bc05173e1519ca Mon Sep 17 00:00:00 2001 From: Carlos Del Real Date: Wed, 20 Nov 2024 11:38:08 -0500 Subject: [PATCH] Fullfill automation (#333) * feat: automation implemented until upload to GEE, pending update layer * feat: update layer from automation * feat: add envs in cron * feat: fix automation * feat: install python dependencies * feat: implemented mask automation * feat: update pip reqs * feat: pyramidingPolycy: MODE --- .github/workflows/cron_update_SPI24.yml | 27 +- .github/workflows/cron_update_SPI24_mask.yml | 62 +++++ .github/workflows/cron_update_SPI3.yml | 30 +- .gitignore | 3 + Makefile | 8 + {python => automation}/.run-act.sh | 0 automation/.run-ee.example.sh | 24 ++ automation/automation.py | 62 +++++ {python => automation}/docker/Act.Dockerfile | 0 automation/mask_automation.py | 36 +++ automation/month_24_automation.py | 34 +++ automation/month_3_automation.py | 34 +++ {python => automation}/readme.md | 8 +- automation/requirements.txt | 6 + automation/src/__init__.py | 0 automation/src/services/__init__.py | 0 automation/src/services/cartosql.py | 275 +++++++++++++++++++ automation/src/services/cloudstorage.py | 86 ++++++ automation/src/services/dates.py | 16 ++ automation/src/services/mask.py | 32 +++ automation/src/services/path.py | 13 + automation/src/services/project.py | 6 + automation/src/services/resourcewatch.py | 74 +++++ automation/src/test/__init__.py | 0 automation/src/test/services/__init__.py | 0 automation/src/test/services/test_dates.py | 13 + python/.run-ee.sh | 5 - python/earthengine.py | 9 - python/getFilename24month.py | 24 -- python/getFilename3month.py | 24 -- python/project24month.py | 21 -- python/project3month.py | 21 -- python/requirements.txt | 3 - 33 files changed, 828 insertions(+), 128 deletions(-) create mode 100644 .github/workflows/cron_update_SPI24_mask.yml create mode 100644 Makefile rename {python => automation}/.run-act.sh (100%) create mode 100755 automation/.run-ee.example.sh create mode 100644 automation/automation.py rename {python => automation}/docker/Act.Dockerfile (100%) create mode 100644 automation/mask_automation.py create mode 100644 automation/month_24_automation.py create mode 100644 automation/month_3_automation.py rename {python => automation}/readme.md (87%) create mode 100644 automation/requirements.txt create mode 100644 automation/src/__init__.py create mode 100644 automation/src/services/__init__.py create mode 100644 automation/src/services/cartosql.py create mode 100644 automation/src/services/cloudstorage.py create mode 100644 automation/src/services/dates.py create mode 100644 automation/src/services/mask.py create mode 100644 automation/src/services/path.py create mode 100644 automation/src/services/project.py create mode 100644 automation/src/services/resourcewatch.py create mode 100644 automation/src/test/__init__.py create mode 100644 automation/src/test/services/__init__.py create mode 100644 automation/src/test/services/test_dates.py delete mode 100755 python/.run-ee.sh delete mode 100644 python/earthengine.py delete mode 100644 python/getFilename24month.py delete mode 100644 python/getFilename3month.py delete mode 100644 python/project24month.py delete mode 100644 python/project3month.py delete mode 100644 python/requirements.txt diff --git a/.github/workflows/cron_update_SPI24.yml b/.github/workflows/cron_update_SPI24.yml index dfb25dd1..91e9a4cc 100644 --- a/.github/workflows/cron_update_SPI24.yml +++ b/.github/workflows/cron_update_SPI24.yml @@ -17,6 +17,15 @@ jobs: contents: "read" id-token: "write" + env: + RW_API_KEY: ${{ secrets.RW_API_KEY }} + BUCKET: ${{ vars.BUCKET }} + IMAGES_PREFIX_PATH: ${{ vars.IMAGES_PREFIX_PATH }} + GS_STAGING_PREFIX: ${{ vars.GS_STAGING_PREFIX }} + GEE_PROJECT_FOLDER: ${{ vars.GEE_PROJECT_FOLDER }} + MONTH_3_LAYER_ID: ${{ vars.MONTH_3_LAYER_ID }} + MONTH_24_LAYER_ID: ${{ vars.MONTH_24_LAYER_ID }} + steps: - name: Checkout Repository uses: actions/checkout@v4 @@ -28,7 +37,7 @@ jobs: run: | pip3 install --upgrade pip==22.0 echo "Installing Python dependencies" - if [ -f ./python/requirements.txt ]; then pip install -r ./python/requirements.txt; fi + if [ -f ./automation/requirements.txt ]; then pip install -r ./automation/requirements.txt; fi pip install rasterio pip install earthengine-api pip install earthengine-api --upgrade @@ -39,11 +48,11 @@ jobs: # Get file source name and put it on env - name: Run Python Script on File run: | - python python/getFilename24month.py - - name: Copy from Google storage to local - run: | - gcloud storage cp $WPS_24_MONTH_GS $WPS_24_MONTH_FILENAME.tif - # Get file source name and put it on env - - name: Project downloaded file - run: | - python python/project24month.py \ No newline at end of file + echo "ENVS TO USE" + echo "BUCKET" + echo "IMAGES_PREFIX_PATH" + echo "GS_STAGING_PREFIX" + echo "GEE_PROJECT_FOLDER" + echo "RW_API_KEY" + echo "MONTH_24_LAYER_ID" + python automation/month_24_automation.py \ No newline at end of file diff --git a/.github/workflows/cron_update_SPI24_mask.yml b/.github/workflows/cron_update_SPI24_mask.yml new file mode 100644 index 00000000..cc8bc7dc --- /dev/null +++ b/.github/workflows/cron_update_SPI24_mask.yml @@ -0,0 +1,62 @@ +name: Update 3-month SPI Anomaly (Long-term) + +on: + push: + branches: + - earthengine-actions + workflow_dispatch: + # Allows manual triggering of the action + schedule: + - cron: "0 12 * * *" # Runs daily at 12:00 UTC, adjust as needed + +jobs: + spi-24-month: + runs-on: ubuntu-latest + + permissions: + contents: "read" + id-token: "write" + + env: + RW_API_KEY: ${{ secrets.RW_API_KEY }} + BUCKET: ${{ vars.BUCKET }} + IMAGES_PREFIX_PATH: ${{ vars.IMAGES_PREFIX_PATH }} + GS_STAGING_PREFIX: ${{ vars.GS_STAGING_PREFIX }} + GEE_PROJECT_FOLDER: ${{ vars.GEE_PROJECT_FOLDER }} + MONTH_3_LAYER_ID: ${{ vars.MONTH_3_LAYER_ID }} + MONTH_24_LAYER_ID: ${{ vars.MONTH_24_LAYER_ID }} + CARTO_MASK_TABLE_NAME: ${{ vars.CARTO_MASK_TABLE_NAME }} + + steps: + - name: Checkout Repository + uses: actions/checkout@v4 + - uses: actions/setup-python@v3 + with: + python-version: "3.10" + + - name: Upgrade pip & Install dependencies + run: | + pip3 install --upgrade pip==22.0 + echo "Installing Python dependencies" + if [ -f ./automation/requirements.txt ]; then pip install -r ./automation/requirements.txt; fi + pip install rasterio + pip install earthengine-api + pip install earthengine-api --upgrade + + - uses: "google-github-actions/auth@v2" + with: + credentials_json: "${{ secrets.GOOGLE_CREDENTIALS }}" + + # Get file source name and put it on env + - name: Run Python Script on File + run: | + echo "ENVS TO USE" + echo "BUCKET" + echo "IMAGES_PREFIX_PATH" + echo "GS_STAGING_PREFIX" + echo "GEE_PROJECT_FOLDER" + echo "RW_API_KEY" + echo "MONTH_24_LAYER_ID" + echo "CARTO_MASK_TABLE_NAME" + python automation/mask_automation.py + diff --git a/.github/workflows/cron_update_SPI3.yml b/.github/workflows/cron_update_SPI3.yml index b96630ca..30554173 100644 --- a/.github/workflows/cron_update_SPI3.yml +++ b/.github/workflows/cron_update_SPI3.yml @@ -17,6 +17,15 @@ jobs: contents: "read" id-token: "write" + env: + RW_API_KEY: ${{ secrets.RW_API_KEY }} + BUCKET: ${{ vars.BUCKET }} + IMAGES_PREFIX_PATH: ${{ vars.IMAGES_PREFIX_PATH }} + GS_STAGING_PREFIX: ${{ vars.GS_STAGING_PREFIX }} + GEE_PROJECT_FOLDER: ${{ vars.GEE_PROJECT_FOLDER }} + MONTH_3_LAYER_ID: ${{ vars.MONTH_3_LAYER_ID }} + MONTH_24_LAYER_ID: ${{ vars.MONTH_24_LAYER_ID }} + steps: - name: Checkout Repository uses: actions/checkout@v4 @@ -28,7 +37,7 @@ jobs: run: | pip3 install --upgrade pip==22.0 echo "Installing Python dependencies" - if [ -f ./python/requirements.txt ]; then pip install -r ./python/requirements.txt; fi + if [ -f ./automation/requirements.txt ]; then pip install -r ./automation/requirements.txt; fi pip install rasterio pip install earthengine-api pip install earthengine-api --upgrade @@ -40,13 +49,12 @@ jobs: # Get file source name and put it on env - name: Run Python Script on File run: | - python python/getFilename3month.py - - - name: Copy from Google storage to local - run: | - gcloud storage cp $WPS_3_MONTH_GS $WPS_3_MONTH_FILENAME.tif - - # Get file source name and put it on env - - name: Project downloaded file - run: | - python python/project3month.py + echo "ENVS TO USE" + echo "BUCKET" + echo "IMAGES_PREFIX_PATH" + echo "GS_STAGING_PREFIX" + echo "GEE_PROJECT_FOLDER" + echo "RW_API_KEY" + echo "MONTH_3_LAYER_ID" + python automation/month_3_automation.py + diff --git a/.gitignore b/.gitignore index a5511e8e..d6b4e9ff 100644 --- a/.gitignore +++ b/.gitignore @@ -57,3 +57,6 @@ dev_docs/nginx-proxy/certs/*.crt automationvenv gcloud-key.json +*.pyc +__pycache__ +automation/.run-ee.sh diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..24af95fe --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +.PHONY: setup install + +setup: + python3 -m venv automationvenv + source ./automationvenv/bin/activate + +install: + pip install -r automation/requirements.txt diff --git a/python/.run-act.sh b/automation/.run-act.sh similarity index 100% rename from python/.run-act.sh rename to automation/.run-act.sh diff --git a/automation/.run-ee.example.sh b/automation/.run-ee.example.sh new file mode 100755 index 00000000..79c840a1 --- /dev/null +++ b/automation/.run-ee.example.sh @@ -0,0 +1,24 @@ +# obligatories for automation +export GEE_SERVICE_ACCOUNT= +export GOOGLE_APPLICATION_CREDENTIALS= +export CLOUDSDK_CORE_PROJECT= +export BUCKET="wps_pillar1a" +export IMAGES_PREFIX_PATH="data_sources/Deltares/Data20" +export GS_STAGING_PREFIX="wps-staging-delete-me" +export GEE_PROJECT_FOLDER="projects/wpsi-208318/assets/wpsi" +export RW_API_KEY= + +# depending on the script +export MONTH_24_LAYER_ID="cdd0000b-34a9-4b3d-9640-8f574321223d" +export MONTH_3_LAYER_ID="cdd0000b-34a9-4b3d-9640-8f57422f264d" + +## to inspect data +# python automation/earthengine.py +# python automation/automation.py listBlobs +python automation/automation.py latestFiles +# python automation/automation.py updateLayer cdd0000b-34a9-4b3d-9640-8f57422f264d projects/ee-lizsaccoccia/assets/spi3_112024 +# python automation/automation.py updateLayer cdd0000b-34a9-4b3d-9640-8f57422f264d helo + +## automations +# python automation/month_24_automation.py +# python automation/month_3_automation.py diff --git a/automation/automation.py b/automation/automation.py new file mode 100644 index 00000000..5f755def --- /dev/null +++ b/automation/automation.py @@ -0,0 +1,62 @@ +import sys + +def main(): + """ + Main function to handle command line arguments and execute the appropriate command. + This function expects three command line arguments: + 1. command: The command to execute (e.g., 'updateLayer'). + 2. layerId: The ID of the layer to be updated. + 3. assetId: The path of the asset id (e.g., projects/ee-lizsaccoccia/assets/spi3_112024). + The function performs the following steps: + 1. Parses the command line arguments. + 2. Validates that both layerId and assetId are provided. + 3. Prints a success message if the arguments are valid. + 4. Prints an error message and usage example if the arguments are missing or invalid. + 5. Executes the 'updateLayer' command by calling the layer_set_asset_id function with the provided layerId and assetId. + Usage example: + python /home/carlos/wri/wri-wpsi/automation/rw-api.py updateLayer cdd0000b-34a9-4b3d-9640-8f57422f264d path/to/assetId + """ + try: + command = sys.argv[1] + assert(command) + except: + print('command was not passed as argument') + print('example: python automation.py ...args') + return + + if command == 'updateLayer': + from src.services.resourcewatch import layer_set_asset_id + try: + layerId = sys.argv[2] + assetId = sys.argv[3] + assert(layerId and assetId) + print(f'layer id: {layerId} and assetId {assetId} successfully loaded') + except: + print('layer id or assetId where not passed as arguments') + print(r'example: python automation.py updateLayer cdd0000b-34a9-4b3d-9640-8f57422f264d path/to/assetId') + return + layer_set_asset_id(layerId, assetId) + + if command == 'listBlobs': + from src.services.cloudstorage import print_all_blobs + print_all_blobs() + + if command == 'latestFiles': + from src.services.cloudstorage import latest_24_month, latest_3_month + print(latest_3_month(), latest_24_month()) + if command == 'layerSql': + from src.services.resourcewatch import layer_set_sql + try: + layerId = sys.argv[2] + sql = sys.argv[3] + assert(layerId and sql) + + except: + print('sql was not passed as argument') + print(r'example: python automation.py layerSql cdd0000b-34a9-4b3d-9640-8f57422f264d "SELECT * FROM wps_spi24_mask WHERE value > -1.5"') + return + layer_set_sql(layerId, sql) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/python/docker/Act.Dockerfile b/automation/docker/Act.Dockerfile similarity index 100% rename from python/docker/Act.Dockerfile rename to automation/docker/Act.Dockerfile diff --git a/automation/mask_automation.py b/automation/mask_automation.py new file mode 100644 index 00000000..ea1dc668 --- /dev/null +++ b/automation/mask_automation.py @@ -0,0 +1,36 @@ +import os +import eeUtil as eu +from src.services.mask import mask +from src.services.cloudstorage import latest_24_month, download +from src.services.project import project +from src.services.cartosql import init, upload_to_carto, deleteRows + +# define constants +BUCKET=os.environ["BUCKET"] +GS_STAGING_PREFIX=os.environ["GS_STAGING_PREFIX"] +MONTH_3_LAYER_ID=os.environ["MONTH_3_LAYER_ID"] +GEE_PROJECT_FOLDER=os.environ["GEE_PROJECT_FOLDER"] +CARTO_MASK_TABLE_NAME=os.environ["CARTO_MASK_TABLE_NAME"] + +# Login to gcloud and gee properly + +eu.init(bucket=BUCKET) + +# get the last 24 month image +filename, blob_name = latest_24_month() +print("****Mask automation****") +print(filename, blob_name) + +# download the file from google storage +download(blob_name, filename) + +# run the projection +project(filename) + +# upload it to GEE and make it public +masked_df = mask(filename) + +# carto +init() +deleteRows(CARTO_MASK_TABLE_NAME, "true") +upload_to_carto(CARTO_MASK_TABLE_NAME, masked_df) \ No newline at end of file diff --git a/automation/month_24_automation.py b/automation/month_24_automation.py new file mode 100644 index 00000000..35c8d838 --- /dev/null +++ b/automation/month_24_automation.py @@ -0,0 +1,34 @@ +import os +import eeUtil as eu +from src.services.resourcewatch import layer_set_asset_id +from src.services.path import strip_extension +from src.services.cloudstorage import latest_24_month, download +from src.services.project import project + +# define constants +BUCKET=os.environ["BUCKET"] +GS_STAGING_PREFIX=os.environ["GS_STAGING_PREFIX"] +MONTH_24_LAYER_ID=os.environ["MONTH_24_LAYER_ID"] +GEE_PROJECT_FOLDER=os.environ["GEE_PROJECT_FOLDER"] + +# Login to gcloud and gee properly + +eu.init(bucket=BUCKET) + +# get the last 24 month image +filename, blob_name = latest_24_month() +print("****latest 24 month****") +print(filename, blob_name) + +# download the file from google storage +download(blob_name, filename) + +# run the projection +project(filename) + +# upload it to GEE and make it public +imageId = f"{GEE_PROJECT_FOLDER}/{strip_extension(filename)}" +eu.upload(filename, imageId, gs_prefix=GS_STAGING_PREFIX, public=True, clean=False) + +# update the layerConfig.assetId in the given layerId from resourcewatch +layer_set_asset_id(MONTH_24_LAYER_ID, imageId) \ No newline at end of file diff --git a/automation/month_3_automation.py b/automation/month_3_automation.py new file mode 100644 index 00000000..13981390 --- /dev/null +++ b/automation/month_3_automation.py @@ -0,0 +1,34 @@ +import os +import eeUtil as eu +from src.services.resourcewatch import layer_set_asset_id +from src.services.path import strip_extension +from src.services.cloudstorage import latest_3_month, download +from src.services.project import project + +# define constants +BUCKET=os.environ["BUCKET"] +GS_STAGING_PREFIX=os.environ["GS_STAGING_PREFIX"] +MONTH_3_LAYER_ID=os.environ["MONTH_3_LAYER_ID"] +GEE_PROJECT_FOLDER=os.environ["GEE_PROJECT_FOLDER"] + +# Login to gcloud and gee properly + +eu.init(bucket=BUCKET) + +# get the last 24 month image +filename, blob_name = latest_3_month() +print("****latest 3 month****") +print(filename, blob_name) + +# download the file from google storage +download(blob_name, filename) + +# run the projection +project(filename) + +# upload it to GEE and make it public +imageId = f"{GEE_PROJECT_FOLDER}/{strip_extension(filename)}" +eu.upload(filename, imageId, gs_prefix=GS_STAGING_PREFIX, public=True, clean=False, ingest_params={"pyramidingPolicy": "MODE"}) + +# update the layerConfig.assetId in the given layerId from resourcewatch +layer_set_asset_id(MONTH_3_LAYER_ID, imageId) \ No newline at end of file diff --git a/python/readme.md b/automation/readme.md similarity index 87% rename from python/readme.md rename to automation/readme.md index 4b5dc725..9d1ff8d9 100644 --- a/python/readme.md +++ b/automation/readme.md @@ -32,4 +32,10 @@ pip install -r python/requirements.txt ## Documentation - pipreqs to update requirements.txt [pipreqs](https://github.com/bndr/pipreqs) -- pyenv [pyenv](https://github.com/pyenv/pyenv?tab=readme-ov-file#set-up-your-shell-environment-for-pyenv) \ No newline at end of file +- pyenv [pyenv](https://github.com/pyenv/pyenv?tab=readme-ov-file#set-up-your-shell-environment-for-pyenv) + +## Tests + +```bash +python -m unittest discover -s automation +``` \ No newline at end of file diff --git a/automation/requirements.txt b/automation/requirements.txt new file mode 100644 index 00000000..a8deea9c --- /dev/null +++ b/automation/requirements.txt @@ -0,0 +1,6 @@ +eeUtil==0.3.0 +eeUtil==0.3.0 +geopandas==1.0.1 +protobuf==5.28.3 +rasterio==1.4.2 +Requests==2.32.3 diff --git a/automation/src/__init__.py b/automation/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/automation/src/services/__init__.py b/automation/src/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/automation/src/services/cartosql.py b/automation/src/services/cartosql.py new file mode 100644 index 00000000..196fe47a --- /dev/null +++ b/automation/src/services/cartosql.py @@ -0,0 +1,275 @@ +import requests +import os +import logging +import json +from collections import OrderedDict +import time + +string_types = str + +CARTO_URL = 'https://{}.carto.com/api/v2/sql' +CARTO_USER = os.environ.get('CARTO_USER') +CARTO_KEY = os.environ.get('CARTO_KEY') + +CARTO_SCHEMA =OrderedDict([ + ('value', "numeric"), + ("the_geom", "geometry"), + ]) + +session = requests.Session() + +def init(user=None, key=None, auth=True): + '''Set user and key''' + global CARTO_USER, CARTO_KEY + CARTO_USER = user or os.environ.get('CARTO_USER') + CARTO_KEY = key or os.environ.get('CARTO_KEY') + if auth: + try: + get('SELECT * FROM CDB_UserTables() LIMIT 1') + return True + except requests.HTTPError as e: + logging.warning('Failed to authenticate') + logging.warning(e) + return False + +def sendSql(sql, user=None, key=None, f='', post=True): + '''Send arbitrary sql and return response object or False''' + user = user or CARTO_USER + key = key or CARTO_KEY + url = CARTO_URL.format(user) + payload = { + 'api_key': key, + 'q': sql, + } + if len(f): + payload['format'] = f + logging.debug((url, payload)) + if post: + r = requests.post(url, json=payload) + else: + r = requests.get(url, params=payload) + r.raise_for_status() + return r + + +def get(sql, user=None, key=None, f=''): + '''Send arbitrary sql and return response object or False''' + return sendSql(sql, user, key, f, False) + + +def post(sql, user=None, key=None, f=''): + '''Send arbitrary sql and return response object or False''' + return sendSql(sql, user, key, f) + + +def getFields(fields, table, where='', order='', limit='', user=None, + key=None, f='', post=True): + '''Select fields from table''' + fields = fields.split(',') if isinstance(fields, string_types) else fields + where = 'WHERE {}'.format(where) if where else '' + order = 'ORDER BY {}'.format(order) if order else '' + limit = 'LIMIT {}'.format(limit) if limit else '' + sql = 'SELECT {} FROM "{}" {} {} {}'.format( + ','.join(fields), table, where, order, limit) + return sendSql(sql, user, key, f, post) + +def getTables(user=None, key=None, f='csv'): + '''Get the list of tables''' + r = get('SELECT * FROM CDB_UserTables()', user, key, f) + if f == 'csv': + return r.text.splitlines()[1:] + return r + + +def tableExists(table, user=None, key=None): + '''Check if table exists''' + return table in getTables(user, key) + + +def createTable(table, schema, user=None, key=None): + ''' + Create table with schema and CartoDBfy table + + `schema` should be a dict or list of tuple pairs with + - keys as field names and + - values as field types + ''' + items = schema.items() if isinstance(schema, dict) else schema + defslist = ['{} {}'.format(k, v) for k, v in items] + sql = 'CREATE TABLE "{}" ({})'.format(table, ','.join(defslist)) + if post(sql, user, key): + return _cdbfyTable(table, user, key) + return False + +def createTableFromQuery(table, query, user=None, key=None): + ''' + Create table with from query and CartoDBfy table + ''' + sql = 'CREATE TABLE "{}" AS {}'.format(table, query) + if post(sql, user, key): + return _cdbfyTable(table, user, key) + return False + + +def _cdbfyTable(table, user=None, key=None): + '''CartoDBfy table so that it appears in Carto UI''' + user = user or CARTO_USER + sql = "SELECT cdb_cartodbfytable('{}','\"{}\"')".format(user, table) + return post(sql, user, key) + + +def createIndex(table, fields, unique='', using='', user=None, + key=None): + '''Create index on table on field(s)''' + fields = (fields,) if isinstance(fields, string_types) else fields + f_underscore = '_'.join(fields) + f_comma = ','.join(fields) + unique = 'UNIQUE' if unique else '' + using = 'USING {}'.format(using) if using else '' + sql = 'CREATE {} INDEX idx_{}_{} ON {} {} ({})'.format( + unique, table, f_underscore, table, using, f_comma) + return post(sql, user, key) + + +def _escapeValue(value, dtype): + ''' + Escape value for SQL based on field type + + TYPE Escaped + None -> NULL + geometry -> string as is; obj dumped as GeoJSON + text -> single quote escaped + timestamp -> single quote escaped + varchar -> single quote escaped + else -> as is + ''' + if value is None: + return "NULL" + if dtype == 'geometry': + # if not string assume GeoJSON and assert WKID + if isinstance(value, string_types): + return value + else: + value = str(value) + return "ST_SetSRID(ST_GeomFromText('{}'),4326)".format(value) + elif dtype in ('text', 'timestamp', 'varchar'): + # quote strings, escape quotes, and drop nbsp + return "'{}'".format( + str(value).replace("'", "''")) + else: + return str(value) + + +def _dumpRows(rows, dtypes): + '''Escapes rows of data to SQL strings''' + dumpedRows = [] + for row in rows: + escaped = [_escapeValue(row[i], dtypes[i]) for i in range(len(dtypes))] + dumpedRows.append('({})'.format(','.join(escaped))) + return ','.join(dumpedRows) + + +def _insertRows(table, fields, dtypes, rows, user=None, key=None): + values = _dumpRows(rows, tuple(dtypes)) + sql = 'INSERT INTO "{}" ({}) VALUES {}'.format( + table, ', '.join(fields), values) + return post(sql, user, key) + + +def insertRows(table, fields, dtypes, rows, user=None, + key=None, blocksize=1000): + ''' + Insert rows into table + + `rows` must be a list of lists containing the data to be inserted + `fields` field names for the columns in `rows` + `dtypes` field types for the columns in `rows` + + Automatically breaks into multiple requests at `blocksize` rows + ''' + # iterate in blocks + while len(rows): + if not _insertRows(table, fields, dtypes, rows[:blocksize], user, key): + return False + rows = rows[blocksize:] + return True + +# Alias insertRows +blockInsertRows = insertRows + + +def deleteRows(table, where, user=None, key=None): + '''Delete rows from table''' + sql = 'DELETE FROM "{}" WHERE {}'.format(table, where) + return post(sql,user, key) + + +def deleteRowsByIDs(table, ids, id_field='cartodb_id', dtype='', + user=None, key=None): + '''Delete rows from table by IDs''' + if dtype: + ids = [_escapeValue(i, dtype) for i in ids] + where = '{} in ({})'.format(id_field, ','.join(ids)) + return deleteRows(table, where, user, key) + + +def dropTable(table, user=None, key=None): + '''Delete table''' + sql = 'DROP TABLE "{}"'.format(table) + return post(sql, user, key) + +def truncateTable(table, user=None, key=None): + '''Delete table''' + sql = 'TRUNCATE TABLE "{}"'.format(table) + return post(sql,user, key) + +def upload_to_carto(carto_table, gdf): + ''' + Function to upload data to the Carto table + INPUT row: the geopandas dataframe of data we want to upload (geopandas dataframe) + RETURN the wdpa_pid of the row just uploaded + ''' + # replace all null values with None + gdf = gdf.where(gdf.notnull(), None) + # convert the geometry in the geometry column to geojsons + # row['geometry'] = convert_geometry(row['geometry']) + # construct the sql query to upload the row to the carto table + fields = CARTO_SCHEMA.keys() + + # maximum attempts to make + n_tries = 4 + # sleep time between each attempt + retry_wait_time = 6 + values = _dumpRows(gdf.values.tolist(), tuple(CARTO_SCHEMA.values())) + + insert_exception = None + payload = { + 'api_key': CARTO_KEY, + 'q': 'INSERT INTO "{}" ({}) VALUES {}'.format(carto_table, ', '.join(fields), values) + } + del values + for i in range(n_tries): + try: + # send the sql query to the carto API + r = session.post('https://{}.carto.com/api/v2/sql'.format(CARTO_USER), json=payload) + r.raise_for_status() + except Exception as e: # if there's an exception do this + insert_exception = e + if r.status_code != 429: + logging.error(r.content) + logging.warning('Attempt #{} to upload row #{} unsuccessful. Trying again after {} seconds'.format(i, gdf['WDPA_PID'], retry_wait_time)) + logging.debug('Exception encountered during upload attempt: '+ str(e)) + time.sleep(retry_wait_time) + else: # if no exception do this + return gdf + else: + # this happens if the for loop completes, ie if it attempts to insert row n_tries times + logging.error('Upload of row #{} has failed after {} attempts'.format(gdf['WDPA_PID'], n_tries)) + logging.error('Problematic row: '+ str(gdf)) + logging.error('Raising exception encountered during last upload attempt') + logging.error(insert_exception) + raise insert_exception + +if __name__ == '__main__': + from . import cli + cli.main() \ No newline at end of file diff --git a/automation/src/services/cloudstorage.py b/automation/src/services/cloudstorage.py new file mode 100644 index 00000000..c12984c1 --- /dev/null +++ b/automation/src/services/cloudstorage.py @@ -0,0 +1,86 @@ +import os +import eeUtil as eu +from google.cloud import storage + +# define constants +BUCKET=os.environ["BUCKET"] +IMAGES_PREFIX_PATH=os.environ["IMAGES_PREFIX_PATH"] + +# Login to gcloud and gee properly +eu.init(bucket=BUCKET) + +client = storage.Client() + +def _latest_file(file_type, prefix=IMAGES_PREFIX_PATH): + """ + Retrieves the latest file of a specific type from a cloud storage bucket. + This function lists all blobs in the specified bucket with a given prefix, + filters out non-tif files, and identifies the latest file based on the filename. + + Parameters: + file_type (str): The file type to filter (e.g., "ERA5_SPI24" or "SEAS5_SPI3"). + prefix (str): The prefix to filter files. + + Returns: + tuple: A tuple containing the latest filename and the blob name. + + Raises: + ValueError: If no valid file is found or if the latest filename is not a string or is an empty string. + """ + + blobs = client.list_blobs(BUCKET, prefix=prefix) + + latestFile = '' + latestBlob = '' + + for blob in blobs: + filename = blob.name.split("/")[-1] + if filename[-3:] != 'tif': + continue + if filename[:10] == file_type: + if filename > latestFile: + latestBlob = blob.name + latestFile = filename + + if not isinstance(latestFile, str) or latestFile == '': + raise ValueError("latestFile is either not a string or is an empty string") + + return latestFile, latestBlob + +def latest_24_month(): + """ + Retrieves the latest ERA5_SPI24 file from a cloud storage bucket. + Calls the general _get_latest_file function with the appropriate prefix and file type. + + Returns: + tuple: The filename of the latest ERA5_SPI24 file and the blob name. + """ + + return _latest_file("ERA5_SPI24") + +def latest_3_month(): + """ + Retrieves the latest SEAS5 image filename from a cloud storage bucket. + Calls the general _get_latest_file function with the appropriate prefix and file type. + + Returns: + tuple: The filename of the latest SEAS5 image and the blob name. + """ + + return _latest_file("SEAS5_SPI3") + +def print_all_blobs(): + """ + Prints the names of all blobs in the specified cloud storage bucket. + This function lists all blobs in the specified bucket with a given prefix + and prints their names. + """ + blobs = client.list_blobs(BUCKET, prefix=IMAGES_PREFIX_PATH) + + for blob in blobs: + print(blob.name) + +def download(blob_name, filename): + bucket = client.bucket(BUCKET) + blob = bucket.blob(blob_name) + blob.download_to_filename(filename) \ No newline at end of file diff --git a/automation/src/services/dates.py b/automation/src/services/dates.py new file mode 100644 index 00000000..df309916 --- /dev/null +++ b/automation/src/services/dates.py @@ -0,0 +1,16 @@ + + +def prev_month(month, delta): + """ + Calculate the previous month based on the given month and delta. + Args: + month (int): The current month as an integer (1-12). + delta (int): The number of months to go back. Can be negative or positive. + Returns: + int: The resulting month after applying the delta. The result is always in the range 1-12. + """ + + if month+delta <= 0: + return month+delta+12 + else: + return (month+delta)%12 \ No newline at end of file diff --git a/automation/src/services/mask.py b/automation/src/services/mask.py new file mode 100644 index 00000000..70bb9be7 --- /dev/null +++ b/automation/src/services/mask.py @@ -0,0 +1,32 @@ +import rasterio +from rasterio.crs import CRS +from rasterio.features import shapes +import geopandas as gpd + +def mask(filename): + with rasterio.open(filename) as src: + # https://gis.stackexchange.com/questions/410885/defining-the-crs-using-rasterio-when-reading-nongeoreferenced-raster + # Read the raster band you want to vectorize + band = src.read(1) + + # Extract vector features from the raster + shapes_generator = shapes(band, mask=band != 0, transform=src.transform) + # Create a list of features with their geometries + + features = [ + {"geometry": geometry, "properties": { 'value': value } } for geometry, value in shapes_generator + ] + + collection = { + "type": "FeatureCollection", + "features": features + } + + gdf = gpd.GeoDataFrame.from_features(features=collection, crs=CRS.from_epsg(4326)) + + gdf['mask'] = gdf['value'].apply(lambda x: 0 if x < -1.5 else 1) + + gdf = gdf.dissolve('mask') + gdf = gdf.explode() + + return gdf \ No newline at end of file diff --git a/automation/src/services/path.py b/automation/src/services/path.py new file mode 100644 index 00000000..98b6af04 --- /dev/null +++ b/automation/src/services/path.py @@ -0,0 +1,13 @@ +import os + +def strip_extension(filename: str) -> str: + """ + Removes the file extension from the given filename. + + Parameters: + filename (str): The name of the file, including the extension. + + Returns: + str: The filename without the extension. + """ + return os.path.splitext(filename)[0] \ No newline at end of file diff --git a/automation/src/services/project.py b/automation/src/services/project.py new file mode 100644 index 00000000..98d1977b --- /dev/null +++ b/automation/src/services/project.py @@ -0,0 +1,6 @@ +import rasterio +from rasterio.crs import CRS + +def project(filename): + with rasterio.open(filename, "r+") as rds: + rds.crs = CRS.from_epsg(4326) \ No newline at end of file diff --git a/automation/src/services/resourcewatch.py b/automation/src/services/resourcewatch.py new file mode 100644 index 00000000..8e67cf12 --- /dev/null +++ b/automation/src/services/resourcewatch.py @@ -0,0 +1,74 @@ +import requests +import json +import sys +import os + + +# print(config) +API_TOKEN = os.environ["RW_API_KEY"] + +if API_TOKEN: + print('RW_API_KEY successfully loaded!') +else: + print('Please check the path to your .env file and make sure you have a key called RW_API_KEY in your .env file.') + +def create_headers(): + return { + 'content-type': "application/json", + 'authorization': "{}".format(API_TOKEN), + } + +def getLayerData(layerId): + try: + rw_api_url = 'https://api.resourcewatch.org/v1/layer/{}'.format(layerId) + data = requests.request("GET", rw_api_url, headers=create_headers()).json()["data"] + return data + except: + raise ValueError('Failed getting layer data') + +def updateLayer(datasetId, layerId, layerData): + try: + rw_api_url = 'https://api.resourcewatch.org/v1/dataset/{}/layer/{}'.format(datasetId, layerId) + print(rw_api_url) + res = requests.patch(rw_api_url, data=json.dumps(layerData), headers=create_headers()) + print(res.json()["data"]["attributes"]["layerConfig"]) + except: + raise ValueError('Failed to update layer data') + +def layer_set_asset_id(layerId, assetIdContent): + """ + Updates the asset ID of a specified layer. + Args: + layerId (str): The ID of the layer to be updated. + assetIdContent (str): The new asset ID to be set for the layer. + Returns: + None + Raises: + KeyError: If the layer data does not contain the expected keys. + Exception: If there is an error updating the layer. + Example: + layer_set_asset_id("cdd0000b-34a9-4b3d-9640-8f57422f2aad", "projects/wpsi-208318/assets/wpsi/ERA5_SPI24_202409") + """ + + layerData = getLayerData(layerId) + + datasetId = layerData["attributes"]["dataset"] + layerId = layerData["id"] + print(layerData["attributes"]["layerConfig"]["assetId"]) + + ### updated assetId + layerData["attributes"]["layerConfig"]["assetId"] = assetIdContent + + updateLayer(datasetId, layerId, layerData["attributes"]) + +def layer_set_sql(layerId, sql): + layerData = getLayerData(layerId) + + datasetId = layerData["attributes"]["dataset"] + layerId = layerData["id"] + print(layerData["attributes"]["layerConfig"]["body"]["layers"][0]["options"]["sql"]) + + layerData["attributes"]["layerConfig"]["body"]["layers"][0]["options"]["sql"] = sql + + print(layerData["attributes"]["layerConfig"]["body"]["layers"][0]["options"]["sql"]) + updateLayer(datasetId, layerId, layerData["attributes"]) \ No newline at end of file diff --git a/automation/src/test/__init__.py b/automation/src/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/automation/src/test/services/__init__.py b/automation/src/test/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/automation/src/test/services/test_dates.py b/automation/src/test/services/test_dates.py new file mode 100644 index 00000000..827c583f --- /dev/null +++ b/automation/src/test/services/test_dates.py @@ -0,0 +1,13 @@ +import unittest +from automation.src.services.dates import prev_month + +class TestDates(unittest.TestCase): + def test_prev_month(self): + self.assertEqual(prev_month(12, -1), 11) + self.assertEqual(prev_month(1, -2), 11) + self.assertEqual(prev_month(11, 2), 1) + self.assertEqual(prev_month(12, 1), 1) + self.assertEqual(prev_month(1, 3), 4) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/python/.run-ee.sh b/python/.run-ee.sh deleted file mode 100755 index 0f833f6d..00000000 --- a/python/.run-ee.sh +++ /dev/null @@ -1,5 +0,0 @@ -export GEE_SERVICE_ACCOUNT="wps-automation@wpsi-208318.iam.gserviceaccount.com" -export GOOGLE_APPLICATION_CREDENTIALS="gcloud-key.json" -export CLOUDSDK_CORE_PROJECT="wpsi-208318" - -python python/earthengine.py \ No newline at end of file diff --git a/python/earthengine.py b/python/earthengine.py deleted file mode 100644 index d7b611d5..00000000 --- a/python/earthengine.py +++ /dev/null @@ -1,9 +0,0 @@ -import eeUtil as eu - -eu.init() - -for asset in eu.ls('wpsi'): - print(str(asset)) - eu.setAcl("wpsi/"+ asset, "public") - - diff --git a/python/getFilename24month.py b/python/getFilename24month.py deleted file mode 100644 index dbf5ff43..00000000 --- a/python/getFilename24month.py +++ /dev/null @@ -1,24 +0,0 @@ -from datetime import datetime -import os - -# 24 month - -def getPrevMonth(month, delta): - if month+delta <= 0: - return month+delta+12 - else: - return (month+delta)%12 - -current_year_month = f"{datetime.now().year}{getPrevMonth(datetime.now().month, -1):02}" -prev_year_month = f"{datetime.now().year}{getPrevMonth(datetime.now().month, -2):02}" - -file = 'ERA5_SPI24_{year_month}' - -env_file = os.getenv('GITHUB_ENV') # Get the path of the runner file - -# write to the file -with open(env_file, "a") as env_file: - env_file.write(f"WPS_24_MONTH_GS=gs://wps_pillar1a/data_sources/Deltares/Data{current_year_month}/{file.format(year_month=prev_year_month)}.tif\n") - env_file.write(f"WPS_24_MONTH_GS_PROJ=gs://wps_pillar1a/data_sources/Deltares/Data{current_year_month}/proj_{file.format(year_month=prev_year_month)}.tif\n") - env_file.write(f"WPS_24_MONTH_FILENAME={file.format(year_month=prev_year_month)}") - \ No newline at end of file diff --git a/python/getFilename3month.py b/python/getFilename3month.py deleted file mode 100644 index 22bf7a51..00000000 --- a/python/getFilename3month.py +++ /dev/null @@ -1,24 +0,0 @@ -from datetime import datetime -import os - -# 24 month - -def getPrevMonth(month, delta): - if month+delta <= 0: - return month+delta+12 - else: - return (month+delta)%12 - -current_year_month = f"{datetime.now().year}{getPrevMonth(datetime.now().month, -1):02}" -prev_year_month = f"{datetime.now().year}{getPrevMonth(datetime.now().month, -2):02}" - -file = 'SEAS5_SPI3_{year_month}' - -env_file = os.getenv('GITHUB_ENV') # Get the path of the runner file - -# write to the file -with open(env_file, "a") as env_file: - env_file.write(f"WPS_3_MONTH_GS=gs://wps_pillar1a/data_sources/Deltares/Data{current_year_month}/{file.format(year_month=current_year_month)}.tif\n") - env_file.write(f"WPS_3_MONTH_GS_PROJ=gs://wps_pillar1a/data_sources/Deltares/Data{current_year_month}/proj_{file.format(year_month=current_year_month)}.tif\n") - env_file.write(f"WPS_3_MONTH_FILENAME={file.format(year_month=current_year_month)}") - \ No newline at end of file diff --git a/python/project24month.py b/python/project24month.py deleted file mode 100644 index a9e75295..00000000 --- a/python/project24month.py +++ /dev/null @@ -1,21 +0,0 @@ -import rasterio -from rasterio.crs import CRS -import os -import eeUtil as eu - -FILE = os.environ["WPS_24_MONTH_FILENAME"] -EXTENTION = ".tif" -BUCKET='wps_pillar1a' -GEE_FILE = f"projects/wpsi-208318/assets/wpsi/{FILE}" - -if FILE: - print('The filename evn WPS_24_MONTH_FILENAME={FILE} successfully loaded!') -else: - print('Please check the env WPS_24_MONTH_FILENAME was not loaded') - -with rasterio.open(f"{FILE}{EXTENTION}", "r+") as rds: - rds.crs = CRS.from_epsg(4326) - -eu.init(bucket=BUCKET) - -eu.upload([f"{FILE}{EXTENTION}"], [GEE_FILE], gs_prefix='wpsstaging', public=True, clean=False) ## clean=True fails \ No newline at end of file diff --git a/python/project3month.py b/python/project3month.py deleted file mode 100644 index 13f3dff5..00000000 --- a/python/project3month.py +++ /dev/null @@ -1,21 +0,0 @@ -import rasterio -from rasterio.crs import CRS -import os -import eeUtil as eu - -FILE = os.environ["WPS_3_MONTH_FILENAME"] -EXTENTION = ".tif" -BUCKET='wps_pillar1a' - - -if FILE: - print('The filename evn WPS_3_MONTH_FILENAME={FILE} successfully loaded!') -else: - print('Please check the env WPS_3_MONTH_FILENAME was not loaded') - -with rasterio.open(f"{FILE}{EXTENTION}", "r+") as rds: - rds.crs = CRS.from_epsg(4326) - -eu.init(bucket=BUCKET) - -eu.upload(f"{FILE}{EXTENTION}", f"projects/wpsi-208318/assets/wpsi/{FILE}", gs_prefix='wpsstaging', public=True, clean=False) \ No newline at end of file diff --git a/python/requirements.txt b/python/requirements.txt deleted file mode 100644 index 5df586b5..00000000 --- a/python/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -eeUtil==0.3.0 -eeUtil==0.3.0 -rasterio==1.4.2