Using MongoDB 3.6 Change Streams

MongoDB 3.6 Change Streams

In this blog post, we will explore MongoDB 3.6 change streams – a powerful new feature that added in the latest version of MongoDB.

MongoDB 3.6 Change Streams

Streaming data workflows are becoming increasingly popular in open-source due to their ability to allow efficient, asynchronous, near-real-time updates that benefit users becoming more and more accustomed to real-time experiences in technology.

Before the rise of streaming workflows and open-source streaming frameworks such as Akka, Spark and Storm, it was much more common to process data in what is called a “batch-processing” workflow. Here, potentially massive amounts of data are queried and processed in a large batch, often once or a few times daily. This processing style has the drawbacks of operating on data AFTER it was written/changed, the inefficiencies caused by querying large amounts of data in a single instance, not to mention the latency in receiving results when doing so.

Stream workflows benefit from the high-efficiency of processing at change-time (usually asynchronously) while also providing more up-to-date results as a free side effect. These benefits make this approach popular in “real-time” user-facing systems like social media, gaming and trading and even backend ETL systems.

Background

Before MongoDB 3.6, the most common way to implement a “change stream”-like data system in MongoDB was by using tailable cursors to “watch” the MongoDB replication oplog.

The use of the oplog requires that you enable replication, whether or not you have a single node. The tailable cursor method is possible as the replication oplog is a queriable capped-collection in MongoDB, available to any MongoDB shell or driver.

The drawbacks to using the oplog as a change stream source are:

  1. Server-wide Changelog. As the oplog is intended for replication purposes, it represents ALL changes to data in a server. Chances are the stream of changes you’re interested is limited to one or a handful of collections, not every change in the entire server/replica-set! This usually means an application that “tails” the oplog must read and throw away entries it doesn’t care about as it processes the stream. This is inefficient in processing and network usage as many oplog changes are not useful but still processed.
  2. Tailable Cursors. While creating a tailable cursor is possible in virtually any MongoDB driver, it’s often not the most friendly thing to code and generally drivers do not have a singular “helper function” to do so. Usually, the application developer needs to use loops and handle various conditions that may occur in the tailing of the tailable cursor. Remember, more code generally equals more opportunities for bugs.
  3. Internal Oplog Format. Designed for “internal” server use, the oplog format is optimized for efficiency and uses obscure field names to describe changes. Also, theoretically the oplog format could change in the future. These two problems can lead to increased code maintenance.

MongoDB 3.6 Change Streams

MongoDB 3.6 added “Change Streams”, handled via the new collection-level method named “db.collection.watch()”. This function opens a Change Stream Cursor on a given collection, returning changes to the collection in a predictable format called Change Events.

An example of a change returned by this new feature (due to an insert to “test.test” in another session):

rs0:PRIMARY> db.test.watch().pretty()
{
	"_id" : {
		"_data" : BinData(0,"glqW/CsAAAABRmRfaWQAZFqW/CukzAygJGLkVwBaEASV+CeIpHBBKKVaH0KcDV5OBA==")
	},
	"operationType" : "insert",
	"fullDocument" : {
		"_id" : ObjectId("5a96fc2ba4cc0ca02462e457"),
		"x" : 1
	},
	"ns" : {
		"db" : "test",
		"coll" : "test"
	},
	"documentKey" : {
		"_id" : ObjectId("5a96fc2ba4cc0ca02462e457")
	}
}
...

The “db.collection.watch()” function takes in an aggregation pipeline as it’s first optional field and a document of “options” as the second optional field. Passing no fields to the function causes it to perform no aggregation and use default options.

“db.collection.watch()” supports the following aggregation functions to be passed as an optional pipeline:

  1. $match
  2. $project
  3. $addFields
  4. $replaceRoot
  5. $redact

Similar to the pre-3.6 method described earlier, the change stream feature requires you enable replication, and the operation errors if it is not. If you run a standalone server, you can still enable replication with a single member only.

The benefits of the new feature are numerous:

  1. Collection-Level. Streaming of changes can now occur on a per-collection basis (not a server-wide basis!). Further filtering is possible via passing a $match aggregation to the db.collection.watch() function.
  2. Efficient Processing. Collection and $match-level filtering mean only relevant changes are returned to the application instead of every change occurring in the server, reducing processing and network usage.
  3. Change Event Format. Changes are presented as Change Events, not internal-replication oplog entries.
  4. Simplified Approach. Application developers have less code to maintain due to moving a lot of the logic required to implement “tailable cursors” server-side.
  5. Majority Read Concern. The change streams feature uses Majority Read Concern, meaning changes returned are guaranteed to be durable following a replica set rollback. This is fantastic for data integrity!

Resuming Change Streams

By default, Change Streams will stop on error or if no changes occurred in the default timeout of 1000 milliseconds, this timeout could be overridden using the ‘maxAwaitTimeMS’ option to your db.collection.watch() operation.

This behavior means Change Streams sometimes need to be resumed from the last successful change. Resuming change streams from the last successful change can be done by passing the ‘_id’ of the last event read as the ‘resumeAfter’ option to your db.collection.watch() operation.

Production Considerations

As change streams use MongoDB’s replication technologies behind the scenes, there are some things to consider in production:

  1. Majority Read Concern. Change streams require a majority of data-bearing members to be alive, meaning streams may pause if you lose a majority of members or if the majority is relying on an arbiter.
  2. Oplog Size. The oplog must be large enough for the stream events to exist until the time of processing by the application.
  3. Drop / Rename Collection. Change streams on dropped or renamed collections receive an error, breaking the stream.

In a sharded cluster, there are additional items to consider:

  1. Fan-Out Stream. To maintain total ordering, the change stream is executed on all shards in a cluster and is as fast as the slowest shard.
  2. Multi-Updates. Under sharding, failed multi-document updates can sometimes create change events for orphaned documents. Try to avoid using multi updates if this is important. This problem is fixable in MongoDB 4.0+ via ACID-compliant transactions.

Use Case: Email Sender

Let’s pretend we have an email system that uses MongoDB as the source of truth. A single email is a single MongoDB document in the collection: “email.emails”.

In our example, we must write an application that sends our emails (over SMTP) when the “sendNow” boolean field is set to “true” in the emails collection document.

When an email is ready to be sent, the application issues this update on a single ‘_id’ field:

> db.emails.update(
    { "_id": ObjectId("5a97fdd4a4cc0ca02462e45c") },
    { $set: { sendNow: true } }
  )
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

Using change streams, we can “watch” the emails collection for update operations that match our criteria!

Below I have created a change stream that uses an aggregation. The aggregation matches change events containing the “update” operationType and the field “sendNow” is set to “true” in the update. We store updated fields under the “updateDescription.updatedFields” sub-document in Change Events, so the full name for the “sendNow” field becomes: “updateDescription.updatedFields.sendNow”.

As our sender application only needs the document key to query an email for sending, I added a $project aggregation step to strip-down the result to only the Change Event “documentKey” plus the “_id” field that is returned by default.

The result is this operation:

> db.emails.watch([
    { $match: {
      "operationType": "update",
      "updateDescription.updatedFields.sendNow": true
    } },
    { $project: {
      documentKey: 1
    } }
  ]).pretty()
...
{
	"_id" : {
		"_data" : BinData(0,"glqYAA4AAAABRmRfaWQAZFqX/dSkzAygJGLkXABaEARxbXGgdm5K9ZnzwSfCfmNbBA==")
	},
	"documentKey" : {
		"_id" : ObjectId("5a97fdd4a4cc0ca02462e45c")
	}
}
...

Now, when emails get marked as “sendNow” we have a stream of document keys that are ready to be sent immediately!

This makes a very intuitive and responsive workflow! In this case, our email sender now knows it can send the email with ‘_id’ of ObjectId(“5a97fdd4a4cc0ca02462e45c”)!

Use Case: Backend Synchronization

Often large infrastructures have several data-related components that require synchronization. Some examples are caching tiers (Redis, Memcache, etc.), search engines (Apache Solr, Elasticsearch, etc.) and backend analytics systems.

Change streams make it easy for systems other than MongoDB to “hook into” a real-time stream of events, making synchronization of several backends easy. Used correctly, using this feature can also remove or reduce dependencies/reliance on message queues.

Some ideas this brings to mind:

  • Caching tiers pre-emptively cache data based on change events
  • Search Engines index important data based on change events
  • Replicating changes to an incompatible backend data stores (business analytics, cold-storage, etc.)

Conclusion

I hope this article gives you some ideas on how to use MongoDB 3.6 change streams, a powerful new feature!

关注dbDao.com的新浪微博

扫码加入微信Oracle小密圈,了解Oracle最新技术下载分享资源

TEL/電話+86 13764045638
Email service@parnassusdata.com
QQ 47079569