apache_hadoop apache_hawq pivotal_one products technical twitter

JSON on Hadoop Example for Extending HAWQ Data Formats Using Pivotal eXtension Framework (PXF)

Image by Patrick Dinnen via Flickr.

Image by Patrick Dinnen via Flickr.

In my last post, I demonstrated a simple data workflow using Flume to pull raw JSON tweets from Twitter and store them on your HDFS cluster. These tweets were then analyzed using HAWQ and the Pivotal Xtension Framework (PXF). PXF is a powerful framework to put an ANSI-compliant SQL interface on virtually any data set or source. In our 1.0.1 release of Pivotal HD, there was no JSON connector out of the box. I opted to show the flexibility of this framework by taking the time to implement a JSON connector for PXF, rather than using MapReduce to generate a row format that PXF can read. In this post, I am going to go over the classes I had to implement in order to give HAWQ the ability to read raw JSON data.

Let’s start with a brief overview of the API and then dive into the code. PXF has a robust and extensible API to read data stored in various formats and sources. There are four core components you can extend for your data format or source.

Fragmenter
The Fragmenter is responsible for passing data source metadata back to HAWQ. Each data fragment describes a piece of the requested data, including the data source name and hostname of where it is located. For example, if the source is a file in HDFS, the Fragmenter returns a list of data fragments containing HDFS file blocks. Each fragment includes the location of the block. If the source data is an HBase table, the Fragmenter returns information about table regions, including their locations.

Accessor
The Accessor retrieves specific fragments and passes records back to the Resolver. For example, the Accessor creates a FileInputFormat and a RecordReader for the specific fragments (HDFS blocks) it was requested to process and sends this to the Resolver. In the case of HBase and Hive files, the Accessor returns single rows from an HBase or Hive table.

Resolver
The Resolver deserializes records passed from the Accessor (Java OneRow objects) and serializes them to a list of field objects (OneField). PXF automatically converts this list of OneField objects into HAWQ-readable GPDBWritable format.

Analyzer
The Analyzer provides PXF statistical data for the HAWQ query optimizer. Statistics include the size of a fragment of data, the number of fragments, and the number of rows in the data. It is advised that these statistics are quick estimates, as scanning an entire data set for actual statistics can take some time. Analyzers are optional, and a number of analyzers are built into the framework for us (e.g., files stored in HDFS).

In addition to these four classes, PXF has extensible functionality for Query Filter Push-Downs. With a little extra coding, the operands in a query’s WHERE clause can be pushed down to an external system.

One out-of-the-box implementation of a Query Filter Push-Down is for HBase. The filter will prune column families as well as only scan data that fit each query’s WHERE clause operands. Only the data matching the constraints of your WHERE clause will be sent across the network from an HBase RegionServer to PXF, saving a lot of network traffic and reducing your overall query execution time. These filters are optional but recommended where appropriate.

The Fragmenter, Accessor, Resolver, and Analyzer classes are used in an external table data definition language (DDL) as seen here. Any FilterParser implementation you have is not explicitly referenced in this way, but is instead used in your Fragmenter or Accessor code.

CREATE EXTERNAL TABLE ext_json_mytestfile ( created_at TEXT,
id_str TEXT, text TEXT, source TEXT, "user.id" INTEGER,
"user.location" TEXT,
"coordinates.coordinates[0]" DOUBLE PRECISION,
"coordinates.coordinates[1]" DOUBLE PRECISION)
LOCATION ('pxf://phd2:50070/data/twitter/json/*/*/*/*/*.json?
FRAGMENTER=HdfsDataFragmenter&
ACCESSOR=JsonAccessor&
RESOLVER=JsonResolver&
ANALYZER=HdfsAnalyzer')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');

One new feature in PXF 2.1.0 is profiles, where you can define a set of parameters ahead of time and simply use the PROFILE parameter. The above external table definition turns into the below using PROFILE. PXF treats them as the same, but one is clearly less verbose than the other!

CREATE EXTERNAL TABLE ext_json_mytestfile ( created_at TEXT,
id_str TEXT, text TEXT, source TEXT, "user.id" INTEGER,
"user.location" TEXT,
"coordinates.coordinates[0]" DOUBLE PRECISION,
"coordinates.coordinates[1]" DOUBLE PRECISION)
LOCATION ('pxf://phd2:50070/data/twitter/*/*/*/*/*.json?PROFILE=JSON')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');

Now that we are familiar with what we will need to build and how to use it, let’s look at implementation details for our complete JSON extension. Since the JSON files are expected to be in HDFS, we can leverage the HdfsDataFragmenter and HdfsAnalyzer. These classes are very generic and will fragment and analyze all files stored in HDFS, regardless of actual data format underneath.

Now that we’ve dealt with fragmenting and analyzing the JSON files, let’s take a look at the JsonAccessor class. Much like Hadoop MapReduce, PXF comes with a number of helpful classes to extend that will do a lot of heavy lifting for file-based formats. The HdfsSplittableDataAccessor has only one abstract method — getReader. The returned object is expected to be an instance of RecordReader from the Hadoop MapReduce API, making implementation of this Accessor very simple. And now, the code:

package com.pivotal.pxf.accessors;

// Import statements...

public class JsonAccessor extends HdfsSplittableDataAccessor {

public static final String IDENTIFIER_PARAM = "X-GP-IDENTIFIER";
public static final String ONERECORDPERLINE_PARAM = "X-GP-ONERECORDPERLINE";

private String identifier = "";
private boolean oneRecordPerLine = true;

public JsonAccessor(InputData inputData) throws Exception {
super(inputData, new JsonInputFormat());

if (inputData.getParametersMap().containsKey(IDENTIFIER_PARAM)) {
identifier = inputData.getProperty(IDENTIFIER_PARAM);
}

if (inputData.getParametersMap().containsKey(ONERECORDPERLINE_PARAM)) {
oneRecordPerLine = Boolean.parseBoolean(inputData
.getProperty(ONERECORDPERLINE_PARAM));
}
}

@Override
protected Object getReader(JobConf conf, InputSplit split)
throws IOException {
conf.set(JsonInputFormat.RECORD_IDENTIFIER, identifier);

if (oneRecordPerLine) {
return new JsonInputFormat.SimpleJsonRecordReader(conf,
(FileSplit) split);
} else {
return new JsonInputFormat.JsonRecordReader(conf, (FileSplit) split);
}
}
}

The constructor passes the InputData class along with an implementation of FileInputFormat (JsonInputFormat) via the super method, which will be used by the base class to read records. We also grab a couple of configuration parameters from our external table definition to determine which JSON RecordReader we want to create. In getReader, we check our configuration to see if we return SimpleJsonRecordReader or the more complex JsonRecordReader. The simple version will read a single line from the file and pass it on to the JsonResolver, while the more complex record reader will respect any whitespace in the full JSON record, sometimes called “pretty print”. The former being a more performant way to read the data, as there is less overhead.

At this point, we can now go from bytes to a JSON record. The next piece of the puzzle is to implement the JsonResolver. This class extends Plugin and implements IReadResolver. It has one method — getFields. This method takes in a OneRow object which contains a key and a value. In this case, the key is a Text object containing the full JSON record. This method is all about uses a standard JSON parser to populate a row of data based on the HAWQ column descriptor. Give the code a read, paying special attention to the comments.

package com.pivotal.pxf.resolvers;

// Import statements

public class JsonResolver extends Plugin implements IReadResolver {

private ArrayList<OneField> list = new ArrayList<OneField>();

public JsonResolver(InputData inputData) throws Exception {
super(inputData);
}

@Override
public List<OneField> getFields(OneRow row) throws Exception {
list.clear();

// key is a Text object
JsonNode root = JsonInputFormat
.decodeLineToJsonNode(row.getKey().toString());

// if we weren't given a null object
if (root != null) {
// Iterate through the column definition
// and fetch our JSON data
for (int i = 0; i < inputData.columns(); ++i) {

// Get the current column description
ColumnDescriptor cd = inputData.getColumn(i);
int columnType = cd.columnTypeCode();

// Get the JSON projections from the column name
// For example, "user.name" is ["user", "name"]
String[] projs = cd.columnName().split(".");

// Move down the JSON path to the final name
JsonNode node = getPriorJsonNode(root, projs);

// If this column is an array index,
// ex. "tweet.hashtags[0]"
if (isArrayIndex(projs)) {

// Get the node name and index
String nodeName = getArrayName(projs);
int arrayIndex = getArrayIndex(projs);

// Move to the array node
node = node.get(nodeName);

// If this node is null or missing,
// add a null value here
if (node == null || node.isMissingNode()) {
addNullField(columnType);
} else if (node.isArray()) {
// If the JSON node is an array,
// then add it to our list
addFieldFromJsonArray(columnType,
node, arrayIndex);
} else {
throw new
InvalidParameterException(nodeName +
" is not an array node");
}
} else {
// This column is not an array type
// Move to the final node
node = node.get(projs[projs.length - 1]);

// If this node is null or missing,
// add a null value here
if (node == null || node.isMissingNode()) {
addNullField(columnType);
} else {
// Else, add the value to the record
addFieldFromJsonNode(columnType,
node);
}
}
}
}

return list;
}

private JsonNode getPriorJsonNodeFromColumnName(JsonNode root,
String[] projs)  { /* omitted */ }

private boolean isArrayIndex(String[] projs) { /* omitted */ }

private String getArrayName(String[] projs) { /* omitted */ }

private int getArrayIndex(String[] projs) { /* omitted */ }

private void addFieldFromJsonArray(int type, JsonNode node,
int index) { /* omitted */ }

private void addFieldFromJsonNode(int type, JsonNode val)
throws IOException { /* omitted */ }

private void addNullField(int type) { /* omitted */ }
}

In essence, this getFields method does a number of things. It iterates through the column definition of the external table and extracts JSON data from the nodes. It understands node projections to handle nesting, such as “user.name”, as well as indexing into JSON arrays of data, such as “geo.coordinates[0]” and “geo.coordinates[1]” for latitude and longitude. Once the type is determined from the column name, the JSON node is walked, the value extracted, and it added to a list of fields. This list of fields is returned back to the framework for further processing.

As you likely noticed, the private methods of the class were omitted, as they are merely helper functions to handle all the JSON parsing and type conversion. The full source code is posted on GitHub, along with documentation of the JSON, Cassandra, and Accumulo prototype extensions I’ve been working on.

Let’s see this JSON connector in action by looking at a small data set. Say I have four tweets stored in HDFS as such:

{"created_at":"Mon Sep 30 04:04:53 +0000 2013","id_str":"384529256681725952","text":"sigh, who knows.","source":"web","user":{"id":31424214,"location":"COLUMBUS"},"coordinates":{"type":"Point","coordinates":[-6.100,50.103]}}
{"created_at":"Mon Sep 30 04:04:54 +0000 2013","id_str":"384529260872228864","text":"I did that 12 years ago..nnT.T","source":"web","user":{"id":67600981,"location":"KryberWorld"},"coordinates":{"type":"Point","coordinates":[-8.100,52.104]}}
{"created_at":"Mon Sep 30 04:04:54 +0000 2013","id_str":"384529260892786688","text":"Welp guess I'll have anxiety for another week","source":"web","user":{"id":122795713,"location":"California"},"coordinates":null}
{"created_at":"Mon Sep 30 04:04:55 +0000 2013","id_str":"384529265099689984","text":"I'm craving breadsticks","source":"web","user":{"id":633364307,"location":""},"coordinates":null}

Using the external table definition earlier in this post, executing a simple SELECT statement against this table returns these results:

SELECT * FROM ext_tweets_json;

           created_at           |       id_str       |                     text                      | source |  user.id  | user.location | coordinates.coordinates[0] | coordinates.coordinates[1]
--------------------------------+--------------------+-----------------------------------------------+--------+-----------+---------------+---------------------------------------------------------
 Mon Sep 30 04:04:53 +0000 2013 | 384529256681725952 | sigh, who knows.                              | web    |  31424214 | COLUMBUS      |                       -6.1 |                     50.103
 Mon Sep 30 04:04:54 +0000 2013 | 384529260872228864 | I did that 12 years ago..nnT.T              | web    |  67600981 | KryberWorld   |                       -8.1 |                     52.104
 Mon Sep 30 04:04:54 +0000 2013 | 384529260892786688 | Welp guess I'll have anxiety for another week | web    | 122795713 | California    |                            |
 Mon Sep 30 04:04:55 +0000 2013 | 384529265099689984 | I'm craving breadsticks                       | web    | 633364307 |               |                            |

In just a few hours, I was able to create a reusable PXF connector to read raw JSON files. This enables ANSI-compliant SQL queries against this raw data. Just think about what you could do with HAWQ and PXF against your own data sets! PXF can help you bridge the gap between your various applications and a big data platform. By pushing data to Pivotal HD and building a PXF connector for your data, you can run legacy SQL reports as well as use the full Pivotal HD platform to increase the value out of your data and drive your business forward.