Monthly Archives: July 2015

Dead simple MongoDB to Hive Connector

At OneFold, we build software that allows our users to use SQL to query across multiple data sources. For example, one of our customer is using OneFold to run query across his Mixpanel, MongoDB and Stripe data (via SQL JOIN). At the heart of the OneFold platform is an ETL engine that transforms and loads semi-structured data (e.g. JSON) from various data sources into a traditional table-based data warehouses like RedShift, Google BigQuery and Hive.

One of the reason why we created OneFold was to simplify the loading of semi-structured data into traditional warehouses. With semi-structured data like JSON, there are three challenges that we’ve identified:

  1. JSON has weak data type. An attribute “zipcode” can have integer in one JSON object, and string in another JSON object. With data warehouses, the user needs to define the data type associated with a column ahead of time, and usually hard to change afterwards.

  2. JSON doesn’t have a schema. Because of this flexibility, new attributes are added over time, and data engineers usually need to play catch-up to add new columns to accomodate these new attributes.

  3. Nested and Array data types. JSON has nested and array structure that doesn’t translate well into a typical data warehouse table schema.

Recently, we open-sourced our MongoDB to Hive ETL engine. Now, MongoDB has open-sourced a set of code (which can be found here) that allows user to create Hive table with underlying data that lives in MongoDB. But the user needs to specify the schema during table creation which can be a big challenge.

Our ETL engine doesn’t require the user to know the structure of the MongoDB collection. In fact, the goal of our ETL engine is to eliminate all of the challenges listed above. It first parses through all the incoming JSON objects and creates a schema. Then it compares the new schema with the one in the data warehouse and applies the delta. Nested attributes are flattened, and array attributes are loaded into a different child table with the proper parent-child key.

You can download our MongoDB to Hive ETL engine here. Here’s what it does:

  1. Connects to your MongoDB and extract the specified collection into local file which is then copied to HDFS.
  2. MapReduce generates schema (a copy is saved back to MongoDB for info).
  3. MapReduce transforms data, breaking the array into multiple files in HDFS output folder.
  4. Create Hive tables using schema generated in step 2.
  5. Load Hive tables using HDFS files generated in step 3.

Simple case

Say you have a MongoDB collection called test.users, and you have a record in it:

> db.users.find();
{ "_id" : ObjectId("55426ac7151a4b4d32000001"), "mobile" : { "carrier" : "Sprint", "device" : "Samsung" }, "name" : "John Doe", "age" : 24, "utm_campaign" : "Facebook_Offer", "app_version" : "2.4", "address" : { "city" : "Chicago", "zipcode" : 94012 } }

To load this into Hive,

./ --mongo mongodb://[mongodb_host]:[mongodb_port] \
             --source_db test \
             --source_collection users \
             --hiveserver_host [hive_server_host] \
             --hiveserver_port [hive_server_port]


-- Initializing Hive Util --
Creating file /tmp/onefold_mongo/users/data/1
Executing command: cat /tmp/onefold_mongo/users/data/1 | json/ | sort | json/ mongodb://xxx:xxx/test/users_schema > /dev/null
Executing command: cat /tmp/onefold_mongo/users/data/1 | json/ mongodb://xxx:xxx/test/users_schema,/tmp/onefold_mongo/users/data_transform/output > /dev/null
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/root
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/root/part-00000 onefold_mongo/users/data_transform/output/root/
Executing HiveQL: show tables
Executing HiveQL: create table users (app_version string,utm_campaign string,id_oid string,age int,mobile_device string,name string,address_city string,hash_code string,mobile_carrier string,address_zipcode int) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
Executing HiveQL: load data inpath 'onefold_mongo/users/data_transform/output/root/*' into table users
Extracted data with _id from 55426ac7151a4b4d32000001 to 55426ac7151a4b4d32000001
Extracted files are located at: /tmp/onefold_mongo/users/data/1
Hive Tables: users
Schema is stored in Mongo test.users_schema

In Hive, you can see that a new table users is created:

hive> add jar [install_path]/java/HiveSerdes/target/hive-serdes-1.0-SNAPSHOT.jar;

hive> desc users;
app_version             string                  from deserializer
utm_campaign            string                  from deserializer
id_oid                  string                  from deserializer
age                     int                     from deserializer
mobile_device           string                  from deserializer
name                    string                  from deserializer
address_city            string                  from deserializer
hash_code               string                  from deserializer
mobile_carrier          string                  from deserializer
address_zipcode         int                     from deserializer
Time taken: 0.073 seconds, Fetched: 10 row(s)

hive> select * from users;
2.4     Facebook_Offer  55426ac7151a4b4d32000001        24      Samsung John Doe        Chicago 863a4ddd10579c8fc7e12b5bd1e188ce083eec2d        Sprint  94012
Time taken: 0.07 seconds, Fetched: 1 row(s)

Now let’s add a record with new fields

In Mongo, one new records is added with some new fields:

> db.users.find();
{ "_id" : ObjectId("55426c42151a4b4d9e000001"), "hobbies" : [ "reading", "cycling" ], "age" : 34, "work_history" : [ { "to" : "present", "from" : 2013, "name" : "IBM" }, { "to" : 2013, "from" : 2003, "name" : "Bell" } ], "utm_campaign" : "Google", "name" : "Alexander Keith", "app_version" : "2.5", "mobile" : { "device" : "iPhone", "carrier" : "Rogers" }, "address" : { "state" : "Ontario", "zipcode" : "M1K3A5", "street" : "26 Marshall Lane", "city" : "Toronto" } }

New fields added to the address nested object. address.zipcode is now string (used to be integer). A new hobbies field is introduced that is a string array. A new work_history field is introduced that is an array of nested objects.

Run the command with parameters --write_disposition append and --query '{"_id":{"$gt":ObjectId("55426ac7151a4b4d32000001")}}' to tell the program to query from MongoDB only records with ID larger than the old one, and to append to existing Hive table during load:

./ --mongo mongodb://[mongodb_host]:[mongodb_port] \
             --source_db test \
             --source_collection users \
             --hiveserver_host [hive_server_host] \
             --hiveserver_port [hive_server_port] \
             --write_disposition append \
             --query '{"_id":{"$gt":ObjectId("55426f15151a4b4e46000001")}}'


-- Initializing Hive Util --
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/root
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/root/part-00000 onefold_mongo/users/data_transform/output/root/
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/work_history
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/work_history/part-00000 onefold_mongo/users/data_transform/output/work_history/
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/hobbies
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/hobbies/part-00000 onefold_mongo/users/data_transform/output/hobbies/
Executing HiveQL: alter table `users` change `address_zipcode` `address_zipcode` string
Executing HiveQL: alter table `users` add columns (`address_state` string)
Executing HiveQL: alter table `users` add columns (`address_street` string)
Executing HiveQL: create table `users_hobbies` (parent_hash_code string,hash_code string,`value` string) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
Executing HiveQL: create table `users_work_history` (parent_hash_code string,hash_code string,`from` int,`name` string,`to` string) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
Extracted data with _id from 55426f52151a4b4e5a000001 to 55426f52151a4b4e5a000001
Extracted files are located at: /tmp/onefold_mongo/users/data/1
Hive Tables: users users_hobbies users_work_history
Schema is stored in Mongo test.users_schema

In Hive, two new tables are created: users_hobbies and users_work_history:

hive> show tables;

hive> desc users_hobbies;
parent_hash_code        string                  from deserializer
hash_code               string                  from deserializer
value                   string                  from deserializer
Time taken: 0.068 seconds, Fetched: 3 row(s)

hive> desc users_work_history;
parent_hash_code        string                  from deserializer
hash_code               string                  from deserializer
from                    int                     from deserializer
name                    string                  from deserializer
to                      string                  from deserializer
Time taken: 0.067 seconds, Fetched: 5 row(s)

You can join parent and child table like:

hive> select * from users 
  join users_hobbies on users.hash_code = users_hobbies.parent_hash_code 
  join users_work_history on users.hash_code = users_work_history.parent_hash_code;

Voila! As you can see, the ETL engine takes care of all the guesswork related to schema creation and maintenance. It also handles nested and array data types quite well. More information on its usage can be found in the Github repo.

Email Open Tracking using AWS CloudFront

At Plum District, we send out a lot of marketing emails – on some days more than 5 millions daily. It’s a big part of our business, and we spend a lot of our time tracking open and send rates, and running analytics on this data. Luckily, SendGrid is able to funnel all that data via their Event Notification API where all the events (processed, opened, clicked, etc) are sent to us. We recently acquired another company, but unfortunately they send their emails via Amazon SES which doesn’t have any tracking.

In this article, I’ll discuss an innovative way to track email open rate using Amazon CloudFront. Well, the basic mechanism is really just pixel tracking. CloudFront provides detailed access log that’s dumped directly into S3. Hence, we can host the pixel in CloudFront, put the pixel in the email (plus any optional HTTP params that you want to track) and be able to track how many times that pixel is loaded, and finally track open count plus a bunch of other information including demographic, most active timeframe, etc.

Here are the steps:

  1. Create an S3 bucket if you don’t have one to store the pixel. In this case, I’ve put the pixel under s3n://plum-mms/images/1.gif. Feel free to borrow the pixel here. It’s just a 1×1 transparent gif. Also, make sure that bucket has the appropriate permission, i.e. Everyone → Open / Download.

  2. Create another S3 bucket for logs. In our case, we’ve created a bucket named plum-mms-logs.

  3. Configure a CloudFront distribution using the S3 bucket you created in step 1. Make sure you have the following configured when creating:

    On. This tells CloudFront to enable logging.
    Cookie Logging
    Off. We don’t really need that information.
    Log Bucket
    This tells CloudFront where to dump the access log. In my case, I selected which corresponds to the bucket I created in step 2.
  4. Capture the domain name associated with your new CloudFront distribution. In our case, it’s You can test that returns the GIF file in a browser.

  5. Insert the pixel in your email template. We want to capture who has opened an email, so we’ve included the subscriber ID, as well as the email category as GET parameters. Here’s a bit of Ruby code to generate the pixel:

    pixel_tracking_url = nil
    if subscriber && category
      pixel_tracking_url = "{}&category=#{category}"
  6. And in your email template (.erb file), you can add the code anywhere in the email:

    <% if @pixel_tracking_url %>
      <img src="<%= @pixel_tracking_url%>" width="1" height="1" alt=""/>
    <% end %>

Now we are ready to roll! You can send the email to a few test email accounts, and see if you are getting the logs. It usually takes a few hours for CloudFront to push the log files out to your logging bucket.