Managing MongoDB Bulk Deletes and Inserts with Minimal Impact to Production Environments

MongoDB bulk deletes and inserts

MongoDB bulk deletes and insertsIn this blog post, we’ll look at how to manage MongoDB bulk deletes and inserts with little impact on production traffic.

If you are like me, there is no end to the demands placed on you as a DBA. One of the biggest is when we want to load X% more data into the database, during peak traffic no less. I refer to this as MongoDB bulk deletes and inserts. As a DBA, my first reaction is “no, do this during off-peak hours.” However, the business person in me says what if this is due to clients loading a customer, product, or email list into the system for work during business hours. That puts it into another light, does it not?

This raises the question of how can we change data in the database as fast as possible while also trying to give the production system some breathing room. In this blog, I wanted to give you some nice scripts that you can load into your MongoDB shell to really simplify the process.

First, we will cover an iterative delete function that can be stopped and restarted at any time. Next, I will talk about smart updating with similarly planned overhead. Lastly, I want to talk about more advanced forms of health checking when you want to do something a bit smarter than where this basic series of scripts stop.

Bulk Deleting with a Plan

In this code, you can see there are a couple of ways to manage these deletes. Specifically, you can see how to call this from anywhere (deleteFromCollection). I’ve also shown how to extend the shell so you can call (db.collection.deleteBulk). This avoids the need to provide the namespace, as it can discover that from the context of the function.

The idea behind this function is pretty straightforward: you provide it with a find pattern for what you want to delete. This could be { } if you don’t want to restrict it, but you should use .drop() in that case. After that, it expects a batch size, which is the number of document ID’s to use to drop in a single go. There is a trade-off between more deletes with more iterations or more with fewer iterations. Keep in mind this means there are 1000 of oplog entries per batch (also in my examples). You should consider this carefully and watch your oplog range as a result. You could improve this to allow someone to check that size, but it requires more permissions (we’ll leave that discussion for another time). Finally, between batches, the pauseNS sleeps for that duration.

If you find that the overhead is too much for you, simply kill the shell running this and it will stop running. You can then reduce the batch, increase the pause, or both, to make the system handle the change better. Sadly, this is not an exact science as some people have different behaviors they consider an “acceptable” from an impact perspective with so many writes. We will talk about this more in a bit:

function parseNS(ns){
    //Expects we are forcing people to not violate the rules and not doing "foodb.foocollection.month.day.year" if they do they need to use an array.
    if (ns instanceof Array){
        database =  ns[0];
        collection = ns[1];
    }
    else{
        tNS =  ns.split(".");
        if (tNS.length > 2){
            print('ERROR: NS had more than 1 period in it, please pass as an [ "dbname","coll.name.with.dots"] !');
            return false;
        }
        database = tNS[0];
        collection = tNS[1];
    }
    return {database: database,collection: collection};
}
DBCollection.prototype.deleteBulk = function( query, batchSize, pauseMS){
    //Parse and check namespaces
    ns = this.getFullName();
    srcNS={
        database:   ns.split(".")[0],
        collection: ns.split(".").slice(1,ns.length).join("."),
    };
    var db = this._db;
    var batchBucket = new Array();
    var totalToProcess = db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).find(query,{_id:1}).count();
    if (totalToProcess < batchSize){ batchSize = totalToProcess; }
    currentCount = 0;
    print("Processed "+currentCount+"/"+totalToProcess+"...");
    db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).find(query).addOption(DBQuery.Option.noTimeout).forEach(function(doc){
        batchBucket.push(doc._id);
        if ( batchBucket.length >= batchSize){
            printjson(db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).remove({_id : { "$in" : batchBucket}}));
            currentCount += batchBucket.length;
            batchBucket = [];
            sleep (pauseMS);
            print("Processed "+currentCount+"/"+totalToProcess+"...");
        }
    })
    print("Completed");
}
function deleteFromCollection( sourceNS, query, batchSize, pauseMS){
    //Parse and check namespaces
    srcNS = parseNS(sourceNS);
    if (srcNS == false) { return false; }
    batchBucket = new Array();
    totalToProcess = db.getDB(srcNS.database).getCollection(srcNS.collection).find(query,{_id:1}).count();
    if (totalToProcess < batchSize){ batchSize = totalToProcess};
    currentCount = 0;
    print("Processed "+currentCount+"/"+totalToProcess+"...");
    db.getDB(srcNS.database).getCollection(srcNS.collection).find(query).addOption(DBQuery.Option.noTimeout).forEach(function(doc){
        batchBucket.push(doc._id);
        if ( batchBucket.length >= batchSize){
            db.getDB(srcNS.database).getCollection(srcNS.collection).remove({_id : { "$in" : batchBucket}});
            currentCount += batchBucket.length;
            batchBucket = [];
            sleep (pauseMS);
            print("Processed "+currentCount+"/"+totalToProcess+"...");
        }
    })
    print("Completed");
}
/** Example Usage:
    deleteFromCollection("foo.bar",{"type":"archive"},1000,20);
  or
    db.bar.deleteBulk({type:"archive"},1000,20);
**/

Inserting & Updating with a Plan

Not to be outdone with the deletes, MongoDB updates and inserts are equally good for the same logic. In these cases, only small changes would be needed to create batches of inserts and then pass .insert(batchBucket) into the shell. Using “sleep” allows breather room for other reads and actions in the system. I find we don’t need this for modern MongoDB using WiredTiger, but your mileage can vary based on workloads. Also, you might want to figure out a way to tell the script how to handle a document that already exists. In the case of data loading, you could wrap the script with a check for errors not including a duplicate key. Please note it’s very easy to duplicate data if you do not have a unique index, and MongoDB is auto assigning its own _id field.

Updates are a tad tricker as they can be expensive if the query portion of the code is not indexed. I’ve provided you with an example, however. You should consider the query time when planning batches and pauses — the more the update is based on a table scan, the smaller the batch you should consider. The reasoning here is that we want to avoid restarting and causing a new table scan as much as possible. A future improvement might be to also support reads from a secondary, and doing the update itself on the primary by the _id field, to ensure a pin-pointed update query.

DBCollection.prototype.updateBulk = function( query, changeObject, batchSize, pauseMS){
    //Parse and check namespaces
    ns = this.getFullName();
    srcNS={
        database:   ns.split(".")[0],
        collection: ns.split(".").slice(1,ns.length).join("."),
    };
    var db = this._db;
    var batchBucket = new Array();
    var totalToProcess = db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).find(query,{_id:1}).count();
    if (totalToProcess < batchSize){ batchSize = totalToProcess; }
    currentCount = 0;
    print("Processed "+currentCount+"/"+totalToProcess+"...");
    db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).find(query).addOption(DBQuery.Option.noTimeout).forEach(function(doc){
        batchBucket.push(doc._id);
        if ( batchBucket.length >= batchSize){
            var bulk = db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).initializeUnorderedBulkOp();
            batchBucket.forEach(function(doc){
                bulk.find({_id:doc._id}).update(changeObject);
            })
            printjson(bulk.execute());
            currentCount += batchBucket.length;
            batchBucket = [];
            sleep (pauseMS);
            print("Processed "+currentCount+"/"+totalToProcess+"...");
        }
    })
    print("Completed");
}
/** Example Usage:
    db.bar.updateBulk({type:"archive"},{$set:{archiveDate: ISODate()}},1000,20);
**/

In each iteration, the update prints out the failure. You can extend this example code to either write the failures to a file or try to automatically fix any issues as appropriate. My goal here is to provide you the starter function to build on. As with the earlier example, this assumes the JS shell, but you can follow the logic in the programming language of your choice if you would rather use Python, Golang or Java.

If you got nothing else from this blog on MongoDB bulk deletes and inserts, I hope you learned a good deal more about writing functions in the shell. Hopefully, you learned how to use programming to add pauses to the bulk operations you need to do. Taking this forward, you could be inventive by having a query to measure latency to trigger pauses (canary query), or even measure things like the oplog to ensure your not adversely impacting HA and replication. There is no right answer, but this is a great start towards explaining more operationally safe ways to do the bigger actions DBAs are asked to do from time to time.

The post Managing MongoDB Bulk Deletes and Inserts with Minimal Impact to Production Environments appeared first on Percona Database Performance Blog.

关注dbDao.com的新浪微博

扫码加入微信Oracle小密圈,了解Oracle最新技术下载分享资源

TEL/電話+86 13764045638
Email service@parnassusdata.com
QQ 47079569