Faster Tests with Cascading 2.0 Local Mode

October 22, 2012

For one of our clients, we’d developed a series of complex workflows using Cascading 1.2 that get run multiple times every week, using Amazon’s Elastic MapReduce. These 15 or so higher-level workflows get planned by Cascading into 80+ Hadoop jobs, which Cascading takes care of running for us. That part has been working well, and the end result (a set of Solr indexes) powers the Adbeat web site.

But we’ve had to struggle with how long our tests take. Because of the way we combine & aggregate results, and the inter-connected nature of various workflows, we have to pump a fair amount of data through the complete process in order to validate that we get the expected end results.

And that in turn meant our pre-deploy tests were taking close to 90 minutes to complete. So we started cutting corners, by sometimes skipping tests, or beginning a deploy while running tests at the same time…and we all know that’s the path to darkness.

With Cascading 2.0 comes a new workflow planner, something (confusingly) called “local mode”. This isn’t the same as Hadoop local mode, where the Cascading Hadoop planner has generated some number of Hadoop jobs that run locally (how we were running our tests). Instead there’s a new planner that generates a locally executed “plan” for the workflow, without using anything from Hadoop. Based on some quick tests, it was clear that running tests in Cascading local mode would be much faster, by potentially an order of magnitude.

But in order to do that, we’d have to first port our code from Cascading 1.2 to 2.0, and then work around various issues & limitations with Cascading local mode. What follows is a detailed description of the changes we had to make, for anyone else contemplating the same process.

Porting to Cascading 2.0

This was actually pretty easy, for our actual workflows. The one issue we ran into was a change in what our custom Buffers saw in the grouping TupleEntry. Previously with Cascading 1.2 if we were grouping two pipes using an OuterJoin, BufferCall.getGroup() would return just the non-null grouping key field value(s), which we would then access by index. Cascading 2.0’s BufferCall.getGroup always returns the field value(s) from both pipes, which can be accessed by field name. For example, suppose the grouping key is a single field (e.g., with a field named ‘left-key-field’ on the left and ‘right-key-field’ on the right), and the first key has null on the left (i.e., no matching Tuple) and ‘right-key-value’ on the right. Cascading 1.2’s BufferCall.getGroup returns fields: [0] tuple: ['right-key-value'], while Cascading 2.0’s BufferCall.getGroup returns fields: ['left-key-field', 'right-key-field'] tuple: [null, 'right-key-value']. This is better, but it did cause some code to fail, because it was calling getString(0) on the group TupleEntry, and assuming it would always get the non-null grouping key value.

The bigger challenge was in porting to 2.0 all of the other re-usable components we’d created and now relied on. For example, we have a Solr scheme that we use to build the indexes. And there’s a bunch of utility code in the open source cascading.utils project, much of which was dependent on Cascading internals. So all of that had to be ported & tested first.

After about a week of work, we were able to run with Cascading 2.0, though still in Hadoop local mode. That was the good news. The bad news was that tests now took even longer, by about 10 minutes. This was apparently due to the simultaneous change to use Hadoop 1.0.3 vs. Hadoop 0.20.2, where the newer version of Hadoop seems to take longer to set up & tear down each job (at least in local mode). We were able to get back most of this time by using the FlowProps.setJobPollingInterval(200).

Porting to Cascading Local Mode

Now the real fun started. We needed to create a generic platform that let us target either Hadoop (when running in a real cluster) or Cascading Local (during testing/debugging). We wound up adding some support in cascading.utils for this, along with a generic “Path” that we could use for things like enumerating all files or directories in a directory. The BasePlatform class defines a small set of abstract methods, which was the minimum set we needed – your mileage may vary.

    public abstract boolean isLocal();
    public abstract boolean isTextSchemeCompressable();
    public abstract void setNumReduceTasks(int numReduceTasks) throws Exception;
    public abstract void setFlowPriority(FlowPriority priority) throws Exception;
    public abstract FlowConnector makeFlowConnector() throws Exception;
    public abstract FlowProcess makeFlowProcess() throws Exception;
    public abstract BasePath makePath(String path) throws Exception;
    public abstract BasePath makePath(BasePath parent, String subdir) throws Exception;
    public abstract Tap makeTap(Scheme scheme, BasePath path, SinkMode mode) throws Exception;
    public abstract Scheme makeBinaryScheme(Fields fields) throws Exception;
    public abstract Scheme makeTextScheme(boolean isEnableCompression) throws Exception;
    public abstract Scheme makeTextScheme() throws Exception;

You’ll note the calls to makeTextScheme() and makeBinaryScheme(), which help us abstract out the calls to create TextLine and SequenceFile schemes. Since there is no SequenceFile scheme in Cascading local mode, we defined a KryoScheme that is returned by the Cascading local version of makeBinaryScheme().

One of the challenges was that Cascading’s local version of TextLine doesn’t support reading or writing compressed text files, whereas the Hadoop version does (because it’s leveraging Hadoop support for this). So we had to create uncompressed versions of our test data, and avoid assuming that the output of some flows was compressed. We’ve got a pending patch for Cascading that adds back in compression support, which would simplify some of the above support.

We also had to create a Cascading local version of SolrScheme, since Schemes and Taps often are tied to the target platform. And we made a DirectoryTap for local mode, which acts like the Lfs Tap in Hadoop mode – e.g. it supports reading from a directory of files, unlike the FileTap.

Then the tedious work started; we had to modify all of our code to remove any an all references to Hadoop-specific support, and switch over to using this BasePlatform class in all cases. One of the challenges here was that we often cast the generated FlowProcess class into HadoopFlowProcess, to get at Hadoop-specific functionality…and obviously all of those casts would fail at run-time when running in Cascading local mode.

We also wound up having to wrap BasePlatform with another class that contained some generic platform support that was very specific to our use case, e.g. setting specific Elastic MapReduce values when running in Hadoop mode.

And then we got to debug it all. There were only two really tricky issues we ran into…

ThreadLocal – In Cascading local mode, a Function’s prepare() method can be called by one thread, and its operate() method by a different thread. This caused problems with some text parsing code we had, which set up a Lucene reusable token stream in the prepare() method and then used it in the operate() method. The issue is that Lucene uses ThreadLocal to keep track of these reusable token streams, so our carefully prepared stream wasn’t being used by our operate() method.

Tuple cloning – In many of our custom Function and Buffer classes, we reuse a Tuple when emitting results as a way of avoiding lots of memory allocations/GC hell. This works fine in Hadoop, because calling the outputCollector.add() method effectively does a deep clone of the Tuple via serialization before returning. But in Cascading local mode, only a shallow copy is done, and the Tuple is kept around in memory if it’s being used in a subsequent grouping operation. We were stuffing Tuples inside of the Tuple, and thus these mutable objects were not getting cloned. Our quick & dirty solution was to add a static clone() method to our internal Cascading platform class, which we had to use everywhere that we called outputCollector.add(). This would walk the Tuple, and create a deep copy of any mutable objects (though only in Cascading local mode). The problem with our solution is that any new code will also need to remember to use this pattern of outputCollector.add(Platform.clone(theTuple)), otherwise the problem could pop up again.

And the Final Result

In the end, it was worth it – though much harder than we initially expected.

We were able to reduce our test times from 90 minutes to 10 minutes. And at this point it was obvious where we were processing more data than needed, so with a few small changes we further reduced our time down to 4 minutes. So ultimately we got more than a 20x improvement in test time, and we’re back to religiously running tests before committing and code or doing any deployments of jobs.

We still of course have to test against the real Hadoop platform, but that happens as part of our release cycle, where we generate test indexes using samples of real data that are then validated by QA.

Comments are closed.