Skip to content

Latest commit

 

History

History
executable file
·
224 lines (190 loc) · 7.49 KB

README.md

File metadata and controls

executable file
·
224 lines (190 loc) · 7.49 KB

aioRestFramework

REST Framework for aiohttp

Usage example

Create some authentication middleware that patch default request and add .user attribute

import re
from xml.etree import ElementTree as etree
from json.decoder import JSONDecodeError
from multidict import MultiDict, MultiDictProxy

from aiohttp import web
from aiohttp.hdrs import (
    METH_POST, METH_PUT, METH_PATCH, METH_DELETE
)

from aiorestframework import exceptions
from aiorestframework import serializers
from aiorestframework import Response
from aiorestframework.views import BaseViewSet
from aiorestframework.permissions import set_permissions, AllowAny
from aiorestframework.app import APIApplication

from my_project.settings import Settings  # Generated by aiohttp-devtools


TOKEN_RE = re.compile(r'^\s*BEARER\s{,3}(\S{64})\s*$')

async def token_authentication(app, handler):
    """
    Authorization middleware
    Catching Authorization: BEARER <token> from request headers
    Found user in Tarantool by token and bind User or AnonymousUser to request
    """
    async def middleware_handler(request):
        # Check that `Authorization` header exists
        if 'authorization' in request.headers:
            authorization = request.headers['authorization']
            # Check matches in header value
            match = TOKEN_RE.match(authorization)
            if not match:
                setattr(request, 'user', AnonymousUser())
                return await handler(request)
            else:
                token = match[1]
        elif 'authorization_token' in request.query:
            token = request.query['authorization_token']
        else:
            setattr(request, 'user', AnonymousUser())
            return await handler(request)

        # Try select user auth record from Tarantool by token index
        res = await app['tnt'].select('auth', [token, ])
        cached = res.body
        if not cached:
            raise exceptions.AuthenticationFailed()

        # Build basic user data and bind it to User instance
        record = cached[0]
        user = User()
        user.bind_cached_tarantool(record)

        # Add User to request
        setattr(request, 'user', user)

        return await handler(request)
    return middleware_handler

And for data extraction from request:

DATA_METHODS = [METH_POST, METH_PUT, METH_PATCH, METH_DELETE]
JSON_CONTENT = ['application/json', ]
XML_CONTENT = ['application/xml', ]
FORM_CONTENT = ['application/x-www-form-urlencoded', 'multipart/form-data']

async def request_data_handler(app, handler):
    """
    Request .data middleware
    Try extract POST data or application/json from request body
    """
    async def middleware_handler(request):
        data = None
        if request.method in DATA_METHODS:
            if request.has_body:
                if request.content_type in JSON_CONTENT:
                    # If request has body - try to decode it to JSON
                    try:
                        data = await request.json()
                    except JSONDecodeError:
                        raise exceptions.ParseError()
                elif request.content_type in XML_CONTENT:
                    if request.charset is not None:
                        encoding = request.charset
                    else:
                        encoding = api_settings.DEFAULT_CHARSET
                    parser = etree.XMLParser(encoding=encoding)
                    try:
                        text = await request.text()
                        tree = etree.XML(text, parser=parser)
                    except (etree.ParseError, ValueError) as exc:
                        raise exceptions.ParseError(
                            detail='XML parse error - %s' % str(exc))
                    data = tree
                elif request.content_type in FORM_CONTENT:
                    data = await request.post()

        if data is None:
            # If not catch any data create empty MultiDictProxy
            data = MultiDictProxy(MultiDict())

        # Attach extracted data to request
        setattr(request, 'data', data)

        return await handler(request)
    return middleware_handler

Create few serializers:

class UserRegisterSerializer(s.Serializer):
    """Register new user"""
    email = s.EmailField(max_length=256)
    password = s.CharField(min_length=8, max_length=64)
    first_name = s.CharField(min_length=2, max_length=64)
    middle_name = s.CharField(default='', min_length=2, max_length=64, 
                              required=False, allow_blank=True)
    last_name = s.CharField(min_length=2, max_length=64)
    phone = s.CharField(max_length=32, required=False,
                        allow_blank=True, default='')

    async def register_user(self, app):
        user = User()
        data = self.validated_data
        try:
            await user.register_user(data, app)
        except Error as e:
            resolve_db_exception(e, self)
       return user

And few ViewSets. It may be nested in bindings['custom']

class UserViewSet(BaseViewSet):
    name = 'user'
    lookup_url_kwarg = '{user_id:[0-9a-f]{32}}'
    permission_classes = [AllowAny, ]
    bindings = {
        'list': {
            'retrieve': 'get',
            'update': 'put'
        },
        'custom': {
            'list': {
                'set_status': 'post',
                'create_new_sip_password': 'post',
                'get_registration_domain': 'get',
                'report': UserReportViewSet
            }
        }
    }

    @staticmethod
    async def resolve_sip_host(data, user, app):
        sip_host = await resolve_registration_switch(user, app)
        data.update({'sip_host': sip_host})

    async def retrieve(self, request):
        user = User()
        await user.load_from_db(request.match_info['user_id'], request.app)

        serializer = user_ser.UserProfileSerializer(instance=user)
        data = serializer.data
        await self.resolve_sip_host(data, user, request.app)
        return Response(data=data)

    @atomic
    @set_permissions([AuthenticatedOnly, IsCompanyMember, CompanyIsEnabled])
    async def update(self, request):
        serializer = user_ser.UserProfileSerializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        user = await serializer.update_user(user_id=request.user.id,
                                            app=request.app)

        serializer = user_ser.UserProfileSerializer(instance=user)
        data = serializer.data
        await self.resolve_sip_host(data, user, request.app)

        return Response(data=data)

Register Viewsets and run Application:

def setup_routes(app: APIApplication):
    """Add app routes here"""
    # Auth API
    app.router.register_viewset('/auth', auth_vws.AuthViewSet())
    # User API
    app.router.register_viewset('/user', user_vws.UserViewSet())
    # Root redirection to Swagger
    redirect = app.router.add_resource('/', name='home_redirect')
    redirect.add_route('*', swagger_redirect)


def create_api_app():
    sentry = get_sentry_middleware(settings.SENTRY_CONNECT_STRING, settings.SENTRY_ENVIRONMENT)
    middlewares = [sentry, token_authentication, request_data_handler]
    api_app = APIApplication(name='api', middlewares=middlewares, 
client_max_size=10*(1024**2))

    api_app.on_startup.append(startup.startup_api)
    api_app.on_shutdown.append(startup.shutdown_api)
    api_app.on_cleanup.append(startup.cleanup_api)

    setup_routes(api_app)

if __name__ == '__main__':
    app = create_api_app()
    web.run_app(app)