Category Archives: hadoop

Dead simple MongoDB to Hive Connector

At OneFold, we build software that allows our users to use SQL to query across multiple data sources. For example, one of our customer is using OneFold to run query across his Mixpanel, MongoDB and Stripe data (via SQL JOIN). At the heart of the OneFold platform is an ETL engine that transforms and loads semi-structured data (e.g. JSON) from various data sources into a traditional table-based data warehouses like RedShift, Google BigQuery and Hive.

One of the reason why we created OneFold was to simplify the loading of semi-structured data into traditional warehouses. With semi-structured data like JSON, there are three challenges that we’ve identified:

  1. JSON has weak data type. An attribute “zipcode” can have integer in one JSON object, and string in another JSON object. With data warehouses, the user needs to define the data type associated with a column ahead of time, and usually hard to change afterwards.

  2. JSON doesn’t have a schema. Because of this flexibility, new attributes are added over time, and data engineers usually need to play catch-up to add new columns to accomodate these new attributes.

  3. Nested and Array data types. JSON has nested and array structure that doesn’t translate well into a typical data warehouse table schema.

Recently, we open-sourced our MongoDB to Hive ETL engine. Now, MongoDB has open-sourced a set of code (which can be found here) that allows user to create Hive table with underlying data that lives in MongoDB. But the user needs to specify the schema during table creation which can be a big challenge.

Our ETL engine doesn’t require the user to know the structure of the MongoDB collection. In fact, the goal of our ETL engine is to eliminate all of the challenges listed above. It first parses through all the incoming JSON objects and creates a schema. Then it compares the new schema with the one in the data warehouse and applies the delta. Nested attributes are flattened, and array attributes are loaded into a different child table with the proper parent-child key.

You can download our MongoDB to Hive ETL engine here. Here’s what it does:

  1. Connects to your MongoDB and extract the specified collection into local file which is then copied to HDFS.
  2. MapReduce generates schema (a copy is saved back to MongoDB for info).
  3. MapReduce transforms data, breaking the array into multiple files in HDFS output folder.
  4. Create Hive tables using schema generated in step 2.
  5. Load Hive tables using HDFS files generated in step 3.

Simple case

Say you have a MongoDB collection called test.users, and you have a record in it:


> db.users.find();
{ "_id" : ObjectId("55426ac7151a4b4d32000001"), "mobile" : { "carrier" : "Sprint", "device" : "Samsung" }, "name" : "John Doe", "age" : 24, "utm_campaign" : "Facebook_Offer", "app_version" : "2.4", "address" : { "city" : "Chicago", "zipcode" : 94012 } }

To load this into Hive,


./onefold.py --mongo mongodb://[mongodb_host]:[mongodb_port] \
             --source_db test \
             --source_collection users \
             --hiveserver_host [hive_server_host] \
             --hiveserver_port [hive_server_port]

Results:


-- Initializing Hive Util --
Creating file /tmp/onefold_mongo/users/data/1
Executing command: cat /tmp/onefold_mongo/users/data/1 | json/generate-schema-mapper.py | sort | json/generate-schema-reducer.py mongodb://xxx:xxx/test/users_schema > /dev/null
Executing command: cat /tmp/onefold_mongo/users/data/1 | json/transform-data-mapper.py mongodb://xxx:xxx/test/users_schema,/tmp/onefold_mongo/users/data_transform/output > /dev/null
...
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/root
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/root/part-00000 onefold_mongo/users/data_transform/output/root/
..
Executing HiveQL: show tables
Executing HiveQL: create table users (app_version string,utm_campaign string,id_oid string,age int,mobile_device string,name string,address_city string,hash_code string,mobile_carrier string,address_zipcode int) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
Executing HiveQL: load data inpath 'onefold_mongo/users/data_transform/output/root/*' into table users
-------------------
    RUN SUMMARY
-------------------
Extracted data with _id from 55426ac7151a4b4d32000001 to 55426ac7151a4b4d32000001
Extracted files are located at: /tmp/onefold_mongo/users/data/1
Hive Tables: users
Schema is stored in Mongo test.users_schema

In Hive, you can see that a new table users is created:


hive> add jar [install_path]/java/HiveSerdes/target/hive-serdes-1.0-SNAPSHOT.jar;

hive> desc users;
app_version             string                  from deserializer
utm_campaign            string                  from deserializer
id_oid                  string                  from deserializer
age                     int                     from deserializer
mobile_device           string                  from deserializer
name                    string                  from deserializer
address_city            string                  from deserializer
hash_code               string                  from deserializer
mobile_carrier          string                  from deserializer
address_zipcode         int                     from deserializer
Time taken: 0.073 seconds, Fetched: 10 row(s)

hive> select * from users;
2.4     Facebook_Offer  55426ac7151a4b4d32000001        24      Samsung John Doe        Chicago 863a4ddd10579c8fc7e12b5bd1e188ce083eec2d        Sprint  94012
Time taken: 0.07 seconds, Fetched: 1 row(s)

Now let’s add a record with new fields

In Mongo, one new records is added with some new fields:


> db.users.find();
...
{ "_id" : ObjectId("55426c42151a4b4d9e000001"), "hobbies" : [ "reading", "cycling" ], "age" : 34, "work_history" : [ { "to" : "present", "from" : 2013, "name" : "IBM" }, { "to" : 2013, "from" : 2003, "name" : "Bell" } ], "utm_campaign" : "Google", "name" : "Alexander Keith", "app_version" : "2.5", "mobile" : { "device" : "iPhone", "carrier" : "Rogers" }, "address" : { "state" : "Ontario", "zipcode" : "M1K3A5", "street" : "26 Marshall Lane", "city" : "Toronto" } }

New fields added to the address nested object. address.zipcode is now string (used to be integer). A new hobbies field is introduced that is a string array. A new work_history field is introduced that is an array of nested objects.

Run the command with parameters --write_disposition append and --query '{"_id":{"$gt":ObjectId("55426ac7151a4b4d32000001")}}' to tell the program to query from MongoDB only records with ID larger than the old one, and to append to existing Hive table during load:


./onefold.py --mongo mongodb://[mongodb_host]:[mongodb_port] \
             --source_db test \
             --source_collection users \
             --hiveserver_host [hive_server_host] \
             --hiveserver_port [hive_server_port] \
             --write_disposition append \
             --query '{"_id":{"$gt":ObjectId("55426f15151a4b4e46000001")}}'

Results:


-- Initializing Hive Util --
...
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/root
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/root/part-00000 onefold_mongo/users/data_transform/output/root/
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/work_history
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/work_history/part-00000 onefold_mongo/users/data_transform/output/work_history/
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/hobbies
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/hobbies/part-00000 onefold_mongo/users/data_transform/output/hobbies/
...
Executing HiveQL: alter table `users` change `address_zipcode` `address_zipcode` string
Executing HiveQL: alter table `users` add columns (`address_state` string)
Executing HiveQL: alter table `users` add columns (`address_street` string)
Executing HiveQL: create table `users_hobbies` (parent_hash_code string,hash_code string,`value` string) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
Executing HiveQL: create table `users_work_history` (parent_hash_code string,hash_code string,`from` int,`name` string,`to` string) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
...
-------------------
    RUN SUMMARY
-------------------
Extracted data with _id from 55426f52151a4b4e5a000001 to 55426f52151a4b4e5a000001
Extracted files are located at: /tmp/onefold_mongo/users/data/1
Hive Tables: users users_hobbies users_work_history
Schema is stored in Mongo test.users_schema

In Hive, two new tables are created: users_hobbies and users_work_history:


hive> show tables;
users
users_hobbies
users_work_history

hive> desc users_hobbies;
OK
parent_hash_code        string                  from deserializer
hash_code               string                  from deserializer
value                   string                  from deserializer
Time taken: 0.068 seconds, Fetched: 3 row(s)

hive> desc users_work_history;
OK
parent_hash_code        string                  from deserializer
hash_code               string                  from deserializer
from                    int                     from deserializer
name                    string                  from deserializer
to                      string                  from deserializer
Time taken: 0.067 seconds, Fetched: 5 row(s)

You can join parent and child table like:


hive> select * from users 
  join users_hobbies on users.hash_code = users_hobbies.parent_hash_code 
  join users_work_history on users.hash_code = users_work_history.parent_hash_code;

Voila! As you can see, the ETL engine takes care of all the guesswork related to schema creation and maintenance. It also handles nested and array data types quite well. More information on its usage can be found in the Github repo.