Apache Spark is a commonly used framework for distributed computing. It provides the capability to analyze data sets on the order of terabytes or larger. In this post, we show how to use SparkR, an R package providing a frontend to use Spark, by analyzing trip data from Citi Bike, NYC's bike sharing program. This data is publicly available at https://www.citibikenyc.com/system-data. It describes trip information (starting and stopping locations, trip time, etc.) and some limited rider information (birth year, gender). These tables (as of September 2017) constitute 8 GB of disk space and span 45 million rows. The code used to generate these analyses and figures can be found at https://github.com/gregtam/citi-bike-analysis/.
This analysis is done using Amazon Elastic MapReduce with Apache Spark and Apache Hive. The Citi Bike data is stored on Amazon S3 and accessed via Spark.
Spark has a DataFrame API, which is modeled after R's dplyr library and Python's Pandas DataFrames. This API provides an intuitive, tabular structure for data that data scientists are familiar with. As such, much of the functionality that SQL provides is available with Spark DataFrames such as SELECT
, WHERE
, ORDER BY
, GROUP BY
, and JOIN
. These can all be executed from within SparkR.
Citi Bike Data
Before doing any analysis, we can pull in some sample records to better understand our data.
take(citi_bike_trips_sdf, 10)
The take function samples a subset of rows from the Spark DataFrame and brings it into R as an R data frame.
We can see the table’s columns and respective data types by typing the DataFrame name into R.
> citi_bike_trips_sdf SparkDataFrame[tripduration:string, starttime:timestamp, stoptime:timestamp, start_station_id:string, start_station_name:string, start_station_latitude:double, start_station_longitude:double, end_station_id:string, end_station_name:string, end_station_latitude:double, end_station_longitude:double, bikeid:string, usertype:string, birth_year:string, gender:string]
One task we would like to achieve is determining which stations are being used most frequently. We can achieve this by pulling in the start and end points, “unioning” them, then running a “group by” and a “count”.
We use the magrittr library here; it allows us to use the %>%
operator, which pipes the output of a function as the input in the following function. This creates more legible code since functions are shown in the order they are run instead of being nested.
start_station_sdf <- citi_bike_trips_sdf %>% select(column('starttime') %>% alias('use_time'), column('start_station_name') %>% alias('station_name'), column('start_station_latitude') %>% alias('station_latitude'), column('start_station_longitude') %>% alias('station_longitude')) end_station_sdf <- citi_bike_trips_sdf %>% select(column('stoptime') %>% alias('use_time'), column('end_station_name') %>% alias('station_name'), column('end_station_latitude') %>% alias('station_latitude'), column('end_station_longitude') %>% alias('station_longitude')) station_uses_sdf <- union(start_station_sdf, end_station_sdf) most_common_station_df <- station_uses_sdf %>% groupBy('station_name', 'station_longitude', 'station_latitude') %>% count() %>% collect()
The SparkR functions select
, alias
, groupBy
, and count
functions are used in the same way as their equivalent SQL statements. We chain multiple operations to transform the citi_bike_trips_sdf
DataFrame.
By design, Spark is a 'lazy evaluator', meaning that it does not execute any of the operations until one of a specific set of actions is run. Common examples of this are take
, which brings subset of the data locally, and collect
, which brings all of the data in locally. It does this to optimize the entire operation. In the example above, start_station_sdf
and end_station_sdf
are temporary Spark tables that are never saved. Splitting citi_bike_trips_sdf
into two Spark DataFrames allows us to write cleaner code.
Once collect
is run, the entire set of steps to create most_common_station_df
is run and the result is stored in an R data frame. In general, data scientists should be cautious when running collect
since the data may be too large to fit in memory. In this case, because we are running a group by, the query shrinks the data substantially and it can be brought into R.
(Side Note: The collect
statement may take too long if there are too many operations that need to be run. We can circumvent this by either caching our intermediary tables or saving them to Hive.)
From here, we can use the ggplot library to plot our data.
Figure 1: Most commonly used stations
As we would expect, the most frequently used stations are in Manhattan, as opposed to Brooklyn or Queens, since Manhattan is the business hub for the New York City area and contains a disproportionately large number of residents, tourists, and commuters who are working in the city. By the same reasoning, we observe that there are far more stations in Manhattan than the other larger boroughs.
The markedly small number of trips near the upper portion of Central Park and areas of Brooklyn is due to the fact that those stations were added more recently.
Paths
Next, we will look at paths that riders take. Since the only information we have is the start and end points and not the path taken, we will make an assumption that rides only follow straight lines.
Figure 2: 150 most common Citi Bike paths
From the above plot, we observe two distinct categories of Citi Bike uses:
-
Scenic/Leisure – Three of the most common Citi Bike areas are within Central Park, along the Hudson River, and over the Brooklyn Bridge.
-
Convenience – There are many paths that are taken between avenues, that is, between the west and east sides of Manhattan. A likely reason for this is that, apart from the L and 7 trains, there are no other subway trains going in the east/west direction. Citi Bike provides a convenient alternative.
This plot shows all rides in aggregate. We might gain more insight if we instead incorporated the time that these rides were taken. We will look at ridership by hour of day and whether the ride was taken on a weekend. Sample code to extract the day of week and hour is shown below.
weekday_bike_trips_sdf <- citi_bike_trips_sdf %>% withColumn('start_dayofweek', date_format(column('starttime'), 'E')) %>% withColumn('stop_dayofweek', date_format(column('stoptime'), 'E')) %>% withColumn('starttime_unix', unix_timestamp(column('starttime'))) %>% withColumn('stoptime_unix', unix_timestamp(column('stoptime'))) %>% withColumn('trip_length_sec', (column('stoptime_unix') - column('starttime_unix'))) %>% where(column('trip_length_sec') <= 60*60*24) %>% where(column('start_dayofweek') != 'Sat') %>% where(column('start_dayofweek') != 'Sun') %>% where(column('stop_dayofweek') != 'Sat') %>% where(column('stop_dayofweek') != 'Sun') start_hour_weekday_group_df <- weekday_bike_trips_sdf %>% select(hour(column('starttime')) %>% alias('start_hour')) %>% groupBy('start_hour') %>% count() %>% orderBy('start_hour') %>% collect()
Figure 3: Weekday ridership by hour of day
Figure 4: Weekend ridership by hour of day
On weekdays, there are two distinct peaks—morning rush hour (8-10 am) and evening rush hour (5-7 pm). On weekends, there is a smooth curve that reaches its peak shortly after noon.
If we alter our plots, showing one for morning rush hour and another for evening rush hour, we see two distinct patterns.
Figure 5: Most common morning commute paths
Morning commutes typically fall into the “convenience” category, where the paths cut across multiple avenues.
Figure 6: Most common evening commute paths
During the evening, riders are in less of a hurry, so we see a smaller proportion of east/west trips. Additionally, more trips taken on the scenic bicycle path that goes along the Hudson River on the west side of the island as well as over the Brooklyn Bridge.
Figure 7: Most common weekend paths
Weekend trips are markedly different from weekday trips. There are far more trips across the Brooklyn and Williamsburg bridges and in Central Park. We also see more circles on the plot, which indicate aimless trips that start and end in the same place.
Final Thoughts
In this post, we have shown that SparkR is fully capable solution for analyzing large data sets. Additionally, SparkR supports many of the commonly used machine learning algorithms in a familiar R syntax; these functions are wrappers for MLlib. We can use these to determine which station riders are likely to go to given their starting point and the date and time of day.
For those R zealots, check out sparklyr, which is a dplyr interface to Spark. SparkR still has a few more refinements before catching up with the other Spark languages. User-defined functions require a more rigid syntax when compared to PySpark. Nevertheless, SparkR is pretty close to a fully-fledged distributing computing solution.