Debezium MongoDB Connector

Поддерживается только потребитель

Компонент Debezium MongoDB представляет собой оболочку вокруг Debezium , использующую Debezium Engine , которая позволяет осуществлять сбор измененных данных из базы данных MongoDB с помощью Debezium без необходимости использования Kafka или Kafka Connect.

Коннектор Debezium MongoDB использует oplog MongoDB для захвата изменений. Коннектор работает только с наборами реплик MongoDB или с шардированными кластерами, где каждый шард является отдельным набором реплик. Поэтому вам нужно будет запустить экземпляр MongoDB либо в режиме набора реплик, либо в режиме шардированных кластеров.

Примечание по обработке сбоев: согласно документации Debezium Embedded Engine , движки активно записывают смещения источника и периодически сбрасывают эти смещения в постоянное хранилище. Поэтому при перезапуске или сбое приложения движок возобновит работу с последнего записанного смещения. Это означает, что при нормальной работе ваши нисходящие маршруты будут получать каждое событие ровно один раз. Однако в случае сбоя приложения (без корректного завершения работы) приложение возобновит работу с последнего записанного смещения, что может привести к получению дублирующихся событий сразу после перезапуска. Поэтому ваши нисходящие маршруты должны быть достаточно терпимы к такому случаю и при необходимости дедуплицировать события.

Пользователям Maven необходимо добавить следующую зависимость в свой pom.xml для этого компонента:

<dependency>

<groupId>org.apache.camel</groupId>

<artifactId>camel-debezium-mongodb</artifactId>

<version>x.x.x</version>

<!-- use the same version as your Camel core version ->

</dependency>

Формат URI

debezium-mongodb:имя[?опции]

Configuring Options

Компоненты настраиваются на двух отдельных уровнях:

  • component level

  • edpoint level

Configuring Component Options

На уровне компонента задаются основные и общие настройки, которые впоследствии наследуются endpoints. Это самый высокий уровень конфигурации.

Например, компонент может иметь настройки безопасности, учетные данные для аутентификации, URL-адреса для сетевых подключений и т. д.

Некоторые компоненты имеют всего несколько параметров, в то время как у других их может быть много. Поскольку компоненты обычно имеют предварительно настроенные значения по умолчанию, которые используются в большинстве случаев, вам может понадобиться настроить только несколько параметров компонента или вообще ничего не настраивать.

Компоненты можно настроить с использованием:

  • Component DSL.

  • Конфигурационного файла (application.properties, *.yaml files, etc).

  • Напрямую в Java code.

Configuring Endpoint Options

Обычно больше времени уходит на настройку endpoints, так как они обладают множеством параметров. Эти параметры позволяют настроить поведение endpoint в соответствии с вашими потребностями. Кроме того, параметры делятся на категории в зависимости от того, используется ли endpoint как потребитель (from), как производитель (to) или в обеих ролях.

Настройка endpoints чаще всего выполняется непосредственно в URI endpoint в виде параметров пути и запроса. Также вы можете использовать Endpoint DSL и DataFormat DSL как типобезопасный способ настройки endpoints и форматов данных в Java.

Хорошей практикой при настройке параметров является использование Property Placeholders.

Property placeholders предоставляют несколько преимуществ:

  • Помогают избежать использования жестко закодированных URL-адресов, номеров портов, конфиденциальной информации и других настроек.

  • Позволяют вынести конфигурацию за пределы кода.

  • Делают код более гибким и удобным для повторного использования.

Component properties

Additional Properties

  • Дополнительные свойства для компонентов Debezium, если они не могут быть заданы напрямую в конфигурациях Camel (например, настройка свойств Kafka Connect, необходимых для движка Debezium, таких как KafkaOffsetBackingStore). Эти свойства должны иметь префикс additionalProperties.

  • По умолчанию:

  • Тип: Map

Capture Mode

  • Метод, используемый для захвата изменений с сервера MongoDB. Доступные варианты:

    • change_streams -- захват изменений через MongoDB Change Streams, события обновления не содержат полных документов.

    • change_streams_update_full (по умолчанию) -- захват изменений через MongoDB Change Streams, события обновления содержат полные документы.

  • По умолчанию: change_streams_update_full

  • Тип: String

Collection Exclude List

  • Максимальный размер каждого пакета исходных записей. По умолчанию равен 2048.

  • По умолчанию:

  • Тип: String

Collection Include List

  • Шаблоны регулярных выражений для включения столбцов в события изменения.

  • По умолчанию:

  • Тип: String

Converters

  • Тип: Необязательный список пользовательских converters, заменяющих стандартные. Определяются через параметр '.type' и настраиваются с помощью опций через '.'.

  • По умолчанию:

  • Тип: String

Cursor Max Await Time Ms

  • Максимальное время обработки в миллисекундах, которое отводится на ожидание обработки курсором oplog одного запроса на опрос.

  • По умолчанию:

  • Тип: int

Database Exclude List

  • Тип: Список регулярных выражений или литералов, разделённых запятой, которые соответствуют именам database, изменения в которых должны быть исключены.

  • По умолчанию:

  • Тип: String

Database Include List

  • Список регулярных выражений или литералов, разделённых запятой, которые соответствуют именам database, изменения в которых должны быть зафиксированы.

  • По умолчанию:

  • Тип: String

Event Processing Failure Handling Mode

  • Тип: Укажите, как следует обрабатывать ошибки, возникающие во время обработки событий (например, при обнаружении повреждённого события). Доступные варианты:

    • fail (по умолчанию) -- вызывается исключение с указанием проблемного события и его позиции, что приводит к остановке коннектора.

    • warn -- проблемное событие и его позиция будут зафиксированы в логах, а событие будет пропущено.

    • ignore -- проблемное событие будет пропущено без уведомлений.

  • По умолчанию: fail

  • Тип: String

Field Exclude List

  • Список fully-qualified имён полей, разделённых запятыми, которые должны быть исключены из значений сообщений о событиях изменений.

  • По умолчанию:

  • Тип: String

Field Renames

  • Список fully-qualified замен полей, разделённых запятыми, которые должны использоваться для переименования полей в значениях сообщений о событиях изменений. Полные замены для полей имеют вид databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, где databaseName и collectionName могут содержать символы подстановки (), которые соответствуют любым символам, а символ двоеточия (:) используется для определения отображения переименования поля.

  • По умолчанию:

  • Тип: String

Heartbeat Interval Ms

  • Тип: Длина интервала в миллисекундах, через который коннектор периодически отправляет heartbeat-сообщения в тему heartbeat. Используйте 0, чтобы отключить отправку heartbeat-сообщений. По умолчанию отключено.

  • По умолчанию: 0ms

  • Тип: int

Heartbeat Topics Prefix

  • Префикс, который используется для названия тем heartbeat. По умолчанию используется __debezium-heartbeat.

  • По умолчанию: __debezium-heartbeat

  • Тип: String

Internal Key Converter

  • Класс конвертера, который должен использоваться для сериализации и десериализации данных ключей для смещений (offsets). По умолчанию используется JSON-конвертер.

  • По умолчанию: org.apache.kafka.connect.json.JsonConverter

  • Тип: String

Internal Value Converter

  • Класс конвертера, который должен использоваться для сериализации и десериализации данных значений для смещений. По умолчанию используется JSON-конвертер.

  • По умолчанию: org.apache.kafka.connect.json.JsonConverter

  • Тип: String

Max Batch Size

  • Максимальный размер каждого пакета исходных записей. По умолчанию равен 2048.

  • По умолчанию: 2048

  • Тип: int

Max Queue Size

  • Максимальный размер очереди для событий изменений, считанных из журнала database, но ещё не записанных или переданных. По умолчанию равен 8192 и всегда должен быть больше максимального размера пакета.

  • По умолчанию: 8192

  • Тип: int

Max Queue Size In Bytes

  • Максимальный размер очереди в байтах для событий изменений, считанных из журнала database, но ещё не записанных или переданных. По умолчанию равен 0, что означает, что функция отключена.

  • По умолчанию: 0

  • Тип: long

Mongodb Authsource

  • Database, содержащая учётные данные пользователей.

  • По умолчанию: admin

  • Тип: String

Mongodb Connect Timeout Ms

  • Время ожидания подключения в миллисекундах. По умолчанию 10 000 мс (10 секунд).

  • По умолчанию: 10s

  • Тип: int

Mongodb Connection Mode

Mongodb Connection String

  • Строка подключения к database.

  • По умолчанию:

  • Тип: String

Mongodb Heartbeat Frequency Ms

  • Тип: Частота, с которой мониторинг кластера пытается связаться с каждым сервером. По умолчанию -- 10 секунд (10 000 мс).

  • По умолчанию: 10s

  • Тип: int

Mongodb Password

  • Пароль, используемый для подключения к MongoDB (при необходимости).

  • По умолчанию:

  • Тип: String

Mongodb Poll Interval Ms

  • Частота проверки изменений в наборах реплик, указанная в миллисекундах. По умолчанию -- 30 секунд (30 000 мс).

  • По умолчанию: 30s

  • Тип: long

Mongodb Server Selection Timeout Ms

  • Время ожидания выбора сервера в миллисекундах. По умолчанию -- 10 секунд (10 000 мс).

  • По умолчанию: 30s

  • Тип: int

Mongodb Socket Timeout Ms

  • Время ожидания socket в миллисекундах. Значение по умолчанию -- 0 мс.

  • По умолчанию: 0ms

  • Тип: int

Mongodb Ssl Enabled

  • Определяет необходим ли SSL для подключения коннектора к экземплярам MongoDB

  • По умолчанию: false

  • Тип: boolean

Mongodb Ssl Invalid Hostname Allowed

  • Определяет можно ли использовать недействительные имена хостов при SSL-соединении. Если да, соединение будет подвержено атакам "man-in-the-middle".

  • По умолчанию: false

  • Тип: boolean

Mongodb User

  • Имя пользователя database для подключения к MongoDB, при необходимости.

  • По умолчанию:

  • Тип: String

Name

  • Обязательное уникальное имя для коннектора. Попытка повторной регистрации с тем же именем завершится ошибкой.

  • По умолчанию:

  • Тип: String

Notification Enabled Channels

  • Тип: Список названий каналов уведомлений, которые включены.

  • По умолчанию:

  • Тип: String

Notification Sink Topic Name

  • Название топика для уведомлений. Это поле обязательно, если 'sink' находится в списке включённых каналов.

  • По умолчанию:

  • Тип: String

Offset Commit Policy

  • Тип: Имя Java-класса commit policy. Оно определяет, когда должна выполняться offsets commit на основе количества обработанных событий и времени, прошедшего с момента последнего commit. Этот класс должен реализовывать интерфейс 'OffsetCommitPolicy'. По умолчанию используется периодическая commit policy, основанная на временных интервалах.

  • По умолчанию:

  • Тип: String

Offset Commit Timeout Ms

  • Максимальное время (в миллисекундах) ожидания завершения сброса записей и фиксации смещения разделов в хранилище, после чего процесс будет отменён, а смещения восстановлены для следующей попытки. Значение по умолчанию 5 секунд.

  • По умолчанию: 5000

  • Тип: long

Offset Flush Interval Ms

  • Интервал, с которым выполняется попытка фиксации смещений. По умолчанию равен 1 минуте.

  • По умолчанию: 60000

  • Тип: long

Offset Storage

  • Имя Java-класса, который отвечает за сохранение смещений коннектора.

  • По умолчанию: org.apache.kafka.connect.storage.FileOffsetBackingStore

  • Тип: String

Offset Storage File Name

  • Указывает путь к файлу, где сохраняются смещения. Требуется при настройке offset.storage на FileOffsetBackingStore.

  • По умолчанию:

  • Тип: String

Offset Storage Partitions

  • Определяет количество партиций топика, используемого для хранения смещений. Необходимо указать при использовании KafkaOffsetBackingStore.

  • По умолчанию:

  • Тип: int

Offset Storage Replication Factor

  • Тип: Количество реплик, создаваемых для топика хранения смещений. Необходимо при настройке offset.storage на KafkaOffsetBackingStore.

  • По умолчанию:

  • Тип: int

Offset Storage Topic

  • Название Kafka-топика, в котором будут храниться смещения. Обязательно, если offset.storage установлен в KafkaOffsetBackingStore.

  • По умолчанию:

  • Тип: String

Poll Interval Ms

  • Тип: Время задержки перед повторной проверкой новых событий изменений в случае их отсутствия, в миллисекундах. По умолчанию 500 мс.

  • По умолчанию: 500ms

  • Тип: long

Provide Transaction Metadata

  • Позволяет извлекать метаданные транзакций и одновременно вести учёт событий.

  • По умолчанию: false

  • Тип: boolean

Query Fetch Size

  • Определяет максимальное число записей, загружаемых в оперативную память при потоковой обработке. При значении 0 применяется стандартный размер выборки JDBC.

  • По умолчанию: 0

  • Тип: int

Retriable Restart Connector Wait Ms

  • Время ожидания перед перезапуском коннектора после возникновения повторяемой ошибки. По умолчанию 10000 мс.

  • По умолчанию: 10s

  • Тип: long

Schema History Internal File Filename

  • Путь к файлу для сохранения истории изменений схемы database.

  • По умолчанию:

  • Тип: String

Schema Name Adjustment Mode

  • Указывает, как имена схем должны быть изменены для совместимости с преобразователем сообщений, используемым коннектором, включая следующие варианты:

    • avro -- заменяет символы, которые не могут быть использованы в имени типа Avro, на подчеркивание.

    • avro_unicode -- заменяет подчеркивание или символы, которые не могут быть использованы в имени типа Avro, на соответствующие юникодные символы, например, _uxxxx. Примечание: _ -- это escape-последовательность, как обратная косая черта в Java.

    • none -- не применяется никаких изменений (по умолчанию).

  • По умолчанию: none

  • Тип: String

Signal Data Collection

  • Название data collection, которая используется для отправки сигналов/команд в Debezium. Сигнализация отключена, если не задано.

  • По умолчанию:

  • Тип: String

Signal Enabled Channels

  • Список имен каналов, которые включены. Канал источника включен по умолчанию.

  • По умолчанию: source

  • Тип: String

Signal Poll Interval Ms

  • Интервал для поиска новых сигналов в зарегистрированных каналах, указанный в миллисекундах. По умолчанию -- 5 секунд.

  • По умолчанию: 5s

  • Тип: long

Skipped Operations

  • Список операций, которые нужно пропустить при стриминге, разделённый запятыми:

    • c для вставок/созданий

    • u для обновлений

    • d для удалений

    • t для усечений

    • none для отсутствия пропуска.

    По умолчанию пропускаются только операции усечения.

  • По умолчанию: t

  • Тип: String

Snapshot Collection Filter Overrides

  • Это свойство содержит список, разделённый запятыми, для которого snapshot может быть подмножеством данных из источника данных. Это подмножество будет определяться фильтром MongoDB, который указывается как значение для свойства snapshot.collection.filter.override..

  • По умолчанию:

  • Тип: String

Snapshot Delay Ms

  • Задержка перед началом создания snapshot, в миллисекундах. По умолчанию 0 мс.

  • По умолчанию: 0ms

  • Тип: long

Snapshot Fetch Size

  • Максимальное количество записей, которые можно загрузить в память при выполнении snapshot.

  • По умолчанию:

  • Тип: int

Snapshot Include Collection List

  • Этот параметр должен быть задан, чтобы указать список таблиц/коллекций, для которых необходимо выполнить snapshot при создании или перезапуске коннектора.

  • По умолчанию:

  • Тип: String

Snapshot Max Threads

  • Тип: Максимальное количество потоков, используемых для выполнения snapshot. По умолчанию 1.

  • По умолчанию: 1

  • Тип: int

Snapshot Mode

  • Критерии для выполнения snapshot при запуске коннектора. Выберите одну из следующих опций snapshot:

    • initial (по умолчанию) - Если коннектор не обнаруживает смещения для логического имени сервера, он выполняет snapshot, который захватывает текущее полное состояние настроенных таблиц. После завершения snapshot коннектор начинает потоковую передачу изменений из oplog.

    • never - Коннектор не выполняет snapshot. При первом запуске коннектор сразу начинает читать с начала oplog.

  • По умолчанию: initial

  • Тип: String

Sourceinfo Struct Maker

  • Имя класса SourceInfoStructMaker, который возвращает схему и структуру SourceInfo.

  • По умолчанию: io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker

  • Тип: String

Tombstones On Delete

  • Определяет, должны ли операции удаления представляться событием удаления и последующим событием tombstone (true) или только событием удаления (false). Генерация события tombstoneя (поведение по умолчанию) позволяет Kafka полностью удалить все события, связанные с данным ключом, после удаления исходной записи.

  • По умолчанию: false

  • Тип: boolean

Topic Naming Strategy

  • Имя класса TopicNamingStrategy, отвечающего за определение топика для изменений данных, схемы, транзакций, heartbeat-событий и др.

  • По умолчанию: io.debezium.schema.SchemaTopicNamingStrategy

  • Тип: String

Topic Prefix

  • Требуемый префикс топика, который идентифицирует и задаёт пространство имён для конкретного сервера или кластера базы данных, фиксирующего изменения.

    Примечание: Префикс топика должен быть уникальным среди всех других коннекторов, так как он используется в качестве префикса для всех Kafka-топиков, принимающих события от данного коннектора. Разрешены только буквенно-цифровые символы, дефисы, точки и подчеркивания.

  • По умолчанию:

  • Тип: String

Component advanced properties

Bridge Error Handler

  • Предоставляет возможность связать потребителя с маршрутизатором Error Handler Camel. Это позволяет обрабатывать исключения (если возможно), возникающие при получении входящих сообщений потребителем Camel, в виде сообщений, передаваемых в маршрутизатор Error Handler.

    Важно: Это возможно только в том случае, если сторонний компонент позволяет Camel получать уведомления о возникших исключениях. Некоторые компоненты обрабатывают исключения только внутренне, из-за чего использование bridgeErrorHandler становится невозможным. В некоторых случаях мы можем доработать компонент Camel, чтобы интегрировать его со сторонним компонентом и сделать такую возможность доступной в будущих выпусках. По умолчанию потребитель использует org.apache.camel.spi.ExceptionHandler для обработки исключений, которые логируются с уровнем WARN или ERROR и игнорируются.

  • По умолчанию: false

  • Тип: boolean

Exception Handler

  • Позволяет потребителю использовать собственный ExceptionHandler.

    Обратите внимание, что если включена опция bridgeErrorHandler, то эта настройка не используется. По умолчанию потребитель обрабатывает исключения, которые логируются с уровнем WARN или ERROR и игнорируются.

  • По умолчанию:

  • Тип: ExceptionHandler

Exchange Pattern

  • Определяет шаблон взаимодействия, который применяется, когда потребитель создает новый обмен (exchange).

    Возможные значения:

    • InOnly

    • InOut

  • По умолчанию:

  • Тип: ExchangePattern