Пишем собственный шлюз для Thrift API

в 6:06, , рубрики: apache thrift, api, java, Microservices, open source, spring boot, spring cloud

Микросервисы, как ни крути, — наше всё. Можно сопротивляться SOAP 2.0 сколь угодно долго, но рано или поздно или они придут за тобой и обратят в свою веру, или ты придёшь к ним сам и попросишь крестить себя огнём и мечом. Как и у любого архитектурного решения, у микросервисов есть свои минусы. Одним из них является необходимость в каждый микросервис включать какую-то логику по авторизации запросов от внешних систем или других микросервисов. Эта логика может быть напрямую «зашита» внутри микросервиса (и не важно, что это отдельная библиотека), делегирована другому микросервису, а может быть объявлена декларативно. Что значит декларативно? Например, можно договориться, что в каждый микросервис приходит особый HTTP-заголовок, или какая-то структура данных, в которой есть информация о пользователе, делающем запрос. И данным в этой структуре необходимо однозначно доверять. У всех трёх вариантов есть свои недостатки, но в рамках статьи мы разберём последний. Для его реализации обычно используется шаблон проектирования API Gateway:
image

Под катом все трудности реализации шаблона в условиях бинарного протокола передачи данных.

В общем случае API Gateway ограничивает количество запросов к внутренним сервисам, авторизует запросы клиентов, производит логирование и аудит, распределяет запросы между клиентами и преобразовывает данные, если это нужно. В качестве примера может быть использован обычный nginx. Рассмотрим функцию авторизации запросов пользователей. Если используется HTTP-протокол, то общепринятой практикой считается добавление некоего токена (не важно как мы его получили) в заголовок Authorization:

Authorization: Bearer <some token>  

На стороне API Gateway этот заголовок каким-то образом проверяется и обменивается на другой заголовок, содержащий некое знание о пользователе, которому токен был выписан, например его идентификатор, и уже его можно пробросить внутренним сервисам:

Customer: <id>  

Всё кажется простым и понятным, но беда в том, что Apache Thrift состоит из нескольких частей:

+-------------------------------------------+
| Server                                    |
| (single-threaded, event-driven etc)       |
+-------------------------------------------+
| Processor                                 |
| (compiler generated)                      |
+-------------------------------------------+
| Protocol                                  |
| (JSON, compact, binary etc)               |
+-------------------------------------------+
| Transport                                 |
| (raw TCP, HTTP etc)                       |
+-------------------------------------------+

В общем случае мы не можем завязаться на протокол или транспорт. Можно конечно выбрать что-то одно, всем договориться, что мы используем только HTTP, но это ограничивает возможности по замене транспорта и заставляет делать некие внешние обработчики/фильтры уже внутри самих Thrift-сервисов (ведь для них то http-заголовки не являются нативными).

Остаётся использовать возможности самого протокола, чтобы в процессе прохождения запроса через шлюз API внешний авторизационный токен подменялся на внутренний.

Convention over configuration

Итак, пусть у нас есть следующий внутренний сервис:

service InternalTestService {  
    SomeReturnData getSomeData(
        1: UserData userData,
        2: RequestData requestData
    ) throws (1: SomeException e);
}

UserData — это некие сведения о пользователе, от лица которого вызывается сервис, чтобы последний мог понять, а чьи данные тянуть. Понятно, что такой сервис выставлять наружу нельзя. А какой можно? Например такой:

service ExternalTestService {  
    SomeReturnData getSomeData(
        1: AuthToken authData,
        2: RequestData requestData
    ) throws (1: SomeException e, 99: UnauthorizedException ue);
}

Вся разница между сервисами — это их первый аргумент и исключение, которое инициируется в случае проблем с авторизацией запроса (я надеюсь, что 98 собственных исключений хватит всем). Наша задача на уровне шлюза проверить авторизационный токен и заменить его на информацию о пользователе.

Кишочки

К сожалению документации по Thrift-у кот наплакал. Почти все гайды, включая, пожалуй, лучший из них, не касаются вопросов внутреннего устройства тех или иных протоколов. И это понятно. В 99% случаев, лезть внутрь протокола разработчику не придётся, но нам то нужно.

Есть три наиболее популярных протокола:

  • Binary — просто бинарный протокол данных (строки, например, передаются как есть в UTF-8)
  • Compact — тот же бинарный только компактный
  • JSON — очень своеобразный JSON

Каждый из представленных протоколов имеет свою реализацию, скрытую за одним и тем же API. Если рассмотреть бинарный протокол, то для нашего сервиса он будет с точки зрения API выглядеть так:
image

TMessage — метаинформация о сообщении. Состоит из имени метода, типа и порядкого номера метода в сервисе. Тип сообщения может быть следующим:

  • CALL = 1 — входящее сообщение
  • REPLY = 2 — ответ
  • EXCEPTION = 3 — в процессе выполнения произошла ошибка
  • ONEWAY = 4 — сообщение не требует ответа

Всё что не TMessage — полезная информация, которая обёрнута в структуру входящего сообщения.
Все представленные протоколы читают пришедший байтовый массив данных последовательно и хранят его текущий индекс, чтобы продолжить чтение с нужного места.

Поэтому наш алгоритм должен быть следующим:

  1. Прочитать TMessage
  2. Прочитать начало общей структуры сообщения
  3. Прочитать метаинформацию о первом поле в сообщении
  4. Запомнить текущую позицию в байтовом массив
  5. Прочитать информацию о токене
  6. Запомнить текущую позицию в байтовом массиве
  7. Обменять токен на данные о пользователе
  8. Сериализовать данные о пользователе
  9. Сформировать новый бинарный массив из трёх частей:
    • От начала исходного сообщения до индекса из пункта 4
    • Байтовый массив структуры данных о пользователе
    • От индекса из пункта 6 до конца оригинального сообщения

Пишем тест

Без тестирования в разведку не ходим, тем более что в случае с бинарным протоколом это самый простой способ проверить работоспособность вашего кода. Для теста нам понадобится следующие thrift-сервисы:

Заголовок спойлера
namespace java ru.aatarasoff.thrift

exception SomeException {
    1: string code
}

exception UnauthorizedException {
    1: string reason
}

service ExternalTestService {
    SomeReturnData getSomeData(
        1: AuthToken authData,
        2: RequestData requestData
    ) throws (1: SomeException e, 99: UnauthorizedException ue);
}

service InternalTestService {
    SomeReturnData getSomeData(
        1: UserData userData,
        2: RequestData requestData
    ) throws (1: SomeException e);
}

struct SomeReturnData {
    1: string someStringField,
    2: i32 someIntField
}

struct RequestData {
    1: string someStringField,
    2: i32 someIntField
}

struct AuthToken {
    1: string token,
    2: i32 checksum
}

struct UserData {
    1: string id
}

Создадим и заполним внешний сервис тестовыми данными:

TMemoryBuffer externalServiceBuffer = new TMemoryBufferWithLength(1024);

ExternalTestService.Client externalServiceClient  
= new ExternalTestService.Client(protocolFactory.getProtocol(externalServiceBuffer));

externalServiceClient.send_getSomeData(  
    new AuthToken().setToken("sometoken").setChecksum(128),
    new RequestData().setSomeStringField("somevalue").setSomeIntField(8)
);

TMemoryBufferWithLength — специально созданный класс, который который устраняет фатальный для нас недостаток транспорта TMemoryBuffer. Последний не умеет отдавать настоящую длину сообщения. Вместо этого можно получить длину всего буфера, которая обычно больше длины сообщения поскольку часть байтового массива резервируется под будущие данные.

Метод send_getSomeData сериализует сообщение в наш буфер.

Аналогичные действия сделаем и со внутренним сервисом:

internalServiceClient.send_getSomeData(  
  new UserData().setId("user1"),
  new RequestData().setSomeStringField("somevalue").setSomeIntField(8)
);

Получим байтовый массив нашего сообщения:

byte[] externalServiceMessage = Arrays.copyOf(  
    externalServiceBuffer.getArray(),
    externalServiceBuffer.length()
);

Введём класс, который будет транслировать наше сообщение из представления для внешнего сервиса в представление для внутреннего: MessageTransalator.

public MessageTransalator(TProtocolFactory protocolFactory, AuthTokenExchanger authTokenExchanger) {  
        this.protocolFactory = protocolFactory;
        this.authTokenExchanger = authTokenExchanger;
    }

public byte[] process(byte[] thriftBody) throws TException {  
    //some actions
}

Реализация обмена токена (AuthTokenExchanger) может быть разной в разных проектах, поэтому сделаем отдельный интерфейс:

public interface AuthTokenExchanger<T extends TBase, U extends  TBase> {  
    T createEmptyAuthToken();
    U process(T authToken) throws TException;
}

createEmptyAuthToken должен вернуть некий объект, который представляет пустой токен, заполненный MessageTransalator-ом. В методе process нужно реализовать обмен авторизационного токена на данные о пользователе. Для нашего теста используем простую реализация:

@Override
public AuthToken createEmptyAuthToken() {  
    return new AuthToken();
}

@Override
public UserData process(AuthToken authToken) {  
    if ("sometoken".equals(authToken.getToken())) {
        return new UserData().setId("user1");
    }
    throw new RuntimeException("token is invalid");
}

Пишем проверку:

assert.assertTrue(  
    "Translated external message must be the same as internal message",
    Arrays.equals(
      new MessageTransalator(
          protocolFactory, 
          new AuthTokenExchanger<AuthToken, UserData>() {}
      ).process(externalServiceMessage),
      internalServiceMessage
    )
)

Запускаем тесты, и ничего не работает. И это хорошо!

Зеленый свет

Реализуем метод process согласно алгоритму:

TProtocol protocol = createProtocol(thriftBody);

int startPosition = findStartPosition(protocol);

TBase userData = authTokenExchanger.process(  
    extractAuthToken(protocol, authTokenExchanger.createEmptyAuthToken())
);

int endPosition = findEndPosition(protocol);

return  ArrayUtils.addAll(  
        ArrayUtils.addAll(
            getSkippedPart(protocol, startPosition),
            serializeUserData(protocolFactory, userData)
        ),
        getAfterTokenPart(protocol, endPosition, thriftBody.length)
);

В качестве протокола используем TMemoryInputTransport, который позволяет читать сообщение напрямую из переданного в него байтового массива.

private TProtocol createProtocol(byte[] thriftBody) {  
    return protocolFactory.getProtocol(new TMemoryInputTransport(thriftBody));
}

Реализуем нахождение границ токена в байтовом массиве:

private int findStartPosition(TProtocol protocol) throws TException {  
    skipMessageInfo(protocol); //пропускаем TMessage
    skipToFirstFieldData(protocol); //ищем начало данных в первом поле
    return protocol.getTransport().getBufferPosition();
}

private int findEndPosition(TProtocol protocol) throws TException {  
    return protocol.getTransport().getBufferPosition();
}

private void skipToFirstFieldData(TProtocol protocol) throws TException {  
    protocol.readStructBegin();
    protocol.readFieldBegin();
}

private void skipMessageInfo(TProtocol protocol) throws TException {  
    protocol.readMessageBegin();
}

Сериализуем пользовательские данные:

TMemoryBufferWithLength memoryBuffer = new TMemoryBufferWithLength(1024);  
TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);

userData.write(protocol);

return Arrays.copyOf(memoryBuffer.getArray(), memoryBuffer.length());  

Запускаем тесты, и…

Включаем Шерлока

Итак, тесты для Binary и Compact проходят, но JSON сопротивляется. Что же не так? Уходим в дебаг и смотрим, какие же массивы мы сравниваем:

//JSON обычного человека
[1,"getSomeData",1,1,{"1":{"rec":{"1":{"str":"user1"}}},"2":{"rec":{"1":{"str":"somevalue"},"2":{"i32":8}}}}]

//JSON курильщика  
[1,"getSomeData",1,1,{"1":{"rec"{"1":{"str":"user1"}}},"2":{"rec":{"1":{"str":"somevalue"},"2":{"i32":8}}}}]

Не заметили разницы? А она есть. После первого «rec» не хватает двоеточия. API используем один и тот же, а результат разный. Разгадка пришла только после внимательного чтения кода класса TJSONProtocol. Протокол содержит контекст, который хранит различные разделители в стеке, когда обходит JSON-структуру для чтения или записи.

TJSONProtocol.JSONBaseContext context_ = new TJSONProtocol.JSONBaseContext();  

При чтении структуры, считывается и символ ":", а вот обратно он не возвращается, потому что контекста в самом объекте нет.

Вставляем костыль в метод seriaizeUserData:

if (protocol instanceof TJSONProtocol) {  
    memoryBuffer.write(COLON, 0, 1); //добавляем ":"
}

Запускаем тесты, и теперь то всё ок.

Выброс исключений

Мы уже близко к финишной черте. Ок, вспомним, что мы должны выкинуть исключение в случае, если авторизация запроса прошла неуспешно:

service ExternalTestService {  
    SomeReturnData getSomeData(
        1: AuthToken authData,
        2: RequestData requestData
    ) throws (1: SomeException e, 99: UnauthorizedException ue);
}

Сделаем обработку исключений в отдельном методе processError.

public byte[] processError(TException exception) throws Exception  

В Thrift-е есть несколько типов исключений, которые могут возникнуть в результате вызова сервиса:

  1. TApplicationException — исключение уровня приложения
  2. TProtocolException — исключение, связанное с протоколом
  3. TTransportException — исключение, связанное с передачей сообщения
  4. TException — базовое исключение, от которого наследуются все остальные типы
  5. YourException extends TException — любое исключение, которое было объявлено в DSL

Интересная деталь. Передать в ответном сообщении клиенту можно либо TApplicationException, либо пользовательское кастомное, в нашем случае это UnauthorizedException. Поэтому мы должны обернуть любые ошибки либо в TApplicationException, либо в UnauthorizedException.

public byte[] processError(TException exception) throws Exception {
    TMemoryBufferWithLength memoryBuffer = new TMemoryBufferWithLength(1024);

    TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);

    try {
        throw exception;
    } catch (TApplicationException e) {
        writeTApplicationException(e, protocol);
    } catch (TProtocolException e) {
        writeTApplicationException(createApplicationException(e), protocol);
    } catch (TTransportException e) {
        writeTApplicationException(createApplicationException(e), protocol);
    } catch (TException e) {
        if (TException.class.equals(e.getClass())) {
            writeTApplicationException(createApplicationException(e), protocol);
        } else {
            writeUserDefinedException(exception, protocol);
        }
    }

    return Arrays.copyOf(memoryBuffer.getArray(), memoryBuffer.length());
}

Реализация записи TApplicationException в ответный пакет данных довольно проста:

private void writeTApplicationException(TApplicationException exception, TProtocol protocol) throws TException {
    protocol.writeMessageBegin(new TMessage(this.methodName, TMessageType.EXCEPTION, this.seqid));
    exception.write(protocol);
    protocol.writeMessageEnd();
}

private TApplicationException createApplicationException(TException e) {
    return new TApplicationException(TApplicationException.INTERNAL_ERROR, e.getMessage());
}

Согласно протоколу, каждое сообщение имеет свой идентификатор последовательности и имя вызываемого метода, которые необходимо вернуть обратно клиенту. Для этого нужно добавить новые поля: seqid и methodName в наш класс MessageTranslator, которые заполняются при чтении начала сообщения. Из-за этого наш класс перестаёт быть потокобезопасным.

Для записи произвольного исключения требуется больше телодвижений:

private static final String ERROR_STRUCT_NAME = "result";
private static final String ERROR_FIELD_NAME = "exception";
private static final short ERROR_FIELD_POSITION = (short) 99;
private static final String WRITE_METHOD_NAME = "write";

private void writeUserDefinedException(TException exception, TProtocol protocol) throws TException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
    TStruct errorStruct = new TStruct(ERROR_STRUCT_NAME);
    TField errorField = new TField(ERROR_FIELD_NAME, TType.STRUCT, ERROR_FIELD_POSITION);

    protocol.writeMessageBegin(new TMessage(this.methodName, TMessageType.REPLY, this.seqid));
    protocol.writeStructBegin(errorStruct);
    protocol.writeFieldBegin(errorField);

    exception.getClass().getMethod(WRITE_METHOD_NAME, TProtocol.class).invoke(exception, protocol);

    protocol.writeFieldEnd();
    protocol.writeFieldStop();
    protocol.writeStructEnd();
    protocol.writeMessageEnd();
}

Здесь интересно то, что для пользовательского исключения тип обратного сообщения не TMessageType.EXCEPTION, а TMessageType.REPLY.

Теперь мы умеем брать входящее сообщение, подменять в нём токен и корректно отдавать клиенту ответ, если во время проверки токена произошла ошибка.

В бар врывается Spring

Ок, мы сделали препарацию бинарных пакетов. Теперь самое время сделать практическую реализацию на популярном фреймворке для создания микросервисов. Например, на Spring Boot. Он хорош тем, что, с одной стороны, для него можно найти уже готовые решения, а с другой — его просто и удобно кастомизировать аннотациями добавляя новые возможности двумя-тремя строчками кода. Для роутинга и обработки HTTP-запросов возьмём Netflix Zuul, который входит в набор расширений Spring Cloud. Схема работы Zuul-а представлена на следующем изображении:

Пишем собственный шлюз для Thrift API - 3

Если совсем просто, то Netflix Zuul представляет из себя обычный сервлет с цепочкой собственных фильтров, которые могут как загружаться динамически, так и быть включёнными в приложение. Каждый фильтр добавляет новое поведение, и даже запись HTTP-ответа тоже реализована фильтром. Существует несколько типов фильтров, которые выполняются последовательно как показано на картинке выше. Внутри каждого типа, фильтры выполняются в порядке, определённом приоритетом конкретного фильтра. Подключить Zuul к приложению на Spring Boot проще простого (ну ещё зависимости добавить):

@SpringBootApplication
@EnableZuulProxy
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Мы хотим того же, но для API-шлюза, чтобы те, кто будет использовать наше решение, могли сконцентрироваться на бизнес-логике авторизации своего приложения, а не на перечисленных в статье проблемах. Для этого создадим аннотацию @EnableThriftGateway:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(ThriftGatewayConfiguration.class)
public @interface EnableThriftGateway {  
}

Конфигурация ThriftGatewayConfiguration будет содержать три бина, которые создаются в случае, если аннотация будет добавлена к основному классу приложения: Application.

@Configuration
public class ThriftGatewayConfiguration {  
    @Bean
    @ConditionalOnMissingBean(AuthTokenExchanger.class)
    AuthTokenExchanger authTokenExchanger() {
        throw new UnsupportedOperationException("You should implement AuthTokenExchanger bean");
    }
    @Bean
    @ConditionalOnMissingBean(TProtocolFactory.class)
    TProtocolFactory thriftProtocolFactory() {
        return new TBinaryProtocol.Factory();
    }
    @Bean
    public AuthenticationZuulFilter authenticationZuulFilter() {
        return new AuthenticationZuulFilter();
    }
}

Аннотация ConditionalOnMissingBean предотвратит создание дефолтного бина в том случае, если в приложении будет объявлен собственный бин этого класса. Ранее созданный интерфейс AuthTokenExchanger должен быть в обязательном порядке реализован разработчиком конкретного проекта. Мы не можем, по причинам безопасности, сделать какую-либо дефолтную реализацию, поэтому в методе создания бина выкидывается исключение. Также, нужно определить протокол, используемый для передачи thrift-сообщений. По-умолчанию, это TBinaryProtocol, но всегда можно использовать нужный для проекта, переопределив бин создания фабрики протокола. Но самой важной частью конфигурации безусловно является бин AuthenticationZuulFilter, который реализует бизнес-логику авторизационного слоя.

public class AuthenticationZuulFilter extends ZuulFilter {
    @Override
    public String filterType() {
        return "pre";
    }
    @Override
    public int filterOrder() {
        return 6;
    }
    @Override
    public boolean shouldFilter() {
        return true;
    }
    @Override
    public Object run() {
        RequestContext ctx = RequestContext.getCurrentContext();
        HttpServletRequestWrapper request = (HttpServletRequestWrapper) ctx.getRequest();
        //здесь ваши действия
        return null;
    }
}

После получения объектов контекста и HTTP-запроса, создадим MessageTransalator.

MessageTransalator messageTransalator = new MessageTransalator(protocolFactory, authTokenExchanger);

Позитивный сценарий состоит из обработки входящего пакета данных, записи нового пакета в поле requestEntity контекста запроса, и указания новой длины сообщения вместо оригинальной:

byte[] processed = messageTransalator.process(request.getContentData());  
ctx.set("requestEntity", new ByteArrayInputStream(processed));  
ctx.setOriginContentLength(processed.length);

Если произошла ошибка, то её необходимо обработать:

ctx.setSendZuulResponse(false);  
ctx.setResponseDataStream(new ByteArrayInputStream(new byte[]{}));
try {  
    ctx.getResponse().getOutputStream().write(messageTransalator.processError(e));
} catch (Exception e1) {
    log.error("unexpected error", e1);
    ctx.setResponseStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
}

Здесь нам пришлось применить несколько неочевидных трюков для предотвращения дальнейшей обработки запроса и попытки отправить пакет внутреннему сервису. Во-первых,

ctx.setSendZuulResponse(false)

не даёт провести GZIP-сжатие исходящего пакета. Не все thrift-клиенты способны выжить после такой переупаковки. А во-вторых,

ctx.setResponseDataStream(new ByteArrayInputStream(new byte[]{}))

позволяет использовать оригинальный фильтр формирования исходящего сообщения, несмотря на установку в предыдущем пункте запрета на передачу данных обратно на клиента.

Соединяем всё вместе

Создадим новое Spring Boot приложение и добавим в него две аннотации @EnableZuulProxy и @EnableThriftGateway:

@SpringBootApplication
@EnableZuulProxy
@EnableThriftGateway
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Реализуем простую логику авторизации:

@Configuration
public class AuthenticationConfiguration {
    @Bean
    AuthTokenExchanger authTokenExchanger() {
        return new AuthTokenExchanger<Token, TName>() {
            @Override
            public Token createEmptyAuthToken() {
                return new Token();
            }

            @Override
            public TName process(Token authToken) throws TException {
                if (authToken.getValue().equals("heisours")) {
                    return new TName("John", "Smith");
                }

                throw new UnauthorizedException(ErrorCode.WRONG_TOKEN);
            }
        };
    }
}

Как видно, если к нам пришёл токен со значением heisours, то мы авторизовываем запрос, а если нет, то выкидываем ошибку. Остаётся только сконфигурировать Zuul:

zuul:
  routes:
    greetings:
      #путь в URL, на котором будет спроксирован сервис
      path: /greetings/**
      #идентификатор сервиса
      serviceId: greetings

greetings: 
  ribbon:    
    listOfServers: localhost:8080  #список серверов, где развёрнут сервис greetings

и API Gateway можно использовать.

Ссылки

Базовая часть для преобразования бинарных пакетов: https://github.com/aatarasoff/thrift-api-gateway-core
Волшебные аннотации для Spring-а: https://github.com/aatarasoff/spring-thrift-api-gateway
Примеры: https://github.com/aatarasoff/spring-thrift-api-gateway/tree/master/examples

Автор: aatarasoff

Источник

Поделиться новостью

* - обязательные к заполнению поля