Dead simple MongoDB to Hive Connector

At OneFold, we build software that allows our users to use SQL to query across multiple data sources. For example, one of our customer is using OneFold to run query across his Mixpanel, MongoDB and Stripe data (via SQL JOIN). At the heart of the OneFold platform is an ETL engine that transforms and loads semi-structured data (e.g. JSON) from various data sources into a traditional table-based data warehouses like RedShift, Google BigQuery and Hive.

One of the reason why we created OneFold was to simplify the loading of semi-structured data into traditional warehouses. With semi-structured data like JSON, there are three challenges that we’ve identified:

  1. JSON has weak data type. An attribute “zipcode” can have integer in one JSON object, and string in another JSON object. With data warehouses, the user needs to define the data type associated with a column ahead of time, and usually hard to change afterwards.

  2. JSON doesn’t have a schema. Because of this flexibility, new attributes are added over time, and data engineers usually need to play catch-up to add new columns to accomodate these new attributes.

  3. Nested and Array data types. JSON has nested and array structure that doesn’t translate well into a typical data warehouse table schema.

Recently, we open-sourced our MongoDB to Hive ETL engine. Now, MongoDB has open-sourced a set of code (which can be found here) that allows user to create Hive table with underlying data that lives in MongoDB. But the user needs to specify the schema during table creation which can be a big challenge.

Our ETL engine doesn’t require the user to know the structure of the MongoDB collection. In fact, the goal of our ETL engine is to eliminate all of the challenges listed above. It first parses through all the incoming JSON objects and creates a schema. Then it compares the new schema with the one in the data warehouse and applies the delta. Nested attributes are flattened, and array attributes are loaded into a different child table with the proper parent-child key.

You can download our MongoDB to Hive ETL engine here. Here’s what it does:

  1. Connects to your MongoDB and extract the specified collection into local file which is then copied to HDFS.
  2. MapReduce generates schema (a copy is saved back to MongoDB for info).
  3. MapReduce transforms data, breaking the array into multiple files in HDFS output folder.
  4. Create Hive tables using schema generated in step 2.
  5. Load Hive tables using HDFS files generated in step 3.

Simple case

Say you have a MongoDB collection called test.users, and you have a record in it:

> db.users.find();
{ "_id" : ObjectId("55426ac7151a4b4d32000001"), "mobile" : { "carrier" : "Sprint", "device" : "Samsung" }, "name" : "John Doe", "age" : 24, "utm_campaign" : "Facebook_Offer", "app_version" : "2.4", "address" : { "city" : "Chicago", "zipcode" : 94012 } }

To load this into Hive,

./ --mongo mongodb://[mongodb_host]:[mongodb_port] \
             --source_db test \
             --source_collection users \
             --hiveserver_host [hive_server_host] \
             --hiveserver_port [hive_server_port]


-- Initializing Hive Util --
Creating file /tmp/onefold_mongo/users/data/1
Executing command: cat /tmp/onefold_mongo/users/data/1 | json/ | sort | json/ mongodb://xxx:xxx/test/users_schema > /dev/null
Executing command: cat /tmp/onefold_mongo/users/data/1 | json/ mongodb://xxx:xxx/test/users_schema,/tmp/onefold_mongo/users/data_transform/output > /dev/null
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/root
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/root/part-00000 onefold_mongo/users/data_transform/output/root/
Executing HiveQL: show tables
Executing HiveQL: create table users (app_version string,utm_campaign string,id_oid string,age int,mobile_device string,name string,address_city string,hash_code string,mobile_carrier string,address_zipcode int) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
Executing HiveQL: load data inpath 'onefold_mongo/users/data_transform/output/root/*' into table users
Extracted data with _id from 55426ac7151a4b4d32000001 to 55426ac7151a4b4d32000001
Extracted files are located at: /tmp/onefold_mongo/users/data/1
Hive Tables: users
Schema is stored in Mongo test.users_schema

In Hive, you can see that a new table users is created:

hive> add jar [install_path]/java/HiveSerdes/target/hive-serdes-1.0-SNAPSHOT.jar;

hive> desc users;
app_version             string                  from deserializer
utm_campaign            string                  from deserializer
id_oid                  string                  from deserializer
age                     int                     from deserializer
mobile_device           string                  from deserializer
name                    string                  from deserializer
address_city            string                  from deserializer
hash_code               string                  from deserializer
mobile_carrier          string                  from deserializer
address_zipcode         int                     from deserializer
Time taken: 0.073 seconds, Fetched: 10 row(s)

hive> select * from users;
2.4     Facebook_Offer  55426ac7151a4b4d32000001        24      Samsung John Doe        Chicago 863a4ddd10579c8fc7e12b5bd1e188ce083eec2d        Sprint  94012
Time taken: 0.07 seconds, Fetched: 1 row(s)

Now let’s add a record with new fields

In Mongo, one new records is added with some new fields:

> db.users.find();
{ "_id" : ObjectId("55426c42151a4b4d9e000001"), "hobbies" : [ "reading", "cycling" ], "age" : 34, "work_history" : [ { "to" : "present", "from" : 2013, "name" : "IBM" }, { "to" : 2013, "from" : 2003, "name" : "Bell" } ], "utm_campaign" : "Google", "name" : "Alexander Keith", "app_version" : "2.5", "mobile" : { "device" : "iPhone", "carrier" : "Rogers" }, "address" : { "state" : "Ontario", "zipcode" : "M1K3A5", "street" : "26 Marshall Lane", "city" : "Toronto" } }

New fields added to the address nested object. address.zipcode is now string (used to be integer). A new hobbies field is introduced that is a string array. A new work_history field is introduced that is an array of nested objects.

Run the command with parameters --write_disposition append and --query '{"_id":{"$gt":ObjectId("55426ac7151a4b4d32000001")}}' to tell the program to query from MongoDB only records with ID larger than the old one, and to append to existing Hive table during load:

./ --mongo mongodb://[mongodb_host]:[mongodb_port] \
             --source_db test \
             --source_collection users \
             --hiveserver_host [hive_server_host] \
             --hiveserver_port [hive_server_port] \
             --write_disposition append \
             --query '{"_id":{"$gt":ObjectId("55426f15151a4b4e46000001")}}'


-- Initializing Hive Util --
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/root
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/root/part-00000 onefold_mongo/users/data_transform/output/root/
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/work_history
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/work_history/part-00000 onefold_mongo/users/data_transform/output/work_history/
Executing command: hadoop fs -mkdir -p onefold_mongo/users/data_transform/output/hobbies
Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/users/data_transform/output/hobbies/part-00000 onefold_mongo/users/data_transform/output/hobbies/
Executing HiveQL: alter table `users` change `address_zipcode` `address_zipcode` string
Executing HiveQL: alter table `users` add columns (`address_state` string)
Executing HiveQL: alter table `users` add columns (`address_street` string)
Executing HiveQL: create table `users_hobbies` (parent_hash_code string,hash_code string,`value` string) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
Executing HiveQL: create table `users_work_history` (parent_hash_code string,hash_code string,`from` int,`name` string,`to` string) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
Extracted data with _id from 55426f52151a4b4e5a000001 to 55426f52151a4b4e5a000001
Extracted files are located at: /tmp/onefold_mongo/users/data/1
Hive Tables: users users_hobbies users_work_history
Schema is stored in Mongo test.users_schema

In Hive, two new tables are created: users_hobbies and users_work_history:

hive> show tables;

hive> desc users_hobbies;
parent_hash_code        string                  from deserializer
hash_code               string                  from deserializer
value                   string                  from deserializer
Time taken: 0.068 seconds, Fetched: 3 row(s)

hive> desc users_work_history;
parent_hash_code        string                  from deserializer
hash_code               string                  from deserializer
from                    int                     from deserializer
name                    string                  from deserializer
to                      string                  from deserializer
Time taken: 0.067 seconds, Fetched: 5 row(s)

You can join parent and child table like:

hive> select * from users 
  join users_hobbies on users.hash_code = users_hobbies.parent_hash_code 
  join users_work_history on users.hash_code = users_work_history.parent_hash_code;

Voila! As you can see, the ETL engine takes care of all the guesswork related to schema creation and maintenance. It also handles nested and array data types quite well. More information on its usage can be found in the Github repo.

  • Camilo Casadiego

    Hi, Im triying to use the ETL on a hortonworks envrioment from ssh console with user hive, but Im getting this error…

    Closing file descriptor root
    Adding fragment value root to mongodb.
    Executing command: hadoop fs -rm -r -f /incomingMongoData/minibars/data_transform/output
    15/11/27 22:43:32 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 360 minutes, Emptier interval = 0 minutes.
    Moved: ‘hdfs://’ to trash at: hdfs://
    Executing command: hadoop fs -mkdir -p /incomingMongoData/minibars/data_transform/output/root
    Executing command: hadoop fs -copyFromLocal /tmp/onefold_mongo/minibars/data_transform/output/root/part-00000 /incomingMongoData/minibars/data_transform/output/root/
    Traceback (most recent call last):
    File “./”, line 513, in
    File “./”, line 509, in main
    File “./”, line 422, in run
    File “./”, line 377, in load_dw
    if self.dw.table_exists(self.dw_database_name, self.dw_table_name):
    File “./”, line 289, in table_exists
    r = self.execute_sql(database_name, “show tables”, True)
    File “./”, line 88, in execute_sql
    conn = pyhs2.connect(, port=self.port, authMechanism=”NOSASL”, database=’default’)
    File “/usr/lib/python2.6/site-packages/pyhs2/”, line 7, in connect
    return Connection(*args, **kwargs)
    File “/usr/lib/python2.6/site-packages/pyhs2/”, line 47, in __init__
    res = self.client.OpenSession(TOpenSessionReq(username=user, password=password, configuration=configuration))
    File “/usr/lib/python2.6/site-packages/pyhs2/TCLIService/”, line 154, in OpenSession
    return self.recv_OpenSession()
    File “/usr/lib/python2.6/site-packages/pyhs2/TCLIService/”, line 165, in recv_OpenSession
    (fname, mtype, rseqid) = self._iprot.readMessageBegin()
    File “/usr/lib64/python2.6/site-packages/thrift/protocol/”, line 140, in readMessageBegin
    name = self.trans.readAll(sz)
    File “/usr/lib64/python2.6/site-packages/thrift/transport/”, line 58, in readAll
    chunk = – have)
    File “/usr/lib64/python2.6/site-packages/thrift/transport/”, line 159, in read
    self.__rbuf = StringIO(, self.__rbuf_size)))
    File “/usr/lib64/python2.6/site-packages/thrift/transport/”, line 120, in read
    message=’TSocket read 0 bytes’)
    thrift.transport.TTransport.TTransportException: TSocket read 0 bytes

    • jorge_chang

      Hi.. So sorry for the late reply. Please make sure you have Hive Server running. The program uses the Python “pyhs2″ client library to connect to Hive. In Python, please try the following to ensure HiveServer is running correctly:

      import pyhs2
      conn = pyhs2.connect(host=[HiveServer host], port=[HiveServer port], authMechanism="NOSASL", database='default')

      • Camilo Casadiego

        Hi actually I have to change the line 89 to add the user name and password for my hive instance, like this “conn = pyhs2.connect(, port=self.port, authMechanism=’PLAIN’, user=’hive’, password=”,database=’default’)”, this was run against a horton works sand box