MongoDB.local SF, Jan 15: See the speaker lineup & ship your AI vision faster. Use WEB50 to save 50%
Find out more >
Docs 菜单
Docs 主页
/ /

PyMongoArrow 入门

本教程旨在介绍 PyMongoArrow 的使用。本教程假定读者熟悉基本的 PyMongoMongoDB 概念。

提示

兼容性

有关与PyMongo、PyArrow和操作系统的兼容性的信息,请参阅兼容性页面。

您可以通过三种方式安装PyMongoArrow

  • Conda

  • 来自源

我们建议使用 pip 在所有平台上安装 PyMongoArrow。PyMongoArrow 可在 PyPI 上使用。

$ python -m pip install pymongoarrow

要获取特定版本的 pymongo:

$ python -m pip install pymongoarrow==1.8.0

使用 pip 升级:

$ python -m pip install --upgrade pymongoarrow

重要

如果安装因错误而失败,例如 ValueError: Could not find "libbson-1.0" library ,则表示pip无法为您的平台找到合适的轮子。 我们建议您首先确保已安装pip >= 20.3。 要升级pip ,请运行以下 shell 命令:

$ python -m pip install --upgrade pip

然后,您可以尝试重新安装pymongoarrow

我们目前为 x86_64 架构上的 macOS、Windows 和 Linux 分发轮子。

要测试安装,请在Python Shell中运行以下代码。如果 PyMongoArrow 已正确安装,则不会看到任何错误或异常。

>>> import pymongoarrow as pma

通过运行以下 shell 命令, conda用户可以使用 PyMongoArrow:

$ conda install --channel conda-forge pymongoarrow

要测试安装,请在Python Shell中运行以下代码。如果 PyMongoArrow 已正确安装,则不会看到任何错误或异常。

>>> import pymongoarrow as pma

如果无法使用上述选项在系统上安装 pymongoarrow,您可以从源代码安装。要学习;了解操作方法,请参阅贡献指南。

在Linux上从源代码安装需要以下依赖项:

  • GCC 12或更高版本

  • CMake

  • pkg-config

要将 PyMongoArrow 与需要可选依赖项的PyMongo功能一起使用,您必须在安装PyMongo时将依赖项设立为选项。

注意

要学习;了解有关 PyMongo 可选依赖项的更多信息,请参阅依赖项PyMongo文档中的依赖项。

例如,要将 PyMongoArrow 与客户端字段级加密结合使用,除了安装 PyMongoArrow 之外,您还必须使用encryption选项安装 PyMongo:

$ python -m pip install 'pymongo[encryption]' pymongoarrow

使用 PyMongoArrow API 并将查询结果集返回为pandas.DataFrame实例的应用程序(例如~pymongoarrow.api.find_pandas_all() ,还必须安装pandas

$ python -m pip install pandas

要测试安装,请在Python Shell中运行以下代码。如果 PyMongoArrow 已正确安装,则不会看到任何错误或异常。

>>> import pymongoarrow as pma

本教程假设MongoDB实例正在默认托管和端口上运行。下载并安装MongoDB后,您可以启动它,如以下代码示例所示:

$ mongod

运行以下代码,将示例数据插入集群:

from datetime import datetime
from pymongo import MongoClient
client = MongoClient()
client.db.data.insert_many([
{'_id': 1, 'amount': 21, 'last_updated': datetime(2020, 12, 10, 1, 3, 1), 'account': {'name': 'Customer1', 'account_number': 1}, 'txns': ['A']},
{'_id': 2, 'amount': 16, 'last_updated': datetime(2020, 7, 23, 6, 7, 11), 'account': {'name': 'Customer2', 'account_number': 2}, 'txns': ['A', 'B']},
{'_id': 3, 'amount': 3, 'last_updated': datetime(2021, 3, 10, 18, 43, 9), 'account': {'name': 'Customer3', 'account_number': 3}, 'txns': ['A', 'B', 'C']},
{'_id': 4, 'amount': 0, 'last_updated': datetime(2021, 2, 25, 3, 50, 31), 'account': {'name': 'Customer4', 'account_number': 4}, 'txns': ['A', 'B', 'C', 'D']}])

PyMongoArrow 依赖数据模式将查询结果集编组为表格形式。 如果您不提供此模式,PyMongoArrow 会从数据中推断出一个模式。 您可以通过创建Schema对象并将字段名称映射到类型说明符来定义模式,如以下示例所示:

from pymongoarrow.api import Schema
schema = Schema({'_id': int, 'amount': float, 'last_updated': datetime})

MongoDB 使用嵌入式文档来表示嵌套数据。 PyMongoArrow 为这些文档提供一流的支持:

schema = Schema({'_id': int, 'amount': float, 'account': { 'name': str, 'account_number': int}})

PyMongoArrow 还支持列表和嵌套列表:

from pyarrow import list_, string
schema = Schema({'txns': list_(string())})
polars_df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)

提示

PyMongoArrow 为每种支持的BSON类型包含多个允许的类型标识符。有关这些数据类型及其关联的类型标识符的完整列表,请参阅 数据类型。

以下代码示例展示了如何将amount字段具有非零值的所有记录作为pandas.DataFrame对象加载:

df = client.db.data.find_pandas_all({'amount': {'$gt': 0}}, schema=schema)

您还可以加载与pyarrow.Table实例相同的结果集:

arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}}, schema=schema)

或者作为polars.DataFrame实例:

df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)

或者作为 NumPy arrays对象:

ndarrays = client.db.data.find_numpy_all({'amount': {'$gt': 0}}, schema=schema)

使用 NumPy 时,返回值是字典,其中键是字段名称,值是相应的numpy.ndarray实例。

注意

在前面的所有示例中,您都可以省略模式,如下例所示:

arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}})

如果省略模式,PyMongoArrow 会尝试根据第一批处理中包含的数据自动应用模式。

运行聚合操作与运行查找操作类似,但需要执行一系列操作。

以下是输出一个新数据帧的aggregate_pandas_all()方法的简单示例,其中所有_id值都分组在一起,并对它们的amount值求和:

df = client.db.data.aggregate_pandas_all([{'$group': {'_id': None, 'total_amount': { '$sum': '$amount' }}}])

您还可以对嵌入式文档运行聚合操作。 以下示例展开嵌套txn字段中的值,计算每个值的数量,然后以 NumPy ndarray对象列表的形式返回结果,并按降序排序:

pipeline = [{'$unwind': '$txns'}, {'$group': {'_id': '$txns', 'count': {'$sum': 1}}}, {'$sort': {"count": -1}}]
ndarrays = client.db.data.aggregate_numpy_all(pipeline)

提示

有关聚合管道的更多信息,请参阅MongoDB Server 文档。

您可以使用write()方法将以下类型的对象写入 MongoDB:

  • 箭头 Table

  • Pandas DataFrame

  • NumPy ndarray

  • 极地 DataFrame

from pymongoarrow.api import write
from pymongo import MongoClient
coll = MongoClient().db.my_collection
write(coll, df)
write(coll, arrow_table)
write(coll, ndarrays)

注意

NumPy 数组指定为dict[str, ndarray]

write() 方法可以选择接受布尔值 exclude_none 参数。如果将此参数设立为 True,则驱动程序不会将空值写入数据库。如果将此参数设立为 False 或留空,则驱动程序将为每个空字段写入 None

以下示例中的代码将箭头 Table 写入MongoDB两次。'b'字段中的一个值设立为 None

第一次调用 write() 方法时省略了 exclude_none 参数,因此默认值为 FalseTable 中的所有值(包括 None)都将写入数据库。第二次调用 write() 方法会将 exclude_none 设为 True,因此会忽略 'b'字段中的空值。

data_a = [1, 2, 3]
data_b = [1, None, 3]
data = Table.from_pydict(
{
"a": data_a,
"b": data_b,
},
)
coll.drop()
write(coll, data)
col_data = list(coll.find({}))
coll.drop()
write(coll, data, exclude_none=True)
col_data_exclude_none = list(coll.find({}))
print(col_data)
print(col_data_exclude_none)
{'_id': ObjectId('...'), 'a': 1, 'b': 1}
{'_id': ObjectId('...'), 'a': 2, 'b': None}
{'_id': ObjectId('...'), 'a': 3, 'b': 3}
{'_id': ObjectId('...'), 'a': 1, 'b': 1}
{'_id': ObjectId('...'), 'a': 2}
{'_id': ObjectId('...'), 'a': 3, 'b': 3}

加载结果集后,您可以将其写入包支持的任何格式。

例如,要将变量arrow_table引用的表写入名为example.parquet的 Parquet 文件,请运行以下代码:

import pyarrow.parquet as pq
pq.write_table(arrow_table, 'example.parquet')

Pandas 还支持将DataFrame实例写入各种格式,包括 CSV 和 HDF。 要将变量df引用的数据框写入名为out.csv的 CSV 文件,请运行以下代码:

df.to_csv('out.csv', index=False)

Polars API 是前面两个示例的混合:

import polars as pl
df = pl.DataFrame({"foo": [1, 2, 3, 4, 5]})
df.write_parquet('example.parquet')

注意

Parquet 读写操作支持嵌套数据,但 Arrow 或 Pandas 不能很好地支持 CSV 读写操作。

pymongoarrow.monkey模块提供了一个接口,用于就地修补 PyMongo,并将 PyMongoArrow 功能直接添加到Collection实例:

from pymongoarrow.monkey import patch_all
patch_all()

运行monkey.patch_all()方法后, Collection类的新实例将包含 PyMongoArrow API,例如pymongoarrow.api.find_pandas_all()方法。

注意

您还可以通过从pymongoarrow.api模块导入任何 PyMongoArrow API 来使用它们。 如果这样做,则在调用 API 方法时,必须将要对其运行操作的Collection实例作为第一个参数传递。

当Linux环境中未安装 GCC 12或更高版本时,PyMongoArrow 会引发此错误。 如果遇到此错误,请确保您安装了 GCC 12或更高版本。

当应用程序尝试使用 PyMongoArrow API 在 Python 环境中未安装polars且该 API 将查询结果集返回为polars.DataFrame实例时,PyMongoArrow 会引发此错误。 由于polars不是 PyMongoArrow 的直接依赖项,因此在您安装pymongoarrow时不会自动安装。 您必须使用以下 shell 命令单独安装polars

$ python -m pip install polars

后退

Overview

在此页面上