Tag Archives: mongodb

MongoDB to Google BigQuery ETL Engine now available to public

At OneFold, we used Google BigQuery from day one. We chose Google BigQuery for the following reasons:

  1. Cost. OneFold is a startup and for that, it has very limited budget. We simply don’t have the money and the admin overhead to manage a 100-node Hadoop cluster. With BigQuery, we get access to thousands of machines while paying for only what we use.
  2. Speed. The speed of execution is phenomenal. Some of our clients have upwards of 20TB of data, and running complex queries on the entire dataset usually takes seconds. This is because of the unique architecture of Google BigQuery which splits the job load over thousands of machines.
  3. Ease-of-Use. BigQuery was super easy to get started and learning curve is low. We were able to load a sizable data collection and start writing queries within minutes. More importantly, since BigQuery is a managed service, we don’t have to spend cycles maintaining any hardware. The data is just there when we need it.

A few months ago, we at OneFold open-sourced its MongoDB to Hive ETL Engine. Today, we are happy to announce that we have now open-sourced our MongoDB to BigQuery ETL Engine also. The feature set for both of them are similar in that they solve similar challenges when one tried to move unstructured data like JSON or BSON to structured data warehouse like Hive or Google BigQuery. These challenges can be summarized as:

  1. JSON has weak data type. An attribute “zipcode” can have integer in one JSON object, and string in another JSON object. With data warehouses, the user needs to define the data type associated with a column ahead of time, and usually hard to change afterwards.
  2. JSON doesn’t have a schema. Because of this flexibility, new attributes are added over time, and data engineers usually need to play catch-up to add new columns to accomodate these new attributes.
  3. Nested and Array data types. JSON has nested and array structure that doesn’t translate well into a typical data warehouse table schema.

The ETL engine performs a scan over the data collection and creates a schema automatically. For Google BigQuery, since it supports certain nested and array data type, the ETL engine can either split those data into child tables or keep them inline.

You can download our MongoDB to Google BigQuery ETL engine here. Let us know what you think and how it can be improved.

Dead simple MongoDB to Hive Connector

At OneFold, we build software that allows our users to use SQL to query across multiple data sources. For example, one of our customer is using OneFold to run query across his Mixpanel, MongoDB and Stripe data (via SQL JOIN). At the heart of the OneFold platform is an ETL engine that transforms and loads semi-structured data (e.g. JSON) from various data sources into a traditional table-based data warehouses like RedShift, Google BigQuery and Hive.

One of the reason why we created OneFold was to simplify the loading of semi-structured data into traditional warehouses. With semi-structured data like JSON, there are three challenges that we’ve identified:

  1. JSON has weak data type. An attribute “zipcode” can have integer in one JSON object, and string in another JSON object. With data warehouses, the user needs to define the data type associated with a column ahead of time, and usually hard to change afterwards.

  2. JSON doesn’t have a schema. Because of this flexibility, new attributes are added over time, and data engineers usually need to play catch-up to add new columns to accomodate these new attributes.

  3. Nested and Array data types. JSON has nested and array structure that doesn’t translate well into a typical data warehouse table schema.

Recently, we open-sourced our MongoDB to Hive ETL engine. Now, MongoDB has open-sourced a set of code (which can be found here) that allows user to create Hive table with underlying data that lives in MongoDB. But the user needs to specify the schema during table creation which can be a big challenge.

Our ETL engine doesn’t require the user to know the structure of the MongoDB collection. In fact, the goal of our ETL engine is to eliminate all of the challenges listed above. It first parses through all the incoming JSON objects and creates a schema. Then it compares the new schema with the one in the data warehouse and applies the delta. Nested attributes are flattened, and array attributes are loaded into a different child table with the proper parent-child key.

You can download our MongoDB to Hive ETL engine here. Here’s what it does:

  1. Connects to your MongoDB and extract the specified collection into local file which is then copied to HDFS.
  2. MapReduce generates schema (a copy is saved back to MongoDB for info).
  3. MapReduce transforms data, breaking the array into multiple files in HDFS output folder.
  4. Create Hive tables using schema generated in step 2.
  5. Load Hive tables using HDFS files generated in step 3.

Simple case

Say you have a MongoDB collection called test.users, and you have a record in it:


> db.users.find();
{ "_id" : ObjectId("55426ac7151a4b4d32000001"), "mobile" : { "carrier" : "Sprint", "device" : "Samsung" }, "name" : "John Doe", "age" : 24, "utm_campaign" : "Facebook_Offer", "app_version" : "2.4", "address" : { "city" : "Chicago", "zipcode" : 94012 } }

To load this into Hive,


./onefold.py --mongo mongodb://[mongodb_host]:[mongodb_port] \
             --source_db test \
             --source_collection users \
             --hiveserver_host [hive_server_host] \
             --hiveserver_port [hive_server_port]

Results:


-- Initializing Hive Util --
Creating file /tmp/onefold_mongo/users/data/1
Executing command: cat /tmp/onefold_mongo/users/data/1 | json/generate-schema-mapper.py | sort | json/generate-schema-reducer.py mongodb://xxx:xxx/test/users_schema > /dev/null
Executing command: cat /tmp/onefold_mongo/users/data/1 | json/transform-data-mapper.py mongodb://xxx:xxx/test/users_schema,/tmp/onefold_mongo/users/data_transform/output > /dev/null
...
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/root
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/root/part-00000 onefold_mongo/users/data_transform/output/root/
..
Executing HiveQL: show tables
Executing HiveQL: create table users (app_version string,utm_campaign string,id_oid string,age int,mobile_device string,name string,address_city string,hash_code string,mobile_carrier string,address_zipcode int) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
Executing HiveQL: load data inpath 'onefold_mongo/users/data_transform/output/root/*' into table users
-------------------
    RUN SUMMARY
-------------------
Extracted data with _id from 55426ac7151a4b4d32000001 to 55426ac7151a4b4d32000001
Extracted files are located at: /tmp/onefold_mongo/users/data/1
Hive Tables: users
Schema is stored in Mongo test.users_schema

In Hive, you can see that a new table users is created:


hive> add jar [install_path]/java/HiveSerdes/target/hive-serdes-1.0-SNAPSHOT.jar;

hive> desc users;
app_version             string                  from deserializer
utm_campaign            string                  from deserializer
id_oid                  string                  from deserializer
age                     int                     from deserializer
mobile_device           string                  from deserializer
name                    string                  from deserializer
address_city            string                  from deserializer
hash_code               string                  from deserializer
mobile_carrier          string                  from deserializer
address_zipcode         int                     from deserializer
Time taken: 0.073 seconds, Fetched: 10 row(s)

hive> select * from users;
2.4     Facebook_Offer  55426ac7151a4b4d32000001        24      Samsung John Doe        Chicago 863a4ddd10579c8fc7e12b5bd1e188ce083eec2d        Sprint  94012
Time taken: 0.07 seconds, Fetched: 1 row(s)

Now let’s add a record with new fields

In Mongo, one new records is added with some new fields:


> db.users.find();
...
{ "_id" : ObjectId("55426c42151a4b4d9e000001"), "hobbies" : [ "reading", "cycling" ], "age" : 34, "work_history" : [ { "to" : "present", "from" : 2013, "name" : "IBM" }, { "to" : 2013, "from" : 2003, "name" : "Bell" } ], "utm_campaign" : "Google", "name" : "Alexander Keith", "app_version" : "2.5", "mobile" : { "device" : "iPhone", "carrier" : "Rogers" }, "address" : { "state" : "Ontario", "zipcode" : "M1K3A5", "street" : "26 Marshall Lane", "city" : "Toronto" } }

New fields added to the address nested object. address.zipcode is now string (used to be integer). A new hobbies field is introduced that is a string array. A new work_history field is introduced that is an array of nested objects.

Run the command with parameters --write_disposition append and --query '{"_id":{"$gt":ObjectId("55426ac7151a4b4d32000001")}}' to tell the program to query from MongoDB only records with ID larger than the old one, and to append to existing Hive table during load:


./onefold.py --mongo mongodb://[mongodb_host]:[mongodb_port] \
             --source_db test \
             --source_collection users \
             --hiveserver_host [hive_server_host] \
             --hiveserver_port [hive_server_port] \
             --write_disposition append \
             --query '{"_id":{"$gt":ObjectId("55426f15151a4b4e46000001")}}'

Results:


-- Initializing Hive Util --
...
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/root
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/root/part-00000 onefold_mongo/users/data_transform/output/root/
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/work_history
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/work_history/part-00000 onefold_mongo/users/data_transform/output/work_history/
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/hobbies
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/hobbies/part-00000 onefold_mongo/users/data_transform/output/hobbies/
...
Executing HiveQL: alter table `users` change `address_zipcode` `address_zipcode` string
Executing HiveQL: alter table `users` add columns (`address_state` string)
Executing HiveQL: alter table `users` add columns (`address_street` string)
Executing HiveQL: create table `users_hobbies` (parent_hash_code string,hash_code string,`value` string) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
Executing HiveQL: create table `users_work_history` (parent_hash_code string,hash_code string,`from` int,`name` string,`to` string) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
...
-------------------
    RUN SUMMARY
-------------------
Extracted data with _id from 55426f52151a4b4e5a000001 to 55426f52151a4b4e5a000001
Extracted files are located at: /tmp/onefold_mongo/users/data/1
Hive Tables: users users_hobbies users_work_history
Schema is stored in Mongo test.users_schema

In Hive, two new tables are created: users_hobbies and users_work_history:


hive> show tables;
users
users_hobbies
users_work_history

hive> desc users_hobbies;
OK
parent_hash_code        string                  from deserializer
hash_code               string                  from deserializer
value                   string                  from deserializer
Time taken: 0.068 seconds, Fetched: 3 row(s)

hive> desc users_work_history;
OK
parent_hash_code        string                  from deserializer
hash_code               string                  from deserializer
from                    int                     from deserializer
name                    string                  from deserializer
to                      string                  from deserializer
Time taken: 0.067 seconds, Fetched: 5 row(s)

You can join parent and child table like:


hive> select * from users 
  join users_hobbies on users.hash_code = users_hobbies.parent_hash_code 
  join users_work_history on users.hash_code = users_work_history.parent_hash_code;

Voila! As you can see, the ETL engine takes care of all the guesswork related to schema creation and maintenance. It also handles nested and array data types quite well. More information on its usage can be found in the Github repo.