Тестирование многопоточного и асинхронного кода

в 10:13, , рубрики: asynchronous, debug, java, multithreading, testing, translation

Привет!
На неделе встала задача написать интеграционный тест для Spring Boot приложения, использующего асинхронное взаимодействие с внешними системами.
Освежил много материала про отладку многопоточного кода.
Привлекла внимание статья «Testing Multi-Threaded and Asynchronous Code» by Jonathan Halterman, мой перевод которой приведен ниже.

Если вы пишете код достаточно долго а может даже и нет, то, вероятно, столкнулись со сценарием, в котором нужно протестировать многопоточный код. Обычно считается, что потоки и тесты не должны смешиваться. Обычно это получается, т.к. то, что подлежит тестированию, как раз запускается внутри многопоточной системы и может быть протестировано индивидуально без использования потоков. Но что делать, если вы не можете их разделить или более того, если многопоточность — это тот аспект кода, который вы тестируете?
Я здесь, чтобы сказать вам, что, хотя потоки в тестах не сильно распространены, но вполне используются. Программная полиция не арестует вас за запуск потока в модульном тесте, хотя, как на самом деле тестировать многопоточный код — это другой вопрос. Некоторые превосходные асинхронные технологии, такие как Akka и Vert.x, предоставляют тестовые наборы для облегчения этого бремени. Но помимо этого, тестирование многопоточного кода обычно требует иного подхода, чем типичный синхронный модульный тест.

Идем параллельно

Первым шагом является запуск любого многопоточного действия, для которого требуется проверить результат. Например, давайте использовать гипотетический API для регистрации обработчика сообщений на шине сообщений и публикации на шине сообщения, которое будет доставлено нашему обработчику асинхронно в отдельном потоке:

messageBus.registerHandler(message - > {
    System.out.println("Received " + message);
});
messageBus.publish("test");

Выглядит неплохо. Когда запускается тест, шина должна доставить наше сообщение обработчику в другом потоке, но это не очень полезно, так как мы ничего не проверяем. Давайте обновим наш тест, чтобы подтвердить, что шина сообщений доставляет наше сообщение так, как ожидалось:

String msg = "test";
messageBus.registerHandler(message -> {
  System.out.println("Received " + message);
  assertEquals(message, msg);
};
messageBus.publish(msg);

Выглядит лучше. Запускаем наш тест и он зеленый. Круто! Но сообщение Received нигде не напечаталось, что-то где-то неправильно.

Подождите секундочку

В тесте выше, когда сообщение публикуется на шине сообщений, оно доставляется шиной обработчику в другом потоке. Но когда инструмент модульного тестирования, такой как JUnit, выполняет тест, он ничего не знает о потоках шины сообщений. JUnit знает только о главном потоке, в котором он выполняет тест. Таким образом, пока шина сообщений занята, пытаясь доставить сообщение, тест завершает выполнение в основном тестовом потоке и JUnit сообщает об успехе. Как это решить? Нам нужно, чтобы основной тестовый поток ждал, пока шина сообщений доставит наше сообщение. Поэтому давайте добавим оператор sleep:

String msg = "test";
messageBus.registerHandler(message -> {
  System.out.println("Received " + message);
  assertEquals(message, msg);
};
messageBus.publish(msg);
Thread.sleep(1000);

Наш тест зеленый и выражение Received печатается, как и ожидалось. Круто! Но одна секунда сна означает, что наш тест выполняется по меньшей мере одну секунду, и в этом нет ничего хорошего. Мы могли бы уменьшить время сна, но тогда мы рискуем закончить тест до получения сообщения. Нам нужен способ координации между основным тестовым потоком и потоком обработчика сообщений. Глядя на пакет java.util.concurrent, мы обязательно найдем то, что можем использовать. Как насчет CountDownLatch?

String msg = "test";
CountDownLatch latch = new CountDownLatch(1);
messageBus.registerHandler(message -> {
  System.out.println("Received " + message);
  assertEquals(message, msg);
  latch.countDown();
};
messageBus.publish(msg);
latch.await();

В этом подходе мы разделяем (share) CountDownLatch между основным тестовым потоком и потоком обработчика сообщений. Основной поток вынужден ждать на блокере. Тестовый поток освобождает ожидающий основной поток, вызывая countDown() на блокере после получения сообщения. Нам больше не нужно спать одну секунду. Наш тест занимает ровно столько времени, сколько нужно.

Так устраивает?

С нашей новой прелестью CountDownLatch мы начинаем писать многопоточные тесты, как последние модницы. Но довольно быстро мы замечаем, что один из наших тест-кейсов блокируется навсегда и не завершается. Что же происходит? Рассмотрим сценарий шины сообщений: блокер заставляет ждать, но он освобождается только после получения сообщения. Если шина не работает и сообщение никогда не будет доставлено, то тест никогда не завершится. Поэтому давайте добавим таймаут к блокеру:

latch.await(1, TimeUnit.SECONDS);

Тест, который блокировался, завершается неуспешно через 1 секунду с исключением TimeoutException. В конце концов мы найдем проблему и исправим тест, но решаем оставить тайм-ауты на месте. В случае, если это когда-нибудь повторится, мы предпочли бы, чтобы наш тест заблокировался на секунду и упал, чем заблокировался навсегда и не был завершен вообще.
Еще одна проблема, которую мы замечаем при написании тестов, заключается в том, что все они, похоже, проходят даже тогда, когда они, вероятно, не должны этого делать. Как это возможно? Рассмотрим тест обработки сообщений еще раз:

messageBus.registerHandler(message -> {
  assertEquals(message, msg);
  latch.countDown();
};

Мы должны были использовать CountDownLatch для координации завершения нашего теста с основным тестовым потоком, но как насчет проверок (asserts)? Если проверка не удастся, узнает ли об этом JUnit? Оказывается, поскольку мы не выполняем проверку в основном тестовом потоке, любые зафейленные проверки остаются полностью незамеченными JUnit. Давайте попробуем небольшой сценарий, чтобы проверить это:

CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
  assertTrue(false);
  latch.countDown();
}).start();
latch.await();

Тест зеленый! Так что нам теперь делать? Нам нужен способ передачи любых ошибок тестирования из потока обработчика сообщений обратно в основной тестовый поток. Если в потоке обработчика сообщений происходит сбой, нам нужно, чтобы он повторно появился в основном потоке, чтобы тест зафейлился, как и ожидалось. Давайте попробуем это сделать:

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<AssertionError> failure = new AtomicReference<>();
new Thread(() -> {
  try {
    assertTrue(false);
  } catch (AssertionError e) {
    failure.set(e);
  }
  latch.countDown();
}).start();
latch.await();
if (failure.get() != null)
  throw failure.get();

Быстрый запуск и да, тест не проходит, как и должен! Теперь мы можем вернуться и добавить CountDownLatches, блоки try/catch и AtomicReference ко всем нашим тестовым случаям. Круто! На самом деле, не круто, выглядит как бойлерплейт.

Вырезаем хлам

В идеале нам нужен API, который позволяет нам координировать ожидание, проверку и возобновление выполнения между потоками, чтобы модульные тесты могли проходить или фейлиться, как ожидалось, независимо от того, где происходит сбой проверки. К счастью, ConcurrentUnit предоставляет облегченную структуру, которая делает именно это: Waiter. Давайте адаптируем тест обработки сообщений выше в последний раз и посмотрим, что Waiter из ConcurrentUnit может сделать для нас:

String msg = "test";
Waiter waiter = new Waiter();
messageBus.registerHandler(message -> {
  waiter.assertEquals(message, msg);
  waiter.resume();
};
messageBus.publish(msg);
waiter.await(1, TimeUnit.SECONDS);

В этом тесте мы видим, что Waiter занял место нашего CountDownLatch и AtomicReference. С помощью Waiter мы блокируем основной тестовый поток, выполняем проверку, затем возобновляем основной тестовый поток, чтобы тест мог завершиться. Если проверка фейлится, то вызов waiter.await автоматически снимет блокировку и выбросит сбой, что приведет к тому, что тест пройдет или зафейлится, как должен, даже если проверка осуществлялась из другого потока.

Еще более параллельно

Теперь, когда мы стали сертифицированными многопоточными тестерами, мы можем захотеть подтвердить, что происходит несколько асинхронных действий. Waiter из ConcurrentUnit делает это просто:

Waiter waiter = new Waiter();
messageBus.registerHandler(message -> {
  waiter.resume();
};
messageBus.publish("one");
messageBus.publish("two");
waiter.await(1, TimeUnit.SECONDS);

Здесь мы публикуем два сообщения в шину и проверяем, что оба сообщения доставлены, заставляя Waiter ждать, пока resume() не будет вызван 2 раза. Если сообщения не доставляются и resume не вызывается дважды в течение 1 секунды, то тест зафейлится с ошибкой TimeoutException.
Один общий совет с этим подходом заключается в том, чтобы убедиться, что ваши тайм-ауты достаточно долгие для завершения любых параллельных действий. В нормальных условиях, когда тестируемая система работает так, как ожидалось, тайм-аут не имеет значения и вступает в действие только в случае отказа системы по какой-либо причине.

Резюме

В этой статье мы узнали, что многопоточное модульное тестирование не является злом и его довольно легко провести. Мы узнали об общем подходе, когда мы блокируем основной тестовый поток, выполняем проверки из некоторых других потоков, а затем возобновляем основной поток. И мы узнали о ConcurrentUnit, который может облегчить эту задачу.
Счастливого тестирования!

Перевод выполнен @middle_java

Автор: middle_java

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js