Использование Akka в Spring-приложении

в 9:32, , рубрики: akka, java, jmock, multithreading, spring

Фреймворк Akka позволяет реализовать многопоточность в Java-приложении, используя концепцию акторов, взаимодействующих посредством посылки друг другу сообщений. Создав несколько копий акторов одного и того же типа, мы можем таким образом распределить нагрузку в приложении между несколькими потоками.

В данной статье приводится пример использования Akka в Spring-приложении, что представляет некоторую сложность, поскольку в силу ее особенностей, акторов нельзя создавать посредством простого вызова new.

Создадим вначале некую модельную проблему

Сгенерируем обычное spring-boot приложение:

AkkaProjectApplication

@SpringBootApplication
public class AkkaProjectApplication {
    public static void main(String[] args) {
        SpringApplication.run(AkkaProjectApplication.class, args);
    }
}

С тестом на то, что контекст стартует без проблем:

AkkaProjectApplicationTests

@RunWith(SpringRunner.class)
@SpringBootTest
public class AkkaProjectApplicationTests {
    @Test
    public void contextLoads() {
    }
}

Предположим, что у нас есть некий внешний сервис ExternalService с довольно-таки небыстрой операцией:

ExternalServiceFakeImpl

@Service
public class ExternalServiceFakeImpl implements ExternalService {
    @Value("${delay.base:300}")
    long delayBase;

    @Value("${delay.spread:300}")
    long delaySpread;

    @Override
    public ServiceResponse timeConsumingOperation(ServiceRequest request) {
        try {
            sleep(delayBase + (int) (delaySpread * random()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return buildServiceResponse(SUCCESS);
    }

    private ServiceResponse buildServiceResponse(StatusCodeType statusCode) {
        ServiceResponse result = new ServiceResponse();
        result.setStatusCode(statusCode);
        return result;
    }
}

И нам требуется доставать данные из некоторого хранилища и отправлять на обработку в этот небыстрый сервис. Данные предоставляет DataProvider:

DataProvider

@Component
public class DataProvider implements Dao {
    @Override
    public List<DataItem> retrieveItems(int maxSize) {
        List<DataItem> result = new ArrayList<>();
        for (int i = 0; i < maxSize * random(); i++) {
            result.add(buildDataItem());
        }
        return result;
    }

    private DataItem buildDataItem() {
        DataItem dataItem = new DataItem();
        dataItem.setTime(LocalDateTime.now());
        dataItem.setValue(random());
        return dataItem;
    }
}

Где класс DataItem используется как контейнер данных:

DataItem

public class DataItem {
    private LocalDateTime time;
    private Double value;

    public LocalDateTime getTime() {
        return time;
    }

    public void setTime(LocalDateTime time) {
        this.time = time;
    }

    public Double getValue() {
        return value;
    }

    public void setValue(Double value) {
        this.value = value;
    }
}

При условии, что инстансов ExternalService у нас много/несколько, и они стоят за балансером, есть смысл слать запросы в него в несколько потоков. Собственно для этого и используем Akka

Подготовка инфраструктуры для использования Akka в Spring-приложении

Добавляем нужные зависимости:

Akka зависимости

<properties>
  <akka.version>2.5.4</akka.version>
</properties>

<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-actor_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-testkit_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-slf4j_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>

Для интеграции фреймворка со Spring, необходимо использовать свой класс SpringActorProducer, имплементирующий IndirectActorProducer:

SpringActorProducer

public class SpringActorProducer implements IndirectActorProducer {
    private final ApplicationContext applicationContext;
    private final String actorBeanName;

    public SpringActorProducer(ApplicationContext applicationContext, 
    String actorBeanName) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
    }

    @Override
    public Actor produce() {
        return (Actor) applicationContext.getBean(actorBeanName);
    }

    @Override
    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
    }
}

а также SpringExtension, имплементирующий Extension:

SpringExtension

@Component
public class SpringExtension implements Extension {
    private ApplicationContext applicationContext;

    public void initialize(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    public Props props(String actorBeanName) {
        return Props.create(SpringActorProducer.class, applicationContext, 
        actorBeanName);
    }
}

Для создания Actor system, создадим отдельный конфиг AkkaConfig:

AkkaConfig

@Configuration
@Lazy
public class AkkaConfig {
    @Autowired
    ApplicationContext applicationContext;

    @Autowired
    SpringExtension springExtension;

    @Bean
    public ActorSystem actorSystem() {
        ActorSystem system = ActorSystem.create("KMT", akkaConfiguration());

        // Initialize the application context in the Akka Spring Extension
        springExtension.initialize(applicationContext);
        return system;
    }

    /**
     * Read configuration from application.conf file
     */
    @Bean
    public Config akkaConfiguration() {
        return ConfigFactory.load();
    }
}

С конфигом

akka {
# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs to STDOUT)
loggers = [«akka.event.slf4j.Slf4jLogger»]

# Log level used by the configured loggers (see «loggers») as soon
# as they have been started; before that, see «stdout-loglevel»
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = «INFO»

# Log level for the very basic logger activated during ActorSystem startup.
# This logger prints the log messages to stdout (System.out).
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = «INFO»
}

И логгинг конфигурацией

<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%date{ISO8601} %-5level %X{akkaSource} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="INFO">
    <appender-ref ref="STDOUT"/>
  </root>
</configuration>

{akkaSource} позволяет видеть в логах, какой именно поток делает конкретное действие

Теперь акторы в сприн-бинах можем создавать при помощи props метода у SpringExtension. Это и сделаем в методе, аннотированном @PostConstruct в классе Scheduler, который достает записи при помощи DataProvider и шлет акторам:

Scheduler

@Component
public class Scheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(Scheduler.class);
    private final long DELAY = 10_000;

    @Value("${scheduler.batchSize:5}")
    int batchSize;

    @Autowired
    ApplicationContext context;

    @Autowired
    Dao dataProvider;

    ActorRef migrationActor;

    @Scheduled(fixedDelay = DELAY)
    public void performRegularAction() {
        LOGGER.info("---------------------------------------------------");
        LOGGER.info("Try to retrieve {} records from DB...", batchSize);

        List<DataItem> dataItems = dataProvider.retrieveItems(batchSize);
        LOGGER.info("{} records retrieved", dataItems.size());

        int itemNumber = 0;
        for (DataItem dataItem : dataItems) {
            LOGGER.info("Send to actor item №{}...", itemNumber++);

            migrationActor.tell(new MigrationActor.Send(dataItem),
            ActorRef.noSender());
        }
    }

    @PostConstruct
    public void postConstructMethod() {
        ActorSystem system = context.getBean(ActorSystem.class);
        SpringExtension springExtension = context.getBean(SpringExtension.class);

        // Use the Spring Extension to create props for a named actor bean
        migrationActor = system.actorOf(
                springExtension.props("migrationActor")
                .withRouter(new RoundRobinPool(4)));
    }
}

Метод postConstructMethod создает reference на migrationActor, сообщения к которому будут попадать через роутер RoundRobinPool длиной 4, в итоге одновременно будут работать 4 инстанса MigrationActor, каждый в своем потоке. В итоге сообщения в коде шлем одному и тому же объекту — а роутер раскидывает их по разным инстансам

Пишем тест для Scheduler-а (извините, на JMock):

SchedulerTest

public class SchedulerTest {
    @org.junit.Rule
    public final JUnitRuleMockery mockery = new JUnitRuleMockery() {{
        setImposteriser(ClassImposteriser.INSTANCE);
    }};

    Scheduler scheduler;

    static ActorSystem system;
    static TestProbe probe;

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("TestSystem");
        probe = TestProbe.apply(system);
    }

    @AfterClass
    public static void tearDown() {
        TestKit.shutdownActorSystem(system);
        system = null;
    }

    @SuppressWarnings("unchecked")
    @Before
    public void setUp() {
        scheduler = new Scheduler();

        scheduler.dataProvider = mockery.mock(Dao.class);
        scheduler.migrationActor = probe.ref();
        scheduler.batchSize = 5;
    }

    @Test
    public void performRegularAction() throws Exception {
        DataItem dataItem = new DataItem();
        List<DataItem> dataItems = Arrays.asList(dataItem);

        mockery.checking(new Expectations() {
            {
                oneOf(scheduler.dataProvider).retrieveItems(scheduler.batchSize);
                will(returnValue(dataItems));
            }
        });

        scheduler.performRegularAction();

        MigrationActor.Send send = probe.expectMsgClass(MigrationActor.Send.class);
        assertThat("Wrong message in Send", send.dataItem, is(dataItem));
    }
}

Наконец описываем сам актор:

MigrationActor

@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MigrationActor extends AbstractActor {
    private final LoggingAdapter LOGGER = 
    Logging.getLogger(getContext().getSystem(), this);

    @Autowired
    @Qualifier("externalServiceFakeImpl")
    ExternalService externalService;

    public MigrationActor() {
    }

    public MigrationActor(ExternalService externalService) {
        this.externalService = externalService;
    }

    public static Props props(ExternalService externalService) {
        return Props.create(MigrationActor.class, externalService);
    }

    @Override
    public void preStart() throws Exception {
        super.preStart();
        LOGGER.info("Migration actor started");
    }

    @Override
    public void postStop() throws Exception {
        LOGGER.info("Migration actor stopped");
        super.postStop();
    }

    public static class Send {
        public final DataItem dataItem;

        public Send(DataItem dataItem) {
            this.dataItem = dataItem;
        }
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Send.class, this::onSend)
                .build();
    }

    private void onSend(Send send) {
        LOGGER.info("Start call service by onSend...");

        ServiceRequest request = new ServiceRequest();
        request.setDataItem(send.dataItem);
        externalService.timeConsumingOperation(request);

        LOGGER.info("Finish call to service by onSend");
    }
}

На что здесть стоит обратить внимание:
— актор имеет scope=prototype, поскольку стандартный scope=singleton не подходит для стратегии «актор-на-поток»
— как создается логгер
— классы месседжей, передаваемые актору, определяются прямо внутри него, т.к. по сути это — часть его контракта. Вообще — месседжи, как используемые для обмена между потоками, должны быть immutable
— определяем статический метод props, чтобы создавать Акторы, используя Props.create(). Это понадобится для теста на актор
— как следствие — нам приходится определить конструктор с сервисом в виде параметра
— т.к. акторы создаются как спринг-бины, а конструктор с параметрами уже есть — также объявляем и пустой конструктор
— наличие методов preStart и postStop, где можно отследить моменты старта/запуска актора
— метод createReceive, где собственно и происходит матчинг получаемых сообщений на методы, которые вызываются как реакция на сообщения

Пишем тест для актора:

MigrationActorTest

public class MigrationActorTest {
    @org.junit.Rule
    public final JUnitRuleMockery mockery = new JUnitRuleMockery() {{
        setImposteriser(ClassImposteriser.INSTANCE);
        setThreadingPolicy(new Synchroniser());
    }};

    static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("TestSystem");
    }

    @AfterClass
    public static void tearDown() {
        TestKit.shutdownActorSystem(system);
        system = null;
    }

    @Test
    public void testMigrationActorSend() {
        ExternalService externalService = mockery.mock(ExternalService.class);
        DataItem dataItem = new DataItem();

        mockery.checking(new Expectations() {
            {
                oneOf(externalService).timeConsumingOperation(with(allOf(
                        any(ServiceRequest.class),
                        hasProperty("dataItem", is(dataItem))
                )));
            }
        });

        TestActorRef<MigrationActor> ref = TestActorRef.create(system, 
        MigrationActor.props(externalService), "migrationActor");

        ref.tell(new MigrationActor.Send(dataItem), ActorRef.noSender());
    }
}

Тут понадобилось для JUnitRuleMockery указать ThreadingPolicy, т.к. иначе mockery была бы non-ThreadSafe.

Вот вкратце и все. Стоит только заметить, что в случае необходимости также можно передавать результаты работы одного актора — другому или самому отправителю, если он — тоже актор. Для тестирования таких акторов стоит использовать класс TestKit

Исходный код приведен здесь

Автор: Андрей

Источник

Поделиться

* - обязательные к заполнению поля