Расширяем функционал Apache NiFi 2.0: руководство к написанию своего процессора

в 16:31, , рубрики: apache nifi, big data, etl, java

Привет!

Потоки данных между системами стабильно увеличиваются, и в обозримом будущем эта тенденция вряд ли изменится, что создает постоянную потребность в инструментах для работы с данными.

Apache NiFi — программный продукт с открытым исходным кодом, написанный на языке Java, предназначенный для автоматизации потоков данных между системами. Главная его задача: организовывать ETL‑процессы. На GitHub у Apache NiFi имеется 5.9 тысяч звезд.

Для тех, кто не знает, что такое Apache NiFi советую прочитать отличную статью.

Моя статья посвящена написанию кастомного процессора для Apache NiFi на Java и требует базовых знаний в области Apache NiFi, опыт программирования на Java и IDE на борту компьютера.

Ну, начнем!

Для тех, кто немного подзабыл, что такое процессор в Apache NiFi

Процессор — это механизм, с помощью которого NiFi предоставляет доступ к FlowFiles, их атрибутам и содержимому. В Apache NiFi процессоры являются основными компоновочными блоками и используются для построения пайплайнов обработки.

В качестве примера я покажу, как создать процессор, выполняющий базовую задачу Word Count на основе существующего атрибута. Разумеется, в реальной практике такая задача вам вряд ли встретится, но в качестве обучающего примера этого более чем достаточно.

Мы будем разрабатывать процессор под Apache NiFi 2.4.0. Это практически самая свежая и стабильная версия на данный момент.

Первым делом стоит создать проект на основе Maven Archetype. В Intellij Idea для этого нужно выбрать New Project в верхней панели инструментов. Откроется окно, в котором в области Generators следует выбрать Maven Archetype.

В нашем случае в поле Name мы записываем название нашего проекта. Оно влияет лишь на имя директории, созданной в IDE.

В качестве JDK выбран Amazon Corretto 21, так как Apache NiFi рекомендует использовать Java 21 для работы с этой версией. В поле Catalog выбираем Maven Central, чтобы увидеть архетип для NiFi. После этого выбираем: org.apache.nifi:nifi-processor-bundle-archetype.

Версию архетипа выбираем в соответствии с версией Apache NiFi — в нашем случае это 2.4.0.

Также в additionalProperties следует обязательно указать artifactBaseName. Иначе там по умолчанию установится значение true, что сломает структуру пакетов. В artifactId мы устанавливаем идентичное значение. При желании можно также поправить package, если вы понимаете, как работает этот параметр. В поле Version изначально указано значение 1.0-SNAPSHOT. Однако при работе со SNAPSHOT‑версией у вас просто не пройдет сборка, потому что это не релизная версия вашего процессора.

Расширяем функционал Apache NiFi 2.0: руководство к написанию своего процессора - 1
Не собирающийся модуль
Можно игнорировать enforce, но это будет не совсем правильно
Можно игнорировать enforce, но это будет не совсем правильно

После этого жмем Create и получаем готовую структуру проекта. Выглядит она следующим образом:

Расширяем функционал Apache NiFi 2.0: руководство к написанию своего процессора - 3

Таким образом, мы получаем два модуля: один для сборки NAR и JAR. Для добавления модуля в NiFi будет использоваться как раз таки NAR.

Откроем основной файл с главный нашим классом, а именно MyProcessor (при желании его можно переименовать, это название влияет на нейминг в UI)

Код шаблонного класса
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package ru.kotletkin.dev.processors.demohabrprocessor;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.List;
import java.util.Set;

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder()
            .name("My Property")
            .displayName("My Property")
            .description("Example Property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("Example success relationship")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        descriptors = List.of(MY_PROPERTY);

        relationships = Set.of(REL_SUCCESS);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        // TODO implement

        session.transfer(flowFile, REL_SUCCESS);
    }
}

Видно, что у класса присутствует много аннотаций и методов, которые переопределяют родительский AbstractProcessor. AbstractProcessor реализует интерфейс Processor и предоставляет готовые реализации большинства методов, а также позволяет сосредоточиться на бизнес‑логике, а не на написании boilerplate‑кода.

Про все существующие аннотации и методы в рамках одной статьи очень тяжело уложиться, поэтому про них лучше почитать в Developer Guide. Описание тех аннотаций, которые уже существуют в нашем коде, мы поговорим. Их описание в таблице ниже:

Название аннотации

Описание аннотации

@Tags

Используется для добавление тэгов для поиска и категоризации процессора в UI

@CapabilityDescription

Используется для описания процессора и производимого действия им. Помогает понять назначение без чтения документации

@ReadsAttributes

Сообщает системе и пользователям, какие атрибуты FlowFile читает процессор

@WritesAttributes

Сообщает системе и пользователям какие новые атрибуты FlowFile добавляет процессор

Для нашего тестового процессора, определим следующие параметры для этих аннотаций:

@Tags({"word", "count", "statistics", "habr"})
@CapabilityDescription("Counts the number of words in a specified FlowFile attribute")
@ReadsAttributes({
        @ReadsAttribute(attribute = "habr.wc.target", description = "The attribute that will be analyzed for word count")
})
@WritesAttributes({
        @WritesAttribute(attribute = "habr.wc.result", description = "The number of words found in the specified attribute")
})

Мы добавили тэги, описание, указали, что значение целевого атрибута по‑умолчанию для подсчета слов будет habr.wc.target, а результат запишется в атрибут habr.wc.result

Пример отображения значений аннотаций
Пример отображения заполненных аннотаций: тэги, описание

Пример отображения заполненных аннотаций: тэги, описание

Далее, рассмотрим настройки процессора. Примером в нашем коде в данном случае является параметр MY_PROPERTY с типом PropertyDescriptor. PropertyDescriptor определяет свойство, которое будет использоваться в Processor. Объект такого типа включает его имя, описание свойства, необязательное значение по умолчанию, логику проверки (валидатор) и указание на то, является ли свойство обязательным для корректной работы Processor. PropertyDescriptors создаются путем создания экземпляра класса PropertyDescriptor.Builder, вызова соответствующих методов для заполнения сведений о свойстве и, наконец, вызова метода build, то есть реализуют классический паттерн «Строитель».

Часть кода из шаблона
    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder()
            .name("My Property")
            .displayName("My Property")
            .description("Example Property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

Мы добавим в наш пример три атрибута: исходный атрибут (для перезаписи нашего дефолтного), результирующий, а также разделитель для слов. Примеры описания структуры практически идентичны в базовом шаблоне. Относительно валидаторов - они бывают разные и перечислены в классе StandardValidators.

   public static final PropertyDescriptor SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
            .name("Source Attribute")
            .displayName("Source Attribute")
            .description("Name of the attribute that contains text for word counting")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("habr.wc.target")
            .build();

    public static final PropertyDescriptor OUTPUT_ATTRIBUTE = new PropertyDescriptor.Builder()
            .name("Output Attribute")
            .displayName("Output Attribute")
            .description("Name of the attribute to store the word count result")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("habr.wc.result")
            .build();

    public static final PropertyDescriptor WORD_DELIMITER = new PropertyDescriptor.Builder()
            .name("Word Delimiter")
            .displayName("Word Delimiter")
            .description("Regular expression pattern used to split text into words (default: whitespace characters)")
            .required(false)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("\s+")
            .build();

Геттеры getRelationships и getSupportedPropertyDescriptors в нашем примере, как и в большинстве случаев изменять вам не придется.

Однако, в конструкторе стоит поменять список descriptors и relationships, если они отличаются от базовых. В нашем случае это выглядит так:

  @Override
  protected void init(final ProcessorInitializationContext context) {
     descriptors = List.of(SOURCE_ATTRIBUTE, WORD_DELIMITER, OUTPUT_ATTRIBUTE);
     relationships = Set.of(REL_SUCCESS, REL_FAILURE);
  }

ProcessorInitializationContext в нашем случае не нужен, он предназначен для задач, когда в моменте инициализации процессора требуется выполнить какую‑либо задачу (логирование, доступ к ControllerServices). Например, вывести идентификатор процессора при инициализации можно следующим образом:

@Override
protected void init(final ProcessorInitializationContext context) {
    String processorId = context.getIdentifier();
    getLogger().info("Processor ID: {}", processorId);
    // Выведет что-то вроде: "Processor ID: a1b2c3d4-1234-5678-90ab-cdef12345678"
}

Метод помеченный аннотацией@OnScheduled используется для реализации логики, которая будет выполняться при запуске процессора.

Теперь перейдем к самому главному, реализацию метода onTrigger, который отвечает за работу, выполняемую с каждым FlowFile.

Сразу же, нас встречает небольшая часть кода, который позволяет прекратить выполнение, если FlowFile не существует. Выглядит это как простейшая защита от NPE:

FlowFile flowFile = session.get();
if (flowFile == null) {
    return;
}

Далее, нам уже необходимо реализовать наш код по подсчету слов.

Объяснять алгоритм по подсчету слов выглядит как не самая интересная часть статьи, поэтому подсчет будет вынесен в отдельный метод кода. Разберем код метода onTrigger:

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }

    try {

        // Получаем значения свойств
        final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
        final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
        final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();

        // Получаем значение атрибута, в котором нужно совершить Word Count
        final String attributeValue = flowFile.getAttribute(sourceAttribute);

        // Простейшие проверки
        if (attributeValue == null || attributeValue.trim().isEmpty()) {
            getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
                    sourceAttribute, flowFile.getId());
            // Отправки файла в нужный RelationShip
            session.transfer(flowFile, REL_FAILURE);
            return;
        }

        // Подсчитываем слова
        final int wordCount = countWords(attributeValue, wordDelimiter);

        getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);

        // Добавляем новый атрибут
        final Map<String, String> attributes = new HashMap<>();
        attributes.put(outputAttribute, String.valueOf(wordCount));

        // Кладем все атрибуты
        flowFile = session.putAllAttributes(flowFile, attributes);

        // Перенаправляем в Success
        session.transfer(flowFile, REL_SUCCESS);
        session.getProvenanceReporter().modifyAttributes(flowFile);

        getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());

    } catch (final Exception e) {
        getLogger().error("Error processing FlowFile {}: {}", flowFile.getId(), e.getMessage(), e);
        session.transfer(flowFile, REL_FAILURE);
    }
}

private int countWords(final String text, final String delimiterPattern) {

    if (text == null || text.trim().isEmpty()) {
        return 0;
    }

    try {
        Pattern pattern;
        if (delimiterPattern != null && !delimiterPattern.trim().isEmpty()) {
            pattern = Pattern.compile(delimiterPattern);
        } else {
            pattern = Pattern.compile("\s+");
        }

        String[] words = pattern.split(text.trim());
        return (int) Arrays.stream(words)
                .filter(word -> !word.trim().isEmpty())
                .count();

    } catch (PatternSyntaxException e) {
        getLogger().warn("Invalid regex pattern '{}', using default whitespace pattern", delimiterPattern);
        Pattern defaultPattern = Pattern.compile("\s+");
        String[] words = defaultPattern.split(text.trim());
        return (int) Arrays.stream(words)
                .filter(word -> !word.trim().isEmpty())
                .count();
    }
}

В самом начале метод принимает два параметра с типами ProcessContext и ProcessSession.

ProcessContext предоставляет доступ к настройкам и конфигурации процессора. Позволяет получать значения свойств, идентификатор процессора или любую другую информацию о нем.

ProcessSession предоставляет методы для манипуляции FlowFiles и управления транзакциями. Позволяет получать FlowFile из входной очереди, создавать новые, читать содержимые, добавлять атрибуты и так далее

Так, в следующем участке кода мы получаем значения параметров, которые определяли ранее:

final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();

В UI значения параметров выглядят так:

Атрибуты которые мы задали, отображаются в UI

Атрибуты которые мы задали, отображаются в UI

Затем, получаем текст, который содержится в нашем атрибуте, название которого содержится в параметре Source Attribute

final String attributeValue = flowFile.getAttribute(sourceAttribute);

Далее, проводим проверки в коде и в случае, если какая‑либо из них не проходит (обычное условие if), отправляем файл в Failure Relation. За это отвечает session.transfer()

// Простейшие проверки
if (attributeValue == null || attributeValue.trim().isEmpty()) {
    getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
            sourceAttribute, flowFile.getId());
    // Отправки файла в нужный RelationShip
    session.transfer(flowFile, REL_FAILURE);
    return;
}

В случае, если ошибок, то продолжаем выполнять логику и отправляем FlowFile в REL_SUCCESS:

// Подсчитываем слова
final int wordCount = countWords(attributeValue, wordDelimiter);

getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);

// Добавляем новый атрибут
final Map<String, String> attributes = new HashMap<>();
attributes.put(outputAttribute, String.valueOf(wordCount));

// Кладем все атрибуты
flowFile = session.putAllAttributes(flowFile, attributes);

// Перенаправляем в Success
session.transfer(flowFile, REL_SUCCESS);
// Зафиксировать в журнале аудита, что атрибуты этого FlowFile были изменены
session.getProvenanceReporter().modifyAttributes(flowFile);

getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());

Таким образом, у нас получается полноценный код для нашего процессора. Подведем итог готового кода.

Результат кода для процессора
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package ru.kotletkin.dev.processors.demohabrprocessor;

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.*;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

@Tags({"word", "count", "statistics", "habr"})
@CapabilityDescription("Counts the number of words in a specified FlowFile attribute")
@ReadsAttributes({
        @ReadsAttribute(attribute = "habr.wc.target", description = "The attribute that will be analyzed for word count")
})
@WritesAttributes({
        @WritesAttribute(attribute = "habr.wc.result", description = "The number of words found in the specified attribute")
})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
            .name("Source Attribute")
            .displayName("Source Attribute")
            .description("Name of the attribute that contains text for word counting")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("habr.wc.target")
            .build();

    public static final PropertyDescriptor OUTPUT_ATTRIBUTE = new PropertyDescriptor.Builder()
            .name("Output Attribute")
            .displayName("Output Attribute")
            .description("Name of the attribute to store the word count result")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("habr.wc.result")
            .build();


    public static final PropertyDescriptor WORD_DELIMITER = new PropertyDescriptor.Builder()
            .name("Word Delimiter")
            .displayName("Word Delimiter")
            .description("Regular expression pattern used to split text into words (default: whitespace characters)")
            .required(false)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("\s+")
            .build();

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("FlowFiles with successfully word count")
            .build();

    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("FlowFiles with not successfully word count")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        descriptors = List.of(SOURCE_ATTRIBUTE, WORD_DELIMITER, OUTPUT_ATTRIBUTE);
        relationships = Set.of(REL_SUCCESS, REL_FAILURE);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        try {

            // Получаем значения свойств
            final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
            final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
            final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();

            // Получаем значение атрибута, в котором нужно совершить Word Count
            final String attributeValue = flowFile.getAttribute(sourceAttribute);

            // Простейшие проверки
            if (attributeValue == null || attributeValue.trim().isEmpty()) {
                getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
                        sourceAttribute, flowFile.getId());
                // Отправки файла в нужный RelationShip
                session.transfer(flowFile, REL_FAILURE);
                return;
            }

            // Подсчитываем слова
            final int wordCount = countWords(attributeValue, wordDelimiter);

            getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);

            // Добавляем новый атрибут
            final Map<String, String> attributes = new HashMap<>();
            attributes.put(outputAttribute, String.valueOf(wordCount));

            // Кладем все атрибуты
            flowFile = session.putAllAttributes(flowFile, attributes);

            // Перенаправляем в Success
            session.transfer(flowFile, REL_SUCCESS);
            // Зафиксировать в журнале аудита, что атрибуты этого FlowFile были изменены
            session.getProvenanceReporter().modifyAttributes(flowFile);

            getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());

        } catch (final Exception e) {
            getLogger().error("Error processing FlowFile {}: {}", flowFile.getId(), e.getMessage(), e);
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private int countWords(final String text, final String delimiterPattern) {

        if (text == null || text.trim().isEmpty()) {
            return 0;
        }

        try {
            Pattern pattern;
            if (delimiterPattern != null && !delimiterPattern.trim().isEmpty()) {
                pattern = Pattern.compile(delimiterPattern);
            } else {
                pattern = Pattern.compile("\s+");
            }

            String[] words = pattern.split(text.trim());
            return (int) Arrays.stream(words)
                    .filter(word -> !word.trim().isEmpty())
                    .count();

        } catch (PatternSyntaxException e) {
            getLogger().warn("Invalid regex pattern '{}', using default whitespace pattern", delimiterPattern);
            Pattern defaultPattern = Pattern.compile("\s+");
            String[] words = defaultPattern.split(text.trim());
            return (int) Arrays.stream(words)
                    .filter(word -> !word.trim().isEmpty())
                    .count();
        }
    }
}

Помимо основного кода для процессора, Apache NiFi предлагает решение в виде Mock Framework, а именно TestRunner Class. С помощью него, можно не запуская Apache NiFi (добавление нового NAR требует перезагрузки) тестировать различные кейсы и покрывать свой код тестами как в привычном бэкенде или другой области разработки. Пример кода для тестирования представлен ниже:

Пример тестов для процессора
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package ru.kotletkin.dev.processors.demohabrprocessor;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

class MyProcessorTest {

    private TestRunner testRunner;

    @BeforeEach
    void init() {
        testRunner = TestRunners.newTestRunner(MyProcessor.class);
    }

    @Test
    void testProcessorWithDefaultAttribute() {
        // Устанавливаем свойства
        testRunner.setProperty(MyProcessor.SOURCE_ATTRIBUTE, "text");
        testRunner.setProperty(MyProcessor.OUTPUT_ATTRIBUTE, "word.count");

        // СОЗДАЕМ АТРИБУТЫ
        Map<String, String> attributes = new HashMap<>();
        attributes.put("text", "Hello world this is Apache NiFi for Habr");

        // Добавляем FlowFile с атрибутами
        testRunner.enqueue("dummy content", attributes);

        // Запускаем процессор
        testRunner.run(1);

        // Проверяем результаты
        testRunner.assertTransferCount(MyProcessor.REL_SUCCESS, 1);
        testRunner.assertTransferCount(MyProcessor.REL_FAILURE, 0);

        // Проверяем атрибут 'word.count'
        MockFlowFile result = testRunner.getFlowFilesForRelationship(MyProcessor.REL_SUCCESS).getFirst();
        result.assertAttributeEquals("word.count", "8");
        result.assertAttributeExists("filename"); // Стандартный атрибут
    }
}

Далее, выполним mvn clean install в консоли.

Результат процесса сборки

Результат процесса сборки
Результат сборки

Результат сборки

Нам необходим собранный файл с расширением nar. Именно его мы и добавим в директорию lib.

После того, как вы добавили в директорию lib вновь собранный NAR, запустите Apache NiFi (или перезапустите его).

Далее, среди процессоров должен появиться наш новый процессор.

Самостоятельно реализованный процессор

Самостоятельно реализованный процессор

И в нем будут видны наши заранее реализованные параметры:

Расширяем функционал Apache NiFi 2.0: руководство к написанию своего процессора - 9

Создадим небольшой пайплайн для теста процессора:

Простейший пайплайн для тестирования

Простейший пайплайн для тестирования

Настроим процессор GenerateFlowFile, чтобы он генерировал файлы с уже имеющимся атрибутом:

Добавление атрибута для Apache NiFi

Добавление атрибута для Apache NiFi

Запустим все процессоры через Run Once и увидим результат в одной из очередей перед Funnel:

Очередь с тестовым FlowFile

Очередь с тестовым FlowFile

Проверим атрибуты:

Наши атрибуты с подсчитанным количеством слов

Наши атрибуты с подсчитанным количеством слов

Таким образом, видно, что процессор штатно отработал и мы прошли все этапы, от написания кода, до сборки и добавления нашего процессора в Apache NiFi.

Результат статьи в виде кодовой базы можно найти тут.

Итоги

Мы рассмотрели, как создать простейший процессор в Apache NiFi, реализовать логику, как использовать основные методы различных классов. Естественно, этой статьей разработка модулей не ограничивается ни в коем случае, и есть много различных направлений в этой области. Для полного погружения рекомендуется изучать документацию и тестировать на практике, желательно на большой нагрузке.

Комьюнити Apache NiFi в РФ: telegram

Автор: vkotletkin

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js