Корутины и синхронизация. Лучше не смешивать

в 9:39, , рубрики: concurrency, coroutines, funcorp, kotlin, multithreading, Mutex, synchronized, syncronize, Блог компании FunCorp, Программирование
Корутины и синхронизация. Лучше не смешивать - 1

Предположим, у вас в коде есть критическая секция, которая не должна выполняться более, чем одним потоком одновременно.

В мире Java одним из стандартных решений является добавление ключевого слова synchronized к сигнатуре метода. В Kotlin для получения того же эффекта используется аннотация @Synchronized

repeat(2) {
  thread { criticalSection() }
}

@Synchronized
fun criticalSection() {
  println("Starting!")
  Thread.sleep(10)
  println("Ending!")
}

Данный код выведет следующее:

Starting!
Ending!
Starting!
Ending!

Видно, что оба вызова отработали последовательно, один после другого.

Теперь предположим, что мы хотим использовать корутины. Что у нас получится?

val scope = CoroutineScope(Job())

repeat(2) {
  scope.launch { criticalSectionSuspending() }
}

@Synchronized
suspend fun criticalSectionSuspending() {
  println("Starting!")
  delay(10)
  println("Ending!")
}

А получится, что вызовы критической секции пересекутся, что очень не здорово.

Starting!
Starting!
Ending!
Ending!

Понять, что же происходит, можно, разобравшись, как устроены корутины под капотом. Они реализованы с использованием подхода передачи продолжения. (Краткое объяснение можно посмотреть в моём докладе про корутины Grokking Coroutines, Dan Lew, а для более глубокого понимания рекомендую посмотреть доклад Романа Елизарова Deep Dive into Coroutines.)

В рамках этой статьи всё, что вам надо знать — это то, что останавливаемые функции на самом деле не исполняются последовательно, строка за строкой. Когда останавливаемая функция доходит до точки остановки, она ставится на паузу и передаёт управление какой-то другой функции (позже, когда управление будет возвращено этой функции, — она продолжит выполнение).

Таким образом, во втором примере кода на самом деле происходит следующее:

1. criticalSectionSuspending() стартует, забирает блокировку и печатает Starting!

2. Доходит до delay() (который является точкой остановки), выходит из функции и отдаёт блокировку.

3. Так как блокировка свободна, начинается второй запуск criticalSectionSuspending(), которая забирает блокировку, печатает Starting!, останавливается и тоже отдаёт блокировку.

4. Когда delay() заканчивается, criticalSectionSuspending() запускается снова, но уже с предыдущего места остановки.

Для большей наглядности привожу временную диаграмму выполнения одной функции.

Корутины и синхронизация. Лучше не смешивать - 2

Как видите, время остановки проходит вне критической секции и её блокировка не удерживается. Именно поэтому разные потоки имеют доступ к синхронизированной останавливаемой функции — они выполняют её не одновременно.

Это известная проблема в Kotlin. Фактически, компилятор не позволит использовать synchronized() {} с точкой останова внутри. Такой код не скомпилируется с ошибкой "The 'delay' suspension point is inside a critical section"

Я убеждён, что в случае использования аннотации @Synchronized поведение компилятора должно быть аналогичным. Проблема заведена в YouTrack, но особых подвижек пока нет.

Как же быть?

Во-первых, следует признать, что проблема не в том, что «мы не можем использовать synchronized». synchronized — это просто средство обеспечения работоспособности критических секций. И единственная причина того, что у нас есть эти критические секции в том, что у нас есть общее изменяемое состояние. Соответственно, проблема звучит следующим образом: «нам нужен способ управления общим изменяемым состоянием в многопоточной среде».

К счастью, у нас есть официальное руководство по Kotlin, в котором есть раздел Shared mutable state and concurrency, описывающий несколько неплохих вариантов действий. Для нас более всего подходит секция про мьютексы, так как они наиболее похожи на синхронизацию.

val mutex = Mutex()
val scope = CoroutineScope(Job())

repeat(2) {
  scope.launch { criticalSectionSuspendingLocked() }
}

suspend fun criticalSectionSuspendingLocked() {
  mutex.withLock {
    println("Starting!")
    delay(10)
    println("Ending!")
  }
}

Код выше работает ровно так, как нам нужно, — выводит не перемешанные сообщения.

Я не хочу, чтобы люди считали, что использование мьютексов является универсальным решением при работе с общим изменяемым состоянием. В документации описано несколько подходов, и вы должны самостоятельно оценить, какой из них подходит к вашей конкретной ситуации. Но в случае, когда вы использовали синхронизированную функцию и теперь хотите её приостановить, мьютекс — это самый естественный выбор.

От переводчика:

Эту статью я нашёл во время поисков ответа на вопрос: Почему мьютекс в моём конкретном случае работает сильно медленнее (в полтора - два раза), чем синхронизация? Просто имейте в виду, если у вас нет точек останова в горячей критической секции, то лучше использовать синхронизацию, так как мьютекс может сильно просадить производительность.

Автор: Громов Андрей

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js