Поддержка очередей в Hangfire

в 16:47, , рубрики: .net, .net core, asynchronous task, C#, dotnet core, hangfire, queue

Hangfire — это библиотека для .net (core), позволяющая асинхронно выполнять некоторый код по принципу "fire and forget". Примером такого кода может быть отправка E-Mail, обработка видео, синхронизация с другой системой и т.д. Помимо "fire and forget" есть поддержка отложенных задач, а также задач по расписанию в формате Cron.

В настоящее время существует масса подобных библиотек. Несколько преимуществ, говорящих в пользу Hangfire:

  • Простая конфигурация, удобный API
  • Надежность. Hangfire гарантирует, что созданная задача будет выполнена хотя бы один раз
  • Возможность параллельного выполнения задач и отличная производительность
  • Расширяемость (вот ей-то мы и воспользуемся ниже)
  • Достаточно полная и понятная документация
  • Dashboard, на котором можно видеть всю статистику о задачах

Не буду слишком вдаваться в детали, поскольку существует немало хороших статей о Hangfire и способах его применения. В этой статье я разберу, как воспользоваться поддержкой нескольких очередей (или пулов задач), как починить стандартную retry-функциональность и сделать так, чтобы каждая очередь имела индивидуальную конфигурацию.

Существующая поддержка (псевдо)-очередей

Важное замечание: в заголовке я использовал термин псевдо-очередь, потому что Hangfire не гарантирует выполнение задач в определенном порядке. Т.е. принцип "First In First Out" не действует и мы не будем на него опираться. Более того автор библиотеки рекомендует делать задачи идемпотентными, т.е. устоичивыми к непредвиденному многократному выполнению. Далее я буду использовать просто слово "очередь", т.к. в Hangfire используется термин "Queue".

В Hangfire заложена простая поддержка очередей. Хотя он и не предлагает гибкости Message Queue Systems, таких как rabbitMQ или Azure Service Bus, но этого часто вполне достаточно для решения широкого спектра задач.

Каждая задача имеет свойство "Queue", то есть имя очереди, в которой она должна выполняться. По умолчанию, задача отправляется в очередь с именем "default", если не указано иное. Поддержка нескольких очередей нужна для того, чтобы раздельно управлять выполнением задач разных типов. Например, мы можем захотеть, чтобы задачи по обработке видео попадали в очередь "video_queue", а рассылка E-Mail'ов в очередь "email_queue". Таким образом мы получаем возможность независимо выполнять эти два типа задач. Если мы захотим вынести обработку видео на выделенный сервер, то мы легко сможем это сделать, запустив отдельный Hangfire-сервер как консольное приложение, которое будет обрабатывать очередь "video_queue".

Перейдем к практике

Настройка Hangfire-сервера в asp.net core выглядит следующим образом:

public void Configure(IApplicationBuilder app)
{
    app.UseHangfireServer(new BackgroundJobServerOptions
    {
        WorkerCount = 2,
        Queues = new[] { "email_queue", "video_queue" }
    });
}

Проблема 1

Как я уже упоминал выше, в Hangfire существует очередь по умолчанию, которая называется "default". Если задача, положенная в очередь, например, "video_queue", завершилась с ошибкой и нуждается в повторе, то на повторное выполнение она будет отправлена в очередь "default", а не "video_queue" и, как следствие, наша задача будет выполняться совсем не тем экземпляром Hangfire-сервера, которым нам бы хотелось, если вообще будет. Такое поведение было мной установлено опытным путем и возможно является багом в самом Hangfire.

Job Filters

Hangfire предоставляет нам возможность расширения функционала с помощью так называемых фильтров (Job Filters), которые по принципу работы похожи на Actions Filters в ASP.NET MVC. Дело в том, что внутренняя логика Hangfire реализована как State Machine. Это движок, который поочередно переводит имеющиеся в пуле задачи из одного состояния в другое (например, created -> enqueued -> processing -> succeeded), а фильтры позволяют нам "перехватывать" выполняемую задачу при каждом изменении её состояния и производить манипуляции с ней. Фильтр реализуется как аттрибут, который может быть применен к отдельному методу, классу или глобально.

Job Parameters

В качестве аргумента в метод фильтра передается объект ElectStateContext. Этот объект содержит полную информацию о выполняемой в данный момент задаче. Среди прочего он имеет методы GetJobParameter<>(...) и SettJobParameter<>(...). Job Parameters позволяют сохранять связанную с задачей информацию в базе данных. Именно в Job Parameters и хранится имя очереди, в которую была изначально отправлена задача, только почему-то эта информация игнорируется при последующем повторе.

Решение

Итак, у нас есть задача, которая завершилась с ошибкой и должна быть отправлена на повторное выполнение в нужную очередь (в ту самую, которая была ей присвоена в момент первоначального создания). Повторение завершившейся с ошибкой задачи — это переход из состояния "failed" в состояние "enqueued". Для решения проблемы создадим фильтр, который при переходе задачи в состояние "enqueued", будет проверять в какую очередь задача была отправлена изначально и проставлять параметр "QueueName" в нужное значение:

public class HangfireUseCorrectQueueFilter 
    : JobFilterAttribute, IElectStateFilter
{
    public void OnStateElection(ElectStateContext context)
    {
        if (context.CandidateState is EnqueuedState enqueuedState)
        {
            var queueName = context.GetJobParameter<string>("QueueName");

            if (string.IsNullOrWhiteSpace(queueName))
            {
                context.SetJobParameter("QueueName", enqueuedState.Queue);
            }
            else
            {
                enqueuedState.Queue = queueName;
            }
        }
    }
}

Для того, чтобы применть фильтр по умолчанию ко всем задачам (то есть глобально), добавим следующий код в нашу конфигурацию:

GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });

Еще одна небольшая загвоздка заключается в том, что коллекция GlobalJobFilters по умолчанию содержит экземпляр класса AutomaticRetryAttribute. Это стандартный фильтр, который отвечает за повторное выполнение неудачно завершенных задач. Он же и отправляет задачу в очередь "default", игнорируя изначальную очередь. Для того, чтобы наш велосипед поехал нужно удалить этот фильтр из коллекции и позволить нашему фильтру взять на себя ответственность за повторное выполнение задач. В результате конфигурационный код будет выглядеть так:

var defaultRetryFilter = GlobalJobFilters.Filters
    .FirstOrDefault(f => f.Instance is AutomaticRetryAttribute);

if (defaultRetryFilter != null && defaultRetryFilter.Instance != null)
{
    GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance);
}

GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });

Необходимо отметить, что AutomaticRetryAttribute реализует логику автоматического увеличения интервала между попытками (с каждой последующей попыткой интервал увеличивается), и удаляя AutomaticRetryAttribute из коллекции GlobalJobFilters, мы отказываемся от этой функциональности (см. реализацию метода ScheduleAgainLater)

Итак, мы добились того, что наши задачи могут выполняться в разных очередях и это позволяет нам независимо управлять их выполнением, в том числе обрабатывать разные очереди на разных машинах. Только теперь мы не знаем сколько раз и с каким интервалом наши задачи будут повторяться в случае ошибки, поскольку удалили AutomaticRetryAttribute из коллекции фильтров.

Проблема 2

Мы хотим иметь возможность конфигурировать интервал и количество повторений отдельно для каждой очереди, а также, если для какой-то очереди мы не указали значения явно, то хотим, чтобы применялись значения по умолчанию. Для этого мы реализуем еще один фильтр и назовем его HangfireRetryJobFilter.

В идеале, конфигурационный код должен выглядть примерно так:

GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter
{
    Order = 2,
    ["email_queue"] = new HangfireQueueSettings
    {
        DelayInSeconds = 120,
        RetryAttempts = 3
    },
    ["video_queue"] = new HangfireQueueSettings
    {
        DelayInSeconds = 60,
        RetryAttempts = 5
    }
});

Решение

Для этого сначала добавим класс HangfireQueueSettings, который будет служить контейнером для наших настроек.

public sealed class HangfireQueueSettings
{
    public int RetryAttempts { get; set; }
    public int DelayInSeconds { get; set; }
}

Затем добавим реализацию самого фильтра, который при повторном выполнении задач после ошибки будет применять настройки в зависимости от конфигурации очереди и следить за количеством повторов:

public class HangfireRetryJobFilter 
        : JobFilterAttribute, IElectStateFilter, IApplyStateFilter
{
    private readonly HangfireQueueSettings _defaultQueueSettings = 
        new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 };

    private readonly IDictionary<string, HangfireQueueSettings> _settings 
        = new Dictionary<string, HangfireQueueSettings>();

    public HangfireQueueSettings this[string queueName]
    {
        get
        {
            return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings) 
                ? queueSettings 
                : _defaultQueueSettings;
        }
        set
        {
            _settings[queueName] = value;
        }
    }

    public void OnStateElection(ElectStateContext context)
    {
        if (!(context.CandidateState is FailedState failedState))
        {
            // This filter accepts only failed job state.
            return;
        }

        var retryAttempt = context.GetJobParameter<int>("RetryCount") + 1;
        var queueName = context.GetJobParameter<string>("QueueName");

        if (retryAttempt <= this[queueName].RetryAttempts)
        {
            ScheduleAgainLater(context, retryAttempt, failedState, queueName);
        }
        else
        {
            TransitionToDeleted(context, failedState, queueName);
        }
    }

    public void OnStateApplied(
        ApplyStateContext context, 
        IWriteOnlyTransaction transaction)
    {
        if (context.NewState is ScheduledState &&
            context.NewState.Reason != null &&
            context.NewState.Reason.StartsWith("Retry attempt"))
        {
            transaction.AddToSet("retries", context.BackgroundJob.Id);
        }
    }

    public void OnStateUnapplied(
        ApplyStateContext context, 
        IWriteOnlyTransaction transaction)
    {
        if (context.OldStateName == ScheduledState.StateName)
        {
            transaction.RemoveFromSet("retries", context.BackgroundJob.Id);
        }
    }

    private void ScheduleAgainLater(
        ElectStateContext context, 
        int retryAttempt, 
        FailedState failedState, 
        string queueName)
    {
        context.SetJobParameter("RetryCount", retryAttempt);

        var delay = TimeSpan.FromSeconds(this[queueName].DelayInSeconds);

        const int maxMessageLength = 50;

        var exceptionMessage = failedState.Exception.Message.Length > maxMessageLength
            ? failedState.Exception.Message.Substring(0, maxMessageLength - 1) + "…"
            : failedState.Exception.Message;

        // If attempt number is less than max attempts, we should
        // schedule the job to run again later.

        var reason = $"Retry attempt {retryAttempt} of {this[queueName].RetryAttempts}: {exceptionMessage}";

        context.CandidateState = delay == TimeSpan.Zero
            ? (IState)new EnqueuedState { Reason = reason }
            : new ScheduledState(delay) { Reason = reason };
    }

    private void TransitionToDeleted(
        ElectStateContext context, 
        FailedState failedState, 
        string queueName)
    {
        context.CandidateState = new DeletedState
        {
            Reason = this[queueName].RetryAttempts > 0
                ? "Exceeded the maximum number of retry attempts."
                : "Retries were disabled for this job."
        };
    }
}

Примечание к коду: при реализации класса HangfireRetryJobFilter был взят за основу класс AutomaticRetryAttribute из Hangfire, поэтому реализация некоторых методов частично совпадает с соответствующими методами этого класса.

Заключение

В этой статье мы воспользовались поддержкой нескольких очередей в Hangfire для обработки задач разных типов. Мы реализовали свой механизм повторения неудачно завершившихся задач с возможностью индивидуальной конфигурации для каждой очереди, расширив функциональность Hangfire с помощью Job Filters.

Надеюсь, эта статья окажется кому-нибудь полезной. Буду рад комментариям.

Полезные ссылки

Документация Hangfire
Исходный код Hangfire
Scott Hanselman — How to run Background Tasks in ASP.NET

Автор: dburik

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js