Debezium MongoDB Connector
Поддерживается только потребитель
Компонент Debezium MongoDB представляет собой оболочку вокруг Debezium , использующую Debezium Engine , которая позволяет осуществлять сбор измененных данных из базы данных MongoDB с помощью Debezium без необходимости использования Kafka или Kafka Connect.
Коннектор Debezium MongoDB использует oplog MongoDB для захвата изменений. Коннектор работает только с наборами реплик MongoDB или с шардированными кластерами, где каждый шард является отдельным набором реплик. Поэтому вам нужно будет запустить экземпляр MongoDB либо в режиме набора реплик, либо в режиме шардированных кластеров.
Примечание по обработке сбоев: согласно документации Debezium Embedded Engine , движки активно записывают смещения источника и периодически сбрасывают эти смещения в постоянное хранилище. Поэтому при перезапуске или сбое приложения движок возобновит работу с последнего записанного смещения. Это означает, что при нормальной работе ваши нисходящие маршруты будут получать каждое событие ровно один раз. Однако в случае сбоя приложения (без корректного завершения работы) приложение возобновит работу с последнего записанного смещения, что может привести к получению дублирующихся событий сразу после перезапуска. Поэтому ваши нисходящие маршруты должны быть достаточно терпимы к такому случаю и при необходимости дедуплицировать события.
Пользователям Maven необходимо добавить следующую зависимость в свой pom.xml для этого компонента:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-debezium-mongodb</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version ->
</dependency>
Формат URI
debezium-mongodb:имя[?опции]
Configuring Options
Компоненты настраиваются на двух отдельных уровнях:
-
component level
-
edpoint level
Configuring Component Options
На уровне компонента задаются основные и общие настройки, которые впоследствии наследуются endpoints. Это самый высокий уровень конфигурации.
Например, компонент может иметь настройки безопасности, учетные данные для аутентификации, URL-адреса для сетевых подключений и т. д.
Некоторые компоненты имеют всего несколько параметров, в то время как у других их может быть много. Поскольку компоненты обычно имеют предварительно настроенные значения по умолчанию, которые используются в большинстве случаев, вам может понадобиться настроить только несколько параметров компонента или вообще ничего не настраивать.
Компоненты можно настроить с использованием:
-
Component DSL.
-
Конфигурационного файла (application.properties, *.yaml files, etc).
-
Напрямую в Java code.
Configuring Endpoint Options
Обычно больше времени уходит на настройку endpoints, так как они обладают множеством параметров. Эти параметры позволяют настроить поведение endpoint в соответствии с вашими потребностями. Кроме того, параметры делятся на категории в зависимости от того, используется ли endpoint как потребитель (from), как производитель (to) или в обеих ролях.
Настройка endpoints чаще всего выполняется непосредственно в URI endpoint в виде параметров пути и запроса. Также вы можете использовать Endpoint DSL и DataFormat DSL как типобезопасный способ настройки endpoints и форматов данных в Java.
Хорошей практикой при настройке параметров является использование Property Placeholders.
Property placeholders предоставляют несколько преимуществ:
-
Помогают избежать использования жестко закодированных URL-адресов, номеров портов, конфиденциальной информации и других настроек.
-
Позволяют вынести конфигурацию за пределы кода.
-
Делают код более гибким и удобным для повторного использования.
Component properties
Additional Properties
-
Дополнительные свойства для компонентов Debezium, если они не могут быть заданы напрямую в конфигурациях Camel (например, настройка свойств Kafka Connect, необходимых для движка Debezium, таких как KafkaOffsetBackingStore). Эти свойства должны иметь префикс additionalProperties.
-
По умолчанию:
-
Тип: Map
Capture Mode
-
Метод, используемый для захвата изменений с сервера MongoDB. Доступные варианты:
-
change_streams -- захват изменений через MongoDB Change Streams, события обновления не содержат полных документов.
-
change_streams_update_full (по умолчанию) -- захват изменений через MongoDB Change Streams, события обновления содержат полные документы.
-
-
По умолчанию: change_streams_update_full
-
Тип: String
Collection Exclude List
-
Максимальный размер каждого пакета исходных записей. По умолчанию равен 2048.
-
По умолчанию:
-
Тип: String
Collection Include List
-
Шаблоны регулярных выражений для включения столбцов в события изменения.
-
По умолчанию:
-
Тип: String
Converters
-
Тип: Необязательный список пользовательских converters, заменяющих стандартные. Определяются через параметр '.type' и настраиваются с помощью опций через '.'.
-
По умолчанию:
-
Тип: String
Cursor Max Await Time Ms
-
Максимальное время обработки в миллисекундах, которое отводится на ожидание обработки курсором oplog одного запроса на опрос.
-
По умолчанию:
-
Тип: int
Database Exclude List
-
Тип: Список регулярных выражений или литералов, разделённых запятой, которые соответствуют именам database, изменения в которых должны быть исключены.
-
По умолчанию:
-
Тип: String
Database Include List
-
Список регулярных выражений или литералов, разделённых запятой, которые соответствуют именам database, изменения в которых должны быть зафиксированы.
-
По умолчанию:
-
Тип: String
Event Processing Failure Handling Mode
-
Тип: Укажите, как следует обрабатывать ошибки, возникающие во время обработки событий (например, при обнаружении повреждённого события). Доступные варианты:
-
fail (по умолчанию) -- вызывается исключение с указанием проблемного события и его позиции, что приводит к остановке коннектора.
-
warn -- проблемное событие и его позиция будут зафиксированы в логах, а событие будет пропущено.
-
ignore -- проблемное событие будет пропущено без уведомлений.
-
-
По умолчанию: fail
-
Тип: String
Field Exclude List
-
Список fully-qualified имён полей, разделённых запятыми, которые должны быть исключены из значений сообщений о событиях изменений.
-
По умолчанию:
-
Тип: String
Field Renames
-
Список fully-qualified замен полей, разделённых запятыми, которые должны использоваться для переименования полей в значениях сообщений о событиях изменений. Полные замены для полей имеют вид databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, где databaseName и collectionName могут содержать символы подстановки (), которые соответствуют любым символам, а символ двоеточия (:) используется для определения отображения переименования поля.
-
По умолчанию:
-
Тип: String
Heartbeat Interval Ms
-
Тип: Длина интервала в миллисекундах, через который коннектор периодически отправляет heartbeat-сообщения в тему heartbeat. Используйте 0, чтобы отключить отправку heartbeat-сообщений. По умолчанию отключено.
-
По умолчанию: 0ms
-
Тип: int
Heartbeat Topics Prefix
-
Префикс, который используется для названия тем heartbeat. По умолчанию используется __debezium-heartbeat.
-
По умолчанию: __debezium-heartbeat
-
Тип: String
Internal Key Converter
-
Класс конвертера, который должен использоваться для сериализации и десериализации данных ключей для смещений (offsets). По умолчанию используется JSON-конвертер.
-
По умолчанию: org.apache.kafka.connect.json.JsonConverter
-
Тип: String
Internal Value Converter
-
Класс конвертера, который должен использоваться для сериализации и десериализации данных значений для смещений. По умолчанию используется JSON-конвертер.
-
По умолчанию: org.apache.kafka.connect.json.JsonConverter
-
Тип: String
Max Batch Size
-
Максимальный размер каждого пакета исходных записей. По умолчанию равен 2048.
-
По умолчанию: 2048
-
Тип: int
Max Queue Size
-
Максимальный размер очереди для событий изменений, считанных из журнала database, но ещё не записанных или переданных. По умолчанию равен 8192 и всегда должен быть больше максимального размера пакета.
-
По умолчанию: 8192
-
Тип: int
Max Queue Size In Bytes
-
Максимальный размер очереди в байтах для событий изменений, считанных из журнала database, но ещё не записанных или переданных. По умолчанию равен 0, что означает, что функция отключена.
-
По умолчанию: 0
-
Тип: long
Mongodb Authsource
-
Database, содержащая учётные данные пользователей.
-
По умолчанию: admin
-
Тип: String
Mongodb Connect Timeout Ms
-
Время ожидания подключения в миллисекундах. По умолчанию 10 000 мс (10 секунд).
-
По умолчанию: 10s
-
Тип: int
Mongodb Connection Mode
Mongodb Connection String
-
Строка подключения к database.
-
По умолчанию:
-
Тип: String
Mongodb Heartbeat Frequency Ms
-
Тип: Частота, с которой мониторинг кластера пытается связаться с каждым сервером. По умолчанию -- 10 секунд (10 000 мс).
-
По умолчанию: 10s
-
Тип: int
Mongodb Password
-
Пароль, используемый для подключения к MongoDB (при необходимости).
-
По умолчанию:
-
Тип: String
Mongodb Poll Interval Ms
-
Частота проверки изменений в наборах реплик, указанная в миллисекундах. По умолчанию -- 30 секунд (30 000 мс).
-
По умолчанию: 30s
-
Тип: long
Mongodb Server Selection Timeout Ms
-
Время ожидания выбора сервера в миллисекундах. По умолчанию -- 10 секунд (10 000 мс).
-
По умолчанию: 30s
-
Тип: int
Mongodb Socket Timeout Ms
-
Время ожидания socket в миллисекундах. Значение по умолчанию -- 0 мс.
-
По умолчанию: 0ms
-
Тип: int
Mongodb Ssl Enabled
-
Определяет необходим ли SSL для подключения коннектора к экземплярам MongoDB
-
По умолчанию: false
-
Тип: boolean
Mongodb Ssl Invalid Hostname Allowed
-
Определяет можно ли использовать недействительные имена хостов при SSL-соединении. Если да, соединение будет подвержено атакам "man-in-the-middle".
-
По умолчанию: false
-
Тип: boolean
Mongodb User
-
Имя пользователя database для подключения к MongoDB, при необходимости.
-
По умолчанию:
-
Тип: String
Name
-
Обязательное уникальное имя для коннектора. Попытка повторной регистрации с тем же именем завершится ошибкой.
-
По умолчанию:
-
Тип: String
Notification Enabled Channels
-
Тип: Список названий каналов уведомлений, которые включены.
-
По умолчанию:
-
Тип: String
Notification Sink Topic Name
-
Название топика для уведомлений. Это поле обязательно, если 'sink' находится в списке включённых каналов.
-
По умолчанию:
-
Тип: String
Offset Commit Policy
-
Тип: Имя Java-класса commit policy. Оно определяет, когда должна выполняться offsets commit на основе количества обработанных событий и времени, прошедшего с момента последнего commit. Этот класс должен реализовывать интерфейс 'OffsetCommitPolicy'. По умолчанию используется периодическая commit policy, основанная на временных интервалах.
-
По умолчанию:
-
Тип: String
Offset Commit Timeout Ms
-
Максимальное время (в миллисекундах) ожидания завершения сброса записей и фиксации смещения разделов в хранилище, после чего процесс будет отменён, а смещения восстановлены для следующей попытки. Значение по умолчанию 5 секунд.
-
По умолчанию: 5000
-
Тип: long
Offset Flush Interval Ms
-
Интервал, с которым выполняется попытка фиксации смещений. По умолчанию равен 1 минуте.
-
По умолчанию: 60000
-
Тип: long
Offset Storage
-
Имя Java-класса, который отвечает за сохранение смещений коннектора.
-
По умолчанию: org.apache.kafka.connect.storage.FileOffsetBackingStore
-
Тип: String
Offset Storage File Name
-
Указывает путь к файлу, где сохраняются смещения. Требуется при настройке offset.storage на FileOffsetBackingStore.
-
По умолчанию:
-
Тип: String
Offset Storage Partitions
-
Определяет количество партиций топика, используемого для хранения смещений. Необходимо указать при использовании KafkaOffsetBackingStore.
-
По умолчанию:
-
Тип: int
Offset Storage Replication Factor
-
Тип: Количество реплик, создаваемых для топика хранения смещений. Необходимо при настройке offset.storage на KafkaOffsetBackingStore.
-
По умолчанию:
-
Тип: int
Offset Storage Topic
-
Название Kafka-топика, в котором будут храниться смещения. Обязательно, если offset.storage установлен в KafkaOffsetBackingStore.
-
По умолчанию:
-
Тип: String
Poll Interval Ms
-
Тип: Время задержки перед повторной проверкой новых событий изменений в случае их отсутствия, в миллисекундах. По умолчанию 500 мс.
-
По умолчанию: 500ms
-
Тип: long
Provide Transaction Metadata
-
Позволяет извлекать метаданные транзакций и одновременно вести учёт событий.
-
По умолчанию: false
-
Тип: boolean
Query Fetch Size
-
Определяет максимальное число записей, загружаемых в оперативную память при потоковой обработке. При значении 0 применяется стандартный размер выборки JDBC.
-
По умолчанию: 0
-
Тип: int
Retriable Restart Connector Wait Ms
-
Время ожидания перед перезапуском коннектора после возникновения повторяемой ошибки. По умолчанию 10000 мс.
-
По умолчанию: 10s
-
Тип: long
Schema History Internal File Filename
-
Путь к файлу для сохранения истории изменений схемы database.
-
По умолчанию:
-
Тип: String
Schema Name Adjustment Mode
-
Указывает, как имена схем должны быть изменены для совместимости с преобразователем сообщений, используемым коннектором, включая следующие варианты:
-
avro -- заменяет символы, которые не могут быть использованы в имени типа Avro, на подчеркивание.
-
avro_unicode -- заменяет подчеркивание или символы, которые не могут быть использованы в имени типа Avro, на соответствующие юникодные символы, например, _uxxxx. Примечание: _ -- это escape-последовательность, как обратная косая черта в Java.
-
none -- не применяется никаких изменений (по умолчанию).
-
-
По умолчанию: none
-
Тип: String
Signal Data Collection
-
Название data collection, которая используется для отправки сигналов/команд в Debezium. Сигнализация отключена, если не задано.
-
По умолчанию:
-
Тип: String
Signal Enabled Channels
-
Список имен каналов, которые включены. Канал источника включен по умолчанию.
-
По умолчанию: source
-
Тип: String
Signal Poll Interval Ms
-
Интервал для поиска новых сигналов в зарегистрированных каналах, указанный в миллисекундах. По умолчанию -- 5 секунд.
-
По умолчанию: 5s
-
Тип: long
Skipped Operations
-
Список операций, которые нужно пропустить при стриминге, разделённый запятыми:
-
c для вставок/созданий
-
u для обновлений
-
d для удалений
-
t для усечений
-
none для отсутствия пропуска.
По умолчанию пропускаются только операции усечения.
-
-
По умолчанию: t
-
Тип: String
Snapshot Collection Filter Overrides
-
Это свойство содержит список, разделённый запятыми, для которого snapshot может быть подмножеством данных из источника данных. Это подмножество будет определяться фильтром MongoDB, который указывается как значение для свойства snapshot.collection.filter.override..
-
По умолчанию:
-
Тип: String
Snapshot Delay Ms
-
Задержка перед началом создания snapshot, в миллисекундах. По умолчанию 0 мс.
-
По умолчанию: 0ms
-
Тип: long
Snapshot Fetch Size
-
Максимальное количество записей, которые можно загрузить в память при выполнении snapshot.
-
По умолчанию:
-
Тип: int
Snapshot Include Collection List
-
Этот параметр должен быть задан, чтобы указать список таблиц/коллекций, для которых необходимо выполнить snapshot при создании или перезапуске коннектора.
-
По умолчанию:
-
Тип: String
Snapshot Max Threads
-
Тип: Максимальное количество потоков, используемых для выполнения snapshot. По умолчанию 1.
-
По умолчанию: 1
-
Тип: int
Snapshot Mode
-
Критерии для выполнения snapshot при запуске коннектора. Выберите одну из следующих опций snapshot:
-
initial (по умолчанию) - Если коннектор не обнаруживает смещения для логического имени сервера, он выполняет snapshot, который захватывает текущее полное состояние настроенных таблиц. После завершения snapshot коннектор начинает потоковую передачу изменений из oplog.
-
never - Коннектор не выполняет snapshot. При первом запуске коннектор сразу начинает читать с начала oplog.
-
-
По умолчанию: initial
-
Тип: String
Sourceinfo Struct Maker
-
Имя класса SourceInfoStructMaker, который возвращает схему и структуру SourceInfo.
-
По умолчанию: io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker
-
Тип: String
Tombstones On Delete
-
Определяет, должны ли операции удаления представляться событием удаления и последующим событием tombstone (true) или только событием удаления (false). Генерация события tombstoneя (поведение по умолчанию) позволяет Kafka полностью удалить все события, связанные с данным ключом, после удаления исходной записи.
-
По умолчанию: false
-
Тип: boolean
Topic Naming Strategy
-
Имя класса TopicNamingStrategy, отвечающего за определение топика для изменений данных, схемы, транзакций, heartbeat-событий и др.
-
По умолчанию: io.debezium.schema.SchemaTopicNamingStrategy
-
Тип: String
Topic Prefix
-
Требуемый префикс топика, который идентифицирует и задаёт пространство имён для конкретного сервера или кластера базы данных, фиксирующего изменения.
Примечание: Префикс топика должен быть уникальным среди всех других коннекторов, так как он используется в качестве префикса для всех Kafka-топиков, принимающих события от данного коннектора. Разрешены только буквенно-цифровые символы, дефисы, точки и подчеркивания.
-
По умолчанию:
-
Тип: String
Component advanced properties
Bridge Error Handler
-
Предоставляет возможность связать потребителя с маршрутизатором Error Handler Camel. Это позволяет обрабатывать исключения (если возможно), возникающие при получении входящих сообщений потребителем Camel, в виде сообщений, передаваемых в маршрутизатор Error Handler.
Важно: Это возможно только в том случае, если сторонний компонент позволяет Camel получать уведомления о возникших исключениях. Некоторые компоненты обрабатывают исключения только внутренне, из-за чего использование bridgeErrorHandler становится невозможным. В некоторых случаях мы можем доработать компонент Camel, чтобы интегрировать его со сторонним компонентом и сделать такую возможность доступной в будущих выпусках. По умолчанию потребитель использует org.apache.camel.spi.ExceptionHandler для обработки исключений, которые логируются с уровнем WARN или ERROR и игнорируются.
-
По умолчанию: false
-
Тип: boolean
Exception Handler
-
Позволяет потребителю использовать собственный ExceptionHandler.
Обратите внимание, что если включена опция bridgeErrorHandler, то эта настройка не используется. По умолчанию потребитель обрабатывает исключения, которые логируются с уровнем WARN или ERROR и игнорируются.
-
По умолчанию:
-
Тип: ExceptionHandler
Exchange Pattern
-
Определяет шаблон взаимодействия, который применяется, когда потребитель создает новый обмен (exchange).
Возможные значения:
-
InOnly
-
InOut
-
-
По умолчанию:
-
Тип: ExchangePattern