Feast 提供了一个高级别的 FeatureStore API ,允许您定义特征和特征组( 功能视图 )、在线和离线存储以及将数据动态从离线存储移动到在线存储(物化)的能力。 MongoDB集成允许您将MongoDB用作 Feast 的在线和离线存储,因此您只需定义一次特征,并在模型培训和在线推理中一致地提供服务这些特征,而无需维护单独的存储系统。
MongoDB 灵活的文档模型和MQL使其能够处理离线存储所需的复杂查询模式。对于在线存储, MongoDB针对网络规模的访问权限模式进行了优化 — 快速读/写、水平扩展以及可最大限度减少连接和往返的灵活模式。
在此集成概述中,您可以找到:
介绍将MongoDB作为 Feast 的在线和离线存储。
Feast 概念如何映射到MongoDB。
MongoDB离线和在线存储设计的详细说明。
在 Feast 中设置MongoDB存储的配置示例。
关键概念
线上和线下商店
常见工作流模式
典型的端到端工作流程如下所示:
定义点MongoDB支持的集合的实体、功能视图和数据源。
通过
offline_write_batch将功能数据导入到离线存储中,该存储接受 PyArrow 表作为输入,并遵循离线存储模式将数据插入到feature_historyMongoDB集合中。使用
get_historical_features生成培训数据,这会对MongoDB中存储的历史功能行运行高效的时间点联接。使用
pull_latest_from_table_or_query和online_write_batch将离线存储中的最新功能值物化到在线存储中。通过 Feast 的在线API在线提供功能,该 API 会从由序列化实体键作为键的单个MongoDB集合中读取数据。
Feast 概念如何映射到MongoDB
MongoDB集成遵循 Feast 的标准概念模型,但将这些抽象映射到MongoDB模式,该模式专为以实体为中心的在线文档和仅附加历史事件而设计。
概念图
盛宴概念 | 在盛宴中的角色 | MongoDB表示 |
|---|---|---|
实体 | 特征描述的域对象(例如驾驶员、用户)。 | 编码为序列化实体键;在线存储中存储为 |
连接键 | 用于标识数据框中实体行的列。 | 馈入 |
序列化 EntityKey | 连接键名称和值的确定性二进制编码。 | 在线: |
功能 | 某一点的命名、类型化测量值。 |
|
FeatureView | 将功能绑定到实体、数据源和TTL;组织单位。 | 离线:每个历史文档上的 |
DataSource | 指向历史特征所在位置的元数据指针。 |
|
OfflineStore | 历史特征和 PIT 连接的读/写入接口。 |
|
OnlineStore | 每个实体最新功能值的低延迟存储。 | 以 |
TTL | FeatureView 级新鲜度窗口。 | 在计算历史特征时的离线查询和Python后过滤中强制执行;也可与索引中的 |
FeatureService | 模型功能参考的命名列表。 | 没有直接的MongoDB表示; Feast 用于决定从在线存储读取哪些 |
注册表 | 实体、功能视图和服务的元数据存储。 | 未更改; MongoDB集成不会取代 Feast 注册表。 |
RetrievalJob | 延迟执行包装器返回功能表。 | 对于MongoDB离线存储,封装MQL聚合并公开由游标到 Arrow 转换支持的 Arrow 导出。 |
物化 | 有计划地将最新的离线功能传播到在线存储中。 | 通过 |
MongoDB离线存储
数据模型
MongoDB离线存储使用单个共享集合(默认为 feature_history)来存储所有功能视图的仅追加历史功能行。
每个文档表示一个 FeatureView 在特定事件时间戳对一个实体的一次观察:
{ "entity_id": "Binary(...)", "feature_view": "driver_stats", "event_timestamp": "ISODate(2024-01-15T12:00:00Z)", "created_at": "ISODate(2024-01-15T12:01:00Z)", "features": { "conv_rate": 0.72, "acc_rate": 0.91, "avg_daily_trips": 14 } }
主要属性:
Append-only:历史数据被视为不可变;更正将写入为具有较新的
created_at时间戳的新行,而不是就地更新。时间序列友好:
event_timestamp表示观察到功能值的时间;当多个观测股票相同的事件时间戳时,created_at用作决胜局。按 FeatureView 进行功能分组:
feature_view标识该行属于哪个 FeatureView,因此单个集合可以托管多个 FV。
单个复合索引支持所有主要查询模式:
(entity_id ASC, feature_view ASC, event_timestamp DESC, created_at DESC)
该索引支持对实体和功能视图进行高效范围扫描,同时确保在聚合期间首先看到每个 (entity_id, feature_view) 的最新观察。
查询模式 | 索引行为 |
|---|---|
| 对 |
| 排序是一项无需操作 —索引顺序与排序顺序匹配。 |
| 游标首先按照 |
|
|
如果没有此索引,所有四种查询模式都会降级为 COLLSCAN。该索引在首次使用时通过 _ensure_indexes 延迟创建,并在进程级 _indexes_ensured设立按连接字符串缓存,因此每个进程生命周期仅创建一次。
核心离线操作
MongoDB离线存储实现了标准的 Feast 离线存储接口:
offline_write_batch— 将功能数据的pyarrow.Table写入根本的MongoDB集合,使用配置的MongoDBSource元元数据来确定connection_string、database和collection。get_historical_features- 给定实体和事件时间戳的entity_df以及一设立FeatureView,返回一个扩展表,其中每一行都包含时间点正确的功能值:对于每个(entity_id, event_timestamp)对,最新的功能值,其event_timestamp <= entity_event_timestamp并在TTL内进行选择。pull_latest_from_table_or_query— 为每个实体返回一行,其中包含时间窗口内的最新功能值,Feast 的物化引擎使用该行为在线存储播种。pull_all_from_table_or_query— 在指定日期范围内从数据源中检索所有行以进行导出或检查,并由相同的feature_history模式和索引提供支持。persist(通过RetrievalJob.persist)- 通过SavedDatasetStorage(与feature_history不同)将历史功能查询的结果写入单独的集合或外部接收器。
调用路径:
FeatureStore.write_to_offline_store(feature_view_name, df) → provider.ingest_df_to_offline_store(feature_view, arrow_table) → OfflineStore.offline_write_batch(config, feature_view, table, progress)
仅追加语义:在, insert_many(ordered=False)10000-文档批处理中使用 插入文档。写入时无需更新或插入(upsert)或去重— 允许并保留同一(entity_id, feature_view, event_timestamp) 元组的多个文档。
冲突解决延迟到读取时间:
pull_latest_from_table_or_query选择获胜event_timestamp群组中created_at最高的文档。get_historical_features(评分路径)使用$sort … created_at DESC,因此$group $first也会在时间戳平局时选择最高的created_at。
因此,使用较晚的 created_at 写入的更正获胜,无需任何删除或更新操作。
pull_latest_from_table_or_query 在 [start_date, end_date]窗口中为每个包含最新功能值的实体返回一行。未提供 entity_df。
管道阶段:
$match { feature_view, event_timestamp: {$gte, $lte} } → $sort { entity_id, event_timestamp DESC, created_at DESC } → $group $first by entity_id → $project { entity_id, event_timestamp, features.* }
复合索引有效地为 $match + $sort 提供服务; $group $first 为每个实体挑选一个文档,而不具体化其余文档。
聚合实现
推荐的离线实施是基于聚合的MongoDB离线存储,名为 MongoDBOfflineStore。
主要特征:
使用由所有 FeatureView 共享的单个
feature_history集合,通过feature_view进行区分。依赖于复合索引
(entity_id, feature_view, event_timestamp, created_at)进行所有查询,避免全集合扫描。使用服务器端
$group $first来“评分”工作负载(每个实体一行),并使用pd.merge_asof使用重复的实体 ID 来“培训”工作负载,从而平衡正确性和性能。通过分块控制内存使用量,因此可以在不耗尽RAM 的情况下处理较大的
entity_df值。
基准测试显示,与其他MongoDB离线方法相比,此实施提供了吞吐量和内存效率的最佳组合。
get_historical_features 是核心 Feast API。它接受一个 entity_df(N 行实体键列 + event_timestamps)和 K 个 FeatureView 对象,并返回一个具有相同 N 行加 M功能列的 DataFrame,在每行的 event_timestamp(点实时正确性)。
符号:
N → 实体数量
M → 特征数量
P → 观察次数
F →功能视图数量
K → 单次
get_historical_features调用中请求的功能视图数
评分路径
当 entity_df 没有重复的实体 ID 时,评分路径被激活,这是一种常见的推理场景,其中每行都在不同的时间点请求不同实体的特征。
检测:
scoring_path = ( entity_df[all_entity_id_cols].drop_duplicates().shape[0] == len(entity_df) )
评分时,会添加服务器端 $group $first 阶段:
$match → $sort → $group $first → $project
$group 按 (entity_id, feature_view) 进行分组,并选择具有最高 (event_timestamp, created_at) 的文档— 即,按索引顺序位于前面的 $sort 之后的第一个文档。 MongoDB从不为每个功能视图物化每个实体的其他 P-1 文档;选取一个文档后,游标只需前进到下一个群组键。每个实体的费用为 O(日志 P)(索引查找),而不是 O(P)。
$match 使用 event_timestamp: {$lte: max_ts},其中 max_ts 是当前数据数据块中的最大实体请求时间戳。这是一个保守的近似值(“超调”):服务器将来可能会稍微为某些实体返回文档。下面的Python后过滤器通过消除无效行来纠正此问题:
# Merge on entity_id (left = entity_df rows, right = server results) merged = result[["_fv_entity_id", event_timestamp_col]].merge( fv_join, on="_fv_entity_id", how="left" ) # Null out rows where the server doc is in the future or outside TTL future_mask = merged["_fv_ts"] > merged[event_timestamp_col] if fv.ttl: ttl_mask = merged["_fv_ts"] < ( merged[event_timestamp_col] - fv.ttl ) bad_mask = future_mask | ttl_mask else: bad_mask = future_mask for feat in features: vals = merged[feat].copy() vals[bad_mask | merged["_fv_ts"].isna()] = None result[col] = vals.values
这是一次 pd.merge 调用,然后是矢量化布尔索引— 在 Pandas C代码中的工作时间为 O(N),与 P 和 M 无关。
训练路径
当 entity_df 具有重复的实体 ID 时(每个实体有许多时间戳快照的培训数据集),则省略 $group 阶段。该聚合返回每个实体的时间戳窗口中的所有文档,并且Python使用 pd.merge_asof 在每行的 event_timestamp 处或之前查找最新文档:
$match → (no $group)
result = pd.merge_asof( result.sort_values(event_timestamp_col), fv_df_subset.sort_values("_fv_ts"), left_on=event_timestamp_col, right_on="_fv_ts", by="_fv_entity_id", direction="backward", )
两个级别的分块控制内存使用情况:
等级 | 恒定 | 用途 |
|---|---|---|
外 | 50,000 行 | 限制传递给 |
内部 | 10,000 实体 ID | 限制每次聚合调用的 |
对于大于 CHUNK_SIZE 的 entity_df,外循环会运行多个 _run_single 调用并将结果连接起来:
if len(working_df) <= CHUNK_SIZE: result_df = _run_single(working_df, coll) else: chunks = [ _run_single(chunk, coll) for chunk in _chunk_dataframe(working_df, CHUNK_SIZE) ] result_df = pd.concat(chunks, ignore_index=True)
因此,无论总 N 是多少,Python 端内存峰值都是 O(CHUNK_SIZE x M x K)。
MongoDB features 子文档使用 pd.apply 而不是 pd.json_normalize 扩展到各个列。这会保留 json_normalize 会展平或丢失的复杂类型(Map 和 Struct 的字典,数组的列表)。还应用了反向字段映射,以便投影的列名称与 FeatureView 定义匹配:
if "features" in fv_df.columns: for feat in features: src_col = reverse_fm.get(feat, feat) fv_df[feat] = fv_df["features"].apply( lambda d, _s=src_col: ( d.get(_s) if isinstance(d, dict) else None ) ) fv_df = fv_df.drop(columns=["features"])
线下商店功能
功能 | 支持? | 注意 |
|---|---|---|
| 是 | 使用索引聚合和 Pandas merge-asof 通过 |
| 是 | 使用 |
| 是 | 完整的历史扫描,时间筛选器超过 |
| 是 | 通过配置的 |
| 是 | 使用 |
直接导出到数据湖或数据仓库等其他便利性取决于特定的 RetrievalJob实施,并且预计将遵循 Feast 的线下商店标准模式。
MongoDB在线商店
数据模型
MongoDB在线存储对所有 FeatureView 使用单个集合,并以序列化实体键为键。
_id:serialized_entity_key(entity_key),由 Feast 的稳定编码函数生成,该函数对实体名称和值进行排序并将其编码为字节。features:嵌套子文档,其中每个 FeatureView 都维护自己的功能命名空间。event_timestamps:每个 FeatureView 的时间戳,表示写入该 FeatureView 的最新值的时间。created_timestamp或updated_at:可用于TTL索引和诊断的簿记字段。
示例(简化):
{ "_id": "b\"<serialized_entity_key>\"", "features": { "driver_stats": { "rating": 4.91, "trips_last_7d": 132 }, "pricing": { "surge_multiplier": 1.2 } }, "event_timestamps": { "driver_stats": "ISODate(2026-01-01T12:00:00Z)", "pricing": "ISODate(2026-01-21T12:00:00Z)" }, "created_timestamp": "ISODate(2026-01-21T12:00:00Z)" }
设计理由:
单个集合将每个实体的状态保存在一个文档中,这符合 Feast 对基于键的查找的期望,并避免了每个 FeatureView 集合之间的状态碎片化。
使用序列化实体键作为
_id可以重复使用 Feast 的确定性编码,避免跨集合使用重复的主节点 (primary node in the replica set)键,并保持检索每个实体的单个键查找。
与离线存储(使用带有 feature_view 鉴别器字段的单个 feature_history集合)一样,在线存储也对所有 FeatureView 使用单个集合。
在线商店本质上是面向实体键的,而不是面向特征视图的。尽管高级 FeatureStore API会使用单个 FeatureView 调用 online_read 和 online_write_batch,但 Feast 中的根本的存储模型是围绕每个实体键的单个逻辑行设计的。随着时间的推移,该行可能会累积来自多个 FeatureView 的功能。
使用一个集合可以让我们为每个实体维护一个统一的文档,并仅原子性地更新相关子文档(例如 features.<feature_view_name>),而无需跨集合复制实体键。
单一集合设计从一开始就是 Feast 的标准(最初是为 Redis 设计的),并且发挥了 MongoDB 的优势。优点包括:
减少写入放大
简化索引管理(只有一个主节点 (primary node in the replica set)
_id索引)当多个 FeatureView股票相同实体时,没有跨集合协调
与 Feast 基于键的获取模型实现一致的检索语义
按FeatureView集合设计会分散实体状态,如果组合了要素,则需要额外的协调或多集合查询,并且会增加操作开销,而 Feast 的访问权限模式没有性能优势。
将实体键序列化为 _id:Feast 提供 serialize_entity_key,这是一种稳定的编码函数,可在连接之前对实体名称和值进行显式排序,以确保可预测的字节序列(使用生成字节的 struct.pack 进行类型化)。这意味着我们可以直接将其用作 _id。
注意
虽然 serialize_entity_key 提供了稳定的 _id,但其输出分布式不均匀,因此不是分片的理想选择。如果您的部署需要对在线存储集合分片,请考虑使用哈希分片键或附加字段。
核心在线操作
MongoDB在线存储实现了 Feast 的标准在线存储API:
online_write_batch— 在物化过程中,Feast 将每个实体的最新功能值写入MongoDB文档。每次批处理更新或插入(upsert)仅更新相关的嵌套features.<feature_view>子文档及其在event_timestamps中的相应条目,从而保持实体文档的原子性和一致。online_read和get_online_features- 在线服务使用与离线相同的序列化逻辑将实体键解析为_id值,然后执行键查找。每次查找利用嵌套的features结构,在单次往返中返回实体的所有请求功能。TTL和新鲜度 — 功能TTL在 FeatureView 上配置,主要用于离线 PIT 连接;在线TTL可以通过
updated_at上的索引或类似时间戳来实现,这与 Feast 的概念一致,即离线存储仅用于追加,而在线存储则保留最新状态。
配置
离线存储配置
离线存储是使用 MongoDBOfflineStoreConfig 配置的:
class MongoDBOfflineStoreConfig(FeastConfigBaseModel): type: str = "...MongoDBOfflineStore" connection_string: str = "mongodb://localhost:27017" database: str = "feast" collection: str = "feature_history"
示例feature_store.yaml:
offline_store: type: feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb.MongoDBOfflineStore connection_string: "mongodb+srv://user:pass@cluster.mongodb.net" database: feast collection: feature_history
MongoDBSource 是相应的DataSource 。其name feature_view字段成为存储在每个文档中的 鉴别器。有关完整的配置选项,请参阅 Feast 文档中的MongoDB数据源参考。
source = MongoDBSource( name="driver_stats", timestamp_field="event_timestamp", created_timestamp_column="created_at", )
后续步骤
按照 Feast 快速入门设立本地功能存储,然后使用本页上的配置示例将MongoDB作为在线和离线存储进行交换。
查看 Feast 文档中的MongoDB Online Store 参考资料,了解配置选项、异步支持和完整功能矩阵。
查看MongoDB数据源参考,了解
MongoDBSource选项和模式详细信息。在 Feast 概念指南中了解核心 Feast 概念,例如实体、功能视图和物化。