Ни одно задание не будет провалено: Spring Boot и Quartz в режиме cluster

в 21:20, , рубрики: cluster, cluster-mode, java, kotlin, kubernetes, openshift, quartz, spring, spring boot, spring framework

Цель данного мини-туториала

Ниже будет краткий обзор настройки приложения, написанного на Kotlin + Spring Boot, которое развернуто в кластере в нескольких экземплярах и использует библиотеку Quartz для выполнения запланированных по cron заданий только на одном из инстансов этого сервиса.

Пример: в OpenShift крутится несколько подов микросервиса, один из которых раз в сутки должен осуществлять формирование отчета. Если под упал во время работы по каким-либо причинам, данную задачу должен перехватить и выполнить другой под. Если формирование отчета было неуспешным, нужно попытаться запустить джобу создания отчета еще несколько раз в течениепары ближайших часов. После N неудачных попыток, необходимо восстановить изначальный cron для данного задания. Конфигурация всех подов микросервиса должна быть одинаковой.

Краткое введение

Перед прочтением данной статьи, очень советую ознакомиться с этим замечательным обширным обзором библиотеки Quartz

Альтернативы

Возможно, вам не потребуется весь функционал, который предоставляет Quartz.
В таком случае, если вы используете в проекте Spring, советую посмотреть на библиотеку ShedLock. Ссылка на репозиторий в GitHub.
Если вкратце ShedLock - это простая библиотека, которая гарантирует, что любое задание будет выполнено не более одного раза.
Реализация построена на локах, хранящихся в базе - буквально пару таблиц. Очень удобно, что всю настройку можно произвести только с помощью аннотаций в стиле Spring аннотации @Scheduled.
Однако главный минус данной библиотеки -
ShedLock не отслеживает жизненный цикл задания (нет возможности проконтролировать, что задание было выполнено, перенести задание, если необходимо).

Если вы рассматриваете альтернативные механизмы синхронизации нескольких инстансов вашего приложения, советую почитать эту ветку на StackOverflow
(здесь есть мой пост с содержанием данной статьи)

Реализация

Описание задачи

Ниже приведен пример настройки приложения на Spring Boot, которое одновременно крутится на нескольких серверах и смотрит на одну базу данных. В каждом инстансе приложения есть бин - задание, которое выполняется по cron. Это здание должно быть выполнено только единожды (на каком-то одном из инстансов).
В случае падения пода, который выполнял джобу, задание должно быть перезапущено на любом другом рабочем поде. Если же под не упал во время выполнения задания, но
задание не было выполнено (получили exception во время исполнения), задание необходимо перезапустить еще 2 раза с задержкой в 5 часов * количество попыток.
Если 2-я попытка перезапуска была неуспешной, то необходимо установить дефолтный cron
для нашего задания:
0 0 4 L-1 * ? * - выполнение в 4 утра предпоследнего дня каждого месяца.

Теперь мы определились, что точно будем использовать Quartz и будем использовать его в режиме кластера

Подключаем зависимость:

Gradle
implementation("org.springframework.boot:spring-boot-starter-quartz")

Maven
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

Наполняем базу:

Перед тем как приступить к написанию конфигураций, необходимо наполнить нашу базу таблицами, с которыми будет работать Quartz. Я использовал liquibase и официальные скрипты - вот отсюда.
Под задачи Quartz лучше завести отдельную схему в базе.

Задание, которое будет выполняться по cron:

Имитируем поведение, при котором джоба или один из сервисов, которые использует данное задание могут вернуть ошибку в 50% случаев.

@Component
@Profile("quartz")
class SomeJob(
    private val someService: SomeService
) : QuartzJobBean() {
    private val log: Logger = LoggerFactory.getLogger(SomeJob::class.java)
    
    override fun executeInternal(jobExecutionContext: JobExecutionContext) {
        try {
            log.info("Doing awesome work...")
            someService.work()
            if ((1..10).random() > 5) throw RuntimeException("Something went wrong...")
        } catch (e: Exception) {
            throw JobExecutionException(e)
        }
    }
}

Настройка конфигурации

(больше информации здесь):

@Configuration
@Profile("quartz")
class JobConfig {
    //JobDetail дла задания выше
    @Bean
    fun someJobDetail(): JobDetail {
        return JobBuilder
            .newJob(SomeJob::class.java).withIdentity("SomeJob")
            .withDescription("Some job")
            // Устанавливаем данное значение в true, если хотим, чтобы джоба была перезапущена
            // в случае падения пода
            .requestRecovery(true)
            // не удаляем задание из базы даже в случае, если ни один из триггеров на задание не укаывает
            .storeDurably().build()
    }

    //Trigger
    @Bean
    fun someJobTrigger(someJobDetail: JobDetail): Trigger {
        return TriggerBuilder.newTrigger().forJob(someJobDetail)
            .withIdentity("SomeJobTrigger")
            .withSchedule(CronScheduleBuilder.cronSchedule("0 0 4 L-1 * ? *"))
            .build()

    }

    // Необходимо также при старте пересоздавать уже имеющиеся задания 
    // (нужно на случай, если вы заходите изменить cron выражение для какого-либо из ваших заданий,
    // которые уже были созданы ранее, в противном случае в базе сохранится старое cron выражение)
    @Bean
    fun scheduler(triggers: List<Trigger>, jobDetails: List<JobDetail>, factory: SchedulerFactoryBean): Scheduler {
        factory.setWaitForJobsToCompleteOnShutdown(true)
        val scheduler = factory.scheduler
        factory.setOverwriteExistingJobs(true)
        //https://stackoverflow.com/questions/39673572/spring-quartz-scheduler-race-condition
        factory.setTransactionManager(JdbcTransactionManager())
        rescheduleTriggers(triggers, scheduler)
        scheduler.start()
        return scheduler
    }

    private fun rescheduleTriggers(triggers: List<Trigger>, scheduler: Scheduler) {
        triggers.forEach {
            if (!scheduler.checkExists(it.key)) {
                scheduler.scheduleJob(it)
            } else {
                scheduler.rescheduleJob(it.key, it)
            }
        }
    }
}
    

Создание слушателя, который будет следить за выполнением нашего задания:

Чтобы listener заработал, необходимо его зарегистрировать в шедулере.

@Component
@Profile("quartz")
class JobListenerConfig(
    private val schedulerFactory: SchedulerFactoryBean,
    private val jobListener: JobListener
) {
    @PostConstruct
    fun addListener() {
        schedulerFactory.scheduler.listenerManager.addJobListener(jobListener, KeyMatcher.keyEquals(jobKey("SomeJob")))
    }
}

Основная логика обработки жизненного цикла задания:

За статусом выполнения задания следим с помощью слушателя, которого мы зарегистрировали в шедулере ранее. У листенера есть 2 метода:
jobToBeExecuted(context: JobExecutionContext)
и
jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?),
которые выполняются до старта и после исполнения задания (вне зависимости от того, было ли выполнено задание успешно или нет)

Вся логика представлена ниже. Добавлю лишь пару комментариев:

  • Шудулер понимает, сколько раз триггер был перезапущен с помощью информации в jobDataMap. Причем эти данные хранятся в бд, поэтому в случае рестарта инстанса, предыдущее значение количества неуспешных запусков триггера будет вычитано.

  • В случае падения приложения во время выполнения задания, в базе может остаться невыполненный триггер, который в свою очередь во время рестарта приложения может быть преобразован в recovery триггер (его название будет начинаться с recovery_ и триггер будет иметь группу RECOVERING_JOBS)

@Profile("quartz")
class JobListener(
    //можно вытащить из контекста выполнения, либо заинжектить напрямую из application контекста
    private val scheduler: Scheduler,
    private val triggers: List<Trigger>
): JobListenerSupport() {

    private lateinit var triggerCronMap: Map<String, String>

    @PostConstruct
    fun post(){
        //В лист триггеров будут помещены только самописные задания, recover триггеры (если 
        //они существуют на момент старта приложения в этот лист внедрены не будут)
        triggerCronMap = triggers.associate {
            it.key.name to (it as CronTrigger).cronExpression
        }
    }

    override fun getName(): String {
        return "myJobListener"
    }


    override fun jobToBeExecuted(context: JobExecutionContext) {
        log.info("Job: ${context.jobDetail.key.name} ready to start by trigger: ${context.trigger.key.name}")
    }


    override fun jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?) {
        //либо можно использовать context.mergedJobDataMap
        val dataMap = context.trigger.jobDataMap
        val count = if (dataMap["count"] != null) dataMap.getIntValue("count") else {
            dataMap.putAsString("count", 1)
            1
        }
        //В этот блок if можно добавить следующее условие: && !context.trigger.key.name.startsWith("recover_")
        // в этом случае шедулер не будет будет перезапускать recover триггеры, которые могут образоваться
        // в случае падения приложения во время выполнения задания.
        if (jobException != null ){
            if (count < 3) {
                log.warn("Job: ${context.jobDetail.key.name} filed while execution. Restart attempts count: $count ")
                val oldTrigger = context.trigger
                var newTriggerName = context.trigger.key.name + "_retry"
                //на случай, если триггер с таким именем уже существует (остался в бд после падения инстанса)
                context.scheduler.getTriggersOfJob(context.jobDetail.key)
                    .map { it.key.name }
                    .takeIf { it.contains(newTriggerName) }
                    ?.apply { newTriggerName += "_retry" }
                val newTrigger = TriggerBuilder.newTrigger()
                    .forJob(context.jobDetail)
                    .withIdentity(newTriggerName, context.trigger.key.group)
                    //заменяем наш cron триггер simple триггером, который будет запущен 
                    // через 5 часов * количество попыток перезапуска задания
                    .startAt(Date.from(Instant.now().plus((5 * count).toLong(), ChronoUnit.HOURS)))
                    .usingJobData("count", count + 1 )
                    .build()
                val date = scheduler.rescheduleJob(oldTrigger.key, newTrigger)
                log.warn("Rescheduling trigger: ${oldTrigger.key} to trigger: ${newTrigger.key}")
            } else {
                log.warn("The maximum number of restarts has been reached. Restart attempts: $count")
                recheduleWithDefaultTrigger(context)
            }
        } else if (count > 1) {
            recheduleWithDefaultTrigger(context)
        }
        else {
            log.info("Job: ${context.jobDetail.key.name} completed successfully")
        }
        context.scheduler.getTriggersOfJob(context.trigger.jobKey).forEach {
            log.info("Trigger with key: ${it.key} for job: ${context.trigger.jobKey.name} will start at ${it.nextFireTime ?: it.startTime}")
        }
    }

    private fun recheduleWithDefaultTrigger(context: JobExecutionContext) {
        val clone = context.jobDetail.clone() as JobDetail
        val defaultTriggerName = context.trigger.key.name.split("_")[0]
        //Recovery триггеры не должны быть перешедулены
        if (!triggerCronMap.contains(defaultTriggerName)) {
            log.warn("This trigger: ${context.trigger.key.name} for job: ${context.trigger.jobKey.name} is not self-written trigger. It can be recovery trigger or whatever. This trigger must not be recheduled.")
            return
        }
        log.warn("Remove all triggers for job: ${context.trigger.jobKey.name} and schedule default trigger for it: $defaultTriggerName")
        scheduler.deleteJob(clone.key)
        scheduler.addJob(clone, true)
        scheduler.scheduleJob(
            TriggerBuilder.newTrigger()
                .forJob(clone)
                .withIdentity(defaultTriggerName)
                .withSchedule(CronScheduleBuilder.cronSchedule(triggerCronMap[defaultTriggerName]))
                .usingJobData("count", 1)
                .startAt(Date.from(Instant.now().plusSeconds(5)))
                .build()
        )
    }
}

Стоит обратить внимание на пару методов:
jobException.setRefireImmediately(true), который можно использовать совместно с context.refireCount, если у вас нет необходимости в переносе задания после получения ошибки во время выполнения. Задание будет перезапущено немедленно.
В одном из ответов на StackOverflow рекомендовали использовать в джобе
Thread.sleep(N-seconds) вместо переназначения задания в случае падения - это явно не лучшая идея ☺

Файл application-quartz.yaml

И последнее вещь, которую осталось сделать - написать конфигурационный yaml файл для профиля quartz, который мы будем использовать. Комментарии в файле оставлю без перевода:

spring:
  quartz:
    job-store-type: jdbc #Database Mode
    jdbc:
      initialize-schema: never #Do not initialize table structure
    properties:
      org:
        quartz:
          scheduler:
            instanceId: AUTO #Default hostname and timestamp generate instance ID, which can be any string, but must be the only corresponding qrtz_scheduler_state INSTANCE_NAME field for all dispatchers
            #instanceName: clusteredScheduler #quartzScheduler
          jobStore:
#            a few problems with the two properties below: https://github.com/spring-projects/spring-boot/issues/28758#issuecomment-974628989 & https://github.com/quartz-scheduler/quartz/issues/284
#            class: org.springframework.scheduling.quartz.LocalDataSourceJobStore #Persistence Configuration
            driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #We only make database-specific proxies for databases
#            useProperties: true #Indicates that JDBC JobStore stores all values in JobDataMaps as strings, so more complex objects can be stored as name-value pairs rather than serialized in BLOB columns.In the long run, this is safer because you avoid serializing non-String classes to BLOB class versions.
            tablePrefix: quartz_schema.QRTZ_  #Database Table Prefix
            misfireThreshold: 60000 #The number of milliseconds the dispatcher will "tolerate" a Trigger to pass its next startup time before being considered a "fire".The default value (if you do not enter this property in the configuration) is 60000 (60 seconds).
            clusterCheckinInterval: 5000 #Set the frequency (in milliseconds) of this instance'checkin'* with other instances of the cluster.Affects the speed of detecting failed instances.
            isClustered: true #Turn on Clustering
          threadPool: #Connection Pool
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 3
            threadPriority: 1
            threadsInheritContextClassLoaderOfInitializingThread: true

Локальная отладка проводилась следующим образом: написал пару docker compose файлов, в которых поднял базу и несколько инстансов приложения, которые "натравил" на поднятую базу. Если интересно, могу отдельно это описать.

Дополнительная информация:

Вот еще несколько интересных статей по теме, которые советую к прочтению:
About quartz
Spring boot using quartz in cluster mode
Интересная статья от коллег из OTUS
Cluster effectively quartz

P.S. Буду рад конструктивной критике предложенного выше решения и с удовольствием изучу альтернативы.

Благодарю за уделенное время!

Автор:
gearbase

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js