PyMongoArrow: Bridging the Gap Between MongoDB and Your Data Analysis App
Rate this quickstart
MongoDB has always been a great database for data science and data analysis, and that's because you can:
- Import data without a fixed schema.
- Clean it up within the database.
But MongoDB is a general-purpose database, and not a data analysis tool, so a common pattern when analysing data that's stored within MongoDB is to extract the results of a query into a Numpy array, or Pandas dataframe, and to run complex and potentially long running analyses using the toolkit those frameworks provide. Until recently, the performance hit of converting large amounts of BSON data, as provided by MongoDB into these data structures, has been slower than we'd like.
Fortunately, MongoDB recently released PyMongoArrow, a Python library for efficiently converting the result of a MongoDB query into the Apache Arrow data model. If you're not aware of Arrow, you may now be thinking, "Mark, how does converting to Apache Arrow help me with my Numpy or Pandas analysis?" The answer is: Conversion between Arrow, Numpy, and Pandas is super efficient, so it provides a useful intermediate format for your tabular data. This way, we get to focus on building a powerful tool for mapping between MongoDB and Arrow, and leverage the existing PyArrow library for integration with Numpy and MongoDB
You'll need a recent version of Python (I'm using 3.8) with pip available. You can use conda if you like, but PyMongoArrow is released on PyPI, so you'll still need to use pip to install it into your conda Python environment.
This tutorial was written for PyMongoArrow v0.1.1.
In this tutorial, I'm going to be using a sample database you can install when creating a cluster hosted on MongoDB Atlas. The database I'll be using is the "sample_weatherdata" database. You'll access this with a
mongodb+srv
URI, so you'll need to install PyMongo with the "srv" extra, like this:1 python -m pip install jupyter pymongoarrow 'pymongo[srv]' pandas
Useful Tip: If you just run
pip
, you may end up using a copy of pip
that was installed for a different version of python
than the one you're using. For some reason, the PATH
getting messed up this way happens more often than you'd think. A solution to this is to run pip via Python, with the command python -m pip
. That way, it'll always run the version of pip
that's associated with the version of python
in your PATH
. This is now the officially recommended way to run pip
!You'll also need a MongoDB cluster set up with the sample datasets imported. Follow these instructions to import them into your MongoDB cluster and then set an environment variable,
MDB_URI
, pointing to your database. It should look like the line below, but with the URI you copy out of the Atlas web interface. (Click the "Connect" button for your cluster.)1 export MDB_URI=mongodb+srv://USERNAME:PASSWORD@CLUSTERID.azure.mongodb.net/sample_weatherdata?retryWrites=true&w=majority
A sample document from the "data" collection looks like this:
1 {'_id': ObjectId('5553a998e4b02cf7151190bf'), 2 'st': 'x+49700-055900', 3 'ts': datetime.datetime(1984, 3, 5, 15, 0), 4 'position': {'type': 'Point', 'coordinates': [-55.9, 49.7]}, 5 'elevation': 9999, 6 'callLetters': 'SCGB', 7 'qualityControlProcess': 'V020', 8 'dataSource': '4', 9 'type': 'FM-13', 10 'airTemperature': {'value': -5.1, 'quality': '1'}, 11 'dewPoint': {'value': 999.9, 'quality': '9'}, 12 'pressure': {'value': 1020.8, 'quality': '1'}, 13 'wind': {'direction': {'angle': 100, 'quality': '1'}, 14 'type': 'N', 15 'speed': {'rate': 3.1, 'quality': '1'}}, 16 'visibility': {'distance': {'value': 20000, 'quality': '1'}, 17 'variability': {'value': 'N', 'quality': '9'}}, 18 'skyCondition': {'ceilingHeight': {'value': 22000, 19 'quality': '1', 20 'determination': 'C'}, 21 'cavok': 'N'}, 22 'sections': ['AG1', 'AY1', 'GF1', 'MD1', 'MW1'], 23 'precipitationEstimatedObservation': {'discrepancy': '2', 24 'estimatedWaterDepth': 0}, 25 'pastWeatherObservationManual': [{'atmosphericCondition': {'value': '0', 26 'quality': '1'}, 27 'period': {'value': 3, 'quality': '1'}}], 28 'skyConditionObservation': {'totalCoverage': {'value': '01', 29 'opaque': '99', 30 'quality': '1'}, 31 'lowestCloudCoverage': {'value': '01', 'quality': '1'}, 32 'lowCloudGenus': {'value': '01', 'quality': '1'}, 33 'lowestCloudBaseHeight': {'value': 800, 'quality': '1'}, 34 'midCloudGenus': {'value': '00', 'quality': '1'}, 35 'highCloudGenus': {'value': '00', 'quality': '1'}}, 36 'atmosphericPressureChange': {'tendency': {'code': '8', 'quality': '1'}, 37 'quantity3Hours': {'value': 0.5, 'quality': '1'}, 38 'quantity24Hours': {'value': 99.9, 'quality': '9'}}, 39 'presentWeatherObservationManual': [{'condition': '02', 'quality': '1'}]}
To keep things simpler in this tutorial, I'll ignore all the fields except for "ts," "wind," and the "_id" field.
I set the
MDB_URI
environment variable, installed the dependencies above, and then fired up a new Python 3 Jupyter Notebook. I've put the notebook on GitHub, if you want to follow along, or run it yourself.I added the following code to a cell at the top of the file to import the necessary modules, and to connect to my database:
1 import os 2 import pyarrow 3 import pymongo 4 import bson 5 import pymongoarrow.monkey 6 from pymongoarrow.api import Schema 7 8 MDB_URI = os.environ['MDB_URI'] 9 10 11 # Add extra find_* methods to pymongo collection objects: 12 pymongoarrow.monkey.patch_all() 13 14 client = pymongo.MongoClient(MDB_URI) 15 database = client.get_default_database() 16 collection = database.get_collection("data")
If the data you wish to convert to Arrow, Pandas, or Numpy data tables is already flat—i.e., the fields are all at the top level of your documents—you can use the methods
find\_arrow\_all
, find\_pandas\_all
, and find\_numpy\_all
to query your collection and return the appropriate data structure.1 collection.find_pandas_all( 2 {}, 3 schema=Schema({ 4 'ts': pyarrow.timestamp('ms'), 5 }) 6 )
ts | |
---|---|
0 | 1984-03-05 15:00:00 |
1 | 1984-03-05 18:00:00 |
2 | 1984-03-05 18:00:00 |
3 | 1984-03-05 18:00:00 |
4 | 1984-03-05 18:00:00 |
... | ... |
9995 | 1984-03-13 06:00:00 |
9996 | 1984-03-13 06:00:00 |
9997 | 1984-03-13 06:00:00 |
9998 | 1984-03-12 09:00:00 |
9999 | 1984-03-12 12:00:00 |
10000 rows × 1 columns
The first argument to find_pandas_all is the
filter
argument. I'm interested in all the documents in the collection, so I've left it empty. The documents in the data collection are quite nested, so the only real value I can access with a find query is the timestamp of when the data was recorded, the "ts" field. Don't worry—I'll show you how to access the rest of the data in a moment!Because Arrow tables (and the other data types) are strongly typed, you'll also need to provide a Schema to map from MongoDB's permissive dynamic schema into the types you want to handle in your in-memory data structure.
The
Schema
is a mapping of the field name, to the appropriate type to be used by Arrow, Pandas, or Numpy. At the current time, these types are 64-bit ints, 64-bit floating point numbers, and datetimes. The easiest way to specify these is with the native python types int
and float
, and with pyarrow.datetime
. Any fields in the document that aren't listed in the schema will be ignored.PyMongoArrow currently hijacks the
projection
parameter to the find_*_all
methods, so unfortunately, you can't write a projection to flatten the structure at the moment.MongoDB documents are very flexible, and can support nested arrays and documents. Although Apache Arrow also supports nested lists, structs, and dictionaries, Numpy arrays and Pandas dataframes, in contrast, are tabular or columnar data structures. There are plans to support mapping to the nested Arrow data types in future, but at the moment, only scalar values are supported with all three libraries. So in all these cases, it will be necessary to flatten the data you are exporting from your documents.
To project your documents into a flat structure, you'll need to use the more powerful
aggregate_*_all
methods that PyMongoArrow adds to your PyMongo Collection objects.In an aggregation pipeline, you can add a
$project
stage to your query to project the nested fields you want in your table to top level fields in the aggregation result.In order to test my
$project
stage, I first ran it with the standard PyMongo aggregate function. I converted it to a list
so that Jupyter would display the results.1 list(collection.aggregate([ 2 {'$match': {'_id': bson.ObjectId("5553a998e4b02cf7151190bf")}}, 3 {'$project': { 4 'windDirection': '$wind.direction.angle', 5 'windSpeed': '$wind.speed.rate', 6 }} 7 ])) 8 9 [{'_id': ObjectId('5553a998e4b02cf7151190bf'), 10 'windDirection': 100, 11 'windSpeed': 3.1}]
Because I've matched a single document by "_id," only one document is returned, but you can see that the
$project
stage has mapped $wind.direction.angle
to the top-level "windDirection" field in the result, and the same with $wind.speed.rate
and "windSpeed" in the result.I can take this
$project
stage and use it to flatten all the results from an aggregation query, and then provide a schema to identify "windDirection" as an integer value, and "windSpeed" as a floating point number, like this:1 collection.aggregate_pandas_all([ 2 {'$project': { 3 'windDirection': '$wind.direction.angle', 4 'windSpeed': '$wind.speed.rate', 5 }} 6 ], 7 schema=Schema({'windDirection': int, 'windSpeed': float}) 8 )
A | B | C |
---|---|---|
windDirection | windSpeed | |
0 | 100 | 3.1 |
1 | 50 | 9.0 |
2 | 30 | 7.7 |
3 | 270 | 19.0 |
4 | 50 | 8.2 |
... | ... | ... |
9995 | 10 | 7.0 |
9996 | 60 | 5.7 |
9997 | 330 | 3.0 |
9998 | 140 | 7.7 |
9999 | 80 | 8.2 |
10000 rows × 2 columns
There are only 10000 documents in this collection, but some basic benchmarks I wrote show this to be around 20% faster than working directly with
DataFrame.from_records
and PyMongo
. With larger datasets, I'd expect the difference in performance to be more significant. It's early days for the PyMongoArrow library, and so there are some limitations at the moment, such as the ones I've mentioned above, but the future looks bright for this library in providing fast mappings between your rich, flexible MongoDB collections and any in-memory analysis requirements you might have with Arrow, Pandas, or Numpy.If you're planning to do lots of analysis of data that's stored in MongoDB, then make sure you're up on the latest features of MongoDB's powerful aggregation framework. You can do many things within the database so you may not need to export your data at all. You can connect to secondary servers in your cluster to reduce load on the primary for analytics queries, or even have dedicated analytics nodes for running these kinds of queries.
Check out MongoDB 5.0's new window functions and if you're working with time series data, you'll definitely want to know about MongoDB 5.0's new time-series collections.
Related
Tutorial
Spark Up Your MongoDB and BigQuery Using BigQuery Spark Stored Procedures
Aug 12, 2024 | 5 min read
Tutorial
Calling the MongoDB Atlas Administration API: How to Do it from Node, Python, and Ruby
Jun 18, 2024 | 4 min read