Skip to main content

Command Palette

Search for a command to run...

Change Streams in MongoDB

Updated
3 min read
Change Streams in MongoDB

MongoDB Change Streams enable applications to receive real-time notifications whenever data changes occur in the database.

Whenever a document in a MongoDB collection is:

  • Inserted

  • Updated

  • Deleted

MongoDB immediately generates an event that your application can listen to and process.  This allows applications to respond instantly without the need for polling or scheduled jobs.

Change Streams support aggregation pipelines, allowing you to filter events and listen only to what matters.

Examples:

  • Listen only to insert operations

  • React only when a specific field is updated

  • Track changes in selected collections

Change Streams Architecture:

Change Streams are built on top of MongoDB’s oplog (operations log).

  • The oplog records every write operation in the database

  • Change Streams read from the oplog and convert operations into structured events

  • Applications subscribe to these events and react in real time

Change Streams work only on

  • Replica Sets

  • Sharded Clusters

They are not supported on standalone MongoDB instances because the oplog is required.

Use Cases:

  • Get alerts instantly when data changes

  • Update dashboards automatically without refresh

  • Notify other services when something changes in the database

  • Refresh or clear the cache when data is updated

  • Create audit logs of all CRUD operations

  • Sync data from MongoDB to other databases/systems

  • Automate tasks based on database changes (e.g., order → send email)

Implementation:

const { MongoClient } = require("mongodb");

const connectionString = '';

async function watchChanges() {
  const client = new MongoClient(connectionString);
  await client.connect();

  const db = client.db("Books");
  const collection = db.collection("book");

 const changeStream = collection.watch();
 changeStream.on('change', (next) => {
     console.log('Change detected:', next);
  })
}

watchChanges();

Output:

Handle Failure:

If a failure occurs due to:

  • Network issues

  • Application crashes

  • Temporary MongoDB unavailability

The Change Stream can be restarted using the stored resume token, ensuring no data loss and no duplicate processing.

Each Change Stream event includes a resume token, which represents the exact position of that event in the oplog. By storing this token, applications can resume listening from the last processed event instead of starting over.

_id in the change stream response is the resume token of every operation.

let resumeToken = null;
async function watchChanges() {
    const client = new MongoClient(connectionString);
    await client.connect();
  
    const db = client.db("Books");
    const collection = db.collection("book");
    const pipeline = [];
  
    const changeStream = resumeToken
    ? collection.watch(pipeline, { resumeAfter: resumeToken })
    : collection.watch(pipeline);

   changeStream.on('change', (next) => {
       console.log('Change detected:', next);
       resumeToken = next['_id']
    })
}
  
watchChanges();

Advantages:

🚀 Real-time data processing

🔄 No polling or cron jobs

📈 Scales with MongoDB clusters

🧩 Works seamlessly with aggregation pipelines

🔐 Reliable and fault-tolerant

Limitations & Considerations:

  • Requires a replica set or a sharded cluster

  • Events are retained only as long as the oplog exists

  • Atlas imposes limits on the number of concurrent Change Streams