Twitter is a great way to get a general feel of public sentiment about a particular product or idea. Using HAWQ and the PXF framework, users can read raw JSON data to extract value directly out of a JSON-based Twitter stream. This is a dramatic change from the typical workflow of using MapReduce to transform the data into a row-based format. In this post, I am going to demonstrate how to setup a simple flow of querying data from Twitter’s Streaming API and storing it in HDFS using Apache Flume. Then, we are going to execute some SQL queries using Pivotal Advanced Database Services – HAWQ – and the Pivotal Extension Framework (PXF).
HAWQ is Pivotal’s SQL-on-Hadoop offering, and through the power of its extension framework, HAWQ segment servers are able to transform raw data stored in HDFS or any other system into rows and columns on the fly. This data is transformed in parallel by HAWQ’s segments, similar to how MapReduce reads data in parallel. If the data can be accessed through a Java API, be split, and transformed into a row-based format, then PXF is a great way to put a SQL interface on your big data.
Let’s take a look at the overall architecture flow diagram above. We are going to use Flume to ingest data from Twitter using a custom-built Flume source available on GitHub, which will dump data into HDFS as files. These files are then parsed by a HAWQ extension built to read JSON data:
- Flume Agent Source connects to Twitter’s sample stream
- Use of a Flume Memory Channel to push tweets from the source to the Sink
- Flume Sink writes files into HDFS
- HAWQ Master server receives external table definition using the PXF protocol
- HAWQ Master receives SQL analysis and distributes queries to HAWQ segment servers
To begin with, we will need to start a Flume agent to write to HDFS. Flume contains three major components–a source, a channel, and a sink. A source is where an event comes from, a sink is where it goes, and the channel is how the event is moved between sink and source. This Flume agent uses a custom source connected to Twitter’s Streaming API and writes a sample of new tweets to HDFS. In this example, we use the basic Memory Channel for event flow, though you may choose to use other channel implementations that are more reliable.
Before we look at the Twitter sample stream itself, we need to start a Flume agent. The TwitterSampleStreamSource is a custom piece of code available at GitHub. This source will connect to Twitter’s streaming API and write out a live sample of new tweets.
A Flume agent requires a configuration file to define the flow topology. This sample Flume configuration file tells Flume to connect to Twitter and write to HDFS using an in-memory channel:
[ajshook@phd1 ~]$ cat ~/twitter-agent.properties
# Flume configuration # Define the names of our sources, sinks, and channels twitter.sources = TStream twitter.sinks = HDFSSink twitter.channels = MemoryChannel # Configuration for our custom Flume Source -- put your access information here! twitter.sources.TStream.type = com.gopivotal.flume.source.TwitterSampleStreamSource twitter.sources.TStream.channels = MemoryChannel twitter.sources.TStream.consumer.key=<your consumer key> twitter.sources.TStream.consumer.secret=<your consumer secret> twitter.sources.TStream.access.token=<your access token> twitter.sources.TStream.access.token.secret=<your access token secret> # Configure the HDFS sink. This will write files in 64 MB chunks, # or a new file every five minutes # More information on the HDFS sink can be found at # >http://flume.apache.org/FlumeUserGuide.html#hdfs-sink twitter.sinks.HDFSSink.type = hdfs twitter.sinks.HDFSSink.channel = MemoryChannel # This will likely need to point to your NameNode, not mine! twitter.sinks.HDFSSink.hdfs.path = hdfs://phd2:8020/flume/twitter/%Y/%m/%d/%H twitter.sinks.HDFSSink.hdfs.filePrefix = twitter twitter.sinks.HDFSSink.hdfs.fileSuffix = .json twitter.sinks.HDFSSink.hdfs.rollInterval = 300 twitter.sinks.HDFSSink.hdfs.rollSize = 67108864 twitter.sinks.HDFSSink.hdfs.rollCount = 0 twitter.sinks.HDFSSink.hdfs.fileType = DataStream twitter.sinks.HDFSSink.hdfs.writeFormat = Text # A memory channel will flow data from Twitter to HDFS using memory, # versus using a more fault-tolerant channel like a FileChannel twitter.channels.MemoryChannel.type = memory twitter.channels.MemoryChannel.capacity = 1000 twitter.channels.MemoryChannel.transactionCapacity = 100
You can get a twitter consumer key/secret and access token/secret by
registering an application on http://dev.twitter.com. The rest of these configuration elements are mostly self-explanatory, but I suggest taking a look at the Flume User Guide for more details.
After our configuration is all said and done, we now can start our Flume agent:
[ajshook@phd1 ~]$ flume-ng agent -n twitter -f ~/twitter-agent.properties ... Some log messages 13/08/23 09:15:56 INFO source.TStreamSource: Starting Twitter sample stream... 13/08/23 09:15:56 INFO twitter4j.TwitterStreamImpl: Establishing connection. 13/08/23 09:15:58 INFO twitter4j.TwitterStreamImpl: Connection established. 13/08/23 09:15:58 INFO twitter4j.TwitterStreamImpl: Receiving status stream. 13/08/23 09:16:00 INFO hdfs.BucketWriter: Creating hdfs://phd2:8020/flume/twitter/2013/08/23/09/twitter.1377263758230.json.tmp
Flume will setup all the components as defined in the configuration file. If everything goes well, then you should see a log file created and data will begin to flow. We can verify this using the command line to talk to HDFS (with some columns omitted):
[ajshook@phd1 ~]$ hdfs dfs -ls -R /flume ... 2013-08-20 14:45 /flume/twitter ... 2013-08-20 14:45 /flume/twitter/2013 ... 2013-08-22 00:47 /flume/twitter/2013/08 ... 2013-08-22 00:47 /flume/twitter/2013/08/23 ... 2013-08-22 00:59 /flume/twitter/2013/08/23/09 ... 2013-08-22 00:59 /flume/twitter/2013/08/23/09/twitter.1377263758230.json.tmp
Now we have data landing in HDFS! Once the file is closed and renamed to remove the .tmp extension, we can view the file using hdfs dfs -cat . These files are stored in a raw JSON format. Typically, we would need to use Java, Hive, or Pig to write a MapReduce job to transform these files into a row-based format before analysis. However, we can use PXF to read JSON records immediately after the file is closed, giving us a shorter time-to-value out of our data.
Next up, we are going to use the power of PXF to define an external table on these raw JSON files. Let’s take a look at the DDL for this table:
CREATE EXTERNAL TABLE ext_tweets_json ( created_at TEXT, id BIGINT, text TEXT, "user.id" BIGINT, "user.screen_name" TEXT) LOCATION ('pxf://phd2:50070/data/twitter-clean/*/*/*/*/*.json?FRAGMENTER= HdfsDataFragmenter&ACCESSOR=JsonAccessor&RESOLVER=JsonResolver& ANALYZER=HdfsAnalyzer') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import'); ANALYZE ext_tweets_json;
The above statement creates an external table with column names identical to their JSON counterparts. There are a number of attributes Twitter gives with each status update, but I am only defining the features I am interested in. Since this is an external table and I am reading the raw data, I can very quickly drop this table and define a new one with any new fields I desire—no more data needs to be loaded.
Defined in the LOCATION clause, the JsonResolver will use this column definition to parse the JSON record to populate the rows with data. (In a future blog post, I’ll go into the implementation details of a custom PXF extension to perform this task.) A view is then created, shown later, to format the data with a SQL timestamp and rename the columns to something more user-friendly.
The other important piece is defined in the LOCATION clause. We are pointing to the NameNode—in this case phd2—and giving it a wildcarded path of files. All of the files matching this wildcarded path will be used as input for the SQL query—including all new files that come into the system.
We then use the ANALYZE statement on the table to gather various metrics about the underlying data. These metrics are used by the HAWQ Master to make a optimal plan for all submitted queries, such as whether to use a hash-join between two large tables, or broadcast the contents of a small table to all HAWQ segment servers.
This next SQL statement will create a view of this external data source. We convert the tweet timestamp, stored as a TEXT object, to a SQL TIMESTAMP instead, as well as rename some columns. This will make it more user-friendly to query the data set, while not actually creating a new table out of the underlying JSON data.
CREATE VIEW tweets AS ( SELECT to_timestamp(regexp_replace(created_at, E'+[0-9]+ ', ''), 'Dy Mon DD HH24:MI:SS YYYY') AS createdat, id, text, source, "user.id" AS userid, "user.screen_name" AS screenname, "user.location" AS location, "user.friends_count" AS numfriends, "user.followers_count" AS numfollowers, "geo.coordinates[0]" AS lat, "geo.coordinates[1]" AS lon FROM ext_tweets_json );
Now that we have defined our table and view, we can query the data set interactively. For this example, I am going to cheat a little bit and load a month of this sample stream into HDFS. That way we can get some cool results from the following queries, rather than using only five minutes worth of Twitter data.
To begin analysis, we should start exploring this data set by counting how many tweets we are dealing with and how many distinct users we have.
ajshook=# SELECT COUNT(*) AS numtweets, COUNT(DISTINCT userid) AS numusers FROM tweets; numtweets | numusers -----------+---------- 16687314 | 10223914 (1 row) Time: 13312.813 ms
At the execution of this query, the HAWQ Master determines the optimal plan using a cost-based optimizer, and dispatches the query to HAWQ Segment Servers. The Segment Servers each parse a subset of blocks that make up the JSON data. As the data is read, PXF transforms the JSON into a row. Once the JSON is transformed, HAWQ continues to operate on the rows of data, enabling fast and intelligent queries.
In this case, we have about 16.7 million tweets and over 10.2 million distinct users. Let’s now take a look at the content of our tweets. We should start by finding users that are particularly chatty, say have 10 or more tweets, and then generate a new table consisting only of those tweets. The following two queries will do just that.
ajshook=# CREATE TABLE chatty_users AS ( SELECT userid FROM tweets GROUP BY userid HAVING COUNT(userid) >= 10 ) DISTRIBUTED BY (userid); SELECT 50421 Time: 8026.668 ms ajshook=# CREATE TABLE chatty_user_tweets AS ( SELECT t.* FROM chatty_users cu INNER JOIN tweets t ON t.userid = cu.userid ) DISTRIBUTED BY (userid); SELECT 665981 Time: 11378.701 ms
Here, we have extracted around fifty thousand users containing about 665 thousand tweets. This amounts to about .5% of users accounting for 4% of all of our tweets. Next up we should see just what these users like talking about. The following query uses some SQL functions to rip apart each tweet into words, while removing any non-alpha characters as well as filtering out common words. The top 1,000 common English words were loaded into a table — words_1000 — which is used with a JOIN to generate find the top 15 frequent words.
ajshook=# SELECT t.word, COUNT(*) FROM ( SELECT regexp_replace( unnest(string_to_array(regexp_replace(lower(text), E's', ' '), ' ')), '[^A-Za-z0-9]', '', 'g') AS word FROM chatty_user_tweets) t LEFT OUTER JOIN words_1000 cw ON (t.word = cw.word) WHERE t.word != '' AND cw.word is null AND LENGTH(t.word) > 5 GROUP BY t.word ORDER BY count DESC LIMIT 15; word | count ----------------+------- twitter | 3848 followers | 3802 justin | 3530 retweet | 2973 mtvhottest | 2554 nowplaying | 2270 following | 2050 thanks | 1928 followback | 1858 harrystyles | 1854 teamfollowback | 1773 justinbieber | 1665 openfollow | 1616 amazon | 1615 bieber | 1503 (15 rows) Time: 13233.394 ms
There are three mentions of Justin Bieber in this list, which is pretty interesting to me. Let’s go back to our original table and find tweets that are related to Justin Bieber and see who is talking about him.
ajshook=# SELECT screenname, count(*) FROM tweets WHERE text ~* '.*justin.*' AND text ~* '.*bieber.*' GROUP BY screenname ORDER BY count DESC LIMIT 10; screenname | count -----------------+------- JustinsRetweets | 32 JustBieBry | 20 votemtvjdb | 16 jbnextome | 16 DaneyFamous | 15 jbforbm | 14 StoryAboutNSN | 13 Richarskians | 12 FANSJustinMUSIC | 11 armyofsb | 11 (10 rows) Time: 15568.651 ms
From this query, we can see that a the top users discussing Justin Bieber from this sample data set also have a Bieber-related name! Not surprising that they’d be talking about him.
Using Flume to feed the live data into HDFS will allow us to get low-latency results using HAWQ and PXF. We defined an external table specifying the JSON fields of interest, and PXF will transform these JSON records into rows of data for us. Creating a view on this data then allows us to have user-friendly column names, and perform any further transformations on the data, such as parsing a timestamp from a string. If we need more fields from the raw JSON field, we can quickly modify our DDL without dropping and loading any more data. From here, we can continue to use ANSI SQL to interact with this large data set. We can create temporary tables to grab snapshots of the data and execute more intense analysis. We can connect third-party tools to HAWQ using ODBC or JDBC to work with this live JSON data feed.
The key takeaway is all of this raw data is transformed into rows of data on the fly using PXF—no MapReduce is needed to execute batch transformations of the data. Using the power of PXF and HAWQ, users are able to put an ANSI-SQL interface to virtually any data set or source. To determine if you can use this approach with your unique data set, many of the same principles for processing data via MapReduce apply—the data needs to be splittable and each row can be determined independently of any other row.