MongoDB aggregation: save results in a Pandas DataFrame

TL, DR

MongoDB is one of the leading NoSQL databases, and its aggregation framework enables powerful queries, as well as data operations. We will see how to save results from aggregation pipelines into a Pandas DataFrame.

From MongoDB to Pandas

I already provided an introduction to MongoDB and Compass in a previous post for my MongoDB series. So you should know that Compass is the way to go for creating Aggregation pipelines.

Now, the main goal to get data from MongoDB is to perform some task. And nothing is better than Pandas to perform further transformation or analysis. So, how do you get the data you queried into a Pandas DataFrame?

For all subsequent examples I will assume you already initialized your environment this way:

from pymongo import MongoClient
import pandas as pd

client = MongoClient("mongo_connection_string")
result = client.db.col.aggregate([YOUR_AGG_PIPELINE])

Where mongo_connection_string is your URI for your MongoDB instance, db is the name of your database, col is the name of your collection, and YOUR_AGG_PIPELINE is your aggregation pipeline.

The simple way

One very simple way to do it is to cast the cursor resulting from your aggregation query to a list, and feed it to a Pandas DataFrame constructor:

data_df = pd.DataFrame(list(result))

However, this creates an in-memory object with a size directly proportional to the amount of data your retrieved. You can be a little bit more parsimonious using an iterator rather than a list:

data_df = pd.DataFrame(iter(result))

Getting some balance

The second solution above still makes you load all data in memory in order to create your DataFrame. A slightly different solution can be implemented with a helper function:

def iterator2dataframes(iterator, chunk_size: int):
    """Turn an iterator into multiple small pandas.DataFrame
    This is a balance between memory and efficiency
    """
    records = []
    frames = []
    for i, record in enumerate(iterator):
        records.append(record)
        if i % chunk_size == chunk_size - 1:
            frames.append(pd.DataFrame(records))
            records = []
    if records:
        frames.append(pd.DataFrame(records))
    return pd.concat(frames)

And then running:

data_df = iterator2dataframes(result, 20000)

The PyMongoArrow way

If you see MongoDB official documentation, they suggest to use their new support library to get data from MongoDB to Pandas, Numpy and Apache Arrow. Let’s see how it works!

First, you’ll need to install the library with:

python -m pip install pymongoarrow

Then you’ll need to import the package and call the patch_all() function. This function extends PyMongo with the PyMongoArrow methods.

from pymongoarrow.monkey import patch_all
from pymongoarrow.api import Schema

patch_all()

Then you’ll need to define the schema of your data before you call the aggregation pipeline:

mongodb aggregation save pandas dataframe
This is an image. My blogging platform does not like that line of code 🙂

And finally create your Pandas DataFrame as follow:

data_df = client.db.col.aggregate_pandas_all(
            pipeline=[YOUR_AGG_PIPELINE],
            schema=schema)

Note, however, that this recommended way resulted much slower in my case. With my aggregation pipeline resulting in about 7 million records, the second method with the helper function created the DataFrame in about 3m20s. PyMongoArrow, instead, took about 5m10s to do the same.

You can argue that PyMongoArrow performed a schema validation on all records (which the other method did not do)…but it was not really a necessity for me.

I hope you find this useful! I plan to write more MongoDB-related posts over time, you can check them here!

  • PyMongoArrow link
  • MongoDB related posts link

Do you like our content? Check more of our posts in our blog!