Selfuel Docs
  • Welcome to Selfuel Platform
    • Features
    • Capabilities
    • Target Audience
    • $150 Free Trial
  • Registration and Login
  • Platform UI
  • Stream Processing with Cortex
    • Cortex Quickstart Guide
    • Cortex Elements
      • Streams
      • Attributes
      • Mappings
        • 🚧Source Mapping Types
        • 🚧Sink Mapping Types
      • Node and Application Healthchecks
      • Nodes
        • Node Preview
        • Node Connectivites
        • Node Units
      • Expression Builder
        • 🚧Built-in Functions
      • Windows
        • Cron Window
        • Delay Window
        • Unique Event Window
        • First Event Window
        • Sliding Event Count Window
        • Tumbling Event Count Window
        • Session Window
        • Tumbling Event Sort Window
        • Sliding Time Window
        • Tumbling Time Window
        • Sliding Time and Event Count Window
      • Store and Cache
        • RDBMS
        • MongoDB
        • Redis
        • Elasticsearch
    • Applications
      • Applications Page
      • Creating Applications using Canvas
      • Connector Nodes Cluster
        • Source Nodes
          • CDC Source
          • Email Source
          • HTTP Source
          • HTTP Call Response Source
          • HTTP Service Source
          • Kafka Source
          • RabbitMQ Source
          • gRPC Source
          • JMS Source
          • Kafka Multi DC Source
          • JMS Source
          • AWS S3 Source
          • Google Pub-sub Source
          • AWS SQS Source
          • MQTT Source
          • Google Cloud Storage Source
          • HTTP SSE Source
          • WebSubHub Source
        • Sink Nodes
          • Email Sink
          • HTTP Sink
          • HTTP Service Response Sink
          • HTTP Call Sink
          • Kafka Sink
          • RabbitMQ Sink
          • gRPC Sink
          • JMS Sink
          • Kafka Multi DC Sink
          • AWS S3 Sink
          • Google Pub-sub Sink
          • AWS SQS Sink
          • MQTT Sink
          • Google Cloud Storage Sink
          • HTTP SSE Sink
          • WebSubHub Sink
      • Processing Nodes Cluster
        • Query
        • Join
        • Pattern
        • Sequence
        • Processor
        • 🚧On-demand Query
      • Buffer Nodes Cluster
        • Stream
        • Table
        • Window
        • Aggregation
        • Trigger
    • Run Applications
      • Run Applications Using Runners
      • Update Running Applications
      • Application Versioning
  • Data Integration with Nexus
    • Nexus Quickstart Guide
    • Nexus Elements
      • Concept
        • Config
        • Schema Feature
        • Speed Control
      • Connectors
        • Source
          • Source Connector Features
          • Source Common Options
          • AmazonDynamoDB
          • AmazonSqs
          • Cassandra
          • Clickhouse
          • CosFile
          • DB2
          • Doris
          • Easysearch
          • Elasticsearch
          • FakeSource
          • FtpFile
          • Github
          • Gitlab
          • GoogleSheets
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • HiveJdbc
          • Http
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Jira
          • Kingbase
          • Klaviyo
          • Kudu
          • Lemlist
          • Maxcompute
          • Milvus
          • MongoDB CDC
          • MongoDB
          • My Hours
          • MySQL CDC
          • MySQL
          • Neo4j
          • Notion
          • ObsFile
          • OceanBase
          • OneSignal
          • OpenMldb
          • Oracle CDC
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Persistiq
          • Phoenix
          • PostgreSQL CDC
          • PostgreSQL
          • Apache Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3File
          • SftpFile
          • Sls
          • Snowflake
          • Socket
          • SQL Server CDC
          • SQL Server
          • StarRocks
          • TDengine
          • Vertica
          • Web3j
          • Kafka
        • Sink
          • Sink Connector Features
          • Sink Common Options
          • Activemq
          • AmazonDynamoDB
          • AmazonSqs
          • Assert
          • Cassandra
          • Clickhouse
          • ClickhouseFile
          • CosFile
          • DB2
          • DataHub
          • DingTalk
          • Doris
          • Druid
          • INFINI Easysearch
          • Elasticsearch
          • Email
          • Enterprise WeChat
          • Feishu
          • FtpFile
          • GoogleFirestore
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • Http
          • Hudi
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Kafka
          • Kingbase
          • Kudu
          • Maxcompute
          • Milvus
          • MongoDB
          • MySQL
          • Neo4j
          • ObsFile
          • OceanBase
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Phoenix
          • PostgreSql
          • Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3Redshift
          • S3File
          • SelectDB Cloud
          • Sentry
          • SftpFile
          • Slack
          • Snowflake
          • Socket
          • SQL Server
          • StarRocks
          • TDengine
          • Tablestore
          • Vertica
        • Formats
          • Avro format
          • Canal Format
          • CDC Compatible Debezium-json
          • Debezium Format
          • Kafka source compatible kafka-connect-json
          • MaxWell Format
          • Ogg Format
        • Error Quick Reference Manual
      • Transform
        • Transform Common Options
        • Copy
        • FieldMapper
        • FilterRowKind
        • Filter
        • JsonPath
        • LLM
        • Replace
        • Split
        • SQL Functions
        • SQL
    • Integrations
      • Integrations Page
      • Creating Integrations Using Json
    • Run Integrations
      • Run Integrations Using Runners
      • Integration Versioning
  • Batch Processing/Storage with Maxim
    • Maxim Quickstart Guide
    • Maxim Elements
    • Queries
    • Run Queries
  • Orchestration with Routines
    • Routines Quickstart Guide
    • Routines Elements
    • Routines
    • Run Routines
  • Runners
    • Runners Page
    • Create a Runner to Run Applications
  • Security
    • Vaults
      • Vaults Page
      • Create Vaults
        • Runner-level Vaults
        • Application-level Vaults
      • Edit and Delete Vaults
      • 🚧Utilizing Vaults in Applications and Runners
    • Certificates
      • Certificates Page
      • 🚧Utilizing Certificates in Applications
      • 🟨Setting Up Security Settings
  • Monitoring Performance
    • Dashboard
    • Application Details
    • Runner Details
  • Logging
    • Log Types
  • Cost Management
    • SaaS
      • Pay-as-you-go
        • Hard Budget Cap
        • Soft Budget Cap
      • Subscriptions
    • On-prem
  • Organization Settings
    • General
    • Access Controls
      • User Roles and Privileges
    • Current Costs
    • Billing Addresses
    • Payment Accounts
    • Subscriptions
    • Pricing
    • Invoicing
  • User Settings
  • Troubleshooting
  • FAQs
Powered by GitBook
On this page
  • Key Features​
  • Description​
  • Data Type Mapping​
  • Source Options​
  • How to Create a MongoDB Data Synchronization Jobs​
  • Parameter Interpretation​
  1. Data Integration with Nexus
  2. Nexus Elements
  3. Connectors
  4. Source

MongoDB

PreviousMongoDB CDCNextMy Hours

Last updated 8 months ago

MongoDB Source Connector

Key Features

Description

The MongoDB Connector provides the ability to read and write data from and to MongoDB. This document describes how to set up the MongoDB connector to run data reads against MongoDB.

Data Type Mapping

The following table lists the field data type mapping from MongoDB BSON type to Nexus data type.

MongoDB BSON type
Nexus Data type

ObjectId

STRING

String

STRING

Boolean

BOOLEAN

Binary

BINARY

Int32

INTEGER

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Date

Date

Timestamp

Timestamp

Object

ROW

Array

ARRAY

For specific types in MongoDB, we use Extended JSON format to map them to Nexus STRING type.

MongoDB BSON type
Nexus STRING

Symbol

{"_value": {"$symbol": "12"}}

RegularExpression

{"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}

JavaScript

{"_value": {"$code": "function() { return 10; }"}}

DbPointer

{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}

Tips

1.When using the DECIMAL type in Nexus, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).

Name
Type
Required
Default
Description

uri

String

Yes

-

The MongoDB standard connection uri. eg. mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.

database

String

Yes

-

The name of MongoDB database to read or write.

collection

String

Yes

-

The name of MongoDB collection to read or write.

schema

String

Yes

-

MongoDB's BSON and Nexus data structure mapping.

match.query

String

No

-

In MongoDB, filters are used to filter documents for query operations.

match.projection

String

No

-

In MongoDB, Projection is used to control the fields contained in the query results.

partition.split-key

String

No

_id

The key of Mongodb fragmentation.

partition.split-size

Long

No

64 1024 1024

The size of Mongodb fragment.

cursor.no-timeout

Boolean

No

true

MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that. However, if the application takes longer than 30 minutes to process the current batch of documents, the session is marked as expired and closed.

fetch.size

Int

No

2048

Set the number of documents obtained from the server for each batch. Setting the appropriate batch size can improve query performance and avoid the memory pressure caused by obtaining a large amount of data at one time.

max.time-min

Long

No

600

This parameter is a MongoDB query option that limits the maximum execution time for query operations. The value of maxTimeMin is in Minute. If the execution time of the query exceeds the specified time limit, MongoDB will terminate the operation and return an error.

flat.sync-string

Boolean

No

true

By utilizing flatSyncString, only one field attribute value can be set, and the field type must be a String. This operation will perform a string mapping on a single MongoDB data entry.

common-options

No

-

1.The parameter match.query is compatible with the historical old version parameter matchQuery, and they are equivalent replacements.

The following example demonstrates how to create a data synchronization job that reads data from MongoDB and prints it on the local client:

# Set the basic configuration of the task to be performed
env {
  parallelism = 1
  job.mode = "BATCH"
}

# Create a source to connect to Mongodb
source {
  MongoDB {
    uri = "mongodb://user:password@127.0.0.1:27017"
    database = "test_db"
    collection = "source_table"
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_int = int
        c_bigint = bigint
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_int = int
          c_bigint = bigint
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
  }
}

# Console printing of the read Mongodb data
sink {
  Console {
    parallelism = 1
  }
}

Unauthenticated single node connection:

mongodb://192.168.0.100:27017/mydb

Replica set connection:

mongodb://192.168.0.100:27017/mydb?replicaSet=xxx

Authenticated replica set connection:

mongodb://admin:password@192.168.0.100:27017/mydb?replicaSet=xxx&authSource=admin

Multi-node replica set connection:

mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb?replicaSet=xxx

Sharded cluster connection:

mongodb://192.168.0.100:27017/mydb

Multiple mongos connections:

mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb

Note: The username and password in the URI must be URL-encoded before being concatenated into the connection string.

In data synchronization scenarios, the matchQuery approach needs to be used early to reduce the number of documents that need to be processed by subsequent operators, thus improving performance. Here is a simple example of a Nexus using match.query

source {
  MongoDB {
    uri = "mongodb://user:password@127.0.0.1:27017"
    database = "test_db"
    collection = "orders"
    match.query = "{status: \"A\"}"
    schema = {
      fields {
        id = bigint
        status = string
      }
    }
  }
}

The following are examples of MatchQuery query statements of various data types:

# Query Boolean type
"{c_boolean:true}"
# Query string type
"{c_string:\"OCzCj\"}"
# Query the integer
"{c_int:2}"
# Type of query time
"{c_date:ISODate(\"2023-06-26T16:00:00.000Z\")}"
# Query floating point type
{c_double:{$gte:1.71763202185342e+308}}

In MongoDB, Projection is used to control which fields are included in the query results. This can be accomplished by specifying which fields need to be returned and which fields do not. In the find() method, a projection object can be passed as a second argument. The key of the projection object indicates the fields to include or exclude, and a value of 1 indicates inclusion and 0 indicates exclusion. Here is a simple example, assuming we have a collection named users:

# Returns only the name and email fields
db.users.find({}, { name: 1, email: 0 });

In data synchronization scenarios, projection needs to be used early to reduce the number of documents that need to be processed by subsequent operators, thus improving performance. Here is a simple example of a Nexus using projection:

source {
  MongoDB {
    uri = "mongodb://user:password@127.0.0.1:27017"
    database = "test_db"
    collection = "users"
    match.projection = "{ name: 1, email: 0 }"
    schema = {
      fields {
        name = string
      }
    }
  }
}

To speed up reading data in parallel source task instances, Nexus provides a partitioned scan feature for MongoDB collections. The following partitioning strategies are provided. Users can control data sharding by setting the partition.split-key for sharding keys and partition.split-size for sharding size.

source {
  MongoDB {
    uri = "mongodb://user:password@127.0.0.1:27017"
    database = "test_db"
    collection = "users"
    partition.split-key = "id"
    partition.split-size = 1024
    schema = {
      fields {
        id = bigint
        status = string
      }
    }
  }
}

By utilizing flat.sync-string, only one field attribute value can be set, and the field type must be a String. This operation will perform a string mapping on a single MongoDB data entry.

env {
  parallelism = 10
  job.mode = "BATCH"
}
source {
  MongoDB {
    uri = "mongodb://user:password@127.0.0.1:27017"
    database = "test_db"
    collection = "users"
    flat.sync-string = true
    schema = {
      fields {
        data = string
      }
    }
  }
}
sink {
  Console {}
}

Use the data samples synchronized with modified parameters, such as the following:

{
  "_id":{
    "$oid":"643d41f5fdc6a52e90e59cbf"
  },
  "c_map":{
    "OQBqH":"jllt",
    "rkvlO":"pbfdf",
    "pCMEX":"hczrdtve",
    "DAgdj":"t",
    "dsJag":"voo"
  },
  "c_array":[
    {
      "$numberInt":"-865590937"
    },
    {
      "$numberInt":"833905600"
    },
    {
      "$numberInt":"-1104586446"
    },
    {
      "$numberInt":"2076336780"
    },
    {
      "$numberInt":"-1028688944"
    }
  ],
  "c_string":"bddkzxr",
  "c_boolean":false,
  "c_tinyint":{
    "$numberInt":"39"
  },
  "c_smallint":{
    "$numberInt":"23672"
  },
  "c_int":{
    "$numberInt":"-495763561"
  },
  "c_bigint":{
    "$numberLong":"3768307617923954543"
  },
  "c_float":{
    "$numberDouble":"5.284220288280258E37"
  },
  "c_double":{
    "$numberDouble":"1.1706091642478246E308"
  },
  "c_bytes":{
    "$binary":{
      "base64":"ZWJ4",
      "subType":"00"
    }
  },
  "c_date":{
    "$date":{
      "$numberLong":"1686614400000"
    }
  },
  "c_decimal":{
    "$numberDecimal":"683265300"
  },
  "c_timestamp":{
    "$date":{
      "$numberLong":"1684283772000"
    }
  },
  "c_row":{
    "c_map":{
      "OQBqH":"cbrzhsktmm",
      "rkvlO":"qtaov",
      "pCMEX":"tuq",
      "DAgdj":"jzop",
      "dsJag":"vwqyxtt"
    },
    "c_array":[
      {
        "$numberInt":"1733526799"
      },
      {
        "$numberInt":"-971483501"
      },
      {
        "$numberInt":"-1716160960"
      },
      {
        "$numberInt":"-919976360"
      },
      {
        "$numberInt":"727499700"
      }
    ],
    "c_string":"oboislr",
    "c_boolean":true,
    "c_tinyint":{
      "$numberInt":"-66"
    },
    "c_smallint":{
      "$numberInt":"1308"
    },
    "c_int":{
      "$numberInt":"-1573886733"
    },
    "c_bigint":{
      "$numberLong":"4877994302999518682"
    },
    "c_float":{
      "$numberDouble":"1.5353209063652051E38"
    },
    "c_double":{
      "$numberDouble":"1.1952441956458565E308"
    },
    "c_bytes":{
      "$binary":{
        "base64":"cWx5Ymp0Yw==",
        "subType":"00"
      }
    },
    "c_date":{
      "$date":{
        "$numberLong":"1686614400000"
      }
    },
    "c_decimal":{
      "$numberDecimal":"656406177"
    },
    "c_timestamp":{
      "$date":{
        "$numberLong":"1684283772000"
      }
    }
  },
  "id":{
    "$numberInt":"2"
  }
}

Source Options

Source plugin common parameters, please refer to for details

Tips

How to Create a MongoDB Data Synchronization Jobs

Parameter Interpretation

MongoDB Database Connection URI Examples

MatchQuery Scan

Please refer to how to write the syntax of match.query:

Projection Scan

Partitioned Scan

Flat Sync String

​
​
​
​
​
​
​
​
​
https://www.mongodb.com/docs/manual/tutorial/query-documents
​
​
​
Source Common Options