Debezium DB2 Connector

Поддерживается только потребитель.

Компонент Debezium db2 представляет собой оболочку вокруг Debezium, использующую Debezium Engine , которая позволяет осуществлять сбор измененных данных из базы данных db2 с помощью Debezium без необходимости использования Kafka или Kafka Connect.

Примечание по обработке сбоев: согласно документации Debezium Embedded Engine , движки активно записывают смещения источника и периодически сбрасывают эти смещения в постоянное хранилище. Поэтому при перезапуске или сбое приложения движок возобновит работу с последнего записанного смещения. Это означает, что при нормальной работе ваши нисходящие маршруты будут получать каждое событие ровно один раз. Однако в случае сбоя приложения (без корректного завершения работы) приложение возобновит работу с последнего записанного смещения, что может привести к получению дублирующихся событий сразу после перезапуска. Поэтому ваши нисходящие маршруты должны быть достаточно терпимы к такому случаю и при необходимости дедуплицировать события.

Пользователям Maven необходимо добавить следующую зависимость в свой pom.xml для этого компонента:

<dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-debezium-db2</artifactId>   <version>x.x.x</version>    <!-- use the same version as your Camel core version -->  

Формат URI

<span style="color: rgb(0,0,0);">debezium-db2:имя[?опции]

</span>

Configuring Options

Компоненты настраиваются на двух отдельных уровнях:

  • component level

  • edpoint level

Configuring Component Options

На уровне компонента задаются основные и общие настройки, которые впоследствии наследуются endpoints. Это самый высокий уровень конфигурации.

Например, компонент может иметь настройки безопасности, учетные данные для аутентификации, URL-адреса для сетевых подключений и т. д.

Некоторые компоненты имеют всего несколько параметров, в то время как у других их может быть много. Поскольку компоненты обычно имеют предварительно настроенные значения по умолчанию, которые используются в большинстве случаев, вам может понадобиться настроить только несколько параметров компонента или вообще ничего не настраивать.

Компоненты можно настроить с использованием:

  • Component DSL.

  • Конфигурационного файла (application.properties, *.yaml files, etc).

  • Напрямую в Java code.

Configuring Endpoint Options

Обычно больше времени уходит на настройку endpoints, так как они обладают множеством параметров. Эти параметры позволяют настроить поведение endpoint в соответствии с вашими потребностями. Кроме того, параметры делятся на категории в зависимости от того, используется ли endpoint как потребитель (from), как производитель (to) или в обеих ролях.

Настройка endpoints чаще всего выполняется непосредственно в URI endpoint в виде параметров пути и запроса. Также вы можете использовать Endpoint DSL и DataFormat DSL как типобезопасный способ настройки endpoints и форматов данных в Java.

Хорошей практикой при настройке параметров является использование Property Placeholders.

Property placeholders предоставляют несколько преимуществ:

  • Помогают избежать использования жестко закодированных URL-адресов, номеров портов, конфиденциальной информации и других настроек.

  • Позволяют вынести конфигурацию за пределы кода.

  • Делают код более гибким и удобным для повторного использования.

Component properties

Additional Properties

  • Дополнительные параметры для компонентов Debezium

    Эти параметры используются в случаях, когда невозможно настроить свойства напрямую через конфигурации Camel. Например, для настройки свойств Kafka Connect, необходимых для работы движка Debezium (таких как KafkaOffsetBackingStore), свойства должны указываться с префиксом additionalProperties.. Пример:

    additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro (opens in a new tab).

  • По умолчанию:

  • Тип: Map

Column Exclude List

  • Шаблоны регулярных выражений для исключения столбцов из событий изменения.

  • По умолчанию:

  • Тип: String

Column Include List

  • Тип: Шаблоны регулярных выражений для включения столбцов в события изменения.

  • По умолчанию:

  • Тип: String

Column Propagate Source Type

  • Список регулярных выражений через запятую, определяющих полные имена столбцов для добавления исходного типа и длины в параметры схем полей в записях изменений.

  • По умолчанию:

  • Тип: String

Converters

  • Тип: Необязательный список пользовательских converters, заменяющих стандартные. Определяются через параметр '.type' и настраиваются с помощью опций через '.'.

  • По умолчанию:

  • Тип: String

Database Dbname

  • Имя database, для которой коннектор должен отслеживать изменения.

  • По умолчанию:

  • Тип: String

Database Hostname

  • Тип: Hostname или IP-адрес сервера database, доступные для разрешения.

  • По умолчанию:

  • Тип: String

Database Password

  • Обязательный параметр. Пароль пользователя database, используемый для подключения к database.

  • По умолчанию:

  • Тип: String

Database Port

  • Тип: Порт сервера database.

  • По умолчанию: 50000

  • Тип: int

Database User

  • Имя пользователя database, которое будет использоваться для подключения к database.

  • По умолчанию:

  • Тип: String

Datatype Propagate Source Type

  • Список регулярных выражений, разделённых запятыми, которые соответствуют названиям типов данных, специфичным для database. Эти выражения добавляют исходный тип и исходную длину типа данных в качестве параметров в соответствующие схемы полей в создаваемых записях об изменениях.

  • По умолчанию:

  • Тип: String

Decimal Handling Mode

  • Тип: Укажите, как столбцы DECIMAL и NUMERIC должны быть представлены в событиях изменений. Доступные варианты:

    • 'precise' (по умолчанию) -- использует java.math.BigDecimal для представления значений, которые кодируются в событиях изменений с использованием двоичного представления и типа 'org.apache.kafka.connect.data.Decimal' в Kafka Connect.

    • 'string' -- использует строки для представления значений.

    • 'double' -- представляет значения с использованием Java 'double', что может не обеспечивать точности, но будет гораздо проще для использования в потребителях.

  • По умолчанию: precise

  • Тип: String

Event Processing Failure Handling Mode

  • Укажите, как следует обрабатывать ошибки, возникающие во время обработки событий (например, при обнаружении повреждённого события). Доступные варианты:

    • 'fail' (по умолчанию) -- вызывается исключение с указанием проблемного события и его позиции, что приводит к остановке коннектора.

    • 'warn' -- проблемное событие и его позиция будут зафиксированы в логах, а событие будет пропущено.

    • 'ignore' -- проблемное событие будет пропущено без уведомлений.

  • По умолчанию: fail

  • Тип: String

Heartbeat Interval Ms

  • Длина интервала в миллисекундах, через который коннектор периодически отправляет heartbeat-сообщения в тему heartbeat. Используйте 0, чтобы отключить отправку heartbeat-сообщений. По умолчанию отключено.

  • По умолчанию: 0ms

  • Тип: int

Heartbeat Topics Prefix

  • Префикс, который используется для названия тем heartbeat. По умолчанию используется __debezium-heartbeat.

  • По умолчанию: __debezium-heartbeat

  • Тип: String

Include Schema Changes

  • Определяет, должен ли коннектор публиковать изменения в схеме database в тему Kafka с тем же именем, что и идентификатор сервера database. Каждое изменение схемы будет записано с использованием ключа, содержащего имя database, а значение будет включать логическое описание новой схемы и, опционально, DDL-запрос(ы). По умолчанию значение равно 'true'. Это не зависит от того, как коннектор внутренне записывает историю изменений схемы database.

  • По умолчанию: true

  • Тип: boolean

Incremental Snapshot Chunk Size

  • Максимальный размер chunk (количество документов/строк) для incremental snapshotting.

  • По умолчанию: 1024

  • Тип: int

Internal Key Converter

  • Класс конвертера, который должен использоваться для сериализации и десериализации данных ключей для смещений. По умолчанию используется JSON-конвертер.

  • По умолчанию: org.apache.kafka.connect.json.JsonConverter

  • Тип: String

Internal Value Converter

  • Класс конвертера, который должен использоваться для сериализации и десериализации данных значений для смещений. По умолчанию используется JSON-конвертер.

  • По умолчанию: org.apache.kafka.connect.json.JsonConverter

  • Тип: String

Max Batch Size

  • Максимальный размер каждого пакета исходных записей. По умолчанию равен 2048.

  • По умолчанию: 2048

  • Тип: int

Max Queue Size

  • Максимальный размер очереди для событий изменений, считанных из журнала database, но ещё не записанных или переданных. По умолчанию равен 8192 и всегда должен быть больше максимального размера пакета.

  • По умолчанию: 8192

  • Тип: int

Max Queue Size In Bytes

  • Максимальный размер очереди в байтах для событий изменений, считанных из журнала database, но ещё не записанных или переданных. По умолчанию равен 0, что означает, что функция отключена.

  • По умолчанию: 0

  • Тип: long

Message Key Columns

  • Список выражений, разделённых точкой с запятой, которые соответствуют полностью квалифицированным таблицам и столбцам, используемым в качестве ключа сообщения. Каждое выражение должно соответствовать шаблону ':', где имена таблиц могут быть определены как (DB_NAME.TABLE_NAME) или (SCHEMA_NAME.TABLE_NAME) в зависимости от конкретного коннектора, а ключевые столбцы представляют собой список столбцов, разделённых запятыми, которые образуют пользовательский ключ. Для любой таблицы без явной настройки ключа в качестве ключа сообщения будут использоваться столбцы первичного ключа таблицы.

    Пример: dbserver1.inventory.orderlines:orderId,orderLineId;dbserver1.inventory.orders:id.

  • По умолчанию:

  • Тип: String

Name

  • Обязательное уникальное имя для коннектора. Попытка повторной регистрации с тем же именем завершится ошибкой.

  • По умолчанию:

  • Тип: String

Notification Enabled Channels

  • Тип: Список названий каналов уведомлений, которые включены.

  • По умолчанию:

  • Тип: String

Notification Sink Topic Name

  • Название топика для уведомлений. Это поле обязательно, если 'sink' находится в списке включённых каналов.

  • По умолчанию:

  • Тип: String

Offset Commit Policy

  • Тип: Имя Java-класса commit policy. Оно определяет, когда должна выполняться offsets commit на основе количества обработанных событий и времени, прошедшего с момента последнего commit. Этот класс должен реализовывать интерфейс 'OffsetCommitPolicy'. По умолчанию используется периодическая commit policy, основанная на временных интервалах.

  • По умолчанию:

  • Тип: String

Offset Commit Timeout Ms

  • Максимальное время (в миллисекундах) ожидания завершения сброса записей и фиксации смещения разделов в хранилище, после чего процесс будет отменён, а смещения восстановлены для следующей попытки. Значение по умолчанию 5 секунд.

  • По умолчанию: 5000

  • Тип: long

Offset Flush Interval Ms

  • Интервал, с которым выполняется попытка фиксации смещений. По умолчанию равен 1 минуте.

  • По умолчанию: 60000

  • Тип: long

Offset Storage

  • Имя Java-класса, который отвечает за сохранение смещений коннектора.

  • По умолчанию: org.apache.kafka.connect.storage.FileOffsetBackingStore

  • Тип: String

Offset Storage File Name

  • Указывает путь к файлу, где сохраняются смещения. Требуется при настройке offset.storage на FileOffsetBackingStore.

  • По умолчанию:

  • Тип: String

Offset Storage Partitions

  • Определяет количество партиций топика, используемого для хранения смещений. Необходимо указать при использовании KafkaOffsetBackingStore.

  • По умолчанию:

  • Тип: int

Offset Storage Replication Factor

  • Тип: Количество реплик, создаваемых для топика хранения смещений. Необходимо при настройке offset.storage на KafkaOffsetBackingStore.

  • По умолчанию:

  • Тип: int

Offset Storage Topic

  • Название Kafka-топика, в котором будут храниться смещения. Обязательно, если offset.storage установлен в KafkaOffsetBackingStore.

  • По умолчанию:

  • Тип: String

Poll Interval Ms

  • Тип: Время задержки перед повторной проверкой новых событий изменений в случае их отсутствия, в миллисекундах. По умолчанию 500 мс.

  • По умолчанию: 500ms

  • Тип: long

Provide Transaction Metadata

  • Позволяет извлекать метаданные транзакций и одновременно вести учёт событий.

  • По умолчанию: false

  • Тип: boolean

Query Fetch Size

  • Определяет максимальное число записей, загружаемых в оперативную память при потоковой обработке. При значении 0 применяется стандартный размер выборки JDBC. По умолчанию 10000 мс.

  • По умолчанию: 10000

  • Тип: int

Retriable Restart Connector Wait Ms

  • Задержка перед повторным запуском коннектора при возникновении временной ошибки. Значение по умолчанию 10000 мс.

  • По умолчанию: 10s

  • Тип: long

Schema History Internal

  • Название класса SchemaHistory, который будет использоваться для хранения и восстановления изменений схемы базы данных. Конфигурационные свойства истории должны иметь префикс "schema.history.internal.".

  • По умолчанию: io.debezium.storage.kafka.history.KafkaSchemaHistory

  • Тип: String

Schema History Internal File Filename

  • Путь к файлу для сохранения истории изменений схемы database.

  • По умолчанию:

  • Тип: String

Schema History Internal Skip Unparseable Ddl

  • Определяет, как Debezium будет поступать при встрече с DDL-оператором в binlog, который не может быть обработан. По умолчанию коннектор останавливается, но с включением этого параметра можно пропускать такие операторы, что может привести к пропуску изменений метаданных.

  • По умолчанию: false

  • Тип: boolean

Schema History Internal Store Only Captured Databases Ddl

  • Управляет тем, какие DDL-операторы Debezium будет сохранять в истории схемы базы данных. По умолчанию (true) сохраняются только те DDL-операторы, которые изменяют таблицы из отслеживаемой базы данных. Если установлено значение false, Debezium будет сохранять все DDL-операторы.

  • По умолчанию: false

  • Тип: boolean

Schema History Internal Store Only Captured Tables Ddl

  • Определяет, какие DDL-запросы Debezium будет записывать в историю схемы базы данных. По умолчанию (false) Debezium сохраняет все входящие DDL-запросы. При установке значения true, будут сохраняться только DDL-запросы, изменяющие захваченную таблицу.

  • По умолчанию: false

  • Тип: boolean

Schema Name Adjustment Mode

  • Указывает, как имена схем должны быть изменены для совместимости с преобразователем сообщений, используемым коннектором, включая следующие варианты:

    • avro -- заменяет символы, которые не могут быть использованы в имени типа Avro, на подчеркивание.

    • avro_unicode -- заменяет подчеркивание или символы, которые не могут быть использованы в имени типа Avro, на соответствующие юникодные символы, например, _uxxxx. Примечание: _ -- это escape-последовательность, как обратная косая черта в Java.

    • none -- не применяется никаких изменений (по умолчанию).

  • По умолчанию: none

  • Тип: String

Signal Data Collection

  • Название data collection, которая используется для отправки сигналов/команд в Debezium. Сигнализация отключена, если не задано.

  • По умолчанию:

  • Тип: String

Signal Enabled Channels

  • Список имен каналов, которые включены. Канал источника включен по умолчанию.

  • По умолчанию: source

  • Тип: String

Signal Poll Interval Ms

  • Интервал для поиска новых сигналов в зарегистрированных каналах, указанный в миллисекундах. По умолчанию -- 5 секунд.

  • По умолчанию: 5s

  • Тип: long

Skipped Operations

  • Список операций, которые нужно пропустить при стриминге, разделённый запятыми:

    • c для вставок/созданий

    • u для обновлений

    • d для удалений

    • t для усечений

    • none для отсутствия пропуска.

    По умолчанию пропускаются только операции усечения.

  • По умолчанию: t

  • Тип: String

Snapshot Delay Ms

  • Задержка перед началом создания snapshot, в миллисекундах. По умолчанию 0 мс.

  • По умолчанию: 0ms

  • Тип: long

Snapshot Fetch Size

  • Максимальное количество записей, которые можно загрузить в память при выполнении snapshot.

  • По умолчанию:

  • Тип: int

Snapshot Include Collection List

  • Этот параметр должен быть задан, чтобы указать список таблиц/коллекций, для которых необходимо выполнить snapshot при создании или перезапуске коннектора.

  • По умолчанию:

  • Тип: String

Snapshot Lock Timeout Ms

  • Тип: Максимальное количество миллисекунд ожидания блокировок таблиц в начале выполнения snapshot. Если блокировки не могут быть получены в течение этого времени, snapshot будет прерван. По умолчанию равно 10 секундам.

  • По умолчанию: 10s

  • Тип: long

Snapshot Mode

  • Критерии для выполнения snapshot при запуске коннектора. Доступные варианты:

    • initial (по умолчанию) -- указывает, что коннектор должен выполнить snapshot только в случае отсутствия смещений для логического имени сервера;

    • schema_only -- указывает, что коннектор должен выполнить snapshot схемы, если смещения для логического имени сервера отсутствуют.

  • По умолчанию: initial

  • Тип: String

Snapshot Select Statement Overrides

  • Это свойство содержит список fully-qualified таблиц (DB_NAME.TABLE_NAME) или (SCHEMA_NAME.TABLE_NAME), разделённых запятыми, в зависимости от конкретного коннектора. SELECT-запросы для отдельных таблиц указываются в дополнительных параметрах конфигурации, по одному для каждой таблицы, с идентификаторами

    • snapshot.select.statement.overrides.DB_NAME.TABLE_NAME

    • snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME

    Значением этих свойств является SELECT-запрос, который используется для извлечения данных из конкретной таблицы во время snapshotting. Возможный пример использования -- указание конкретной точки, с которой следует начать (или продолжить) snapshotting для больших таблиц, работающих только на добавление, в случае, если предыдущее snapshotting было прервано.

  • По умолчанию:

  • Тип: String

Snapshot Tables Order By Row Count

  • Определяет порядок обработки таблиц при начальном snapshot. Значение descending упорядочивает таблицы по убыванию количества строк. Значение ascending упорядочивает таблицы по возрастанию количества строк. Значение disabled (по умолчанию) отключает сортировку по количеству строк.

  • По умолчанию: disabled

  • Тип: String

Sourceinfo Struct Maker

  • Имя класса SourceInfoStructMaker, который возвращает схему и структуру SourceInfo.

  • По умолчанию: io.debezium.connector.db2.Db2SourceInfoStructMaker

  • Тип: String

Table Exclude List

  • Список таблиц (в формате регулярных выражений), которые должны быть исключены из мониторинга.

  • По умолчанию:

  • Тип: String

Table Ignore Builtin

  • Флаг для пропуска встроенных таблиц при обработке.

  • По умолчанию: true

  • Тип: boolean

Table Include List

  • Таблицы, для которых должны фиксироваться изменения.

  • По умолчанию:

  • Тип: String

Time Precision Mode

  • Время, дата и метки времени могут иметь разную степень точности, включая:

    • adaptive (по умолчанию) – определяет точность значений времени, даты и метки времени на основе точности столбца базы данных.

    • daptive_time_microsecond – аналогично режиму adaptive, но поля TIME всегда используют микросекундную точность.

    • connect – всегда представляет значения времени, даты и метки времени, используя встроенные представления Kafka Connect’s для Time, Date и Timestamp, что означает миллисекундную точность независимо от точности столбцов базы данных.

  • По умолчанию: adaptive

  • Тип: String

Tombstones On Delete

  • Определяет, должны ли операции удаления представляться событием удаления и последующим событием tombstone (true) или только событием удаления (false). Генерация события tombstone (поведение по умолчанию) позволяет Kafka полностью удалить все события, связанные с данным ключом, после удаления исходной записи.

  • По умолчанию: false

  • Тип: boolean

Topic Naming Strategy

  • Имя класса TopicNamingStrategy, отвечающего за определение топика для изменений данных, схемы, транзакций, heartbeat-событий и др.

  • По умолчанию: io.debezium.schema.SchemaTopicNamingStrategy

  • Тип: String

Topic Prefix

  • Требуемый префикс топика, который идентифицирует и задаёт пространство имён для конкретного сервера или кластера базы данных, фиксирующего изменения.

    Примечание: Префикс топика должен быть уникальным среди всех других коннекторов, так как он используется в качестве префикса для всех Kafka-топиков, принимающих события от данного коннектора. Разрешены только буквенно-цифровые символы, дефисы, точки и подчеркивания.

  • По умолчанию:

  • Тип: String

Component advanced properties

Bridge Error Handler

  • Предоставляет возможность связать потребителя с маршрутизатором Error Handler Camel. Это позволяет обрабатывать исключения (если возможно), возникающие при получении входящих сообщений потребителем Camel, в виде сообщений, передаваемых в маршрутизатор Error Handler.

    Важно: Это возможно только в том случае, если сторонний компонент позволяет Camel получать уведомления о возникших исключениях. Некоторые компоненты обрабатывают исключения только внутренне, из-за чего использование bridgeErrorHandler становится невозможным. В некоторых случаях мы можем доработать компонент Camel, чтобы интегрировать его со сторонним компонентом и сделать такую возможность доступной в будущих выпусках. По умолчанию потребитель использует org.apache.camel.spi.ExceptionHandler для обработки исключений, которые логируются с уровнем WARN или ERROR и игнорируются.

  • По умолчанию: false

  • Тип: boolean

Exception Handler

  • Позволяет потребителю использовать собственный ExceptionHandler.

    Обратите внимание, что если включена опция bridgeErrorHandler, то эта настройка не используется. По умолчанию потребитель обрабатывает исключения, которые логируются с уровнем WARN или ERROR и игнорируются.

  • По умолчанию:

  • Тип: ExceptionHandler

Exchange Pattern

  • Определяет шаблон взаимодействия, который применяется, когда потребитель создает новый обмен (exchange).

    Возможные значения:

    • InOnly

    • InOut

  • По умолчанию:

  • Тип: ExchangePattern