RPM-репозиторий — своими руками

в 22:19, , рубрики: devops, devops (*nix), nginx, python, rpm, Серверное администрирование

Итак, начнём.

При внедрении DevOps-процесса в компании одним из возможных вариантов хранилища артефактов сборки может стать rpm-репозиторий. По существу — это просто веб-сервер, раздающий определённым образом организованное содержимое. Есть, конечно, коммерческие варианты maven-репозиториев, которые имеют плагины для поддержки rpm, но мы же не ищем лёгких путей?

image

Задача

Написать сервис, который будет принимать готовые rpm-пакеты по протоколу HTTP, парсить их метаданные, раскладывать файлы пакетов по каталогам в соответствии с внутренней структурой репозитория и обновлять метаданные репозитория после обработки очередного пакета. Что из этого получилось — описано под катом.

Анализ

В моей голове задача почти мгновенно распалась на несколько частей: первая — принимающая, которая должна принять rpm-пакет по HTTP; вторая — обрабатывающая, которая должна принятый RPM-пакет обработать. Ну и где-то ещё должен быть веб-сервер, который будет раздавать содержимое репозитория.

Принимающая часть

Ввиду того, что с Nginx я знаком давно, выбор веб-сервера для приёма rpm-пакетов и раздачи содержимого репозитория даже не стоял — только Nginx. Приняв это как данность, я нашёл в документации нужные опции и написал

часть конфигурации Nginx, которая принимает файлы

location /upload {
    proxy_http_version 1.1;
    proxy_pass http://127.0.0.1:5000;
    proxy_pass_request_body off;
    proxy_set_header X-Package-Name $request_body_file;
    client_body_in_file_only on;
    client_body_temp_path /tmp/rpms;
    client_max_body_size 128m;
}

Результат данной конфигурации — при приёме файла Nginx сохранит его в заданный каталог и сообщит исходное имя в отдельном заголовке.
Для полноты картины — вторая крохотная

часть конфигурации, которая раздаёт содержимое репозитория

location /repo {
    alias /srv/repo/storage/;
    autoindex on;
}

Итак, у нас есть первая часть, которая умеет принимать файлы и отдавать их.

Обрабатывающая часть

Обрабатывающая часть напиcана на Python без особых премудростей и выглядит

вот так:

#!/usr/bin/env python
import argparse
import collections
import pprint
import shutil
import subprocess
import threading
import os
import re
import yaml
from flask import Flask, request
from pyrpm import rpmdefs
from pyrpm.rpm import RPM

# Сервис для поддержания репозиториев (С) Sergey Pechenko, 2017
# Лицензия - GPL v2.0. Никаких дополнительных гарантий или прав не предоставляется. 
# Для лицензирования использования кода в коммерческом продукте свяжитесь с автором.

class LoggingMiddleware(object):
    # Вспомогательный класс для логирования запросов и отладки
    def __init__(self, app):
        self._app = app

    def __call__(self, environ, resp):
        errorlog = environ['wsgi.errors']
        pprint.pprint(('REQUEST', environ), stream=errorlog)

        def log_response(status, headers, *args):
            pprint.pprint(('RESPONSE', status, headers), stream=errorlog)
            return resp(status, headers, *args)

        return self._app(environ, log_response)

def parse_package_info(rpm):
    # Обработка метаданных пакета
    os_name_rel = rpm[rpmdefs.RPMTAG_RELEASE]
    os_data = re.search('^(d+).(w+)(d+)$', os_name_rel)
    package = {
        'filename': "%s-%s-%s.%s.rpm" % (rpm[rpmdefs.RPMTAG_NAME],
                                         rpm[rpmdefs.RPMTAG_VERSION],
                                         rpm[rpmdefs.RPMTAG_RELEASE],
                                         rpm[rpmdefs.RPMTAG_ARCH]),
        'os_abbr': os_data.group(2),
        'os_release': os_data.group(3),
        'os_arch': rpm[rpmdefs.RPMTAG_ARCH]
    }
    return package

# Объект приложения и его настройки
app = Flask(__name__)
settings = {}

# Тестовый обработчик - пригодится в начале настройки
@app.route('/')
def hello_world():
    return 'Hello from repo!'

# Обработчик конкретного маршрута в URL
@app.route('/upload', methods=['PUT'])
def upload():
    # Ответ по умолчанию
    status = 503
    headers = []
    # Этот нестандартный заголовок мы добавили в конфигурацию Nginx ранее
    curr_package = request.headers.get('X-Package-Name')
    rpm = RPM(file(unicode(curr_package)))
    rpm_data = parse_package_info(rpm)
    try:
        new_req_queue_element = '%s/%s' % (rpm_data['os_release'], rpm_data['os_arch'])
        dest_dirname = '%s/%s/Packages' % (
            app.settings['repo']['top_dir'],
            new_req_queue_element)
        # Перемещаем файл в нужный каталог
        shutil.move(curr_package, dest_dirname)
        src_filename = '%s/%s' % (dest_dirname, os.path.basename(curr_package))
        dest_filename = '%s/%s' % (dest_dirname, rpm_data['filename'])
        # Переименовываем файл
        shutil.move(src_filename, dest_filename)
        # Готовим ответ, который получит загружавший клиент
        response = 'OK - Accessible as %s' % dest_filename
        status = 200
        if new_req_queue_element not in req_queue:
            # Кладём запрос на обработку этого пакета в очередь
            req_queue.append(new_req_queue_element)
        event_timeout.set()
        event_request.set()
    except BaseException as E:
        response = E.message
    return response, status, headers

def update_func(evt_upd, evt_exit):
    # Ждёт события, затем запускает обновление метаданных
    while not evt_exit.is_set():
        if evt_upd.wait():
            # Выбираем следующий доступный запрос из очереди
            curr_elem = req_queue.popleft()
            p = subprocess.Popen([app.settings['index_updater']['executable'],
                                  app.settings['index_updater']['cmdline'],
                                  '%s/%s' % (app.settings['repo']['top_dir'], curr_elem)],
                                 shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            res_stdout, res_stderr = p.communicate(None)
            pprint.pprint(res_stdout)
            pprint.pprint(res_stderr)
            # Сбрасываем событие обновления
            evt_upd.clear()
    return

def update_enable_func(evt_req, evt_tmout, evt_upd, evt_exit):
    while not evt_exit.is_set():
        # Ожидаем запрос
        evt_req.wait()
        # OK, дождались
        # Теперь выдерживаем 30 секунд, а если в это время пришёл запрос....
        while evt_tmout.wait(30) and (not evt_exit.is_set()):
            evt_tmout.clear()
        if evt_exit.is_set():
            break
        evt_upd.set()
        evt_tmout.clear()
        evt_req.clear()
    return

def parse_command_line():
    # Разбор агрументов командной строки
    parser = argparse.ArgumentParser(description='This is a repository update helper')
    parser.prog_name = 'repo_helper'
    parser.add_argument('-c', '--conf', action='store', default='%.yml' % prog_name, type='file', required='false',
                        help='Name of the config file', dest='configfile')
    parser.epilog('This is an example of Nginx configuration:
  location /repo {
      alias /srv/repo/storage/;
      autoindex on;
  }

  location /upload {
      client_body_in_file_only on;
      client_body_temp_path /tmp/rpms;
      client_max_body_size 128m;
      proxy_http_version 1.1;
      proxy_pass http://localhost:5000;
      proxy_pass_request_body off;
      proxy_set_header X-Package-Name $request_body_file;
  }
')
    parser.parse_args()
    return parser

def load_config(fn):
    with open(fn, 'r') as f:
        config = yaml.safe_load(f)
    return config

def load_hardcoded_defaults():
    # Прибитые гвоздями настройки "по умолчанию"
    config = {
        'index_updater': {
            'executable': '/bin/createrepo',
            'cmdline': '--update'
        },
        'repo': {
            'top_dir': '/srv/repo/storage'
        },
        'server': {
            'address': '127.0.0.1',
            'port': '5000',
            'prefix_url': 'upload',
            'upload_header': ''
        },
        'log': {
            'name': 'syslog',
            'level': 'INFO'
        }
    }
    return config

if __name__ == '__main__':
    try:
        cli_args = parse_command_line()
        settings = load_config(cli_args['configfile'])
    except BaseException as E:
        settings = load_hardcoded_defaults()
    req_queue = collections.deque()
    # Application-level specific stuff
    # Exit flag
    exit_flag = False
    # Событие, сигналящее о пришедшем запросе
    event_request = threading.Event()
    # Событие, сигналящее об окончании задержки
    event_timeout = threading.Event()
    # Событие, сигналящее о запуске обновления метаданных
    event_update = threading.Event()
    # Событие, сигналящее о завершении вспомогательных потоков
    event_exit = threading.Event()
    # Готовим начальное состояние событий
    event_request.clear()
    event_timeout.clear()
    event_update.clear()
    # Поток, который запускает обновление метаданных репозитория
    update_thread = threading.Thread(name='update_worker', target=update_func, args=(event_update, event_exit))
    update_thread.start()
    # Поток, отсчитывающий время задержки, и начинающий отсчёт сначала, если задержка прервана
    # Если задержка прервана - начинаем отсчёт сначала
    delay_thread = threading.Thread(name='delay_worker', target=update_enable_func,
                                    args=(event_request, event_timeout, event_update, event_exit))
    delay_thread.start()
    # Его Величество Приложение
    app.wsgi_app = LoggingMiddleware(app.wsgi_app)
    app.run(host=settings['server']['address'], port=settings['server']['port'])
    # Это событие заставит оба дополнительных потока завершиться
    event_exit.clear()

Важный и, скорее всего, непонятный с первого взгляда момент — зачем же здесь нужны потоки, события и очередь.
Они нужны для передачи данных между асинхронными процессами. Вот смотрите, ведь HTTP-клиент не обязан ждать какого-то разрешения, чтобы загрузить пакет? Правильно, он может начать загрузку в любой момент. Соответственно, в основном потоке приложения мы должны сообщить клиенту об успешности/неуспешности загрузки, и, если загрузка удалась, передать данные через очередь другому потоку, который выполняет вычитывание метаданных пакета, а затем перемещение его в файловой системе. При этом отдельный поток следит, прошло ли 30 секунд с момента загрузки последнего пакета, или нет. Если прошло — метаданные репозитория будут обновлены. Если же время ещё не вышло, а уже пришёл следющий запрос — сбрасываем и перезапускаем таймер. Таким образом, всякая загрузка пакета будет отодвигать обновление метаданных на 30 секунд.

Как пользоваться

Сначала нужно

установить пакеты Python по списку:

appdirs==1.4.3
click==6.7
Flask==0.12.1
itsdangerous==0.24
Jinja2==2.9.6
MarkupSafe==1.0
packaging==16.8
pyparsing==2.2.0
pyrpm==0.3
PyYAML==3.12
six==1.10.0
uWSGI==2.0.15
Werkzeug==0.12.1

К сожалению, я не могу гарантировать, что это минимально возможный список — команда pip freeze просто берёт список доступных пакетов Python и механически переносит его в файл, не рассматривая, используется ли конкретный пакет в конкретном проекте или нет.

Затем нужно установить пакеты с nginx и c createrepo:

yum install -y nginx createrepo

Запуск проекта выглядит вот так:

nohup python app.py

После того, как всё будет запущено, можно пробовать загрузить rpm-пакет в репозиторий вот такой командой:

curl http://hostname.example.com/upload -T <packagename-1.0.rpm>

Я понимаю, что описанный сервис далёк от совершенства и являет собой скорее прототип, нежели полноценное приложение, но, с другой стороны, он может быть легко дополнен/расширен.

Для удобства желающих код выложен на GitHub. Предложения по дополнению сервиса, а ещё лучше — pull-request'ы горячо приветствуются!

Надеюсь, этот прототип окажется кому-то полезным. Спасибо за внимание!

P.S.

Ну и для тех, кому очень нужно, небольшой сниппет для укрощения SELinux:

#!/bin/bash
semanage fcontext -a -t httpd_sys_rw_content_t "/srv/repo/storage(/.*)?"
restorecon -R -v /srv/repo/storage
setsebool -P httpd_can_network_connect 1

Автор: tnt4brain

Источник

Поделиться

* - обязательные к заполнению поля