О том, как я качал музыку с VK

от автора

Предистория

Каждое утро я езжу на работу и это занимает N-ое количество времени от 15 минут (на машине) до 40 минут (на общественном транспорте). К сожалению, утром по радио крутят совсем не музыку, а разные «развлекательные» программы. Очень долго я ездил либо с выключенным магнитофоном, либо всю дорогу искал радиостанцию, либо врубал наушники (пока не раздавил свой телефон).

И вот мне это надоело. Магнитола у меня из дешевых, но умеет читать с флешек. В один прекрасный день, по дороге на работу, я взял и купил SD-карточку (удобней всего ибо не выпирает). Все хорошо, но теперь вопрос стал иначе: «Где взять музыку?». Не долго думая решил, что мне хватит плейлиста с VK. Всего-то 400+ песен, но их нужно выкачать.

Посмотрев на решения, которые можно найти в интернете, решил написать свое. Создал проект на django, настроил ее на работу с couchdb и принялся писать.

Причины

Несколько причин, по которым я решил написать свое, а не использовать готовое решение.
— не хотел устанавливать какой-то плагин/програму для скачивания
— качать вручную по одному файлу
— да и вообще хотелось что-то свое

Что я хотел получить

Ответ на этот вопрос довольно прост. Минимальный набор требований: зашел на сайт, что-то нажал, увидел аудиозаписи, нажал кнопку, скачал их на компьютер.

Далее о том, как это происходило (пытался восстановить реальный ход событий).

Получение аудиозаписей

Для получения доступа к аудиозаписям взял за основу VK Api.

Этап №1. Сначала авторизация и получение токена. (не буду описывать API VK ибо все это можно найти у них на сайте).
Через несколько минут в папке с django-аппликацией был создан файл vkapi.py и добавлено примерно следующее содержимое:

vkapi.py

def authorize():     payload = {         'client_id': settings.APP_ID,         'scope': ','.join(settings.APP_PERMISSIONS),         # TODO: сменить нах         'redirect_uri': 'http://127.0.0.1:8000/vkapi/authorize',         'response_type': 'code',         'v': settings.APP_API_VERSION     }     return 'https://oauth.vk.com/authorize?%s' % urllib.urlencode(payload) 

А в файл views.py добавлена вьюха:

views.py

def vk_authorize(request):     return redirect(authorize()) 

Итак мы получили code, который передается параметром на redirect_url. Теперь нам нужно получить access_token.

На данном этапе меня волновал вопрос где его хранить. Изначально думал сделать регистрацию и возможность подключения VK только для зарегистрированных пользователей, а access_token писать в документ (документ, ибо couchdb) пользователя. Но что, если я не хочу входить или регистрироваться… Хватит сессии. Не вижу смысла чего-то большего для своих нужд.

Так как лень застала меня врасплох, я решил не разделять URL для авторизации и получения access_token’a и вьюха vk_authorize приобрела следующий, не особо красивый вид:

vk_authorize

def vk_authorize(request):     # подумать как перенести в мидлварь     if request.GET.get('code'):         code = request.GET['code']         r = access_token_get(code)         print r.text         data = r.json()                  if data.get('error'):             raise Http404("Error: %s. Desc: %s" % (data['error'], data['error_description']))                  data['date'] = datetime.now()         request.session['vkapi'] = data         return redirect('main')              elif request.GET.get('error'):         error = request.GET['error']         error_description = request.GET.get('error_description')         raise Http404("Error: %s. Desc: %s" % (error, error_description)) 

а в vkapi.py дописана функция для получения access_token’a

access_token

def access_token_get(code):     payload = {         'client_id': settings.APP_ID,         'client_secret': settings.APP_SECRET,         'code': code,         'redirect_uri': 'http://127.0.0.1:8000/vkapi/authorize',     }          return requests.get('https://oauth.vk.com/access_token', params=payload) 

Этап №2. У нас уже есть access_token и пишем его в сессию. Можно начать доставать аудиозаписи. Так в файл vkapi.py дописана еще две функции. Одна общая для запросов api вконтакте, а вторая для получения аудиозаписей пользователя.

vkapi.py

def request_api(method, access_token, params):     """     Для того чтобы вызвать метод API ВКонтакте, Вам необходимо осуществить     POST или GET запрос по протоколу HTTPS на указанный URL:     https://api.vk.com/method/'''METHOD_NAME'''?'''PARAMETERS'''&access_token='''ACCESS_TOKEN'''          METHOD_NAME – название метода из списка функций API (http://vk.com/dev/methods),     PARAMETERS – параметры соответствующего метода API,     ACCESS_TOKEN – ключ доступа, полученный в результате успешной авторизации приложения.     """     payload = {         'access_token': access_token,         'v': settings.APP_API_VERSION,     }     payload.update(params)     r = requests.get('https://api.vk.com/method/%s' % method, params=payload)     return r.json().get('response', {})   def audio_get(session):          params = {         'owner_id': session['user_id'],         'count': 6000,     }     return request_api('audio.get', session['access_token'], params) 

Файл views.py в свою очередь пополнился еще одной вьюхой:

vk_audios

@render_to("downloader/vk_audios.html") def vk_audios(request):     audios = []     if request.session.get('vkapi'):         # TODO: мидлварь, которая будет обновлять access_token         audios = audio_get(request.session['vkapi'])              return {         'audios': audios,     } 

Отлично, я получил список всех своих аудиозаписей. Было написано еще немного кода на получение списка альбомов и отображения их песен, а также для поиска аудиозаписей. Можно увидеть, что я возвращаю только ‘response’. Так вот, я решил просто не заморачиватся, если запрос ошибочный 🙂

К сожалению, оставалось еще это: "# TODO: мидлварь, которая будет обновлять access_token". Была написана мидлварь access_token.py со следующим содержимым:

access_token

 # *  coding: utf8 * from datetime import datetime, timedelta from django.conf import settings from django.shortcuts import redirect from downloader.vkapi import access_token_get   class AccessTokenMiddleware(object):      def process_request(self, request):                  if request.session.get('vkapi'):             data = request.session['vkapi']             expired = data['date']  timedelta(seconds=int(data['expires_in']))             if (expired  datetime.now()).seconds < 3600:                 return redirect('vk_authorize')                  return None 

Но тут, видимо, я протупил и описал process_request вместо process_response и меня постоянно редиректило на авторизацию. Не долго думаю мидлварь была переписана в декоратор (решил, что можно получать новый токен за час до того как он станет просроченным, считаю не принципиальным).

Почему декоратор? Ну тут это… про process_response подумалось аж на следующий день, а переделывать что-то работающее не хотелось.

authorize_require decorator

def authorize_require(view_func):          def check_session(request, *args, **kwargs):         if request.session.get('vkapi'):             data = request.session['vkapi']             expired = data['date']  timedelta(seconds=int(data['expires_in']))             if (expired - datetime.now()).seconds < 3600:                 return redirect('vk_authorize')         else:             return redirect('vk_authorize')         return view_func(request, *args, **kwargs)      return check_session 

Теперь у меня был список аудиозаписей и автоматическое обновление токена (там, где это нужно просто вначале фун-ции дописывался декоратор). Еще позже была добавлена простенькая регистрация (не знаю зачем).

Регистрация (отступление)

Обычно использую такую регистрацию, когда пишу для себя. Быстро и дешево, да и хватает с головой.

В urls.py добавляю строку:

url(r'^registration/$', 'downloader.views.registration', name='registration'), 

views.py пополняется такой вьюхой:

@render_to("registration/registration.html") def registration(request):          form = RegistrationForm()          if request.method == "POST":         form = RegistrationForm(data=request.POST)         if form.is_valid():             user = User(_db=request.db,                         is_active=True,                         is_superuser=False,                         type='user',                         permissions=[],                         groups=[], )             username = form.cleaned_data['username']             password = form.cleaned_data['password']             user.update({"username": username, 'password': make_password(password)})             user.create('u_%s' % slugify(username))                          auth_user = authenticate(username=username, password=password)             login(request, auth_user)             return redirect('main')                  return {         "form": form     } 

Вьюха создает нового пользователя в CouchDB и сразу же его авторизирует, после чего кидает на 1 страницу.

Форма RegistrationForm выглядит вот так:
forms.py

class RegistrationForm(forms.Form):          username = forms.EmailField(label=_('Username'), max_length=30)     password = forms.CharField(widget=forms.PasswordInput(), label=_('Password'))      def clean_username(self):         username = self.cleaned_data['username']         user = django_couch.db().view('auth/auth', key=username).rows         if user:             raise forms.ValidationError(_("Username already in use"))         return username 

registration/registration.html

{% extends "base.html" %} {% load i18n %} {% load bootstrap_toolkit %}  {% block title %}  - {% trans "Registration" %} {% endblock %}  {% block lib %}     <link rel="stylesheet" href="/media/css/authentification.css" /> {% endblock %}   {% block body %} <div class="container">     <div id="register-form">         <form action="" method="post" class="form-horizontal">             <legend>                 <h2>{% trans 'Registration' %}</h2>             </legend>             {{ form|as_bootstrap }}{% csrf_token %}             <div class="form-actions">                 <button class="btn btn-primary" type="submit">{% trans 'Register' %}</button>                   <small>                     <a href="{% url login %}"> {% trans 'Login' %}</a>                 </small>             </div>         </form>     </div> </div> {% endblock %} 

Скачивание

Этап №3. Итак мы уже получаем список аудиозаписей. Теперь нужно их скачать. Естественно можно пройтись каким-то ботом по каждой ссылке и скачать, но мне нужно было получить на выходе либо папку с аудиозаписями, либо архив (что бы скачать все сразу).

Пагинация (отступление)

Осенило меня в общем… если у пользователя будет over100500 аудиозаписей, то браузер просто загнется при рендеринге и было решено добавить пагинацию.

Функция audio_get преобразилась примерно до такого вида, что дало возможность сделать пагинацию:

def audio_get(session, album_id='', offset=0, count=100):          params = {         'owner_id': session['user_id'],         'offset': offset,         'count': count,         'album_id': album_id,     }     return request_api('audio.get', session['access_token'], params) 

vk_audios в файле view.py приобрела примерно такой вид:

@authorize_require @render_to("downloader/vk_audios.html") def vk_audios(request, album_id=''):            try:         page = int(request.GET.get('page', 1))     except:         raise Http404("Error page param: %s" % request.GET['page'])          offset = 100 * (int(page) - 1)          response = audio_get(request.session['vkapi'], album_id=album_id, offset=offset)          audios = response.get('items', [])     audios_count = response.get('count')           return {         'album_id': album_id,         'audios_count': audios_count,         'page': page,         'offset': offset,         'audios': audios,      } 

Был добавлен inclusion_tag, который принимал количество аудиозаписей, страницу на которой находится пользователь и id альбома, что бы рендерить страницы.

@register.inclusion_tag('snippets/pagination.html') def render_pagination(audios_count, page, album_id=False):          pages_count = int(math.ceil(audios_count / 100.0))  1          pages = range(1, pages_count)          return {         "pages": pages,         "page": page,         "album_id": album_id,     } 

И добавлен html-файл (snippets/pagination.html):

{% load i18n %}  {% if pages|length > 1 %} <div class="pagination pagination-right">     <ul>     {% for p in pages %}         <li {% ifequal p page %}class="active"{% endifequal %}>             {% if album_id %}             <a href="{% url vk_audios album_id %}?page={{ p }}">{{ p }}</a>             {% else %}             <a href="{% url vk_audios %}?page={{ p }}">{{ p }}</a>             {% endif %}         </li>     {% endfor %}     </ul> </div> {% endif %} 

Итого я ограничил себя скачиванием по 100 файлов. Осталось их скачать.

Нужно скачать файлы… но как? Пользователь нажал кнопку и ждет, пока ему сервер отдаст архив? Хм… Решать задачу принялся так:
Этап №3.1 — Создание запроса на скачивание. На странице с аудиозаписями вывел форму, в которую нужно ввести свой email и создать запрос на скачивание.

form.py Пополнился новой формой.

forms.py

class RequestsForm(forms.Form):          username = forms.EmailField(label=_('E-mail'),                                 help_text=_("Archive with audios will be send to this email")) 

Почему поле username? Все по причине регистрации на email. Пользователь создается с username = email, указанный при регистрации. Так, если пользователь вошел на сайт, мы можем подставить его email, а он если захочет поменяет.

Теперь пользователь тыкает в кнопку и мы создаем документ со следующей структурой, после чего ложим его id в nsq:

структура документа в couchdb

* _id - r_<hash> * status - new * username - test@test.com * is_active - true * audios - [    {        "url": "<url>",        "processed": true,        "title": "Three Days Grace - I Hate Everything About You"    } ] * date_created - 2013-10-20 11:27:21.208492 * type - request 

Поле status может принимать еще несколько значений: «processing», «error», «processed», «deleted».

Для документа couch’a была добавлена моделька:

models.py

class DownloadRequest(django_couch.Document):          def __init__(self, *args, **kwargs):         self._db = django_couch.db('db_requests')         self.type = 'request'         self.is_active = True         self.status = 'new'         self.date_created = datetime.now().isoformat(' ')         super(DownloadRequest, self).__init__(*args, **kwargs)              @staticmethod     def load(resp_id):         db = django_couch.db('db_requests')                  if resp_id in db:             doc = DownloadRequest(db[resp_id])             assert doc.type == 'request', _("Invalid data loaded")                          return doc                      else:             raise Http404(_("Can't find download request with id '%s'") % id)              @staticmethod     def get_list(email):         pass 

Что бы ложить в nsq скопирована с других мест функция:

nsq_push

def nsq_push(topic, message, fmt='json'):     url = "http://%s/put?topic=%s" % (random.choice(settings.NSQD_HTTP_ADDRESSES), topic)          if fmt == 'json':         message_encoded = json.dumps(message)     elif fmt == 'raw':         message_encoded = message     else:         raise Exception("Unsupported message encode format: %s" % fmt)          r = requests.post(url, data=message_encoded)          return r.ok 

А вьюха vk_audios приобрела следующий вид:

vk_audios

@authorize_require @render_to("downloader/vk_audios.html") def vk_audios(request, album_id=''):          try:         page = int(request.GET.get('page', 1))     except:         page = 1         messages.error(request, _("Error page: %s. Changed to 1") % request.GET.get('page'))          offset = 100 * (int(page) - 1)          response = audio_get(request.session['vkapi'], album_id=album_id, offset=offset)          audios = response.get('items', [])     audios_count = response.get('count')          if request.user.is_authenticated():         initial_data = request.user     else:         initial_data = {'username': ''}          form = RequestsForm(request.POST or None, initial=initial_data)          if form.is_valid():         request_doc = DownloadRequest()         request_doc.update(form.cleaned_data)                  formated_audios = []         for audio in audios:             formated_data = {                 'title': "%s - %s" % (audio['artist'], audio['title']),                 'url': audio['url'],                 'processed': False,             }             formated_audios.append(formated_data)                  request_doc.update({'audios': formated_audios})         request_doc.create('r', force_random_suffix=True)         messages.success(request, _("Download request successfully created"))         nsq_push('download_requests', request_doc.id, fmt="raw")              return {         'album_id': album_id,         'audios_count': audios_count,         'page': page,         'offset': offset,         'audios': audios,         'form': form,     } 

Теперь у нас есть список аудиозаписей, мы создаем запрос на скачивание и ложим id документа в nsq. НО, захотелось видеть список своих запросов и их статусы…

Вывод запросов (отступление):

И принялся я писать вьюху для их отображения. Была использована таже форма, что и выше, для отбора по email’у. В CouchDB был создан дизайн док, который строит индекс с ключем [email, дата создания]:

function(doc) {     if (doc.type == 'request' && doc.is_active) {         emit([doc.username, doc.date_created])     } } 

А в аппликацию добавлена вьюха requests_list:

@render_to("downloader/requests.html") def requests_list(request):          requests = []          if request.user.is_authenticated():         initial_data = request.user     else:         initial_data = {'username': ''}          form = RequestsForm(request.GET or None, initial=initial_data)          if form.is_valid():         requests = DownloadRequest.get_list(form.cleaned_data['username'])          return {         'form': form,         'requests': requests,     } 

И дописана функция get_list в модель DownloadRequest:

@staticmethod def get_list(email):     db = django_couch.db('db_requests')          requests = db.view('requests/list', include_docs=True, startkey=[email], endkey=[email, {}]).rows     return [DownloadRequest(request) for request in requests] 

Хэх! Теперь я еще вижу и статусы, осталось написать nsq-обработчик, который собственно будет скачивать…

Этап №3.2 — Скачивание. Через некоторое время появились наброски обработчика nsq:

management/commands/download_request_worker.py

#!/usr/bin/env python # -*- coding: utf-8 -*- import os import sys import nsq import signal import requests import django_couch  from django.conf import settings from django.core.management.base import BaseCommand from logger import logger  from downloader.models import DownloadRequest           class Command(BaseCommand):          def handle(self, *args, **options):                  self.log = logger('download_request', int(options['verbosity']) > 1, settings.LOG_DIR)                  signal.signal(signal.SIGINT, self.signal_callback)         signal.signal(signal.SIGTERM, self.signal_callback)                  self.db = django_couch.db('db_requests')                  nsq.Reader({"message_callback": self.message_callback},                    "download_requests",                    "download_requests",                    nsqd_tcp_addresses=settings.NSQD_TCP_ADDRESSES)         self.log.debug("Starting NSQ...")         nsq.run()              def process_request(self, request):         self.log.debug("Setting status '%s' to 'processing.'" % request.status)                  request.status = 'processing'         request.save()                  user_path = os.path.join(settings.DOWNLOAD_AUDIOS_DIR, request.username)         if not os.path.exists(user_path):             os.mkdir(user_path)                  self.log.debug("User dir: %s" % user_path)                      request_path = os.path.join(user_path, request.id)         if not os.path.exists(request_path):             os.mkdir(request_path)                  self.log.debug("Download request dir: %s" % request_path)                  for audio in request.audios:             self.log.debug("Title: %s. Url: %s", audio['title'], audio['url'])             if audio.get('processed', False):                 self.log.debug("Already processed")                 continue                          self.log.debug("Downloading file..")             response = requests.get(audio['url'])             self.log.debug("Downloaded")                          filename = os.path.join(request_path, "%s.mp3" % audio['title'])             self.log.debug("Writing to filename: %s" % filename)                          with open(filename, 'wb') as f:                 f.write(response.content)             self.log.debug("Setting audio to processed")             audio['processed'] = True                  request.save()      def message_callback(self, message):         self.log.debug("Processing message id: %s", message.id)                  self.log.debug("Message data: %s", message.body)                  try:             request = DownloadRequest.load(message.body)             self.log.info("Document loaded. Audios count: %s" % len(request.audios))             self.process_request(request)             self.log.debug("Setting status '%s' to 'processed.'" % request.status)             request.status = 'processed'             request.save()             self.log.debug("Request successfullly processed.")         except:             return False         return True      def signal_callback(self, signal_number, stack_frame):         self.log.critical("Signal %d received. Shutting down", signal_number)                  sys.exit(-1) 

Итак что он умел:
— получал id документа с очереди
— создавал папку, если ее не было
— скачивал туда аудиофайлы

Далее было дописано еще архивирование и отправка email’a пользователю. Формат сообщения, который ложится в nsq немного изменился, ибо для того, что бы построить url на скачивание, нужно знать host, для этого в django есть функция request.get_host(), но нет доступа к request’у внутри менеджмент команды (возможно кто знает что можно сделать в этом случае), из-за чего я решил ложить в nsq меседж следующего вида: {‘host’: request.get_host(), ‘id’: <id документ запроса на скачивание>}.

Но все еще это были наброски. Причина тому — nsq. У nsq есть несколько ограничений:
— каждых N секунд он шлет heartbeat подключенным воркерам и если не получает ответ 2 раза, коннект закрывается. Т.е. если наш обработчик будет скачивать много файлов, коннект будет закрыть.
— если nsq не получает, что сообщение обработано в течении N секунд, сообщение отдается другому обработчику. Т.е. если я запущу 2 обработчика, то скачивание начнется как минимум 2 раза.

Немного посмотрев в pynsq решил использовать async mode и также обрабатывать скачивание в отдельных процессах. Возможно не самое хорошее решение и не самый красивый код у меня получился.

Функция обработки nsq-сообщений приобрела следующий вид:

management/commands/download_request_worker.py

#!/usr/bin/env python # -*- coding: utf-8 -*- import os import sys import nsq import json import time import shutil import signal import requests import django_couch import multiprocessing  from logger import logger from datetime import datetime  from django.conf import settings from django.core.mail import send_mail from django.core.urlresolvers import reverse from django.template.loader import render_to_string from django.utils.translation import ugettext_lazy as _ from django.core.management.base import BaseCommand  from downloader.models import DownloadRequest from downloader.views import nsq_push   class DownloadRequestProcess(multiprocessing.Process):          def __init__(self, log, message, *args, **kwargs):         self.log = log         self.message = message         super(DownloadRequestProcess, self).__init__(*args, **kwargs)              def process_request(self, drequest):         self.log.debug("Setting status '%s' to 'processing'. For doc: %s" % (drequest.status, drequest.id))                  drequest.status = 'processing'         drequest.save()                      request_path = os.path.join(settings.DOWNLOAD_AUDIOS_DIR, drequest.id)         if not os.path.exists(request_path):             os.mkdir(request_path)                  self.log.debug("Download request dir: %s" % request_path)                  for audio in drequest.audios:             self.log.debug("Title: %s. Url: %s", audio['title'], audio['url'])             if audio.get('processed', False):                 self.log.debug("Audio already processed")                 continue                          filename = os.path.join(request_path, "%s.mp3" % audio['title'])             self.log.debug("Filename: %s" % filename)                          self.log.debug("Downloading file...")             response = requests.get(audio['url'])             self.log.debug("Downloaded")                          with open(filename, 'wb') as f:                 f.write(response.content)             self.log.debug("Setting audio to processed")             audio['processed'] = True                  archive_path = None         if drequest.get('archive'):             archive_path = os.path.exists(os.path.join(request_path, drequest.archive))                  if not drequest.get('archive') and not archive_path:             self.log.debug("Writing archive")             archive = shutil.make_archive(request_path, 'gztar', settings.DOWNLOAD_AUDIOS_DIR, drequest.id)             self.log.debug("Archive: %s" % archive)             drequest['archive'] = os.path.basename(archive)                  self.log.debug("Deleting download path dir")         shutil.rmtree(request_path)                  self.log.debug("Setting status '%s' to 'processed.'" % drequest.status)         drequest['date_processed'] = datetime.now().isoformat(' ')         drequest.status = 'processed'         drequest.save()              def run(self):         self.log.debug("Message data: %s", self.message.body)                  data = json.loads(self.message.body)                  attempts = data.get('attempts', 1)         if (attempts > 5):             self.log.debug("Attempts limit reached, dropping this request")             return                  drequest = DownloadRequest.load(data['id'])         self.log.info("Document loaded. Audios count: %s" % len(drequest.audios))                  if drequest.get('processed'):             self.log.debug("Download request already processed")             return                  try:             self.process_request(drequest)             self.log.debug("Download request successfullly processed. Sending mail.")                          if drequest.get('archive'):                 archive_link = 'http://%s%s' % (data['host'], reverse('archive_link', args=[drequest.archive]))                 self.log.debug("Link to archive: %s" % archive_link)                 send_mail(_("Take you archive"),                           render_to_string("mail/archive_mail.html", {'archive_link': archive_link}),                           settings.SERVER_EMAIL,                           [drequest.username])                 self.log.debug("Mail sent")         except:             self.log.debug("Error occured: %s. Setting status to error" % sys.exc_info()[1])             drequest.status = 'error'             drequest.status_verbose = "%s" % sys.exc_info()[1]             drequest.save()                          sleep_time = 30 * attempts             self.log.debug("Pushing it back to nsq in %s seconds. Topic: download_requests" % sleep_time)             time.sleep(sleep_time)             nsq_push('download_requests', {"host": data['host'], 'id': drequest.id, 'attempts': attempts + 1})               class Command(BaseCommand):          def handle(self, *args, **options):                  self.log = logger('download_request', int(options['verbosity']) > 1, settings.LOG_DIR)                  signal.signal(signal.SIGINT, self.signal_callback)         signal.signal(signal.SIGTERM, self.signal_callback)                  self.db = django_couch.db('db_requests')                  nsq.Reader({"message_callback": self.message_callback},                    "download_requests",                    "download_requests",                    nsqd_tcp_addresses=settings.NSQD_TCP_ADDRESSES)         self.log.debug("Starting NSQ...")         self.processes = []         nsq.run()          def message_callback(self, message):         self.log.debug("Processing message id: %s", message.id)                  message.enable_async()                  process = DownloadRequestProcess(self.log, message)         process.start()                  self.log.debug("Process: %s", process)         message.finish()                  self.processes.append(process)      def signal_callback(self, signal_number, stack_frame):         self.log.critical("Signal %d received. Shutting down", signal_number)                  for process in self.processes:             if process.is_alive():                 process.join()                 process.terminate()                  sys.exit(-1) 

Итак что делает данный обработчик:
— ловит сообщение с nsq
— создает новый процесс
— отмечает nsq-сообщение как обработанное
— в отдельном процессе скачиваются аудиозаписи и создается архив
— отсылается emal
— в случае ошибки при обработке было решено ложить это сообщение в nsq повторно и при этом вести свой собственный счетчик неудачных обработок. За 6 разом не обрабатывать (возможно есть другие пути — не искал, хватило этого).

Формат сообщение в nsq приобрел следующий вид: {‘host’: , ‘id’: <id запроса на скачивание>, ‘attempts’: <№ попытки>}). Сообщение ложилось после небольшой задержки. Вычислял ее по формуле 30 сек умноженных на № попытки.

Конец

Я успешно скачал все свои аудиозаписи. Посмотрев, что архив в среднем весит 650Мб решил, что их нужно удалять через некоторое время. Была написана еще одна менеджмент команда, которая достает все успешно обработанные запросы и удаляет архив, а также меняет статус на «deleted». Тоже не самое изящное решение, но хотелось быстрее закончить 🙂

management/commands/remove_in_24.py

#!/usr/bin/env python # -*- coding: utf-8 -*- import os import django_couch from datetime import datetime, timedelta  from logger import logger  from django.conf import settings from django.core.management.base import BaseCommand   class Command(BaseCommand):      help = u'prepare and send fax document'          def handle(self, *args, **options):                  self.log = logger('delete_in_24', int(options['verbosity']) > 1, settings.LOG_DIR)         self.db = django_couch.db("db_requests")                  requests = self.db.view("request_processed/list", include_docs=True).rows         self.log.debug("Founded %s processed download requests", len(requests))         for req in requests:             self.log.debug("%s", req.value)             self.log.debug("ID: %s. Archive: %s", req.id, req.value)                          now = datetime.now()             date_expired = datetime.strptime(req.key, settings.DATETIME_FMT) + timedelta(hours=24)             self.log.debug("Now: %s. Expired: %s", now.strftime(settings.DATETIME_FMT), date_expired.strftime(settings.DATETIME_FMT))                          if now < date_expired:                 self.log.debug("Passing this doc")                 continue                           archive_path = os.path.join(settings.DOWNLOAD_AUDIOS_DIR, req.value)             self.log.debug("Archive path: %s", archive_path)                            if os.path.exists(archive_path):                 self.log.debug("Deleting file: %s", archive_path)                 os.unlink(archive_path)             else:                 self.log.warning("Path doesn't exists")                            doc = req.doc             self.log.debug("Settings status '%s' to 'deleted'", doc.status)             doc.status = 'deleted'             doc.save(self.db) 

Couchdb дизайн док request_processed/list, который достает все обработанные аудиозаписи

request_processed/list

function(doc) {     if (doc.type == 'request' && doc.archive     && doc.date_processed && doc.status != 'deleted') {         emit(doc.date_processed.slice(0, 19), doc.archive);     } } 

Ссылка на bitbucket: bitbucket.org/Kolyanu4/vkdownloader/src

ссылка на оригинал статьи http://habrahabr.ru/post/198258/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *