As you may know, we run our OrderPipe ecommerce dashboard on Google’s App Engine – I’m a big fan of the platform, but it has some traps for new (and old) players. A recent issue required us to bulk delete entities on App Engine, and for that it seemed the best tool for the job was the ‘Mapper’ part of Map Reduce. It was a good experience learning a) how it works and b) applying it, I thought I’d document a full Map Reduce example, because the worked examples I found were all based on an older version of the library.
Background
You might be interested to read a bit about why I had to use Map Reduce – it was my own fault. A subtle bug in a recent release caused an additional Account to be created under certain situations – it made it past our staging server, because that certain situation happened infrequently enough to not cause an issue, but once it got to production, where we have many thousands of orders arriving daily, we started seeing a lot of zombie accounts being created. My first thought was we were being attacked, but alas, it wasn’t an attack, just a bug! The nett result, we were left with 10’s of thousands of unwanted entities.
The Problem
In App Engine, the datastore is highly scalable, but it’s not relational, you can’t just run a simple query to delete all rows in a table with a particular created
timestamp. There are simple tools to blindly delete all entities of a type, but not entities that meet a particular condition, and more importantly, not related entities that also meet a particular condition. That’s where Map Reduce comes in, basically we shard the entities and create multiple parallel workers to break the collection into smaller parts and process through them quickly. This seems like an inefficient approach until you consider scaling beyond a single database server, or even a single database cluster.
The Solution
I found the best introduction to this whole area of App Engine and highly scalable datastore operations is Ikai’s post on the Mapper from a couple of years ago. It’d have been perfect for this situation except there’s recently been a new version of the App Engine Map Reduce library for Java, and the examples in Ikai’s post are no longer representative, though still very informative.
The steps to get something similar working in the new library are:
1) Get library
The library is easy to setup, grab it from SVN, use ant to build it and drop the library and it’s dependencies into your project. The process after this is not so well documented, the getting started guide alludes to the bundled example, and to setting up a servlet for job control. It’d be nice if the docs fleshed this out a bit more, but it’s enough to get you started.
2) Create your Mapper Job
Using the sample app as a guide, and the older examples Ikai gives, you can build your first simple Mapper, here’s a cut down version of the one I used to do our bulk conditional delete.
public class CleanAccountsMapper extends Mapper<Entity, Void, Void> { private transient DatastoreMutationPool pool; @Override public void beginShard() { // Ikai gives examples of using the pool for // better parallel datastore operations this.pool = DatastoreMutationPool.forWorker(this); // You could optionally use the normal datastore like this // this.datastore = // DatastoreServiceFactory.getDatastoreService(); } @Override public void map(Entity value) { // During my testing, some of our workers // died with NullPointer's on this field, // as if in some circumstances it goes away. // This ensures we always have it. if (this.pool == null) { this.pool = DatastoreMutationPool.forWorker(this); } // Slightly paranoid check we have an account if (value.getKind().equals("Account")) { // Logic goes here to determine // if the account should be deleted if (itShouldBeDeleted) { pool.delete(value.getKey()); } // You could create/update/count here too } } } |
3) Run your Mapper
Now that we have a Mapper, let’s see how we can initiate the job from a servlet. We use Spring MVC, so this example is from a controller action, but it can be kicked off from anywhere.
@Controller @RequestMapping(value = "/map") public class MapReduceController { @RequestMapping(value="cleanAccounts", method = RequestMethod.GET) @ResponseBody public String cleanAccounts(Model model) { // These settings are covered in step 4) below MapReduceSettings settings = new MapReduceSettings() .setWorkerQueueName("mrworker") .setControllerQueueName("mrcontroller"); String jobId = MapReduceJob.start( MapReduceSpecification.of( "Clean Accounts", // shard count 50 new DatastoreInput("Account", 50), new CleanAccountsMapper(), Marshallers.getVoidMarshaller(), Marshallers.getVoidMarshaller(), NoReducer.<Void, Void, Void>create(), NoOutput.<Void, Void>create(1)), settings); // jobId is used to monitor, see step 5) below return "Job ID: " + jobId; } } |
The steps here are actually pretty simple, the main controlling inputs are the two task queues, and the shard count. I’ll cover the config in step 4) below.
4) Configure Your Job
In the old version of the API it looks like you configured your jobs in a mapreduce.xml
file, and then initiated them from the admin console. The new version is not quite setup that way, so the jobs are initiated programatically, and configured in the code itself and by way of task queue configuration and shard count.
The shard count controls how many shards will be processed, and how big they’ll be. If you have 100,000 Account
entities and 50 shards, then each shard will be working through roughly 2000 entities, I say roughly because the sharding process is not exact, if you want an appreciation for how clever the folks behind this tech are, check out this explanation of how they scatter entities to get even shards.
The worker queue and the controller queue are also inputs to the job – during my testing, the one that made the biggest impact on the job execution is the worker queue. I experimented a little with different settings. If you have 50 workers/shards – then there’ll be 50 tasks in the worker queue. Each time a worker task runs it processes a portion of the entities in it’s shard. If you configure the queue to run more tasks per minute, or more tasks concurrently you can effectively control the rate at which entities are processed. I started mine with a very slow rate and concurrency so that I could monitor the costs – I didn’t want to kick off a job and have it blow our App budget in 10 minutes! Here’s the settings in my queue.xml file:
<!-- This queue controls the rate at which a Map Reduce job will be worked. --> <queue> <name>mrworker</name> <max-concurrent-requests>10</max-concurrent-requests> <rate>10/m</rate> </queue> |
If you had a big job to do and were happy to pay for it all to be done quickly, you could bump up the rate and concurrency. If you wanted to do a task much more slowly, you can simply lower the rate/concurrency. This way if you had a very big, non urgent job you could spread it over days or weeks to maximize the free quota of reads/writes.
5) Monitor Job Progress
Once you’ve started your job, you will get a Job ID. This is used to view the status of the job (the current API doesn’t handle listing all jobs yet, but it’s likely coming soon) so you have to manually go to the URL for your job, something like http://yourapp.com/_ah/pipeline/status.html?root=Job-ID-Here
. At this page you’ll see a break down of the job – if you’re just doing a Mapper job, like in my example, the other steps won’t actually do anything, but you do get to see a cool graph of the shards being worked. I’m bummed because I forgot to take a screenshot of our big job in progress, you’ll just have to wait and see how it looks when you run yours.
Hopefully this blog post will help other developers in the same position as me, with a simple few steps and background to getting started with the new version of Java Map Reduce. If you have any feedback or questions, please let me know.