Skip to content

Commit

Permalink
More MR fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Dor Amram committed Jan 9, 2020
1 parent 2d561cd commit 7f3e6db
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 13 deletions.
3 changes: 1 addition & 2 deletions scottypy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def link(beam_id_or_tag, url, storage_base, dest):
for beam in scotty.get_beams_by_tag(tag):
_link_beam(storage_base, beam, os.path.join(dest, str(beam.id)))
else:
beam = scotty.get_beam(beam_id_or_tag)
beam = asyncio.run(scotty.get_beam(beam_id_or_tag))
if dest is None:
dest = beam_id_or_tag

Expand Down Expand Up @@ -162,7 +162,6 @@ def down(beam_id_or_tag, dest, url, overwrite, filter): # pylint: disable=W0622

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*beams))
loop.close()


@main.group()
Expand Down
5 changes: 5 additions & 0 deletions scottypy/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ class NotOverwriting(Exception):
def __init__(self, file_):
super(NotOverwriting, self).__init__()
self.file = file_


class HTTPError(Exception):
def __init__(self, *, url, code, text):
super().__init__("Server responded {code} when accessing {url}:\n{text}".format(code=code,url=url,text=text))
28 changes: 17 additions & 11 deletions scottypy/scotty.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import aiohttp
import asyncio
import emport
import json
import os
import requests
import socket
import logging
from aiohttp import ClientSession
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from tempfile import NamedTemporaryFile
from .file import File
from .beam import Beam
from .exc import PathNotExists

from .utils import execute_http

_SLEEP_TIME = 10
_NUM_OF_RETRIES = (60 // _SLEEP_TIME) * 15
Expand All @@ -32,6 +35,7 @@ def __init__(self, url, retry_times=3, backoff_factor=2):
self._session.mount(
url, HTTPAdapter(
max_retries=Retry(total=retry_times, status_forcelist=[502, 504], backoff_factor=backoff_factor)))

self._combadge = None

def prefetch_combadge(self):
Expand Down Expand Up @@ -179,20 +183,16 @@ def remove_tag(self, beam_id, tag):
response = self._session.delete("{0}/beams/{1}/tags/{2}".format(self._url, beam_id, tag), timeout=_TIMEOUT)
response.raise_for_status()

def get_beam(self, beam_id):
async def get_beam(self, beam_id):
"""Retrieve details about the specified beam.
:param int beam_id: Beam ID.
:rtype: :class:`.Beam`"""
response = self._session.get("{0}/beams/{1}".format(self._url, beam_id), timeout=_TIMEOUT)
response.raise_for_status()

json_response = response.json()
json_response = await execute_http("{0}/beams/{1}".format(self._url, beam_id))
return Beam.from_json(self, json_response['beam'])

def get_files(self, beam_id, filter_):
response = self._session.get(
"{0}/files".format(self._url),
response = self._session.get("{0}/files".format(self._url),
params={"beam_id": beam_id, "filter": filter_},
timeout=_TIMEOUT)
response.raise_for_status()
Expand All @@ -215,12 +215,18 @@ def get_beams_by_tag(self, tag):
:param str tag: The name of the tag.
:return: a list of :class:`.Beam` objects.
"""

response = self._session.get("{0}/beams?tag={1}".format(self._url, tag), timeout=_TIMEOUT)
response.raise_for_status()

ids = (b['id'] for b in response.json()['beams'])
return [self.get_beam(id_) for id_ in ids]
beams_id_list = (b['id'] for b in response.json()['beams'])
beams = []
for beam_id in beams_id_list:
beams.append(self.get_beam(beam_id))

loop = asyncio.get_event_loop()
beams_result = loop.run_until_complete(asyncio.gather(*beams))

return beams_result

def sanity_check(self):
"""Check if this instance of Scotty is functioning. Raise an exception if something's wrong"""
Expand Down
38 changes: 38 additions & 0 deletions scottypy/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import aiohttp
import asyncio
import logging
import yarl
from aiohttp import ClientSession
from typing import Optional, Dict, Any
from tenacity import stop_after_attempt, stop_after_delay, retry
from .exc import HTTPError


logger = logging.getLogger("scotty") # type: logging.Logger


class AsyncRequestHelper:
def __init__(self):
self._loop = asyncio.get_event_loop()
self._session = aiohttp.ClientSession(
loop=self._loop, timeout=aiohttp.ClientTimeout(total=30),
headers={"Accept-Encoding": "gzip", 'Content-Type': 'application/json'}
)

@retry(stop=(stop_after_delay(5) | stop_after_attempt(3)))
async def execute_http(
self, url: yarl.URL, *, data: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
method = "GET" if data is None else "POST"
logger.info("Async Calling {} {}", method, url)
async with self._session.request(method, url, params=params, json=data) as response:
if response.status != 200:
raise HTTPError(url=url, code=response.status, text=await response.text())
return await response.json()

def __del__(self):
self._loop.run_until_complete(self._session.close())


_async_request_helper = AsyncRequestHelper()
execute_http = _async_request_helper.execute_http

0 comments on commit 7f3e6db

Please sign in to comment.