Flink
Поддерживается только производитель
Эта страница документации посвящена компоненту Apache Flink (opens in a new tab) для Apache Camel. Компонент camel-flink обеспечивает связь между компонентами Camel и задачами Flink. Этот компонент позволяет маршрутизировать сообщения из различных транспортных протоколов, динамически выбирая задачу Flink для выполнения, используя входящие сообщения в качестве входных данных для задачи и, наконец, возвращая результаты в конвейер Camel.
Пользователям Maven необходимо добавить следующую зависимость для pom.xmlэтого компонента:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-flink</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>URI Format
В настоящее время компонент Flink поддерживает только производителей. Можно создавать задания DataSet и DataStream.
flink:dataset?dataset=#myDataSet&dataSetCallback=#dataSetCallback flink:datastream?datastream=#myDataStream&dataStreamCallback=#dataStreamCallback
Configuring Options
Компоненты Camel настраиваются на двух отдельных уровнях:
-
уровень компонентов
-
уровень конечной точки
Configuring Component Options
На уровне компонентов задаются общие и общие конфигурации, которые затем наследуются конечными точками. Это самый высокий уровень конфигурации.
Например, компонент может иметь настройки безопасности, учетные данные для аутентификации, URL-адреса для сетевого подключения и т. д.
У некоторых компонентов всего несколько параметров, а у других -- много. Поскольку компоненты обычно имеют предустановленные значения по умолчанию, которые широко используются, зачастую вам может потребоваться настроить лишь несколько параметров компонента, а то и вовсе ни одного.
Вы можете настроить компоненты, используя:
-
Компонент DSL (opens in a new tab) .
-
в файле конфигурации (
application.propertiesфайлы*.yaml, и т.д.). -
непосредственно в коде Java.
Configuring Endpoint Options
Настройка конечных точек обычно занимает больше времени, поскольку у них есть множество параметров. Эти параметры помогают вам настроить функции конечной точки. Параметры также классифицируются по тому, используется ли конечная точка как потребитель ( от ), как производитель ( от ) или и то, и другое.
Конечные точки чаще всего настраиваются непосредственно в URI конечной точки в виде параметров пути и запроса . Вы также можете использовать Endpoint DSL (opens in a new tab) и DataFormat DSL (opens in a new tab) в качестве типобезопасного способа настройки конечных точек и форматов данных в Java.
Хорошей практикой при настройке параметров является использование заполнителей свойств (opens in a new tab) .
Заполнители свойств обеспечивают несколько преимуществ:
-
Они помогают предотвратить использование жестко запрограммированных URL-адресов, номеров портов, конфиденциальной информации и других настроек.
-
Они позволяют вынести конфигурацию за пределы кода.
-
Они помогают коду стать более гибким и пригодным для повторного использования.
В следующих двух разделах перечислены все параметры, сначала для компонента, а затем для конечной точки.
Component Options
Компонент Flink поддерживает 5 опций, которые перечислены ниже.
dataSetCallback (producer)
-
Функция, выполняющая действие с DataSet.
-
По умолчанию:
-
Тип: DataSetCallback
dataStream (producer)
-
DataStream для вычислений.
-
По умолчанию:
-
Тип: DataStream
dataStreamCallback (producer)
-
Тип: Функция, выполняющая действие с DataStream.
-
По умолчанию:
-
Тип: DataStreamCallback
lazyStartProducer (producer)
-
Следует ли запускать производитель в режиме ленивого запуска (при первом сообщении). Запуск в режиме ленивого запуска позволяет разрешить запуск CamelContext и маршрутов в ситуациях, когда производитель может выйти из строя при запуске, что приведет к сбою запуска маршрута. Отложив запуск в режим ленивого запуска, можно обработать сбой запуска во время маршрутизации сообщений с помощью обработчиков ошибок маршрутизации Camel. Имейте в виду, что после обработки первого сообщения создание и запуск производителя могут занять некоторое время и увеличить общее время обработки..
-
По умолчанию: false
-
Тип: boolean
autowiredEnabled (advanced)
-
Включено ли автоматическое связывание. Это используется для параметров автоматического связывания (параметр должен быть помечен как autowired) путём поиска в реестре экземпляра соответствующего типа, который затем настраивается в компоненте. Это может использоваться для автоматической настройки источников данных JDBC, фабрик JMS-подключений, клиентов AWS и т. д.
-
По умолчанию: true
-
Тип: boolean
Endpoint Options
Конечная точка Flink настраивается с использованием синтаксиса URI:
flink:endpointType
Со следующими параметрами пути и запроса :
Path Parameters (1 parameters)
endpointType (producer)
-
Требуемый тип конечной точки (набор данных, поток данных).
Значения перечисления:
-
dataset
-
datastream
-
-
По умолчанию:
-
Тип: EndpointType
Query Parameters (6 parameters)
collect (producer)
-
Указывает, следует ли собирать или подсчитывать результаты.
-
По умолчанию: true
-
Тип: boolean
dataSet (producer)
-
DataSet для вычисления.
-
По умолчанию:
-
Тип: DataSet
dataSetCallback (producer)
-
Функция, выполняющая действие с DataSet.
-
По умолчанию:
-
Тип: DataSetCallback
dataStream (producer)
-
Тип: DataStream для вычислений.
-
По умолчанию:
-
Тип: DataStream
dataStreamCallback (producer)
-
Функция, выполняющая действие с DataStream.
-
По умолчанию:
-
Тип: DataStreamCallback
lazyStartProducer (producer (advanced))
-
Тип: Следует ли запускать производитель в режиме ленивого запуска (при первом сообщении). Запуск в режиме ленивого запуска позволяет разрешить запуск CamelContext и маршрутов в ситуациях, когда производитель может выйти из строя при запуске, что приведет к сбою запуска маршрута. Отложив запуск в режим ленивого запуска, можно обработать сбой запуска во время маршрутизации сообщений с помощью обработчиков ошибок маршрутизации Camel. Имейте в виду, что после обработки первого сообщения создание и запуск производителя могут занять некоторое время и увеличить общее время обработки.
-
По умолчанию: false
-
Тип: boolean
Message Headers
Компонент Flink поддерживает 4 заголовка сообщения, которые перечислены ниже:
CamelFlinkDataSet (producer)
Constant: FLINK_DATASET_HEADER (opens in a new tab)
-
Набор данных.
-
По умолчанию:
-
Тип: Object
CamelFlinkDataSetCallback (producer)
Constant: FLINK_DATASET_CALLBACK_HEADER (opens in a new tab)
-
Обратный вызов набора данных.
-
По умолчанию:
-
Тип: DataSetCallback
CamelFlinkDataStream (producer)
Constant: FLINK_DATASTREAM_HEADER (opens in a new tab)
-
Тип: Поток данных.
-
По умолчанию:
-
Тип: Object
CamelFlinkDataStreamCallback (producer)
Constant: FLINK_DATASTREAM_CALLBACK_HEADER (opens in a new tab)
-
Обратный вызов потока данных.
-
По умолчанию:
-
Тип: DataStreamCallback
Examples
Flink DataSet Callback
@Bean
public DataSetCallback<Long> dataSetCallback() {
return new DataSetCallback<Long>() {
public Long onDataSet(DataSet dataSet, Object... objects) {
try {
dataSet.print();
return new Long(0);
} catch (Exception e) {
return new Long(-1);
}
}
};
}Flink DataStream Callback
@Bean
public VoidDataStreamCallback dataStreamCallback() {
return new VoidDataStreamCallback() {
@Override
public void doOnDataStream(DataStream dataStream, Object... objects) throws Exception {
dataStream.flatMap(new Splitter()).print();
environment.execute("data stream test");
}
};
}Camel-Flink Producer call
CamelContext camelContext = new SpringCamelContext(context);
String pattern = "foo";
try {
ProducerTemplate template = camelContext.createProducerTemplate();
camelContext.start();
Long count = template.requestBody("flink:dataSet?dataSet=#myDataSet&dataSetCallback=#countLinesContaining", pattern, Long.class);
} finally {
camelContext.stop();
}Spring Boot Auto-Configuration
При использовании flink с Spring Boot обязательно используйте следующую зависимость Maven для поддержки автоматической настройки:
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-flink-starter</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>Компонент поддерживает 6 опций, которые перечислены ниже.
camel.component.flink.autowired-enabled
-
Тип: Включено ли автоматическое связывание. Это используется для параметров автоматического связывания (параметр должен быть помечен как autowired) путём поиска в реестре экземпляра соответствующего типа, который затем настраивается в компоненте. Это может использоваться для автоматической настройки источников данных JDBC, фабрик JMS-подключений, клиентов AWS и т. д.
-
По умолчанию: true
-
Тип: Boolean
camel.component.flink.data-set-callback
-
Функция, выполняющая действие с набором данных. Параметр имеет тип org.apache.camel.component.flink.DataSetCallback.
-
По умолчанию:
-
Тип: DataSetCallback
camel.component.flink.data-stream
-
DataStream для вычислений. Параметр имеет тип org.apache.flink.streaming.api.datastream.DataStream.
-
По умолчанию:
-
Тип: DataStream
camel.component.flink.data-stream-callback
-
Тип: Функция, выполняющая действие с потоком данных. Параметр имеет тип org.apache.camel.component.flink.DataStreamCallback.
-
По умолчанию:
-
Тип: DataStreamCallback
camel.component.flink.enabled
-
Включить ли автоматическую настройку компонента Flink. По умолчанию эта опция включена.
-
По умолчанию:
-
Тип: Boolean
camel.component.flink.lazy-start-producer
-
Тип: Следует ли запускать производитель в режиме ленивого запуска (при первом сообщении). Запуск в режиме ленивого запуска позволяет разрешить запуск CamelContext и маршрутов в ситуациях, когда производитель может выйти из строя при запуске, что приведет к сбою запуска маршрута. Отложив запуск в режим ленивого запуска, можно обработать сбой запуска во время маршрутизации сообщений с помощью обработчиков ошибок маршрутизации Camel. Имейте в виду, что после обработки первого сообщения создание и запуск производителя могут занять некоторое время и увеличить общее время обработки.
-
По умолчанию: false
-
Тип: Boolean