Пишем свой синхронный/асинхронный клиент-сервер

в 8:58, , рубрики: .net, c#.net, метки:

Всем привет.

В этой статье рассмотрим принцип многопоточного TCP сервера приложений в котором реализуем синхронные и асинхронные вызовы, а также разграничение доступа к процедурам и сжатие данных.

С чего все начиналось.
Все началось с непростого выбора с чего начать реализацию взаимодействия между собой нескольких разных приложений работающих через сеть интернет. Казалось-бы WCF способен решить такую задачу, но, к сожалению, он не лишен минусов и некоторых проблем, а его принцип работы сильно сказывается на скорости передачи данных. Нужно было простое решение и в то же время достаточно функциональное.

Начнем наш проект с библиотеки классов, в которой создадим интерфейсы процедур и функций для приложений различных типов.

public interface ICommon
{
    string[] GetAvailableUsers();
    void ChangePrivileges(string Login, string password);
}

public interface IDog
{
    bool TryFindObject(out object obj);
    int Bark(int nTimes);
}

public interface ICat
{
    void CutTheText(ref string Text);
}

Здесь мы описали сигнатуру процедур для трех разных уровней доступа. Как можно догадаться, Common будет содержать процедуры доступные для всех типов удаленных клиентов. Dog и Cat здесь, это наши два типа удаленных клиентов, процедуры каждого из них будут доступны только им самим.

Здесь же создадим класс, с помощью которого мы будем обмениваться данными между сервером и клиентами.

[Serializable]
public class Message
{
    public Message(string Command, object[] Parameters)
    {
        this.Command = Command;
        if (Parameters != null) this.prms = Parameters;
    }

    public bool IsSync;
    public bool IsEmpty = true;
    public readonly string Command;
    public object ReturnValue;
    public object[] prms;
    public Exception Exception;
}

Клиент

Клиент будет реализовать проксирующую связь между методами интерфейса и сервером. Для этого создадим класс реализующий прокси:

private class Proxy<T> : RealProxy where T : class
{
    UniservClient client;

    public Proxy(UniservClient client): base(typeof(T))
    {
        this.client = client;
    }

    public override IMessage Invoke(IMessage msg)
    {
        IMethodCallMessage call = (IMethodCallMessage)msg;
        object[] parameters = call.Args;
        int OutArgsCount = call.MethodBase.GetParameters().Where(x => x.IsOut).Count();

        Message result = client.Execute(call.MethodName, parameters);
        parameters = parameters.Select((x, index) => result.prms[index] ?? x).ToArray();
        return new ReturnMessage(result.ReturnValue, parameters, OutArgsCount, call.LogicalCallContext, call);
    }
}

И создадим свойства для доступа к методам интерфейсов:

public ICommon Common { get; private set; }
public IDog Dog { get; private set; }
public ICat Cat { get; private set; }

Инициализируем прокси и свойства:

CommonProxy = new Proxy<ICommon>(this);
DogProxy = new Proxy<IDog>(this);
CatProxy = new Proxy<ICat>(this);

Common = (ICommon)CommonProxy.GetTransparentProxy();
Dog = (IDog)DogProxy.GetTransparentProxy();
Cat = (ICat)CatProxy.GetTransparentProxy();

Обработка команд сервера:

private void Listener()
{
    while (true)
    {
        try
        {
            if (ListenerToken.IsCancellationRequested) return;

            if (!IsConnected) _Connect();

            while (true)
            {
                if (ListenerToken.IsCancellationRequested) return;

                Message msg = ReceiveData<Message>();
                if (msg.Command == "OnPing")
                {
                    // отражаем пинг
                    SendData(msg);
                    if (Events.OnPing != null) Events.OnPing.BeginInvoke(null, null);
                    continue;
                }

                if (msg.IsSync)
                {  // получен результат синхронной процедуры
                    SyncResult(msg);
                }
                else
                {
                    // асинхронный вызов события
                    try
                    {
                        // ищем соответствующий Action
                        var pi = typeof(IEvents).GetProperty(msg.Command, BindingFlags.Instance | BindingFlags.Public);
                        if (pi == null) throw new Exception(string.Concat("Свойство "", msg.Command, "" не найдено"));
                        var delegateRef = pi.GetValue(this, null) as Delegate;

                        // инициализируем событие
                        if (delegateRef != null) ThreadPool.QueueUserWorkItem(state => delegateRef.DynamicInvoke(msg.prms));
                    }
                    catch (Exception ex)
                    {
                        throw new Exception(string.Concat("Не удалось выполнить делегат "", msg.Command, """), ex);
                    }
                }
            }
        }
        catch (TaskCanceledException)
        {
            return;
        }
        catch (Exception ex)
        {
            if (Events.OnError != null) Events.OnError.BeginInvoke(ex, null, null);
        }
        finally
        {
            _Dicsonnect();
        }

        Thread.Sleep(2000);
    }
}

За выполнение удаленных процедуры отвечают методы:

private Message Execute(string MethodName, object[] parameters)
{
    lock (syncLock)
    {
        _syncResult = new Message(MethodName, parameters);
        _syncResult.IsSync = true;

        _OnResponce.Reset(); 
        SendData(_syncResult);
        _OnResponce.Wait();  // ожидаем ответ сервера

        if (_syncResult.IsEmpty)
        {// произошел дисконект, результат не получен
            throw new Exception(string.Concat("Ошибка при получении результата на команду "", MethodName, """));
        }

        if (_syncResult.Exception != null) throw _syncResult.Exception;  // исключение переданное сервером
        return _syncResult;
    }
}

private void SyncResult(Message msg)
{  // получен результат выполнения процедуры

    _syncResult = msg;
    _syncResult.IsEmpty = false;

    _OnResponce.Set();  // разблокируем поток
}

В общих чертах клиент работает следующим образом:
Когда приложение вызывает метод интерфейса, выполнение переходит к прокси-серверу. Прокси отправляет серверу имя вызываемого метода вместе с его параметрами и блокирует поток в ожидании результата выполнения. Разбор ответов сервера происходит в другом потоке, который разрешает продолжение заблокированного потока.

Сервер

Создадим иерархию классов, которые будут определять доступность тех или иных функций:

public class Cat_Ring0 : Ring2, ICat
{
    public Cat_Ring0(User u) : base(u)
    {
        up.UserType = UserType.Cat;
    }

    public void CutTheText(ref string Text)
    {
        Text = Text.Remove(Text.Length - 1);
    }
}

public class Dog_Ring0 : Dog_Ring1, IDog
{
    public Dog_Ring0(User u) : base(u)
    {
        up.UserType = UserType.Dog;
    }

    public int Bark(int nTimes)
    {
        var ConnectedDogs = ConnectedUsers.ToArray().Where(x => x.UserType == UserType.Dog).Select(x => x.nStream);
        ConnectedDogs.AsParallel().ForAll(nStream =>
        {
            SendMessage(nStream, new Message("OnBark", new object[] { nTimes}));
        });

        return ConnectedDogs.Count();
    }
}

public class Dog_Ring1 : Ring2
{
    public Dog_Ring1(User u): base(u)
    {
        up.UserType = UserType.Dog;
    }

    public bool TryFindObject(out object obj)
    {
        obj = "TheBall";
        return true;
    }
}

public class Ring2 : Ring, ICommon
{
    public Ring2(User u) : base(u) { }

    public string[] GetAvailableUsers()
    {
        return new string[] { "Dog0", "Dog1", "Tom" };
    }

    public void ChangePrivileges(string Animal, string password)
    {
        switch (Animal)
        {
            case "Dog0":
                if (password != "groovy!") throw new Exception("Не верный пароль");
                up.ClassInstance = new Dog_Ring0(up);
                break;
            case "Dog1":
                if (password != "_password") throw new Exception("Не верный пароль");
                up.ClassInstance = new Dog_Ring1(up);
                break;
            case "Tom":
                if (password != "TheCat") throw new Exception("Не верный пароль");
                up.ClassInstance = new Cat_Ring0(up);
                break;
            default:
                throw new Exception("Такого пользователя не существует");
        }
    }
}

public abstract class Ring
{
    public readonly User up;

    public Ring(User up)
    {
        this.up = up;
    }
}

Теперь достаточно поместить процедуру в определенное “кольцо” что бы соответствующий клиент имел к ней доступ. Ring0 это верхний уровень доступа, пользователь этого типа имеет доступ не только к находящимся в нем процедурам, но и процедурам во всех наследуемых классах. Изначально пользователь попадает в Ring2, который реализует только общие методы доступные всем. Далее с помощью ChangePrivileges() пользователь может, пройдя авторизацию, попасть в определенный тип «кольца» с определенным уровнем доступа.

Основная работа сервера сводится к следующему методу:

private void ProcessMessage(Message msg, User u)
{
    string MethodName = msg.Command;
    if (MethodName == "OnPing") return;

    // ищем запрошенный метод в кольце текущего уровня
    MethodInfo method = u.RingType.GetMethod(MethodName, BindingFlags.Instance | BindingFlags.Public | BindingFlags.FlattenHierarchy);

    try
    {
        if (method == null)
        {
            throw new Exception(string.Concat("Метод "", MethodName, "" недоступен"));
        }

        try
        {
            // выполняем метод интерфейса
            msg.ReturnValue = method.Invoke(u.ClassInstance, msg.prms);
        }
        catch (Exception ex)
        {
            throw ex.InnerException;
        }

        // возвращаем ref и out параметры
        msg.prms = method.GetParameters().Select(x => x.ParameterType.IsByRef ? msg.prms[x.Position] : null).ToArray();
    }
    catch (Exception ex)
    {
        msg.Exception = ex;
    }
    finally
    {
        // возвращаем результат выполнения запроса
        SendMessage(u.nStream, msg);
    }
}

Свойство ClassInstance содержит экземпляр “кольца” в котором будет выполняться поиск процедуры по ее имени.

Пример лога выполнения:
image

В результате получилось простое и элегантное решение аналогичное WCF.
Исходник можно взять тут

Автор: vitaliy91

Источник

Поделиться

* - обязательные к заполнению поля