SimpleDB Tap for Cascading
Recently we’ve been running a number of large, multi-phase web mining applications in Amazon’s EC2 & Elastic MapReduce (EMR), and we needed a better way to maintain state than pushing sequence files back and forth between HDFS and S3.
One option was to set up an HBase cluster, but then we’d be paying 24×7 for servers that we’d only need for a few minutes each day. We could also set up MySQL with persistent storage on an Amazon EBS volume, but then we’d have to configure & launch MySQL on our cluster master for each mining “loop”, and for really big jobs it wouldn’t scale well to 100M+ items.
So we spent some time creating a Cascading tap & scheme that lets us use Amazon’s SimpleDB to maintain the state of web mining & crawling jobs. It’s still pretty rough, but usable. The code is publicly available at GitHub – check out http://github.com/bixolabs/cascading.simpledb.
There’s also a README to help you get started, which I’ve copied below since it contains useful information about the project.
As to the big question on performance – not sure yet how well it handles a SimpleDB with 100M+ entries, but we’re heading there fast on one project, so more details to follow. Enjoy…
cascading.simpledb is a Cascading Tap & Scheme for Amazon’s SimpleDB.
This means you can use SimpleDB as the source of tuples for a Cascading
flow, and as a sink for saving results. This is particularly useful when
you need a scalable, persistent store for Cascading jobs being run in
Amazon’s EC2/EMR cloud environment.
Information about SimpleDB is available from http://aws.amazon.com/simpledb/
and also http://docs.amazonwebservices.com/AmazonSimpleDB/latest/DeveloperGuide/
Note that you will need to be signed up to use both AWS and SimpleDB, and
have valid AWS access key and secret key values before using this code. See
In order to get acceptable performance, the cascading.simpledb scheme splits
each virtual “table” of data in SimpleDB across multiple shards. A shard
corresponds to what SimpleDB calls a domain. This allows most requests to
be run in parallel across multiple mappers, without having to worry about
duplicate records being returned for the same request.
Each record (Cascading tuple, or SimpleDB item) has an implicit field called
SimpleDBUtils-itemHash. This is a zero-padded hash of the record’s key or item
value. This is another SimpleDB concept – every record has a unique key, used
to read it directly.
Records (items) are split between shards using partitions of this hash value. This
implies that once a table has been created and populated with items, there is no
easy way to change the number of shards; you essentially have to build a new
table and copy all of the values.
The implicit itemHash field could also be used to parallelize search requests within
a single shard, by further partitioning. This performance boost is not yet
implemented by cascading.simpledb, however.
// Specify fields in SimpleDB item, and which field is to be used as the key. Fields itemFields = new Fields("name", "birthday", "occupation", "status"); SimpleDBScheme sourceScheme = new SimpleDBScheme(itemFields, new Fields("name"));
// Load people that haven't yet been contacted, up to 1000 String query = "`status` = \"NOT_CONTACTED\""; sourceScheme.setQuery(query); sourceScheme.setSelectLimit(1000);
int numShards = 20; String tableName = "people" Tap sourceTap = new SimpleDBTap(sourceScheme, accessKey, secretKey, tableName, numShards);
Pipe processingPipe = new Each("generate email pipe", new SendEmailOperation());
// Use the same scheme as the source - query & limit are ignored Tap sinkTap = new SimpleDBTap(sourceScheme, accessKey, secretKey, tableName, numShards);
Flow flow = new FlowConnector().connect(sourceTap, sinkTap, processingPipe); flow.complete();
All limitations of the underlying SimpleDB system obviously apply. That means
things like the maximum number of shards (100), the maximum size of any one
item (1MB), the maximum size of any field value (1024) and so on. See details
Given the 1K max field value length, SimpleDB is most useful for storing small
chunks of data, or references to bigger blobs that can be saved in S3.
In addition, all values are stored as strings. This means all fields must be
round-trippable as text.
does not guarantee immediate consistencycan guarantee consistency when reading back
the results of a write (or doing queries for the same), but this imposes a significant performance penalty, so the tap uses the default “eventually consistent” mode. This typically isn’t a problem due to the batch-oriented nature of most Cascading workflows, but could be an issue if you had multiple jobs writing to and reading from the same table.
Currently you need to correctly specify the number of shards for a table when
you define the tap. This is error prone, and only necessary when creating (or
re-creating) the table from scratch.
Tuple values that are null will not be updated in the table, which means you
can’t delete values, only add or update them.
Some operations are not multi-threaded, and thus take longer than they should.
For example, calculating the splits for a read will make a series of requests
to SimpleDB to get the item counts.
Numeric fields should automatically be stored as zero-padded strings to ensure
proper sort behavior, but currently this is only done for the implicit hash field.
You need Apache Ant 1.7 or higher, and a git client.
1. Download source from GitHub
% git clone git://github.com/bixolabs/cascading.simpledb.git % cd cascading.simpledb
2. Set appropriate credentials for testing
% edit src/test/resources/aws-keys.txt
Enter valid AWS access key and secret key values for the two corresponding properties.
3. Build the jar
% ant clean jar
or to build and install the jar in your local Maven repo:
% ant clean install
4. Create Eclipse project files
% ant eclipse
Then, from Eclipse follow the standard procedure to import an existing Java project into your Workspace.