Skip to content

Commit

Permalink
Update dashboard (#255)
Browse files Browse the repository at this point in the history
* use async mode for dashboard

* fix combine
  • Loading branch information
cehbrecht authored Aug 1, 2024
1 parent 7aad3ea commit 1e6f313
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
11 changes: 6 additions & 5 deletions rook/processes/wps_dashboard.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from pywps import FORMATS, ComplexOutput, Format, LiteralInput, Process
from pywps import FORMATS, ComplexOutput, LiteralInput, Process
from pywps.app.Common import Metadata
from pywps.app.exceptions import ProcessError

Expand All @@ -23,13 +23,13 @@ def __init__(self):
min_occurs=1,
max_occurs=1,
default="local",
allowed_values=["local", "ceda", "ipsl", "dkrz", "all"],
allowed_values=["local", "ipsl", "dkrz", "all"],
),
LiteralInput(
"time",
"Time Period",
abstract="The time period for usage collection seperated by /"
"Example: 2021-04-01/2021-04-30",
"Example: 2024-06-01/2024-06-30",
data_type="string",
min_occurs=0,
max_occurs=1,
Expand All @@ -53,7 +53,7 @@ def __init__(self):
metadata=[
Metadata("ROOK", "https://github.com/roocs/rook"),
],
version="0.1",
version="0.2",
inputs=inputs,
outputs=outputs,
store_supported=True,
Expand All @@ -69,7 +69,8 @@ def _handler(self, request, response):
time = None
time_start = time_end = None
try:
usage = Combine(site=request.inputs["site"][0].data)
site = request.inputs["site"][0].data
usage = Combine(site=site)
fusage, fdownloads = usage.collect(
time_start=time_start, time_end=time_end, outdir=self.workdir
)
Expand Down
6 changes: 3 additions & 3 deletions rook/processes/wps_usage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from pywps import FORMATS, ComplexOutput, Format, LiteralInput, Process
from pywps import FORMATS, ComplexOutput, LiteralInput, Process
from pywps.app.Common import Metadata
from pywps.app.exceptions import ProcessError

Expand All @@ -23,7 +23,7 @@ def __init__(self):
"time",
"Time Period",
abstract="The time period for usage collection seperated by /"
"Example: 2021-04-01/2021-04-30",
"Example: 2024-06-01/2024-06-30",
data_type="string",
min_occurs=0,
max_occurs=1,
Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(self):
metadata=[
Metadata("ROOK", "https://github.com/roocs/rook"),
],
version="0.1",
version="0.2",
inputs=inputs,
outputs=outputs,
store_supported=True,
Expand Down
21 changes: 17 additions & 4 deletions rook/usage/combine.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import concurrent.futures
import pandas as pd
import time as time_

from pywps import configuration as config

from owslib.wps import WebProcessingService, SYNC
from owslib.wps import WebProcessingService, ASYNC

from .base import Usage

Expand All @@ -18,7 +19,19 @@

def get_usage(site, time):
wps = WebProcessingService(url=URLS[site])
resp = wps.execute(identifier="usage", inputs=[("time", time)], mode=SYNC)
resp = wps.execute(
identifier="usage",
inputs=[("time", time)],
mode=ASYNC,
output=[("wpsusage", True), ("downloads", True)],
)
while resp.isComplete() is False:
time_.sleep(10)
resp.checkStatus()

if not resp.isSucceded():
raise Exception("usage collection failed.")

# requests
df = pd.read_csv(
resp.processOutputs[0].reference, parse_dates=["time_start", "time_end"]
Expand Down Expand Up @@ -48,15 +61,15 @@ class Combine(Usage):
def __init__(self, site=None):
site = site or "local"
if site == "all":
self.sites = ["ceda", "dkrz"]
self.sites = ["ipsl", "dkrz"]
else:
self.sites = [site]

def collect(self, time_start=None, time_end=None, outdir=None):
time = format_time(time_start, time_end)
df_list = []
df_downloads_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
jobs = {executor.submit(get_usage, site, time): site for site in self.sites}
for future in concurrent.futures.as_completed(jobs):
site = jobs[future]
Expand Down

0 comments on commit 1e6f313

Please sign in to comment.