Тестируем aiohttp с помощью простого чата

в 17:16, , рубрики: aiohttp, asyncio, python, python3, метки: ,
Оглавление

  • Введение
  • Структура
  • Routes
  • Handlers, Request and Response
  • Настройки конфигурации
  • Middlewares
  • Базы данных
  • Шаблоны
  • Сессии, авторизация
  • Static
  • WebSocket
  • Выгрузка на Heroku


Введение

Прошлой осенью мне удалось побывать на нескольких python meetups в Киеве.
На одном из них выступал Николай Новик и рассказывал о новом асинхронном фреймворке aiohttp, работающем на библиотеке для асинхронных вызовов asyncio в 3 версии интерпретатора питона. Данный фреймворк заинтересовал меня тем, что он создавался core python разработчиками и позиционировался как концепт python фреймворка для веба.

Сейчас имеется огромное количество разных фреймворков, в каждом из которых своя философия,
синтаксис и реализация общих для веба шаблонов. Надеюсь, что со временем, все это разнообразие
будет на одной основе — aiohttp.

Структура

Чтобы протестировать по максимуму все возможности aiohttp, я попытался разработать простой чат на вебсокетах. Основой aiohttp является бесконечный loop, в котором крутятся handlers. Handler — так называемая coroutine, объект, который не блокирует ввод/вывод(I/O). Данный тип объектов появился в python 3.4 в библиотеке asyncio. Пока не произойдут все вычисления в данном объекте, он как бы засыпает, а в это время интерпретатор может обрабатывать другие объекты. Чтобы было понятно, приведу пример. Зачастую все задержки сервера происходят, когда он ожидает ответа от базы данных и пока этот ответ не придёт и не обработается, другие объекты ждут своей очереди. В данном случае другие объекты будут обрабатываться, пока не придёт ответ из базы. Но для реализации этого нужен асинхронный драйвер.
На данный момент для aiohttp реализованы асинхронные драйвера и обёртки для большинства популярных баз данных (postgresql, mysql, redis)
Для mongodb есть Motor, который используется в чате.

Точкой входа для чата служит файл app.py. В нем создаётся объект app.

import asyncio
from aiohttp import web

loop = asyncio.get_event_loop()

app = web.Application(loop=loop, middlewares=[
    session_middleware(EncryptedCookieStorage(SECRET_KEY)),
    authorize,
    db_handler,
])

Как вы видите, при инициализации в app передаётся loop, а также список middleware, о котором будет рассказано попозже.

Routes

В отличии от flask на который aiohttp очень похож, routes добавляются в уже инициализированное приложение app.

app.router.add_route('GET', '/{name}', handler)

Вот кстати объяснение Андрея Светлова, почему именно так реализовано.

Заполнение routes вынесено в отдельный файл routes.py.

from chat.views import ChatList, WebSocket
from auth.views import Login, SignIn, SignOut

routes = [
    ('GET', '/',        ChatList,  'main'),
    ('GET', '/ws',      WebSocket, 'chat'),
    ('*',   '/login',   Login,     'login'),
    ('*',   '/signin',  SignIn,    'signin'),
    ('*',   '/signout', SignOut,   'signout'),
]

Первый элемент — http метод, далее расположен url, третьим в кортеже идёт объект handler, и напоследок — имя route, чтобы удобно было его вызывать в коде.

Далее импортируется список routes в app.py и они заполняются простым циклом в приложение.

from routes import routes

for route in routes:
        app.router.add_route(route[0], route[1], route[2], name=route[3])

Все просто и логично

Handlers, Request and Response

Я решил обработку запросов сделать по примеру Django фреймворка. В папке auth находиться все, что касается пользователей, авторизации, обработка создания пользователя и его входа. А в папке chat находиться логика работы чата соответственно. В aiohttp можно реализовать handler в качестве как функции, так и класса.
Выбираем реализацию через класс.

class Login(web.View):

    async def get(self):
        session = await get_session(self.request)
        if session.get('user'):
            url = request.app.router['main'].url()
            raise web.HTTPFound(url)
        return b'Please enter login or email'

Про сессии будет написано ниже, а все остальное думаю понятно и так. Хочу заметить, что переадресация происходит либо возвратом(return) либо выбросом исключения в виде объекта web.HTTPFound(), которому передаётся путь параметром. Http методы в классе реализуются через асинхронные функции get, post и тд. Есть некоторые особенности, если нужно работать с параметрами запроса.

data = await self.request.post()

Настройки конфигурации

Все настройки хранятся в файле settings.py. Для хранения секретных данных я использую envparse. Данная утилита позволяет читать данные из переменных окружения, а также парсить специальный файл, где эти переменные хранятся.

if isfile('.env'):
    env.read_envfile('.env')

Во первых, это было необходимо для поднятия проекта на Heroku, а во вторых, это оказалось ещё и очень удобно. Сначала я использовал локальную базу, а потом тестировал на удалённой и переключение состояло из изменения всего одной строки в файле .env.

Middlewares

При инициализации приложения можно задавать middleware. Здесь они вынесены в отдельный файл. Реализация стандартная — функция декоратор, в которой можно делать проверки или любые другие действия с запросом.

Пример проверки на авторизацию

async def authorize(app, handler):
    async def middleware(request):
        def check_path(path):
            result = True
            for r in ['/login', '/static/', '/signin', '/signout', '/_debugtoolbar/']:
                if path.startswith(r):
                    result = False
            return result

        session = await get_session(request)
        if session.get("user"):
            return await handler(request)
        elif check_path(request.path):
            url = request.app.router['login'].url()
            raise web.HTTPFound(url)
            return handler(request)
        else:
            return await handler(request)

    return middleware

Также есть middleware для подключения базы данных.

async def db_handler(app, handler):
    async def middleware(request):
        if request.path.startswith('/static/') or request.path.startswith('/_debugtoolbar'):
            response = await handler(request)
            return response

        request.db = app.db
        response = await handler(request)
        return response
    return middleware

Детали подключения ниже по тексту.

Базы данных

Для чата используется Mongodb и асинхронный драйвер Motor. Подключение к базе происходит при инициализации приложения.

app.client = ma.AsyncIOMotorClient(MONGO_HOST)
app.db = app.client[MONGO_DB_NAME]

А закрытие соединения происходит в специальной функции shutdown.

async def shutdown(server, app, handler):

    server.close()
    await server.wait_closed()
    app.client.close()  # database connection close
    await app.shutdown()
    await handler.finish_connections(10.0)
    await app.cleanup()

Хочу заметить, что в случае асинхронного сервера нужно корректно завершить все параллельные задачи.

Немного подробнее про создание event loop.

loop = asyncio.get_event_loop()
serv_generator, handler, app = loop.run_until_complete(init(loop))
serv = loop.run_until_complete(serv_generator)
log.debug('start server', serv.sockets[0].getsockname())
try:
    loop.run_forever()
except KeyboardInterrupt:
    log.debug(' Stop server begin')
finally:
    loop.run_until_complete(shutdown(serv, app, handler))
    loop.close()
log.debug('Stop server end')

Сам loop создаётся из asyncio.

serv_generator, handler, app = loop.run_until_complete(init(loop))

Метод run_until_complete добавляет corutines в loop. В данном случае он добавляет функцию инициализации приложения.

try:
    loop.run_forever()
except KeyboardInterrupt:
    log.debug(' Stop server begin')
finally:
    loop.run_until_complete(shutdown(serv, app, handler))
    loop.close()

Собственно сама реализация бесконечного цикла, который прерывается в случае исключения. Перед закрытием вызывается функция shutdown, которая завершает все соединения и корректно останавливает сервер.

Теперь нам надо разобраться, как делать запросы, извлекать и изменять данные

class Message():

    def __init__(self, db, **kwargs):
        self.collection = db[MESSAGE_COLLECTION]

    async def save(self, user, msg, **kw):
        result = await self.collection.insert({'user': user, 'msg': msg, 'time': datetime.now()})
        return result

    async def get_messages(self):
        messages = self.collection.find().sort([('time', 1)])
        return await messages.to_list(length=None)

Хотя у меня не задействована ОРМ, запросы к базе удобнее делать в отдельных классах. В папке chat был создан файл models.py, где находится класс Message. В методе get_messages создаётся запрос, который достаёт все сохранённые сообщения, отсортированные по времени. В методе save создаётся запрос на сохранение сообщения в базу.

Шаблоны

Для aiohttp написано несколько асинхронных обёрток для популярных шаблонизаторов, в частности aiohttp_jinja2 и aiohttp_mako. Для чата использую jinja2.

aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'))

Вот так поддержка шаблонов инициализируется в приложении.
FileSystemLoader('templates') указывает jinja2 что наши шаблоны лежать в папке templates.

class ChatList(web.View):
    @aiohttp_jinja2.template('chat/index.html')
    async def get(self):
        message = Message(self.request.db)
        messages = await message.get_messages()
        return {'messages': messages}

Через декоратор мы указываем, какой шаблон будем использовать во views, а для заполнения контекста, возвращаем словарь с переменными, с которыми потом работаем в шаблоне.

Сессии, авторизация

Для работы с сессиями есть библиотека aiohttp_session. Есть возможность хранить сессии в Redis или в cookies в зашифрованном виде, используя cryptography. Способ хранения указывается ещё при установке библиотеки.

aiohttp_session[secure]

Для инициализации сессии, добавляем её в middleware.

session_middleware(EncryptedCookieStorage(SECRET_KEY)),

Чтобы достать или положить значения в сессию, нужно сначала извлечь её из запроса.

session = await get_session(request)

Для авторизации пользователя, добавляем в сессию его id, а потом в middleware проверяем его наличие. Конечно для безопасности нужно больше проверок, но для тестирования концепции хватит и этого.

Static

Папка с статическим контентом подключается отдельным route при инициализации приложения.

app.router.add_static('/static', 'static', name='static')

Чтобы задействовать её в шаблоне, нужно достать её из app.

<script src="{{ app.router.static.url(filename='js/main.js') }}"></script>

Все просто, ничего сложного нету.

WebSocket

Наконец-то мы добрались до самой вкусной части aiohttp). Реализация socket очень проста. В javascript я добавил минимально необходимый функционал для его работы.

try{
    var sock = new WebSocket('ws://' + window.location.host + '/ws');
}
catch(err){
    var sock = new WebSocket('wss://' + window.location.host + '/ws');
}

// show message in div#subscribe
function showMessage(message) {
    var messageElem = $('#subscribe'),
        height = 0,
        date = new Date();
        options = {hour12: false};
    messageElem.append($('<p>').html('[' + date.toLocaleTimeString('en-US', options) + '] ' + message + 'n'));
    messageElem.find('p').each(function(i, value){
        height += parseInt($(this).height());
    });

    messageElem.animate({scrollTop: height});
}

function sendMessage(){
    var msg = $('#message');
    sock.send(msg.val());
    msg.val('').focus();
}

sock.onopen = function(){
    showMessage('Connection to server started')
}

// send message from form
$('#submit').click(function() {
    sendMessage();
});

$('#message').keyup(function(e){
    if(e.keyCode == 13){
        sendMessage();
    }
});

// income message handler
sock.onmessage = function(event) {
  showMessage(event.data);
};

$('#signout').click(function(){
    window.location.href = "signout"
});

sock.onclose = function(event){
    if(event.wasClean){
        showMessage('Clean connection end')
    }else{
        showMessage('Connection broken')
    }
};

sock.onerror = function(error){
    showMessage(error);
}

Для реализации серверной части я использую class WebSocket

class WebSocket(web.View):
    async def get(self):
        ws = web.WebSocketResponse()
        await ws.prepare(self.request)

        session = await get_session(self.request)
        user = User(self.request.db, {'id': session.get('user')})
        login = await user.get_login()

        for _ws in self.request.app['websockets']:
            _ws.send_str('%s joined' % login)
        self.request.app['websockets'].append(ws)

        async for msg in ws:
            if msg.tp == MsgType.text:
                if msg.data == 'close':
                    await ws.close()
                else:
                    message = Message(self.request.db)
                    result = await message.save(user=login, msg=msg.data)
                    log.debug(result)
                    for _ws in self.request.app['websockets']:
                        _ws.send_str('(%s) %s' % (login, msg.data))
            elif msg.tp == MsgType.error:
                log.debug('ws connection closed with exception %s' % ws.exception())

        self.request.app['websockets'].remove(ws)
        for _ws in self.request.app['websockets']:
            _ws.send_str('%s disconected' % login)
        log.debug('websocket connection closed')

        return ws

Сам socket создаётся используя функцию WebSocketResponse(). Обязательно перед использованием его нужно "приготовить". Список открытых sockets у меня хранится в приложении(чтобы при закрытии сервера их можно было корректно закрыть). При подключении нового пользователя, все участники получают уведомление о том что новый участник присоединился к чату. Далее мы ожидаем сообщения от пользователя. Если оно валидно, мы сохраняем его в базе данных и отсылаем другим участникам чата.
Когда socket закрывается, мы удаляем его из списка и оповещаем чат, что его покинул один из участников. Очень простая реализация, визуально в синхронном стиле, без большого количества callbacks, как в Tornado к примеру. Бери и пользуйся).

Выгрузка на Heroku

Тестовый чат я выложил на Heroku, для наглядной демонстрации. При установке возникло несколько проблем, в частности для использования их внутренней базы mongodb нужно было вносить данные кредитной карты, что делать мне не хотелось, поэтому воспользовался услугами MongoLab и создал там базу. Далее были проблемы с установкой самого приложения. Для установки cryptography нужно было явно указывать его в requirements.txt. Также для указания версии python нужно создавать в корне проекта файл runtime.txt.

Выводы

В целом создание чата, изучение aiohttp, разбор работы sockets и некоторых других технологий, с которыми я до этого не работал, заняло у меня где-то около 3 недель работы по вечерам и редко на выходных.
Документация в aiohttp довольно неплохая, много асинхронных драйверов и обёрток уже готовы для тестирования.
Возможно для production пока не все готово, но развитие идёт очень активно (за 3 недели aiohttp обновилась с версии 0.19 до 0.21).
Если нужно добавить в проект sockets, этот вариант отлично подойдёт, чтобы не добавлять тяжёлую Tornado в зависимости.

Ссылки

Все ошибки и недочеты присылайте пожалуйста в личку :)

Автор: Crandel

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js