MongoDB CDC
Last updated
Last updated
MongoDB CDC source connector
The MongoDB CDC connector allows for reading snapshot data and incremental data from MongoDB database.
MongoDB
universal
1.MongoDB version: MongoDB version >= 4.0.
2.Cluster deployment: replica sets or sharded clusters.
3.Storage Engine: WiredTiger Storage Engine.
4.Permissions:changeStream and read
The following table lists the field data type mapping from MongoDB BSON type to Nexus data type.
ObjectId
STRING
String
STRING
Boolean
BOOLEAN
Binary
BINARY
Int32
INTEGER
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL
Date
DATE
Timestamp
TIMESTAMP
Object
ROW
Array
ARRAY
For specific types in MongoDB, we use Extended JSON format to map them to Nexus STRING type.
Symbol
{"_value": {"$symbol": "12"}}
RegularExpression
{"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript
{"_value": {"$code": "function() { return 10; }"}}
DbPointer
{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}
Tips
1.When using the DECIMAL type in Nexus, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).
hosts
String
Yes
-
The comma-separated list of hostname and port pairs of the MongoDB servers. eg. localhost:27017,localhost:27018
username
String
No
-
Name of the database user to be used when connecting to MongoDB.
password
String
No
-
Password to be used when connecting to MongoDB.
database
List
Yes
-
Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. eg. db1,db2
.
collection
List
Yes
-
Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. eg. db1.coll1,db2.coll2
.
connection.options
String
No
-
The ampersand-separated connection options of MongoDB. eg. replicaSet=test&connectTimeoutMS=300000
.
batch.size
Long
No
1024
The cursor batch size.
poll.max.batch.size
Enum
No
1024
Maximum number of change stream documents to include in a single batch when polling for new data.
poll.await.time.ms
Long
No
1000
The amount of time to wait before checking for new results on the change stream.
heartbeat.interval.ms
String
No
0
The length of time in milliseconds between sending heartbeat messages. Use 0 to disable.
incremental.snapshot.chunk.size.mb
Long
No
64
The chunk size mb of incremental snapshot.
common-options
No
-
1.If the collection changes at a slow pace, it is strongly recommended to set an appropriate value greater than 0 for the heartbeat.interval.ms parameter. When we recover a Nexus job from a checkpoint or savepoint, the heartbeat events can push the resumeToken forward to avoid its expiration. 2.MongoDB has a limit of 16MB for a single document. Change documents include additional information, so even if the original document is not larger than 15MB, the change document may exceed the 16MB limit, resulting in the termination of the Change Stream operation. 3.It is recommended to use immutable shard keys. In MongoDB, shard keys allow modifications after transactions are enabled, but changing the shard key can cause frequent shard migrations, resulting in additional performance overhead. Additionally, modifying the shard key can also cause the Update Lookup feature to become ineffective, leading to inconsistent results in CDC (Change Data Capture) scenarios.
The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and prints it on the local client:
The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and write to mysql database:
The following example demonstrates how to create a data synchronization job that read the cdc data of multiple library tables mongodb and prints it on the local client:
1.The cdc synchronization of multiple library tables cannot specify the schema, and can only output json data downstream. This is because MongoDB does not provide metadata information for querying, so if you want to support multiple tables, all tables can only be read as one structure.
The following example demonstrates how to create a data synchronization job that through regular expression read the data of multiple library tables mongodb and prints it on the local client:
Prefix matching
^(test).*
Match the database name or table name with the prefix test, such as test1, test2, etc.
Suffix matching
.*[p$]
Match the database name or table name with the suffix p, such as cdcp, edcp, etc.
Source plugin common parameters, please refer to for details.