О том, как я качал музыку с VK

в 18:38, , рубрики: django, python, метки: ,

Предистория

Каждое утро я езжу на работу и это занимает N-ое количество времени от 15 минут (на машине) до 40 минут (на общественном транспорте). К сожалению, утром по радио крутят совсем не музыку, а разные «развлекательные» программы. Очень долго я ездил либо с выключенным магнитофоном, либо всю дорогу искал радиостанцию, либо врубал наушники (пока не раздавил свой телефон).

И вот мне это надоело. Магнитола у меня из дешевых, но умеет читать с флешек. В один прекрасный день, по дороге на работу, я взял и купил SD-карточку (удобней всего ибо не выпирает). Все хорошо, но теперь вопрос стал иначе: «Где взять музыку?». Не долго думая решил, что мне хватит плейлиста с VK. Всего-то 400+ песен, но их нужно выкачать.

Посмотрев на решения, которые можно найти в интернете, решил написать свое. Создал проект на django, настроил ее на работу с couchdb и принялся писать.

Причины

Несколько причин, по которым я решил написать свое, а не использовать готовое решение.
— не хотел устанавливать какой-то плагин/програму для скачивания
— качать вручную по одному файлу
— да и вообще хотелось что-то свое

Что я хотел получить

Ответ на этот вопрос довольно прост. Минимальный набор требований: зашел на сайт, что-то нажал, увидел аудиозаписи, нажал кнопку, скачал их на компьютер.

Далее о том, как это происходило (пытался восстановить реальный ход событий).

Получение аудиозаписей

Для получения доступа к аудиозаписям взял за основу VK Api.

Этап №1. Сначала авторизация и получение токена. (не буду описывать API VK ибо все это можно найти у них на сайте).
Через несколько минут в папке с django-аппликацией был создан файл vkapi.py и добавлено примерно следующее содержимое:

vkapi.py

def authorize():
    payload = {
        'client_id': settings.APP_ID,
        'scope': ','.join(settings.APP_PERMISSIONS),
        # TODO: сменить нах
        'redirect_uri': 'http://127.0.0.1:8000/vkapi/authorize',
        'response_type': 'code',
        'v': settings.APP_API_VERSION
    }
    return 'https://oauth.vk.com/authorize?%s' % urllib.urlencode(payload)

А в файл views.py добавлена вьюха:

views.py

def vk_authorize(request):
    return redirect(authorize())

Итак мы получили code, который передается параметром на redirect_url. Теперь нам нужно получить access_token.

На данном этапе меня волновал вопрос где его хранить. Изначально думал сделать регистрацию и возможность подключения VK только для зарегистрированных пользователей, а access_token писать в документ (документ, ибо couchdb) пользователя. Но что, если я не хочу входить или регистрироваться… Хватит сессии. Не вижу смысла чего-то большего для своих нужд.

Так как лень застала меня врасплох, я решил не разделять URL для авторизации и получения access_token'a и вьюха vk_authorize приобрела следующий, не особо красивый вид:

vk_authorize

def vk_authorize(request):
    # подумать как перенести в мидлварь
    if request.GET.get('code'):
        code = request.GET['code']
        r = access_token_get(code)
        print r.text
        data = r.json()
        
        if data.get('error'):
            raise Http404("Error: %s. Desc: %s" % (data['error'], data['error_description']))
        
        data['date'] = datetime.now()
        request.session['vkapi'] = data
        return redirect('main')
        
    elif request.GET.get('error'):
        error = request.GET['error']
        error_description = request.GET.get('error_description')
        raise Http404("Error: %s. Desc: %s" % (error, error_description))

а в vkapi.py дописана функция для получения access_token'a

access_token

def access_token_get(code):
    payload = {
        'client_id': settings.APP_ID,
        'client_secret': settings.APP_SECRET,
        'code': code,
        'redirect_uri': 'http://127.0.0.1:8000/vkapi/authorize',
    }
    
    return requests.get('https://oauth.vk.com/access_token', params=payload)

Этап №2. У нас уже есть access_token и пишем его в сессию. Можно начать доставать аудиозаписи. Так в файл vkapi.py дописана еще две функции. Одна общая для запросов api вконтакте, а вторая для получения аудиозаписей пользователя.

vkapi.py

def request_api(method, access_token, params):
    """
    Для того чтобы вызвать метод API ВКонтакте, Вам необходимо осуществить
    POST или GET запрос по протоколу HTTPS на указанный URL:
    https://api.vk.com/method/'''METHOD_NAME'''?'''PARAMETERS'''&access_token='''ACCESS_TOKEN'''
    
    METHOD_NAME – название метода из списка функций API (http://vk.com/dev/methods),
    PARAMETERS – параметры соответствующего метода API,
    ACCESS_TOKEN – ключ доступа, полученный в результате успешной авторизации приложения.
    """
    payload = {
        'access_token': access_token,
        'v': settings.APP_API_VERSION,
    }
    payload.update(params)
    r = requests.get('https://api.vk.com/method/%s' % method, params=payload)
    return r.json().get('response', {})


def audio_get(session):
    
    params = {
        'owner_id': session['user_id'],
        'count': 6000,
    }
    return request_api('audio.get', session['access_token'], params)

Файл views.py в свою очередь пополнился еще одной вьюхой:

vk_audios

@render_to("downloader/vk_audios.html")
def vk_audios(request):
    audios = []
    if request.session.get('vkapi'):
        # TODO: мидлварь, которая будет обновлять access_token
        audios = audio_get(request.session['vkapi'])
        
    return {
        'audios': audios,
    }

Отлично, я получил список всех своих аудиозаписей. Было написано еще немного кода на получение списка альбомов и отображения их песен, а также для поиска аудиозаписей. Можно увидеть, что я возвращаю только 'response'. Так вот, я решил просто не заморачиватся, если запрос ошибочный :)

К сожалению, оставалось еще это: "# TODO: мидлварь, которая будет обновлять access_token". Была написана мидлварь access_token.py со следующим содержимым:

access_token

 # *  coding: utf8 *
from datetime import datetime, timedelta
from django.conf import settings
from django.shortcuts import redirect
from downloader.vkapi import access_token_get


class AccessTokenMiddleware(object):

    def process_request(self, request):
        
        if request.session.get('vkapi'):
            data = request.session['vkapi']
            expired = data['date']  timedelta(seconds=int(data['expires_in']))
            if (expired  datetime.now()).seconds < 3600:
                return redirect('vk_authorize')
        
        return None

Но тут, видимо, я протупил и описал process_request вместо process_response и меня постоянно редиректило на авторизацию. Не долго думаю мидлварь была переписана в декоратор (решил, что можно получать новый токен за час до того как он станет просроченным, считаю не принципиальным).

Почему декоратор? Ну тут это… про process_response подумалось аж на следующий день, а переделывать что-то работающее не хотелось.

authorize_require decorator

def authorize_require(view_func):
    
    def check_session(request, *args, **kwargs):
        if request.session.get('vkapi'):
            data = request.session['vkapi']
            expired = data['date']  timedelta(seconds=int(data['expires_in']))
            if (expired - datetime.now()).seconds < 3600:
                return redirect('vk_authorize')
        else:
            return redirect('vk_authorize')
        return view_func(request, *args, **kwargs)

    return check_session

Теперь у меня был список аудиозаписей и автоматическое обновление токена (там, где это нужно просто вначале фун-ции дописывался декоратор). Еще позже была добавлена простенькая регистрация (не знаю зачем).

Регистрация (отступление)

Обычно использую такую регистрацию, когда пишу для себя. Быстро и дешево, да и хватает с головой.

В urls.py добавляю строку:

url(r'^registration/$', 'downloader.views.registration', name='registration'),

views.py пополняется такой вьюхой:

@render_to("registration/registration.html")
def registration(request):
    
    form = RegistrationForm()
    
    if request.method == "POST":
        form = RegistrationForm(data=request.POST)
        if form.is_valid():
            user = User(_db=request.db,
                        is_active=True,
                        is_superuser=False,
                        type='user',
                        permissions=[],
                        groups=[], )
            username = form.cleaned_data['username']
            password = form.cleaned_data['password']
            user.update({"username": username, 'password': make_password(password)})
            user.create('u_%s' % slugify(username))
            
            auth_user = authenticate(username=username, password=password)
            login(request, auth_user)
            return redirect('main')
            
    return {
        "form": form
    }

Вьюха создает нового пользователя в CouchDB и сразу же его авторизирует, после чего кидает на 1 страницу.

Форма RegistrationForm выглядит вот так:
forms.py

class RegistrationForm(forms.Form):
    
    username = forms.EmailField(label=_('Username'), max_length=30)
    password = forms.CharField(widget=forms.PasswordInput(), label=_('Password'))

    def clean_username(self):
        username = self.cleaned_data['username']
        user = django_couch.db().view('auth/auth', key=username).rows
        if user:
            raise forms.ValidationError(_("Username already in use"))
        return username

registration/registration.html

{% extends "base.html" %}
{% load i18n %}
{% load bootstrap_toolkit %}

{% block title %}
 - {% trans "Registration" %}
{% endblock %}

{% block lib %}
    <link rel="stylesheet" href="/media/css/authentification.css" />
{% endblock %}


{% block body %}
<div class="container">
    <div id="register-form">
        <form action="" method="post" class="form-horizontal">
            <legend>
                <h2>{% trans 'Registration' %}</h2>
            </legend>
            {{ form|as_bootstrap }}{% csrf_token %}
            <div class="form-actions">
                <button class="btn btn-primary" type="submit">{% trans 'Register' %}</button>  
                <small>
                    <a href="{% url login %}"> {% trans 'Login' %}</a>
                </small>
            </div>
        </form>
    </div>
</div>
{% endblock %}

Скачивание

Этап №3. Итак мы уже получаем список аудиозаписей. Теперь нужно их скачать. Естественно можно пройтись каким-то ботом по каждой ссылке и скачать, но мне нужно было получить на выходе либо папку с аудиозаписями, либо архив (что бы скачать все сразу).

Пагинация (отступление)

Осенило меня в общем… если у пользователя будет over100500 аудиозаписей, то браузер просто загнется при рендеринге и было решено добавить пагинацию.

Функция audio_get преобразилась примерно до такого вида, что дало возможность сделать пагинацию:

def audio_get(session, album_id='', offset=0, count=100):
    
    params = {
        'owner_id': session['user_id'],
        'offset': offset,
        'count': count,
        'album_id': album_id,
    }
    return request_api('audio.get', session['access_token'], params)

vk_audios в файле view.py приобрела примерно такой вид:

@authorize_require
@render_to("downloader/vk_audios.html")
def vk_audios(request, album_id=''):  
    
    try:
        page = int(request.GET.get('page', 1))
    except:
        raise Http404("Error page param: %s" % request.GET['page'])
    
    offset = 100 * (int(page) - 1)
    
    response = audio_get(request.session['vkapi'], album_id=album_id, offset=offset)
    
    audios = response.get('items', [])
    audios_count = response.get('count')
    
     return {
        'album_id': album_id,
        'audios_count': audios_count,
        'page': page,
        'offset': offset,
        'audios': audios,
     }

Был добавлен inclusion_tag, который принимал количество аудиозаписей, страницу на которой находится пользователь и id альбома, что бы рендерить страницы.

@register.inclusion_tag('snippets/pagination.html')
def render_pagination(audios_count, page, album_id=False):
    
    pages_count = int(math.ceil(audios_count / 100.0))  1
    
    pages = range(1, pages_count)
    
    return {
        "pages": pages,
        "page": page,
        "album_id": album_id,
    }

И добавлен html-файл (snippets/pagination.html):

{% load i18n %}

{% if pages|length > 1 %}
<div class="pagination pagination-right">
    <ul>
    {% for p in pages %}
        <li {% ifequal p page %}class="active"{% endifequal %}>
            {% if album_id %}
            <a href="{% url vk_audios album_id %}?page={{ p }}">{{ p }}</a>
            {% else %}
            <a href="{% url vk_audios %}?page={{ p }}">{{ p }}</a>
            {% endif %}
        </li>
    {% endfor %}
    </ul>
</div>
{% endif %}

Итого я ограничил себя скачиванием по 100 файлов. Осталось их скачать.

Нужно скачать файлы… но как? Пользователь нажал кнопку и ждет, пока ему сервер отдаст архив? Хм… Решать задачу принялся так:
Этап №3.1 — Создание запроса на скачивание. На странице с аудиозаписями вывел форму, в которую нужно ввести свой email и создать запрос на скачивание.

form.py Пополнился новой формой.

forms.py

class RequestsForm(forms.Form):
    
    username = forms.EmailField(label=_('E-mail'),
                                help_text=_("Archive with audios will be send to this email"))

Почему поле username? Все по причине регистрации на email. Пользователь создается с username = email, указанный при регистрации. Так, если пользователь вошел на сайт, мы можем подставить его email, а он если захочет поменяет.

Теперь пользователь тыкает в кнопку и мы создаем документ со следующей структурой, после чего ложим его id в nsq:

структура документа в couchdb

* _id - r_<hash>
* status - new
* username - test@test.com
* is_active - true
* audios - [
   {
       "url": "<url>",
       "processed": true,
       "title": "Three Days Grace - I Hate Everything About You"
   }
]
* date_created - 2013-10-20 11:27:21.208492
* type - request

Поле status может принимать еще несколько значений: «processing», «error», «processed», «deleted».

Для документа couch'a была добавлена моделька:

models.py

class DownloadRequest(django_couch.Document):
    
    def __init__(self, *args, **kwargs):
        self._db = django_couch.db('db_requests')
        self.type = 'request'
        self.is_active = True
        self.status = 'new'
        self.date_created = datetime.now().isoformat(' ')
        super(DownloadRequest, self).__init__(*args, **kwargs)
        
    @staticmethod
    def load(resp_id):
        db = django_couch.db('db_requests')
        
        if resp_id in db:
            doc = DownloadRequest(db[resp_id])
            assert doc.type == 'request', _("Invalid data loaded")
            
            return doc
            
        else:
            raise Http404(_("Can't find download request with id '%s'") % id)
        
    @staticmethod
    def get_list(email):
        pass

Что бы ложить в nsq скопирована с других мест функция:

nsq_push

def nsq_push(topic, message, fmt='json'):
    url = "http://%s/put?topic=%s" % (random.choice(settings.NSQD_HTTP_ADDRESSES), topic)
    
    if fmt == 'json':
        message_encoded = json.dumps(message)
    elif fmt == 'raw':
        message_encoded = message
    else:
        raise Exception("Unsupported message encode format: %s" % fmt)
    
    r = requests.post(url, data=message_encoded)
    
    return r.ok

А вьюха vk_audios приобрела следующий вид:

vk_audios

@authorize_require
@render_to("downloader/vk_audios.html")
def vk_audios(request, album_id=''):
    
    try:
        page = int(request.GET.get('page', 1))
    except:
        page = 1
        messages.error(request, _("Error page: %s. Changed to 1") % request.GET.get('page'))
    
    offset = 100 * (int(page) - 1)
    
    response = audio_get(request.session['vkapi'], album_id=album_id, offset=offset)
    
    audios = response.get('items', [])
    audios_count = response.get('count')
    
    if request.user.is_authenticated():
        initial_data = request.user
    else:
        initial_data = {'username': ''}
    
    form = RequestsForm(request.POST or None, initial=initial_data)
    
    if form.is_valid():
        request_doc = DownloadRequest()
        request_doc.update(form.cleaned_data)
        
        formated_audios = []
        for audio in audios:
            formated_data = {
                'title': "%s - %s" % (audio['artist'], audio['title']),
                'url': audio['url'],
                'processed': False,
            }
            formated_audios.append(formated_data)
        
        request_doc.update({'audios': formated_audios})
        request_doc.create('r', force_random_suffix=True)
        messages.success(request, _("Download request successfully created"))
        nsq_push('download_requests', request_doc.id, fmt="raw")
        
    return {
        'album_id': album_id,
        'audios_count': audios_count,
        'page': page,
        'offset': offset,
        'audios': audios,
        'form': form,
    }

Теперь у нас есть список аудиозаписей, мы создаем запрос на скачивание и ложим id документа в nsq. НО, захотелось видеть список своих запросов и их статусы…

Вывод запросов (отступление):

И принялся я писать вьюху для их отображения. Была использована таже форма, что и выше, для отбора по email'у. В CouchDB был создан дизайн док, который строит индекс с ключем [email, дата создания]:

function(doc) {
    if (doc.type == 'request' && doc.is_active) {
        emit([doc.username, doc.date_created])
    }
}

А в аппликацию добавлена вьюха requests_list:

@render_to("downloader/requests.html")
def requests_list(request):
    
    requests = []
    
    if request.user.is_authenticated():
        initial_data = request.user
    else:
        initial_data = {'username': ''}
    
    form = RequestsForm(request.GET or None, initial=initial_data)
    
    if form.is_valid():
        requests = DownloadRequest.get_list(form.cleaned_data['username'])
    
    return {
        'form': form,
        'requests': requests,
    }

И дописана функция get_list в модель DownloadRequest:

@staticmethod
def get_list(email):
    db = django_couch.db('db_requests')
    
    requests = db.view('requests/list', include_docs=True, startkey=[email], endkey=[email, {}]).rows
    return [DownloadRequest(request) for request in requests]

Хэх! Теперь я еще вижу и статусы, осталось написать nsq-обработчик, который собственно будет скачивать...

Этап №3.2 — Скачивание. Через некоторое время появились наброски обработчика nsq:

management/commands/download_request_worker.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import nsq
import signal
import requests
import django_couch

from django.conf import settings
from django.core.management.base import BaseCommand
from logger import logger

from downloader.models import DownloadRequest

        
class Command(BaseCommand):
    
    def handle(self, *args, **options):
        
        self.log = logger('download_request', int(options['verbosity']) > 1, settings.LOG_DIR)
        
        signal.signal(signal.SIGINT, self.signal_callback)
        signal.signal(signal.SIGTERM, self.signal_callback)
        
        self.db = django_couch.db('db_requests')
        
        nsq.Reader({"message_callback": self.message_callback},
                   "download_requests",
                   "download_requests",
                   nsqd_tcp_addresses=settings.NSQD_TCP_ADDRESSES)
        self.log.debug("Starting NSQ...")
        nsq.run()
        
    def process_request(self, request):
        self.log.debug("Setting status '%s' to 'processing.'" % request.status)
        
        request.status = 'processing'
        request.save()
        
        user_path = os.path.join(settings.DOWNLOAD_AUDIOS_DIR, request.username)
        if not os.path.exists(user_path):
            os.mkdir(user_path)
        
        self.log.debug("User dir: %s" % user_path)
            
        request_path = os.path.join(user_path, request.id)
        if not os.path.exists(request_path):
            os.mkdir(request_path)
        
        self.log.debug("Download request dir: %s" % request_path)
        
        for audio in request.audios:
            self.log.debug("Title: %s. Url: %s", audio['title'], audio['url'])
            if audio.get('processed', False):
                self.log.debug("Already processed")
                continue
            
            self.log.debug("Downloading file..")
            response = requests.get(audio['url'])
            self.log.debug("Downloaded")
            
            filename = os.path.join(request_path, "%s.mp3" % audio['title'])
            self.log.debug("Writing to filename: %s" % filename)
            
            with open(filename, 'wb') as f:
                f.write(response.content)
            self.log.debug("Setting audio to processed")
            audio['processed'] = True
        
        request.save()

    def message_callback(self, message):
        self.log.debug("Processing message id: %s", message.id)
        
        self.log.debug("Message data: %s", message.body)
        
        try:
            request = DownloadRequest.load(message.body)
            self.log.info("Document loaded. Audios count: %s" % len(request.audios))
            self.process_request(request)
            self.log.debug("Setting status '%s' to 'processed.'" % request.status)
            request.status = 'processed'
            request.save()
            self.log.debug("Request successfullly processed.")
        except:
            return False
        return True

    def signal_callback(self, signal_number, stack_frame):
        self.log.critical("Signal %d received. Shutting down", signal_number)
        
        sys.exit(-1)

Итак что он умел:
— получал id документа с очереди
— создавал папку, если ее не было
— скачивал туда аудиофайлы

Далее было дописано еще архивирование и отправка email'a пользователю. Формат сообщения, который ложится в nsq немного изменился, ибо для того, что бы построить url на скачивание, нужно знать host, для этого в django есть функция request.get_host(), но нет доступа к request'у внутри менеджмент команды (возможно кто знает что можно сделать в этом случае), из-за чего я решил ложить в nsq меседж следующего вида: {'host': request.get_host(), 'id': <id документ запроса на скачивание>}.

Но все еще это были наброски. Причина тому — nsq. У nsq есть несколько ограничений:
— каждых N секунд он шлет heartbeat подключенным воркерам и если не получает ответ 2 раза, коннект закрывается. Т.е. если наш обработчик будет скачивать много файлов, коннект будет закрыть.
— если nsq не получает, что сообщение обработано в течении N секунд, сообщение отдается другому обработчику. Т.е. если я запущу 2 обработчика, то скачивание начнется как минимум 2 раза.

Немного посмотрев в pynsq решил использовать async mode и также обрабатывать скачивание в отдельных процессах. Возможно не самое хорошее решение и не самый красивый код у меня получился.

Функция обработки nsq-сообщений приобрела следующий вид:

management/commands/download_request_worker.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import nsq
import json
import time
import shutil
import signal
import requests
import django_couch
import multiprocessing

from logger import logger
from datetime import datetime

from django.conf import settings
from django.core.mail import send_mail
from django.core.urlresolvers import reverse
from django.template.loader import render_to_string
from django.utils.translation import ugettext_lazy as _
from django.core.management.base import BaseCommand

from downloader.models import DownloadRequest
from downloader.views import nsq_push


class DownloadRequestProcess(multiprocessing.Process):
    
    def __init__(self, log, message, *args, **kwargs):
        self.log = log
        self.message = message
        super(DownloadRequestProcess, self).__init__(*args, **kwargs)
        
    def process_request(self, drequest):
        self.log.debug("Setting status '%s' to 'processing'. For doc: %s" % (drequest.status, drequest.id))
        
        drequest.status = 'processing'
        drequest.save()
            
        request_path = os.path.join(settings.DOWNLOAD_AUDIOS_DIR, drequest.id)
        if not os.path.exists(request_path):
            os.mkdir(request_path)
        
        self.log.debug("Download request dir: %s" % request_path)
        
        for audio in drequest.audios:
            self.log.debug("Title: %s. Url: %s", audio['title'], audio['url'])
            if audio.get('processed', False):
                self.log.debug("Audio already processed")
                continue
            
            filename = os.path.join(request_path, "%s.mp3" % audio['title'])
            self.log.debug("Filename: %s" % filename)
            
            self.log.debug("Downloading file...")
            response = requests.get(audio['url'])
            self.log.debug("Downloaded")
            
            with open(filename, 'wb') as f:
                f.write(response.content)
            self.log.debug("Setting audio to processed")
            audio['processed'] = True
        
        archive_path = None
        if drequest.get('archive'):
            archive_path = os.path.exists(os.path.join(request_path, drequest.archive))
        
        if not drequest.get('archive') and not archive_path:
            self.log.debug("Writing archive")
            archive = shutil.make_archive(request_path, 'gztar', settings.DOWNLOAD_AUDIOS_DIR, drequest.id)
            self.log.debug("Archive: %s" % archive)
            drequest['archive'] = os.path.basename(archive)
        
        self.log.debug("Deleting download path dir")
        shutil.rmtree(request_path)
        
        self.log.debug("Setting status '%s' to 'processed.'" % drequest.status)
        drequest['date_processed'] = datetime.now().isoformat(' ')
        drequest.status = 'processed'
        drequest.save()
        
    def run(self):
        self.log.debug("Message data: %s", self.message.body)
        
        data = json.loads(self.message.body)
        
        attempts = data.get('attempts', 1)
        if (attempts > 5):
            self.log.debug("Attempts limit reached, dropping this request")
            return
        
        drequest = DownloadRequest.load(data['id'])
        self.log.info("Document loaded. Audios count: %s" % len(drequest.audios))
        
        if drequest.get('processed'):
            self.log.debug("Download request already processed")
            return
        
        try:
            self.process_request(drequest)
            self.log.debug("Download request successfullly processed. Sending mail.")
            
            if drequest.get('archive'):
                archive_link = 'http://%s%s' % (data['host'], reverse('archive_link', args=[drequest.archive]))
                self.log.debug("Link to archive: %s" % archive_link)
                send_mail(_("Take you archive"),
                          render_to_string("mail/archive_mail.html", {'archive_link': archive_link}),
                          settings.SERVER_EMAIL,
                          [drequest.username])
                self.log.debug("Mail sent")
        except:
            self.log.debug("Error occured: %s. Setting status to error" % sys.exc_info()[1])
            drequest.status = 'error'
            drequest.status_verbose = "%s" % sys.exc_info()[1]
            drequest.save()
            
            sleep_time = 30 * attempts
            self.log.debug("Pushing it back to nsq in %s seconds. Topic: download_requests" % sleep_time)
            time.sleep(sleep_time)
            nsq_push('download_requests', {"host": data['host'], 'id': drequest.id, 'attempts': attempts + 1})
            

class Command(BaseCommand):
    
    def handle(self, *args, **options):
        
        self.log = logger('download_request', int(options['verbosity']) > 1, settings.LOG_DIR)
        
        signal.signal(signal.SIGINT, self.signal_callback)
        signal.signal(signal.SIGTERM, self.signal_callback)
        
        self.db = django_couch.db('db_requests')
        
        nsq.Reader({"message_callback": self.message_callback},
                   "download_requests",
                   "download_requests",
                   nsqd_tcp_addresses=settings.NSQD_TCP_ADDRESSES)
        self.log.debug("Starting NSQ...")
        self.processes = []
        nsq.run()
    
    def message_callback(self, message):
        self.log.debug("Processing message id: %s", message.id)
        
        message.enable_async()
        
        process = DownloadRequestProcess(self.log, message)
        process.start()
        
        self.log.debug("Process: %s", process)
        message.finish()
        
        self.processes.append(process)

    def signal_callback(self, signal_number, stack_frame):
        self.log.critical("Signal %d received. Shutting down", signal_number)
        
        for process in self.processes:
            if process.is_alive():
                process.join()
                process.terminate()
        
        sys.exit(-1)

Итак что делает данный обработчик:
— ловит сообщение с nsq
— создает новый процесс
— отмечает nsq-сообщение как обработанное
— в отдельном процессе скачиваются аудиозаписи и создается архив
— отсылается emal
— в случае ошибки при обработке было решено ложить это сообщение в nsq повторно и при этом вести свой собственный счетчик неудачных обработок. За 6 разом не обрабатывать (возможно есть другие пути — не искал, хватило этого).

Формат сообщение в nsq приобрел следующий вид: {'host': , 'id': <id запроса на скачивание>, 'attempts': <№ попытки>}). Сообщение ложилось после небольшой задержки. Вычислял ее по формуле 30 сек умноженных на № попытки.

Конец

Я успешно скачал все свои аудиозаписи. Посмотрев, что архив в среднем весит 650Мб решил, что их нужно удалять через некоторое время. Была написана еще одна менеджмент команда, которая достает все успешно обработанные запросы и удаляет архив, а также меняет статус на «deleted». Тоже не самое изящное решение, но хотелось быстрее закончить :)

management/commands/remove_in_24.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import django_couch
from datetime import datetime, timedelta

from logger import logger

from django.conf import settings
from django.core.management.base import BaseCommand


class Command(BaseCommand):

    help = u'prepare and send fax document'
    
    def handle(self, *args, **options):
        
        self.log = logger('delete_in_24', int(options['verbosity']) > 1, settings.LOG_DIR)
        self.db = django_couch.db("db_requests")
        
        requests = self.db.view("request_processed/list", include_docs=True).rows
        self.log.debug("Founded %s processed download requests", len(requests))
        for req in requests:
            self.log.debug("%s", req.value)
            self.log.debug("ID: %s. Archive: %s", req.id, req.value)
            
            now = datetime.now()
            date_expired = datetime.strptime(req.key, settings.DATETIME_FMT) + timedelta(hours=24)
            self.log.debug("Now: %s. Expired: %s", now.strftime(settings.DATETIME_FMT), date_expired.strftime(settings.DATETIME_FMT))
            
            if now < date_expired:
                self.log.debug("Passing this doc")
                continue
             
            archive_path = os.path.join(settings.DOWNLOAD_AUDIOS_DIR, req.value)
            self.log.debug("Archive path: %s", archive_path)
              
            if os.path.exists(archive_path):
                self.log.debug("Deleting file: %s", archive_path)
                os.unlink(archive_path)
            else:
                self.log.warning("Path doesn't exists")
              
            doc = req.doc
            self.log.debug("Settings status '%s' to 'deleted'", doc.status)
            doc.status = 'deleted'
            doc.save(self.db)

Couchdb дизайн док request_processed/list, который достает все обработанные аудиозаписи

request_processed/list

function(doc) {
    if (doc.type == 'request' && doc.archive
    && doc.date_processed && doc.status != 'deleted') {
        emit(doc.date_processed.slice(0, 19), doc.archive);
    }
}

Ссылка на bitbucket: bitbucket.org/Kolyanu4/vkdownloader/src

Автор: Kolyanu4

Источник

Поделиться

* - обязательные к заполнению поля