Мониторинг акторов в Akka.Net, но на F#

в 16:51, , рубрики: .net, C#, F#, функциональное программирование

Сразу скажу, хаба для F# на хабре нет, поэтому пишу в C#.

Для тех кто не знаком с F#, но знаком с C#, рекомендую наисвежайшую статью от Microsoft.
Она поможет Вам испытывать меньше WTF моментов при прочтении, т.к. моя статья не туториал к синтаксису.

Контекст задачи

Есть сервис, написанный на Akka.NET, он вываливает в разные текстовые логи кучу инфы. Отдел эксплуатации грепает эти логи, жарит по ним регекспами, чтобы узнать о кол-ве ошибок (бизнесовых и не очень), о кол-ве входящих в сервис сообщений и кол-ве исходящих. Далее эта информация заливается в ElasticDB, InfluxDB и показывается в Grafana и Kibana в разных срезах и агрегациях.

Звучит сложно, да и парсить текстовые логи сервиса, который генерит несколько десятков ГБ текстового мусора в день — занятие неблагодарное. Поэтому встала задача — сервис должен быть способен поднять ендпоинт, который можно дёрнуть и получить сразу всю инфу о нём.

Решать задачу будем так:

  1. Напишем доменную модель для метрик
  2. Замапим доменную модель метрик на реализацию App.Metrics и поднимем апишечку
  3. Сделаем структурированный доменный логгер, который натянем на внутренний логгер Akka
  4. Сделаем обёртку для функциональных акторов, которая спрячет работу с метриками и логгером
  5. Соберём всё вместе и запустим

Доменная модель для метрик

В App.Metrics есть 6 типов представлений:

  • Counters
  • Apdex
  • Gauges
  • Histograms
  • Meters
  • Timers

В первой итерации нам вполне хватит счётчиков, таймеров и… метров :)
В начале опишем типы и интерфейсы (все приводить не буду, можно посмотреть в репозитории, ссылка в конце).

Так же условимся, что все наши сообщения для метрик будут приходить к особому актору (его мы определим позже) через EventStream (шина сообщений в самой Akka.Net).

Например таймер, который должен уметь замерить некоторое количество времени для какого-то объекта:

    type IMetricsTimer = 
        abstract member Measure : Amount        -> unit
        abstract member Measure : Amount * Item -> unit

Или счётчик, который должен уметь увеличиваться/уменьшаться как с указанием количества, так и без:

    type IMetricsCounter = 
        abstract member Decrement : unit          -> unit
        abstract member Decrement : Amount        -> unit
        abstract member Decrement : Amount * Item -> unit
        abstract member Increment : unit          -> unit
        abstract member Increment : Amount        -> unit
        abstract member Increment : Amount * Item -> unit

И пара примеров команд для шины:

    type DecrementCounterCommand = 
        { CounterId       : CounterId
          DecrementAmount : Amount
          Item            : Item }

    type CreateCounterCommand = 
        { CounterId             : CounterId
          Context               : ContextName
          Name                  : MetricName
          MeasurementUnit       : MeasurementUnit
          ReportItemPercentages : bool
          ReportSetItems        : bool
          ResetOnReporting      : bool }

Самое главное — определим возможные сообщения, которые могут ходить по шине, и на которые будет реагировать наш метрик-актор. Для этого воспользуемся Discriminated Union:

    type MetricsMessage =
        | DecrementCounter of DecrementCounterCommand
        | IncrementCounter of IncrementCounterCommand
        | MarkMeter        of MarkMeterCommand
        | MeasureTime      of MeasureTimeCommand
        | CreateCounter    of CreateCounterCommand
        | CreateMeter      of CreateMeterCommand
        | CreateTimer      of CreateTimerCommand

Теперь надо реализовать интерфейсы и на этом закончить первый пункт. Реализовывать мы их будем в функциональном стиле, т.е. через функции.

Пример создания метра:

    let private createMeter (evtStream: EventStream) meterId = 
        { new IMetricsMeter with

              member this.Mark amount = 
                  this.Mark (amount, Item None)

              member this.Mark item = 
                  this.Mark (Amount 1L, item)

              member this.Mark (amount, item) = 
                  evtStream.Publish <| MarkMeter { MeterId = meterId; Amount = amount; Item = item }

Для людей из мира C# даю аналог:

        private IMetricsMeter createMeter(EventStream evtStream, MeterId meterId)
        {
            private class TempClass : IMetricsMeter
            {
                public void Mark(long amount)
                {
                    Mark(amount, "");
                }

                public void Mark(string item)
                {
                    Mark(1, item);
                }

                public void Mark(long amount, string item)
                {
                    evtStream.Publish(new MarkMeter {...});//omitted
                }
            }
            return new TempClass();
        }

Пусть вас не смущает что аналог не компилируется, это нормально, т.к. приватный класс в теле метода смущает компилятор. А вот в F# вы можете вернуть анонимный класс через интерфейс.

Основное на что надо обратить внимание — мы кидаем в шину сообщение что надо подвинуть измеритель, который определяется через MeterId.

Аналогично поступаем с IMetricsAdapter, но т.к. методов у него много приведу один:

            member this.CreateMeter (name, measureUnit, rateUnit) = 
                let cmd = 
                    { MeterId         = MeterId (toId name)
                      Context         = context
                      Name            = name
                      MeasurementUnit = measureUnit
                      RateUnit        = rateUnit }
                evtStream.Publish <| CreateMeter cmd
                createMeter evtStream cmd.MeterId

При запросе на создание таймера мы отправляем в шину сообщение о создании, а вызывающему возвращаем результат метода createMeter с аргументами evtStream и cmd.MeterId.
Результат её, как видно выше — IMetricsMeter.

После этого создадим расширение для ActorSystem, чтобы можно было вызывать наш IMetricsAdapter откуда угодно:

    type IActorContext with
        member x.GetMetricsProducer context = 
            createAdapter x.System.EventStream context

Акторы для метрик и апишечка

Нам понадобятся два актора:

  • Первый будет слушать шину на наличие в ней MetricsMessage и создавать/писать в метрики.
  • Второй актор будет держать WebApi с одним методом, который будет отгружать по GET запросу всю собранную инфу.

Сразу сообразим ApiController, он тривиален:

    type public MetricController(metrics: IMetrics) = 
        inherit ApiController()

        [<HttpGet>]
        [<Route("metrics")>]
        member __.GetMetrics() =
            __.Ok(metrics.Snapshot.Get())

Далее объявим функцию актора, который будет считывать все MetricsMessage из EventStream и что-то с ними делать. В функцию внедрим зависимость IMetrics через аргументы, внутри создадим кэши для всех метрик через обычные Dictionary.

Почему не ConcurrentDictionary, спросите Вы? А потому что актор обрабатывает сообщения по очереди. Чтобы словить внутри актора race condition, надо целенаправленно стрелять себе в ногу.

    let createRecorder (metrics: IMetrics) (mailbox: Actor<_>) = 
        let self = mailbox.Self

        let counters = new Dictionary<CounterId, ICounter>()
        let meters   = new Dictionary<MeterId,   IMeter>()
        let timers   = new Dictionary<TimerId,   ITimer * TimeUnit>()
        //Часть кода для мапинга пропущена...

        let handle = function
            | DecrementCounter evt ->
                match counters.TryGetValue evt.CounterId with
                | (false, _) -> ()
                | (true,  c) ->
                    let (Amount am) = evt.DecrementAmount
                    match evt.Item with
                    | Item (Some i) -> c.Decrement (i, am)
                    | Item None     -> c.Decrement (am)
            | CreateMeter cmd ->
                match meters.TryGetValue cmd.MeterId with
                | (false, _) ->
                    let (ContextName ctxName) = cmd.Context
                    let (MetricName name)     = cmd.Name
                    let options = new MeterOptions(
                                        Context         = ctxName, 
                                        MeasurementUnit = toUnit cmd.MeasurementUnit, 
                                        Name            = name,
                                        RateUnit        = toTimeUnit cmd.RateUnit)
                    let m = metrics.Provider.Meter.Instance options
                    meters.Add(cmd.MeterId, m)
                | _ -> ()
           //Остальные случае в этом match пропущены

        subscribe typedefof<MetricsMessage> self mailbox.Context.System.EventStream |> ignore

        let rec loop() = actor {
            let! msg = mailbox.Receive()
            handle msg
            return! loop()
        }
        loop()

Краткий смысл — объявили внутреннее состояние в виде словарей разных метрик, объявили функцию обработки сообщения MetricsMessage, подписались на MetricsMessage и вернули рекурсивную функцию обработки сообщения из мейлбокса.

Сообщения для работы с метриками обрабатывается так:

  1. Смотрим какое именно сообщение (через паттерн матчинг)
  2. Ищем в соответствующем словаре метрику с этим Id (для этого есть прекрасный паттерн через пару (bool, obj), который возвращает TryGetValue в F#
  3. Если это запрос на создание метрики и её нет — создаём, добавляем в словарь
  4. Если это запрос на использование метрики и она есть — используем

Так же нам понадобится актор, который поднимает Owin хост с контроллером выше.
Для этого напишем функцию, которая принимает зависимость в виде конфига и IDependencyResolver. Чтобы не завалиться на старте, актор сам себе посылает сообщение, которое инициирует возможный Dispose() старого API и создание нового. И опять таки, т.к. актор внутри себя синхронен, мы можем использовать mutable state.

    type IMetricApiConfig = 
        abstract member Host: string
        abstract member Port: int
    
    type ApiMessage = ReStartApiMessage

    let createReader (config: IMetricApiConfig) resolver (mailbox: Actor<_>) =
        let startUp (app: IAppBuilder) = 
            let httpConfig = new HttpConfiguration(DependencyResolver = resolver)
            httpConfig.Formatters.JsonFormatter.SerializerSettings.Converters.Add(new MetricDataConverter())
            httpConfig.Formatters.JsonFormatter.Indent <- true
            httpConfig.MapHttpAttributeRoutes()
            httpConfig.EnsureInitialized()
            app.UseWebApi(httpConfig) |> ignore

        let uri = sprintf "http://%s:%d" config.Host config.Port
        let mutable api = {new IDisposable with member this.Dispose() = ()}

        let handleMsg (ReStartApiMessage) = 
            api.Dispose()
            api <- WebApp.Start(uri, startUp)

        mailbox.Defer api.Dispose
        mailbox.Self <! ReStartApiMessage

        let rec loop() = actor {
            let! msg = mailbox.Receive()
            handleMsg msg
            return! loop()
        }
        loop()

Так же мы кидаем метод api.Dispose в отложенные задачи при окончательной остановке актора с помощью mailbox.Defer. А для начального состояния переменной api используем заглушку через object expression, которое конструирует пустой IDisposable объект.

Делаем структурированный логгер

Смысл задачи — сделать обёртку для логгера из Akka.Net (он представлен через интерфейс ILoggingAdapter), которую можно будет использовать для замера времени операции и типизированного заноса инфы (не просто стринги, а внятные бизнесовые случаи).

Вся типизация логгера заключена в одном union.

type Fragment =
    | OperationName     of string
    | OperationDuration of TimeSpan
    | TotalDuration     of TimeSpan
    | ReceivedOn        of DateTimeOffset
    | MessageType       of Type
    | Exception         of exn

А сам логгер будет работать по такому интерфейсу:

type ILogBuilder = 
    abstract OnOperationBegin:     unit     -> unit
    abstract OnOperationCompleted: unit     -> unit
    abstract Set:                  LogLevel -> unit
    abstract Set:                  Fragment -> unit
    abstract Fail:                 exn      -> unit
    abstract Supress:              unit     -> unit
    abstract TryGet:               Fragment -> Fragment option

Создавать его будем через обычный класс:

type LogBuilder(logger: ILoggingAdapter) = 
    let logFragments = new Dictionary<System.Type, Fragment>()
    let stopwatch    = new Stopwatch()
    let mutable logLevel = LogLevel.DebugLevel
    interface ILogBuilder with
        //Реализация интерфейса

Возможно Вы спросите, почему обычный Dictionary? Как было сказано выше, данный LogBuilder предназначен для использования внутри актора при обработке одной операции. Нет смысла использовать конкурентную структуру данных.

Приведу пример методов реализации интерфейса:

        let set fragment = 
            logFragments.[fragment.GetType()] <- fragment

        member x.OnOperationBegin() =   
            stopwatch.Start()

        member this.Fail e = 
            logLevel <- LogLevel.ErrorLevel
            set <| Exception e

        member this.OnOperationCompleted() = 
            stopwatch.Stop()
            set <| OperationDuration stopwatch.Elapsed

            match tryGet <| ReceivedOn DateTimeOffset.MinValue with
            | Some (ReceivedOn date) -> set <| TotalDuration (DateTimeOffset.UtcNow - date)
            | _ -> ()

            match status with
            | Active ->
                match (logLevel) with
                | LogLevel.DebugLevel   -> logger.Debug(message())
                | LogLevel.InfoLevel    -> logger.Info(message())
                | LogLevel.WarningLevel -> logger.Warning(message())
                | LogLevel.ErrorLevel   -> logger.Error(message())
                | x                     -> failwith(sprintf "Log level %s is not supported" <| string x)
            | Supressed -> ()

Самое интересное это логика работы OnOperationCompleted():

  • Останавливаем таймер и пишем прошедшее время в логгер через фрагмент OperationDuration
  • Если у нас есть в логе фрагмент ReceivedOn (который в моей модели означает время прихода сообщения в сервис ВООБЩЕ), то пишем в лог общее время нахождения сообщения в сервисе через TotalDuration
  • Если логгер не был выключен (через метод Supress()), то пишем инфу в Akka логгер через метод message(), который я не привёл, но он просто как-то собирает все Fragments в строку с учётом типов сообщений

Создаём обёртку для функциональных акторов

Самая магическая часть, которая позволит нам писать простые функции обработки сообщений без бойлерплейта, который несёт за собой тотальное логирование.

Чего мы хотим добиться? Для начала мы хотим залогировать:

  • Что мы делаем. В нашем случае — это название операции, т.к. функциональные акторы имеют один тип FuncActor
  • Тип обрабатываемого сообщения
  • Сколько таких функций (по сути акторов) живёт в системе
  • Показать время, которое ушло на выполнение операции
  • Показать суммарное время, которое прошло с момента получения данного сообщения на входе в сервис
  • Залогировать ошибку, если она возникла, особым образом
  • иметь возможность писать простые функции обработки сообщений, не думая обо всём выше

Сделать всё выше перечисленное нам помогут Linq.Expressions. Как сделать это через QuotationExpressions из F# я не знаю, т.к. не нашёл простого способа скомпилировать их. Буду рад, если кто предложит варианты.

И так, для начала объявим пару вспомогательных типов и один метод:

type Expr<'T,'TLog when 'TLog :> ILogBuilder> = Expression<System.Action<Actor<'T>, 'T, 'TLog>>

type Wrap =
    static member Handler(e:  Expression<System.Action<Actor<'T>, 'T, #ILogBuilder>>) = e

let toExprName (expr: Expr<_,_>) = 
    match expr.Body with
    | :? MethodCallExpression as methodCall -> methodCall.Method.Name
    | x -> x.ToString()

Expr — это выражение, которое содержит Action от мейлбокса (на случай если надо наплодить детей, остановить себя или детей и вообще), обрабатываемого сообщения и логгера (если надо делать с ним какие-то особые действия).

Wrap.Handler(Expr) — позволит нам писать в него обычные F# выражения вида «fun mb msg log -> ()», а на выходе получать Linq.Expressions.

toExprName — метод, который получает название метода, если выражение является вызовом метода (MethodCallExpression) или просто пытается привести наше выражение к строке.
Для выражения вида «fun mb msg log -> handleMsg msg» — toExprName вернёт «handleMsg».

Теперь напишем обёртку для создания функциональных акторов. Начало объявления выглядит так:

let loggerActor<'TMsg> (handler: Expr<'TMsg,_>) (mailbox: Actor<'TMsg>) =
    let exprName = handler |> toExprName
    let metrics  = mailbox.Context.GetMetricsProducer (ContextName exprName)
    let logger   = mailbox.Log.Value

На вход мы будем подавать только handler, т.к. mailbox потом докинет сама Akka (partial application).

С помощью написанного нами расширения к ActorSystem получим экземпляр IMetricsAdapter в значение metrics. Так же получим логгер Akka в значение logger.

Затем мы создадим для данного актора все необходимые метрики и тут же ими воспользуемся:

    let errorMeter      = metrics.CreateMeter   (MetricName "Error Rate",              Errors)
    let instanceCounter = metrics.CreateCounter (MetricName "Instances Counter",       Items)
    let messagesMeter   = metrics.CreateMeter   (MetricName "Message Processing Rate", Items)
    let operationsTimer = metrics.CreateTimer   (MetricName "Operation Durations",     Requests, MilliSeconds, MilliSeconds)

    instanceCounter.Increment()
    mailbox.Defer instanceCounter.Decrement

Как видите, мы увеличиваем значение instanceCounter и закладываем уменьшение этого счётчика на остановке актора.

Нам понадобятся ещё пара методов, которые будут заполнять известные нам параметры в логгер и дёргать нужные метрики.

В этом куске кода мы кидаем в логгер название операции, вызываем завершение её логирования, закидываем в метрику таймеров время операции, а в метрику мессаджей — тип сообщения:

    let completeOperation (msgType: Type) (logger: #ILogBuilder) =
        logger.Set (OperationName exprName)
        logger.OnOperationCompleted()

        match logger.TryGet(OperationDuration TimeSpan.Zero) with
        | Some(OperationDuration dur) -> 
            operationsTimer.Measure(Amount (int64 dur.TotalMilliseconds), Item (Some exprName))
        | _ -> ()

        messagesMeter.Mark(Item (Some msgType.Name))

В обработке исключений внутри актора нам поможет следующий метод:

    let registerExn (msgType: Type) e (logger: #ILogBuilder) = 
        errorMeter.Mark(Item (Some msgType.Name))
        logger.Fail e

Осталось немного чтобы всё заработало. Свяжем всё вместе через обёртку над обработчиком:

    let wrapHandler handler mb (logBuilder: unit -> #ILogBuilder) =
        let innherHandler mb msg  =
            let logger = logBuilder()
            let msgType = msg.GetType()
            logger.Set (MessageType msgType)
            try
                try
                    logger.OnOperationBegin()
                    handler mb msg logger
                with
                | e -> registerExn msgType e logger; reraise()
            finally
                completeOperation msgType logger
        innherHandler mb

wrapHandler обладает сложной сигнатурой. На языке C# это выглядело бы так:

Func<TMsg, TResult> wrapHandler<Tmsg, TResult, TLogBuilder, TMailbox>(
    Func<TMailbox, TMsg, TLogBuilder, TResult> handler, 
    TMailbox mb, 
    Action<TLogBuilder> logBuilder) 
where TLogBuilder: ILogBuilder

При этом на все остальные типы никаких ограничений нет.

По смыслу wrapHandler должен на выходе дать функцию, которая получает TMsg и выдаёт TResults. Порядок действий в этой функции будет следующий:

  • Начинаем логирование операции
  • Выполняем операцию
  • В случае возникновения необработанного в handler исключения, логируем его и пробрасываем выше (родителю данного актора)
  • Завершаем логирование

Для преобразования Expression в Action и подачи в каждое действие актора нового экземпляра логгера сделаем ещё одну вспомогательную функцию:

    let wrapExpr (expr: Expr<_,_>) mailbox logger = 
        let action = expr.Compile()
        wrapHandler 
            (fun mb msg log -> action.Invoke(mailbox, msg, log))
            mailbox
            (fun () -> new LogBuilder(logger))

В ней мы как раз получаем наш Expression, компилим его и подаём в wrapHandler выше, вместе с мейлбоксом и функцией на получение нового LogBuilder().

Сигнатура данного метода так же непростая. На C# это выглядело бы так:

Action<TMsg> wrapExpr<TMsg>(
    Expr<TMsg, LogBuilder> expr, 
    Actor<TMsg> mb, 
    ILoggingAdapterlogger)

Ограничений на TMsg всё ещё нет.

Осталось только создать рекурсивную функцию :)

    let rec loop() = 
        actor {
            let! msg = mailbox.Receive()
            wrapExpr handler mailbox akkaLogger msg
            return! loop()
        }
    loop()

Вот это выражение «wrapExpr handler mailbox akkaLogger», как видно из объяснения выше, возвращает Action, т.е. метод, в который можно подать любой тип на вход и получить unit (void в c#).

Дописав в конце выражения «msg» мы кидаем в эту функцию аргумент msg и выполняем наше действие над полученным сообщением.

На этом мы закончили с кодированием нашей задачи и перейдём к примерам!

Как всё это запустить?

Чтобы это всё работало необязательно писать много кода.
Можно вообще писать исключительно обработчики сообщений без знания о том, что нам нужны мейлбоксы, логгеры или обработка ошибок.

Простой случай может выглядеть так:

type ActorMessages =
    | Wait of int
    | Stop

let waitProcess = function
    | Wait d -> Async.Sleep d |> Async.RunSynchronously
    | Stop   -> ()

А чтобы завернуть эту функцию в loggerActor и получить все плюшки ради которых мы так старались можно написать так:

let spawnWaitWorker() =
    loggerActor <| Wrap.Handler(fun mb msg log -> waitProcess msg)

let waitWorker = spawn system "worker-wait"  <| spawnWaitWorker()
waitWorker <! Wait 1000 //Будет залогировано действие длительностью ~1000мс
waitWorker <! Wait 500

Если у вас сложная логика и нужен доступ к мейлбоксу и логеру:

let failOrStopProcess (mailbox: Actor<_>) msg (log: ILogBuilder) =
    try
        match msg with
        | Wait d -> failwith "can't wait!"
        | Stop   -> mailbox.Context.Stop mailbox.Self
    with
        | e -> log.Fail e

let spawnFailOrStopWorker() =
    loggerActor <| Wrap.Handler(fun mb msg log -> failOrStopProcess mb msg log)

let failOrStopWorker = spawn system "worker-vocal"  <| spawnFailOrStopWorker()
failOrStopWorker <! Wait 1000 //Будет залогирована ошибка "can't wait!"
failOrStopWorker <! Wait 500 //Будет залогирована ошибка "can't wait!"
failOrStopWorker <! Stop
failOrStopWorker <! Wait 500 //Данное сообщение уже уйдёт в DeadLetters

EntryPoint самой программы, создание ActorSystem, поднятие метрик и акторов можно посмотреть под спойлером, там ничего примечательного нет.

Program.fs

open Akka.FSharp
open SimpleInjector
open App.Metrics;
open Microsoft.Extensions.DependencyInjection
open SimpleInjector.Integration.WebApi
open System.Reflection
open System
open Metrics.MetricActors
open ExampleActors

let createSystem = 
    let configStr = System.IO.File.ReadAllText("system.json")
    System.create "system-for-metrics" (Configuration.parse(configStr))

let createMetricActors system container = 
    let dependencyResolver = new SimpleInjectorWebApiDependencyResolver(container)
    let apiConfig = 
        { new IMetricApiConfig with
            member x.Host = "localhost"
            member x.Port = 10001 }
    
    let metricsReaderSpawner = createReader apiConfig dependencyResolver
    let metricsReader = spawn system "metrics-reader" metricsReaderSpawner

    let metricsRecorderSpawner = createRecorder (container.GetInstance<IMetrics>())
    let metricsRecorder = spawn system "metrics-recorder" metricsRecorderSpawner
    ()

type Container with
    member x.AddMetrics() = 
        let serviceCollection  = new ServiceCollection()
        let entryAssemblyName  = Assembly.GetEntryAssembly().GetName()
        let metricsHostBuilder = serviceCollection.AddMetrics(entryAssemblyName)

        serviceCollection.AddLogging() |> ignore
        let provider = serviceCollection.BuildServiceProvider()

        x.Register(fun () -> provider.GetRequiredService<IMetrics>())

[<EntryPoint>]
let main argv = 
    let container = new Container()
    let system = createSystem

    container.RegisterSingleton system
    container.AddMetrics()
    container.Verify()

    createMetricActors system container

    let waitWorker1      = spawn system "worker-wait1"  <| spawnWaitWorker()
    let waitWorker2      = spawn system "worker-wait2"  <| spawnWaitWorker()
    let waitWorker3      = spawn system "worker-wait3"  <| spawnWaitWorker()
    let waitWorker4      = spawn system "worker-wait4"  <| spawnWaitWorker()

    let failWorker       = spawn system "worker-fail"   <| spawnFailWorker()
    let waitOrStopWorker = spawn system "worker-silent" <| spawnWaitOrStopWorker()
    let failOrStopWorker = spawn system "worker-vocal"  <| spawnFailOrStopWorker()

    waitWorker1 <! Wait 1000
    waitWorker2 <! Wait 500
    waitWorker3 <! Wait 5000
    waitWorker4 <! Wait 8000

    failWorker  <! Wait 5000

    waitOrStopWorker <! Wait 1000
    waitOrStopWorker <! Wait 500
    waitOrStopWorker <! Stop
    waitOrStopWorker <! Wait 500

    failOrStopWorker <! Wait 1000
    failOrStopWorker <! Wait 500
    failOrStopWorker <! Stop
    failOrStopWorker <! Wait 500

    Console.ReadKey() |> ignore

    0

Самое главное — метрики!

Если во время работы зайти по ссылке localhost:10001/metrics, увидим достаточно большой json, в котором будет много информации. Приведу кусок для функции waitProcess:

Скрытый текст

{
      "Context": "waitProcess",
      "Counters": [
        {
          "Name": "Instances Counter",
          "Unit": "items",
          "Count": 4
        }
      ],
      "Meters": [
        {
          "Name": "Message Processing Rate",
          "Unit": "items",
          "Count": 4,
          "FifteenMinuteRate": 35.668327519112893,
          "FiveMinuteRate": 35.01484385742755,
          "Items": [
            {
              "Count": 4,
              "FifteenMinuteRate": 0.0,
              "FiveMinuteRate": 0.0,
              "Item": "Wait",
              "MeanRate": 13.082620551464204,
              "OneMinuteRate": 0.0,
              "Percent": 100.0
            }
          ],
          "MeanRate": 13.082613248856632,
          "OneMinuteRate": 31.356094372926623,
          "RateUnit": "min"
        }
      ],
      "Timers": [
        {
          "Name": "Operation Durations",
          "Unit": "req",
          "ActiveSessions": 0,
          "Count": 4,
          "DurationUnit": "ms",
          "Histogram": {
            "LastUserValue": "waitProcess",
            "LastValue": 8001.0,
            "Max": 8001.0,
            "MaxUserValue": "waitProcess",
            "Mean": 3927.1639786164278,
            "Median": 5021.0,
            "Min": 1078.0,
            "MinUserValue": "waitProcess",
            "Percentile75": 8001.0,
            "Percentile95": 8001.0,
            "Percentile98": 8001.0,
            "Percentile99": 8001.0,
            "Percentile999": 8001.0,
            "SampleSize": 4,
            "StdDev": 2932.0567172627871,
            "Sum": 15190.0
          },
          "Rate": {
            "FifteenMinuteRate": 0.00059447212531854826,
            "FiveMinuteRate": 0.00058358073095712587,
            "MeanRate": 0.00021824579927905906,
            "OneMinuteRate": 0.00052260157288211038
          }
        }
      ]
    }

Из него можно узнать, что:

  • У нас сейчас активно 4 инстанса workProcess
  • Они обработали 4 сообщения типа Wait
  • Иедианное время обработки сообщений 5021 мс

В консоли будет примерно следующее.

Заключение

В данной статье много кода и, скорее всего, мало пояснений (отвечу в коментах, если что непонятно), но это потому что статья призвана показать решение нескольких рутинных задач из реального проекта.

Возможно кому-то пригодится, тем более этот код был изначально написан для акторов на C#, так что при желании можно всё это перенести (дам хинт, можно сделать свою версию Receive() с теми же экспрешнами внутри).

Рекомендую изучить F# тем кто занимается моделированием сложных доменных моделей, т.к. его система типов намного богаче, отсутствие null и проектирование в типах позволяет сделать модель устойчивой к ошибкам программиста.

Репозиторий с примером лежит тут.

Спасибо за внимание!

Автор: Szer

Источник

Поделиться

* - обязательные к заполнению поля