MongoDB
Last updated
Last updated
MongoDB Sink Connector
Tips
1.If you want to use CDC-written features, recommend enable the upsert-enable configuration.
The MongoDB Connector provides the ability to read and write data from and to MongoDB. This document describes how to set up the MongoDB connector to run data writers against MongoDB.
The following table lists the field data type mapping from MongoDB BSON type to Nexus data type.
STRING
ObjectId
STRING
String
BOOLEAN
Boolean
BINARY
Binary
INTEGER
Int32
TINYINT
Int32
SMALLINT
Int32
BIGINT
Int64
DOUBLE
Double
FLOAT
Double
DECIMAL
Decimal128
Date
Date
Timestamp
Timestamp[Date]
ROW
Object
ARRAY
Array
Tips
1.When using Nexus to write Date and Timestamp types to MongoDB, both will produce a Date data type in MongoDB, but the precision will be different. The data generated by the Nexus Date type has second-level precision, while the data generated by the Nexus Timestamp type has millisecond-level precision. 2.When using the DECIMAL type in Nexus, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).
uri
String
Yes
-
The MongoDB standard connection uri. eg. mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
database
String
Yes
-
The name of MongoDB database to read or write.
collection
String
Yes
-
The name of MongoDB collection to read or write.
schema
String
Yes
-
MongoDB's BSON and Nexus data structure mapping.
buffer-flush.max-rows
String
No
1000
Specifies the maximum number of buffered rows per batch request.
buffer-flush.interval
String
No
30000
Specifies the maximum interval of buffered rows per batch request, the unit is millisecond.
retry.max
String
No
3
Specifies the max number of retry if writing records to database failed.
retry.interval
Duration
No
1000
Specifies the retry time interval if writing records to database failed, the unit is millisecond.
upsert-enable
Boolean
No
false
Whether to write documents via upsert mode.
primary-key
List
No
-
The primary keys for upsert/update. Keys are in ["id","name",...]
format for properties.
transaction
Boolean
No
false
Whether to use transactions in MongoSink (requires MongoDB 4.2+).
common-options
No
-
1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters:
buffer-flush.max-rows
,buffer-flush.interval
, andcheckpoint.interval
. Data flushing will be triggered if any of these conditions are met. 2.Compatible with the historical parameterupsert-key
. Ifupsert-key
is set, please do not setprimary-key
.
The following example demonstrates how to create a data synchronization job that writes randomly generated data to a MongoDB database:
Unauthenticated single node connection:
Replica set connection:
Authenticated replica set connection:
Multi-node replica set connection:
Sharded cluster connection:
Multiple mongos connections:
Note: The username and password in the URI must be URL-encoded before being concatenated into the connection string.
Although MongoDB has fully supported multi-document transactions since version 4.2, it doesn't mean that everyone should use them recklessly. Transactions are equivalent to locks, node coordination, additional overhead, and performance impact. Instead, the principle for using transactions should be: avoid using them if possible. The necessity for using transactions can be greatly avoided by designing systems rationally.
By specifying a clear primary key and using the upsert method, exactly-once write semantics can be achieved.
If primary-key
and upsert-enable
is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes. In the event of a failure, Nexus jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.
Sink plugin common parameters, please refer to for details