UDP и C# async-await

в 9:33, , рубрики: .net, udp, говнокод

Недавно возникла необходимость решить следующую несложную задачку: есть несколько десятков устройств (учебных комплексов), у которых нужно регулярно запрашивать их текущее состояние. Комплексы общаются по протоколу UDP, и хотелось сделать так, чтобы не задумываться о цикле опроса и определении, от какого же устройства пришел ответ, а просто посылать запрос — и когда пришел результат — записывать его. Задачу эту я решал и раньше, но захотелось посмотреть, насколько концепция async/await упростит и сократит код. Оказалось, что финальный результат занимает меньше странички.

Вся логика опроса состоит всего лишь из двух методов — цикла чтения сокета UDP и метода посылки команды на устройство.

Когда посылаем команду, есть две вещи, которые надо принять во внимание — это 1) после посылки команды нам надо ждать ответа от устройства и 2) ответ может не прийти — тогда необходимо вернуть исключение, которое скажет нам о таймауте.

Асинхронный метод посылки команды выглядит следующим образом (*см. Update 1):

        public async Task<byte[]> SendReceiveUdpAsync(byte[] msg, string ip, int port, int timeOut)
        {
            var tokenSource = new CancellationTokenSource(timeOut);
            var token = tokenSource.Token;
 
            var tcs = new TaskCompletionSource<byte[]>();
            var key = new Tuple<string, int>(ip, port);
            _tcsDictionary.Add(key, tcs);

            await _client.SendAsync(msg, msg.Length, ip, port);
            var result = await tcs.Task.WithCancellation(token);

            if(_tcsDictionary.ContainsKey(key)) _tcsDictionary.Remove(key);
            return result;
        }

Здесь _client — это стандартный UdpClient.
Мы посылаем команду и по await ждем результата, который нам должен вернуть Task, сохраненный в словарике с ключом нашего соединения (именно от него мы и ждем ответ). Когда чтение начинается — мы заносим TaskCompletionSource в словарик, когда мы получаем ответ и соединение больше не нужно — удаляем из словарика.

Сам словарик (ConcurrentDictionary используем вместо Dictionary для того, чтобы избежать проблем с кросспоточными вызовами):

private ConcurrentDictionary<Tuple<string,int>, TaskCompletionSource<byte[]>> _tcsDictionary;

Тут есть момент, который заслуживает внимания — это метод-расширение WithCancellation(token). Он нужен для того, чтобы поддержать отмену операции при помощи CancellationToken, и отменяет задачу, возвращая исключение при превышении заданного таймаута.

    static class TaskExtension
    {
        public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
        {
            var tcs = new TaskCompletionSource<bool>();

            using (cancellationToken.Register(
                        s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
                if (task != await Task.WhenAny(task, tcs.Task))
                    throw new OperationCanceledException(cancellationToken);
            return await task;
        }
    }

А вот и сам цикл чтения: читаем, пока хватит сил, и если пришедшая датаграмма имеет адресом соединение, ключ с параметрами которого мы уже занесли в словарик, то результат помещается в TaskCompletionSource по этому ключу, и мы переходим обратно в метод посылки сообщения на await tcs.Task, только уже имея на руках нужный результат от устройства, этот результат и вернем в место вызова.

            Task.Run(() =>
            {
                IPEndPoint ipEndPoint = null;
 
                while (true)
                {
                    var receivedBytes = _client.Receive(ref ipEndPoint);
                    var ip = ipEndPoint.Address.ToString();
                    var port = ipEndPoint.Port;
                    var key = new Tuple<string, int>(ip, port);
                    TaskCompletionSource<byte[]> tcs;

                    if(_tcsDictionary.TryGetValue(key,out tcs)) tcs.SetResult(receivedBytes);
                }
            });

Итог радует. Вот так async-await упростил задачу опроса множества устройств по протоколу UDP.

Update 1
Как было справедливо отмечено в комментариях, метод SendReceiveUdpAsync необходимо переписать таким образом, чтобы в случае отмены задачи и выброса исключения удалялось значение из словарика:

public async Task<byte[]> SendReceiveUdpAsync(byte[] msg, string ip, int port, int timeOut)
{
    try{
        ...

        return result;
    }
    finally{
        if(_tcsDictionary.ContainsKey(key)) _tcs.Dictionary.Remove(key);
    }
}

Автор: NeoNN

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js