Flink

Поддерживается только производитель

Эта страница документации посвящена компоненту Apache Flink (opens in a new tab) для Apache Camel. Компонент camel-flink обеспечивает связь между компонентами Camel и задачами Flink. Этот компонент позволяет маршрутизировать сообщения из различных транспортных протоколов, динамически выбирая задачу Flink для выполнения, используя входящие сообщения в качестве входных данных для задачи и, наконец, возвращая результаты в конвейер Camel.

Пользователям Maven необходимо добавить следующую зависимость для pom.xmlэтого компонента:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-flink</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

URI Format

В настоящее время компонент Flink поддерживает только производителей. Можно создавать задания DataSet и DataStream.

flink:dataset?dataset=#myDataSet&amp;dataSetCallback=#dataSetCallback flink:datastream?datastream=#myDataStream&amp;dataStreamCallback=#dataStreamCallback

Configuring Options

Компоненты Camel настраиваются на двух отдельных уровнях:

  • уровень компонентов

  • уровень конечной точки

Configuring Component Options

На уровне компонентов задаются общие и общие конфигурации, которые затем наследуются конечными точками. Это самый высокий уровень конфигурации.

Например, компонент может иметь настройки безопасности, учетные данные для аутентификации, URL-адреса для сетевого подключения и т. д.

У некоторых компонентов всего несколько параметров, а у других -- много. Поскольку компоненты обычно имеют предустановленные значения по умолчанию, которые широко используются, зачастую вам может потребоваться настроить лишь несколько параметров компонента, а то и вовсе ни одного.

Вы можете настроить компоненты, используя:

  • Компонент DSL (opens in a new tab) .

  • в файле конфигурации ( application.propertiesфайлы *.yaml, и т.д.).

  • непосредственно в коде Java.

Configuring Endpoint Options

Настройка конечных точек обычно занимает больше времени, поскольку у них есть множество параметров. Эти параметры помогают вам настроить функции конечной точки. Параметры также классифицируются по тому, используется ли конечная точка как потребитель ( от ), как производитель ( от ) или и то, и другое.

Конечные точки чаще всего настраиваются непосредственно в URI конечной точки в виде параметров пути и запроса . Вы также можете использовать Endpoint DSL (opens in a new tab) и DataFormat DSL (opens in a new tab) в качестве типобезопасного способа настройки конечных точек и форматов данных в Java.

Хорошей практикой при настройке параметров является использование заполнителей свойств (opens in a new tab) .

Заполнители свойств обеспечивают несколько преимуществ:

  • Они помогают предотвратить использование жестко запрограммированных URL-адресов, номеров портов, конфиденциальной информации и других настроек.

  • Они позволяют вынести конфигурацию за пределы кода.

  • Они помогают коду стать более гибким и пригодным для повторного использования.

В следующих двух разделах перечислены все параметры, сначала для компонента, а затем для конечной точки.

Component Options

Компонент Flink поддерживает 5 опций, которые перечислены ниже.

dataSetCallback (producer)

  • Функция, выполняющая действие с DataSet.

  • По умолчанию:

  • Тип: DataSetCallback

dataStream (producer)

  • DataStream для вычислений.

  • По умолчанию:

  • Тип: DataStream

dataStreamCallback (producer)

  • Тип: Функция, выполняющая действие с DataStream.

  • По умолчанию:

  • Тип: DataStreamCallback

lazyStartProducer (producer)

  • Следует ли запускать производитель в режиме ленивого запуска (при первом сообщении). Запуск в режиме ленивого запуска позволяет разрешить запуск CamelContext и маршрутов в ситуациях, когда производитель может выйти из строя при запуске, что приведет к сбою запуска маршрута. Отложив запуск в режим ленивого запуска, можно обработать сбой запуска во время маршрутизации сообщений с помощью обработчиков ошибок маршрутизации Camel. Имейте в виду, что после обработки первого сообщения создание и запуск производителя могут занять некоторое время и увеличить общее время обработки..

  • По умолчанию: false

  • Тип: boolean

autowiredEnabled (advanced)

  • Включено ли автоматическое связывание. Это используется для параметров автоматического связывания (параметр должен быть помечен как autowired) путём поиска в реестре экземпляра соответствующего типа, который затем настраивается в компоненте. Это может использоваться для автоматической настройки источников данных JDBC, фабрик JMS-подключений, клиентов AWS и т. д.

  • По умолчанию: true

  • Тип: boolean

Endpoint Options

Конечная точка Flink настраивается с использованием синтаксиса URI:

flink:endpointType

Со следующими параметрами пути и запроса :

Path Parameters (1 parameters)

endpointType (producer)

  • Требуемый тип конечной точки (набор данных, поток данных).

    Значения перечисления:

    • dataset

    • datastream

  • По умолчанию:

  • Тип: EndpointType

Query Parameters (6 parameters)

collect (producer)

  • Указывает, следует ли собирать или подсчитывать результаты.

  • По умолчанию: true

  • Тип: boolean

dataSet (producer)

  • DataSet для вычисления.

  • По умолчанию:

  • Тип: DataSet

dataSetCallback (producer)

  • Функция, выполняющая действие с DataSet.

  • По умолчанию:

  • Тип: DataSetCallback

dataStream (producer)

  • Тип: DataStream для вычислений.

  • По умолчанию:

  • Тип: DataStream

dataStreamCallback (producer)

  • Функция, выполняющая действие с DataStream.

  • По умолчанию:

  • Тип: DataStreamCallback

lazyStartProducer (producer (advanced))

  • Тип: Следует ли запускать производитель в режиме ленивого запуска (при первом сообщении). Запуск в режиме ленивого запуска позволяет разрешить запуск CamelContext и маршрутов в ситуациях, когда производитель может выйти из строя при запуске, что приведет к сбою запуска маршрута. Отложив запуск в режим ленивого запуска, можно обработать сбой запуска во время маршрутизации сообщений с помощью обработчиков ошибок маршрутизации Camel. Имейте в виду, что после обработки первого сообщения создание и запуск производителя могут занять некоторое время и увеличить общее время обработки.

  • По умолчанию: false

  • Тип: boolean

Message Headers

Компонент Flink поддерживает 4 заголовка сообщения, которые перечислены ниже:

CamelFlinkDataSet (producer)

Constant: FLINK_DATASET_HEADER (opens in a new tab)

  • Набор данных.

  • По умолчанию:

  • Тип: Object

CamelFlinkDataSetCallback (producer)

Constant: FLINK_DATASET_CALLBACK_HEADER (opens in a new tab)

  • Обратный вызов набора данных.

  • По умолчанию:

  • Тип: DataSetCallback

CamelFlinkDataStream (producer)

Constant: FLINK_DATASTREAM_HEADER (opens in a new tab)

  • Тип: Поток данных.

  • По умолчанию:

  • Тип: Object

CamelFlinkDataStreamCallback (producer)

Constant: FLINK_DATASTREAM_CALLBACK_HEADER (opens in a new tab)

  • Обратный вызов потока данных.

  • По умолчанию:

  • Тип: DataStreamCallback

Examples

Flink DataSet Callback

@Bean
public DataSetCallback<Long> dataSetCallback() {
    return new DataSetCallback<Long>() {
        public Long onDataSet(DataSet dataSet, Object... objects) {
            try {
                 dataSet.print();
                 return new Long(0);
            } catch (Exception e) {
                 return new Long(-1);
            }
        }
    };
}

Flink DataStream Callback

@Bean
public VoidDataStreamCallback dataStreamCallback() {
    return new VoidDataStreamCallback() {
        @Override
        public void doOnDataStream(DataStream dataStream, Object... objects) throws Exception {
            dataStream.flatMap(new Splitter()).print();

            environment.execute("data stream test");
        }
    };
}

Camel-Flink Producer call

CamelContext camelContext = new SpringCamelContext(context);

String pattern = "foo";

try {
    ProducerTemplate template = camelContext.createProducerTemplate();
    camelContext.start();
    Long count = template.requestBody("flink:dataSet?dataSet=#myDataSet&dataSetCallback=#countLinesContaining", pattern, Long.class);
    } finally {
        camelContext.stop();
    }

Spring Boot Auto-Configuration

При использовании flink с Spring Boot обязательно используйте следующую зависимость Maven для поддержки автоматической настройки:

<dependency>
  <groupId>org.apache.camel.springboot</groupId>
  <artifactId>camel-flink-starter</artifactId>
  <version>x.x.x</version>
  <!-- use the same version as your Camel core version -->
</dependency>

Компонент поддерживает 6 опций, которые перечислены ниже.

camel.component.flink.autowired-enabled

  • Тип: Включено ли автоматическое связывание. Это используется для параметров автоматического связывания (параметр должен быть помечен как autowired) путём поиска в реестре экземпляра соответствующего типа, который затем настраивается в компоненте. Это может использоваться для автоматической настройки источников данных JDBC, фабрик JMS-подключений, клиентов AWS и т. д.

  • По умолчанию: true

  • Тип: Boolean

camel.component.flink.data-set-callback

  • Функция, выполняющая действие с набором данных. Параметр имеет тип org.apache.camel.component.flink.DataSetCallback.

  • По умолчанию:

  • Тип: DataSetCallback

camel.component.flink.data-stream

  • DataStream для вычислений. Параметр имеет тип org.apache.flink.streaming.api.datastream.DataStream.

  • По умолчанию:

  • Тип: DataStream

camel.component.flink.data-stream-callback

  • Тип: Функция, выполняющая действие с потоком данных. Параметр имеет тип org.apache.camel.component.flink.DataStreamCallback.

  • По умолчанию:

  • Тип: DataStreamCallback

camel.component.flink.enabled

  • Включить ли автоматическую настройку компонента Flink. По умолчанию эта опция включена.

  • По умолчанию:

  • Тип: Boolean

camel.component.flink.lazy-start-producer

  • Тип: Следует ли запускать производитель в режиме ленивого запуска (при первом сообщении). Запуск в режиме ленивого запуска позволяет разрешить запуск CamelContext и маршрутов в ситуациях, когда производитель может выйти из строя при запуске, что приведет к сбою запуска маршрута. Отложив запуск в режим ленивого запуска, можно обработать сбой запуска во время маршрутизации сообщений с помощью обработчиков ошибок маршрутизации Camel. Имейте в виду, что после обработки первого сообщения создание и запуск производителя могут занять некоторое время и увеличить общее время обработки.

  • По умолчанию: false

  • Тип: Boolean