java mapreduce products redis

Making Hadoop MapReduce Work with a Redis Cluster

redis-300dpisizedRedis is a very cool open-source key-value store that can add instant value to your Hadoop installation. Since keys can contain strings, hashes, lists, sets and sorted sets, Redis can be used as a front end to serve data out of Hadoop, caching your ‘hot’ pieces of data in-memory for fast access when they are needed again. By using a Java client called Jedis, you can ingest and retrieve data with Redis. Combining this simple client with the power of MapReduce will let you write and read data to and from Redis in parallel.

In the code below, we use MapReduce to pull and push key/value pairs to any number of standalone Redis instances. We will be writing to, and reading from, a Redis hash, which maps string fields to string values, much like a Java HashMap. Each hash is uniquely identified by a hash key, similar to the names of tables. Each input and output format has two core configuration parameters: a CSV list of hostnames running a Redis instance, and the hash key. Similar to Hadoop’s default HashPartitioner, (key.hashCode() % number of Redis instances) is used to determine which Redis instance the key is written to. This random distribution will result in even data distribution, so long as your key-space isn’t skewed – but solving that problem is a topic for another post.

redis-outputWith that said, let’s take a look at all the code. Pay attention to the comments, as they’ll tell you what is going on. First up is an implementation of OutputFormat. This class defines the key/value data types and behavior for writing to Redis instances via Jedis.

 // This output format class is templated to accept a key and value of type Text
public static class RedisHashOutputFormat extends OutputFormat<Text, Text> {

// These static conf variables and methods are used to modify the job configuration.  This is a common pattern for MapReduce related classes to avoid the magic string problem
public static final String REDIS_HOSTS_CONF = "mapred.redishashoutputformat.hosts";
public static final String REDIS_HASH_KEY_CONF = "mapred.redishashinputformat.key";

public static void setRedisHosts(Job job, String hosts) {
job.getConfiguration().set(REDIS_HOSTS_CONF, hosts);
}

public static void setRedisHashKey(Job job, String hashKey) {
job.getConfiguration().set(REDIS_HASH_KEY_CONF, hashKey);
}

// This method returns an instance of a RecordWriter for the task.  Note how we are pulling the variables set by the static methods during configuration
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
String hashKey = job.getConfiguration().get(REDIS_HASH_KEY_CONF);
String csvHosts = job.getConfiguration().get(REDIS_HOSTS_CONF);
return new RedisHashRecordWriter(hashKey, csvHosts);
}

// This method is used on the front-end prior to job submission to ensure everything is configured correctly
public void checkOutputSpecs(JobContext job) throws IOException {
String hosts = job.getConfiguration().get(REDIS_HOSTS_CONF);
if (hosts == null || hosts.isEmpty()) {
throw new IOException(REDIS_HOSTS_CONF + " is not set in configuration.");
}

String hashKey = job.getConfiguration().get(REDIS_HASH_KEY_CONF);
if (hashKey == null || hashKey.isEmpty()) {
throw new IOException(REDIS_HASH_KEY_CONF + " is not set in configuration.");
}
}

// The output committer is used on the back-end to, well, commit output.  Discussion of this class is out of scope, but more info can be found here
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
// use a null output committer, since
return (new NullOutputFormat<Text, Text>()).getOutputCommitter(context);
}

public static class RedisHashRecordWriter extends RecordWriter<Text, Text> {
// implementation of this static nested class is shown shortly
}
} // end RedisHashOutputFormat

The role of OutputFormat is to properly configure the job, ensuring that the RecordWriter implementation has everything it needs to work correctly. Once configured, the RecordWriter is what actually writes key/value pairs wherever you want them to go. A common practice is to make your RecordWriter (or reader) a static nested class, but that isn’t required. Let’s take a look at an implementation of RecordWriter:

// This class is template to write only Text keys and Text values
public static class RedisHashRecordWriter extends RecordWriter<Text, Text> {

// This map is used to map an integer to a Jedis instance
private HashMap<Integer, Jedis> jedisMap = new HashMap<Integer, Jedis>();

// This is the name of the Redis hash
private String hashKey = null;

public RedisHashRecordWriter(String hashKey, String hosts) {
this.hashKey = hashKey;

// Create a connection to Redis for each host
// Map an integer 0-(numRedisInstances - 1) to the instance
int i=0;
for (String host : hosts.split(",")) {
Jedis jedis = new Jedis(host);
jedis.connect();
jedisMap.put(i++, jedis);
}
}

// The write method is what will actually write the key value pairs out to Redis
public void write(Text key, Text value) throws IOException, InterruptedException {
// Get the Jedis instance that this key/value pair will be written to.
Jedis j = jedisMap.get(Math.abs(key.hashCode()) % jedisMap.size());

// Write the key/value pair
j.hset(hashKey, key.toString(), value.toString());
}

public void close(TaskAttemptContext context)
throws IOException, InterruptedException {
// For each jedis instance, disconnect it
for (Jedis jedis : jedisMap.values()) {
jedis.disconnect();
}
}
} // end RedisRecordWriter

This code demonstrates how simple it is to hook into external hosts for output. Such lightweight interfaces allow for endless possibilities, so long as the custom output formats can handle the parallel load of many map or reduce tasks.

Next up, let’s take a look at the InputFormat code to pull data out of our Redis instances. This is a bit more complex, as we’ll use a custom InputSplit implementation as well.

redis-inputWe create an InputSplit for each Redis host, and map task is created from each InputSplit. A single map task pulls all the data its assigned Redis instance.

// This input format will read all the data from a given set of Redis hosts
public static class RedisHashInputFormat extends InputFormat<Text, Text> {

// Again, the CSV list of hosts and a hash key variables and methods for configuration
public static final String REDIS_HOSTS_CONF = "mapred.redishashinputformat.hosts";
public static final String REDIS_HASH_KEY_CONF = "mapred.redishashinputformat.key";

public static void setRedisHosts(Job job, String hosts) {
job.getConfiguration().set(REDIS_HOSTS_CONF, hosts);
}

public static void setRedisHashKey(Job job, String hashKey) {
job.getConfiguration().set(REDIS_HASH_KEY_CONF, hashKey);
}

// This method will return a list of InputSplit objects.  The framework uses this to create an equivalent number of map tasks
public List<InputSplit> getSplits(JobContext job) throws IOException {

// Get our configuration values and ensure they are set
String hosts = job.getConfiguration().get(REDIS_HOSTS_CONF);
if (hosts == null || hosts.isEmpty()) {
throw new IOException(REDIS_HOSTS_CONF + " is not set in configuration.");
}

String hashKey = job.getConfiguration().get(REDIS_HASH_KEY_CONF);
if (hashKey == null || hashKey.isEmpty()) {
throw new IOException(REDIS_HASH_KEY_CONF + " is not set in configuration.");
}

// Create an input split for each Redis instance
// More on this custom split later, just know that one is created per host
List<InputSplit> splits = new ArrayList<InputSplit>();
for (String host : hosts.split(",")) {
splits.add(new RedisHashInputSplit(host, hashKey));
}

return splits;
}

// This method creates an instance of our RedisHashRecordReader
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new RedisHashRecordReader();
}

public static class RedisHashRecordReader extends RecordReader<Text, Text> {
// implementation of this static nested class is shown shortly
}

public static class RedisHashInputSplit extends InputSplit implements Writable {
// implementation of this static nested class is shown shortly
}

} // end RedisHashInputFormat

There are only two methods that adhere to the InputFormat abstract class: getSplits and createRecordReader. The example above demonstrates how simple it is to hook into external sources for output. The remaining static methods and variables are used to configure the job for the needs of the InputFormat and RecordReader implementations.

// This custom RecordReader will pull in all key/value pairs from a Redis instance for a given hash
public static class RedisHashRecordReader extends RecordReader<Text, Text> {

// A number of member variables to iterate and store key/value pairs from Redis
private Iterator<Entry<String, String>> keyValueMapIter = null;
private Text key = new Text(), value = new Text();
private float processedKVs = 0, totalKVs = 0;
private Entry<String, String> currentEntry = null;

// Initialize is called by the framework and given an InputSplit to process
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {

// Get the host location from the InputSplit
String host = split.getLocations()[0];
String hashKey = ((RedisHashInputSplit) split).getHashKey();

// Create a new connection to Redis
Jedis jedis = new Jedis(host);
jedis.connect();
jedis.getClient().setTimeoutInfinite();

// Get all the key/value pairs from the Redis instance and store them in memory
totalKVs = jedis.hlen(hashKey);
keyValueMapIter = jedis.hgetAll(hashKey).entrySet().iterator();
LOG.info("Got " + totalKVs + " from " + hashKey); jedis.disconnect();
}

// This method is called by Mapper’s run method to ensure all key/value pairs are read
public boolean nextKeyValue() throws IOException, InterruptedException {
if (keyValueMapIter.hasNext()) {
// Get the current entry and set the Text objects to the entry
currentEntry = keyValueMapIter.next();
key.set(currentEntry.getKey());
value.set(currentEntry.getValue());
return true;
} else {
return false;
}
}

// The next two methods are to return the current key/value pairs.  Best practice is to re-use objects rather than create new ones, i.e. don’t use “new”
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}

public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

// This method is used to report the progress metric back to the framework.  It is not required to have a true implementation, but it is recommended.
public float getProgress() throws IOException, InterruptedException {
return processedKVs / totalKVs;
}

public void close() throws IOException {
/* nothing to do */
}

} // end RedisHashRecordReader

Now that we’ve implemented a RecordReader, we need to determine what data is read by it. This is defined as an InputSplit implementation, which initializes the reader. The number of input splits determines the number of map tasks created by the framework.

This is a pretty simple task with Redis. We will create one map task for each Redis instance, which hosts a shard of our total data set. Each mapper will then connect to a single Redis instance and pull all of the data for the hash. This will all happen in parallel, similar to how MapReduce reads a file in parallel by reading its blocks. This split means we won’t overload a single Redis instance with too many connections. This is where InputSplit comes in.1

public static class RedisHashInputSplit extends InputSplit implements Writable {

// Two member variables, the hostname and the hash key (table name)
private String location = null;
private String hashKey = null;

public RedisHashInputSplit() {
// Default constructor required for reflection
}

public RedisHashInputSplit(String redisHost, String hash) {
this.location = redisHost;
this.hashKey = hash;
}

public String getHashKey() {
return this.hashKey;
}

// The following two methods are used to serialize the input information for an individual task
public void readFields(DataInput in) throws IOException {
this.location = in.readUTF();
this.hashKey = in.readUTF();
}

public void write(DataOutput out) throws IOException {
out.writeUTF(location);
out.writeUTF(hashKey);
}

// This gets the size of the split so the framework can sort them by size.  This isn’t that important here, but we could query a Redis instance and get the bytes if we desired
public long getLength() throws IOException, InterruptedException {
return 0;
}

// This method returns hints to the framework of where to launch a task for data locality
public String[] getLocations() throws IOException, InterruptedException {
return new String[] { location };
}

} // end RedisHashInputSplit

This demonstrates how to customize input and output using the MapReduce framework for Redis. Though it’s often overlooked, customizing I/O is a useful way to make MapReduce more flexible. If I’ve piqued your interest and you want to know more about customizing MapReduce I/O, check out chapter seven of MapReduce Design Patterns (O’Reilly 2012), “Input and Output Patterns.”

When implementing custom formats for yourself for other external sources, be mindful of how well these sources can scale, and what would happen if a task fails and is tried again. In some cases, such as this one, that doesn’t really matter. Data that was already written to Redis will just be overwritten with a new copy, and data pulled from Redis will simply be pulled again on the next attempt. Other cases, for example if we were writing to a Redis list rather than a hash, would require a little bit more effort. In this scenario, task retries would add duplicate entries to the list. It would take additional engineering to roll back committed entries, but worth the effort to ensure more fault-tolerant external outputs.

Now get out there and write some custom formats of your own!


1. Note that custom InputSplits must inherit from Writable in order for the framework to serialize them.