SimpleDB Tap for Cascading

March 16, 2010

Recently we’ve been running a number of large, multi-phase web mining applications in Amazon’s EC2 & Elastic MapReduce (EMR), and we needed a better way to maintain state than pushing sequence files back and forth between HDFS and S3.

One option was to set up an HBase cluster, but then we’d be paying 24×7 for servers that we’d only need for a few minutes each day. We could also set up MySQL with persistent storage on an Amazon EBS volume, but then we’d have to configure & launch MySQL on our cluster master for each mining “loop”, and for really big jobs it wouldn’t scale well to 100M+ items.

So we spent some time creating a Cascading tap & scheme that lets us use Amazon’s SimpleDB to maintain the state of web mining & crawling jobs. It’s still pretty rough, but usable. The code is publicly available at GitHub – check out http://github.com/ScaleUnlimited/cascading.simpledb.

There’s also a README to help you get started, which I’ve copied below since it contains useful information about the project.

As to the big question on performance – not sure yet how well it handles a SimpleDB with 100M+ entries, but we’re heading there fast on one project, so more details to follow. Enjoy…

README

Introduction

cascading.simpledb is a Cascading Tap & Scheme for Amazon’s SimpleDB.

This means you can use SimpleDB as the source of tuples for a Cascading
flow, and as a sink for saving results. This is particularly useful when
you need a scalable, persistent store for Cascading jobs being run in
Amazon’s EC2/EMR cloud environment.

Information about SimpleDB is available from http://aws.amazon.com/simpledb/
and also http://docs.amazonwebservices.com/AmazonSimpleDB/latest/DeveloperGuide/

Note that you will need to be signed up to use both AWS and SimpleDB, and
have valid AWS access key and secret key values before using this code. See
http://docs.amazonwebservices.com/AmazonSimpleDB/2009-04-15/GettingStartedGuide/GettingSetUp.html

Design

In order to get acceptable performance, the cascading.simpledb scheme splits
each virtual “table” of data in SimpleDB across multiple shards. A shard
corresponds to what SimpleDB calls a domain. This allows most requests to
be run in parallel across multiple mappers, without having to worry about
duplicate records being returned for the same request.

Each record (Cascading tuple, or SimpleDB item) has an implicit field called
SimpleDBUtils-itemHash. This is a zero-padded hash of the record’s key or item
value. This is another SimpleDB concept – every record has a unique key, used
to read it directly.

Records (items) are split between shards using partitions of this hash value. This
implies that once a table has been created and populated with items, there is no
easy way to change the number of shards; you essentially have to build a new
table and copy all of the values.

The implicit itemHash field could also be used to parallelize search requests within
a single shard, by further partitioning. This performance boost is not yet
implemented by cascading.simpledb, however.

Example

  // Specify fields in SimpleDB item, and which field is to be used as the key.
  Fields itemFields = new Fields("name", "birthday", "occupation", "status");
  SimpleDBScheme sourceScheme = new SimpleDBScheme(itemFields, new Fields("name"));
  // Load people that haven't yet been contacted, up to 1000
  String query = "`status` = \"NOT_CONTACTED\"";
  sourceScheme.setQuery(query);
  sourceScheme.setSelectLimit(1000);
  int numShards = 20;
  String tableName = "people"
  Tap sourceTap = new SimpleDBTap(sourceScheme, accessKey, secretKey, tableName, numShards);
  Pipe processingPipe = new Each("generate email pipe", new SendEmailOperation());
  // Use the same scheme as the source - query & limit are ignored
  Tap sinkTap = new SimpleDBTap(sourceScheme, accessKey, secretKey, tableName, numShards);
  Flow flow = new FlowConnector().connect(sourceTap, sinkTap, processingPipe);
  flow.complete();

Limitations

All limitations of the underlying SimpleDB system obviously apply. That means
things like the maximum number of shards (100), the maximum size of any one
item (1MB), the maximum size of any field value (1024) and so on. See details
at http://docs.amazonwebservices.com/AmazonSimpleDB/latest/DeveloperGuide/SDBLimits.html

Given the 1K max field value length, SimpleDB is most useful for storing small
chunks of data, or references to bigger blobs that can be saved in S3.

In addition, all values are stored as strings. This means all fields must be
round-trippable as text.

Finally, SimpleDB does not guarantee immediate consistencycan guarantee consistency when reading back
the results of a write (or doing queries for the same), but this imposes a significant performance penalty, so the tap uses the default “eventually consistent” mode. This typically isn’t a problem due to the batch-oriented nature of most Cascading workflows, but could be an issue if you had multiple jobs writing to and reading from the same table.

Known Issues

Currently you need to correctly specify the number of shards for a table when
you define the tap. This is error prone, and only necessary when creating (or
re-creating) the table from scratch.

Tuple values that are null will not be updated in the table, which means you
can’t delete values, only add or update them.

Some operations are not multi-threaded, and thus take longer than they should.
For example, calculating the splits for a read will make a series of requests
to SimpleDB to get the item counts.

Numeric fields should automatically be stored as zero-padded strings to ensure
proper sort behavior, but currently this is only done for the implicit hash field.

Building

You need Apache Ant 1.7 or higher, and a git client.

1. Download source from GitHub

% git clone git://github.com/ScaleUnlimited/cascading.simpledb.git
% cd cascading.simpledb

2. Set appropriate credentials for testing

% edit src/test/resources/aws-keys.txt

Enter valid AWS access key and secret key values for the two corresponding properties.

3. Build the jar

% ant clean jar

or to build and install the jar in your local Maven repo:

% ant clean install

4. Create Eclipse project files

% ant eclipse

Then, from Eclipse follow the standard procedure to import an existing Java project into your Workspace.

3 Responses to “SimpleDB Tap for Cascading”

  1. Very nice post, thanks for taking the time to write it up. SimpleDB does support read-after-write consistency; see http://developer.amazonwebservices.com/connect/entry.jspa?externalID=3572 for more info.

  2. Hi Jeff,

    You’re right, I should have clarified my comment on that consistency limitation. It was based on trying to get acceptable speed during reads, which meant not using the read-after-write consistency option. I’ll edit my post when I’m back from vacation…

    Thanks,

    — Ken

  3. […] (crawl state) while still using EMR for bursty processing. I’d previously blogged about our SimpleDB tap & scheme for Cascading, and our use of it for PTD has helped shake out some […]