Change Streams in MongoDB

MongoDB Change Streams enable applications to receive real-time notifications whenever data changes occur in the database.
Whenever a document in a MongoDB collection is:
Inserted
Updated
Deleted
MongoDB immediately generates an event that your application can listen to and process. This allows applications to respond instantly without the need for polling or scheduled jobs.
Change Streams support aggregation pipelines, allowing you to filter events and listen only to what matters.
Examples:
Listen only to insert operations
React only when a specific field is updated
Track changes in selected collections
Change Streams Architecture:
Change Streams are built on top of MongoDB’s oplog (operations log).
The oplog records every write operation in the database
Change Streams read from the oplog and convert operations into structured events
Applications subscribe to these events and react in real time
Change Streams work only on
✅ Replica Sets
✅ Sharded Clusters
They are not supported on standalone MongoDB instances because the oplog is required.
Use Cases:
Get alerts instantly when data changes
Update dashboards automatically without refresh
Notify other services when something changes in the database
Refresh or clear the cache when data is updated
Create audit logs of all CRUD operations
Sync data from MongoDB to other databases/systems
Automate tasks based on database changes (e.g., order → send email)
Implementation:
const { MongoClient } = require("mongodb");
const connectionString = '';
async function watchChanges() {
const client = new MongoClient(connectionString);
await client.connect();
const db = client.db("Books");
const collection = db.collection("book");
const changeStream = collection.watch();
changeStream.on('change', (next) => {
console.log('Change detected:', next);
})
}
watchChanges();
Output:
Handle Failure:
If a failure occurs due to:
Network issues
Application crashes
Temporary MongoDB unavailability
The Change Stream can be restarted using the stored resume token, ensuring no data loss and no duplicate processing.
Each Change Stream event includes a resume token, which represents the exact position of that event in the oplog. By storing this token, applications can resume listening from the last processed event instead of starting over.
_id in the change stream response is the resume token of every operation.
let resumeToken = null;
async function watchChanges() {
const client = new MongoClient(connectionString);
await client.connect();
const db = client.db("Books");
const collection = db.collection("book");
const pipeline = [];
const changeStream = resumeToken
? collection.watch(pipeline, { resumeAfter: resumeToken })
: collection.watch(pipeline);
changeStream.on('change', (next) => {
console.log('Change detected:', next);
resumeToken = next['_id']
})
}
watchChanges();
Advantages:
🚀 Real-time data processing
🔄 No polling or cron jobs
📈 Scales with MongoDB clusters
🧩 Works seamlessly with aggregation pipelines
🔐 Reliable and fault-tolerant
Limitations & Considerations:
Requires a replica set or a sharded cluster
Events are retained only as long as the oplog exists
Atlas imposes limits on the number of concurrent Change Streams



