Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/

Atlas Stream Processing とMongoDB ベクトル検索によるリアルタイム メディア推奨

ユースケース: パーソナライズ生成系 AI

業種: メディア、電気通信

製品およびツール: MongoDB Atlas、 MongoDB ベクトル検索、 MongoDB Atlas Stream Processing

パートナー: 投票AI Azure OpenAI

新しいユーザーがニュースサイトにアクセスしても、価値を失って離れる前に、ユーザーが何を求めているかを理解する秒数があります。これにより、コールド スタートの問題が発生します。つまり、以前のデータを持たないユーザーにコンテンツをどのように推奨するか?

サイトにアクセスし、3 つの記事をクリックする匿名ユーザーを考えてみましょう。

  • 「NVIDI は新しいAIチャンクを作成します」

  • 「TSMC がアナライザの本番環境を拡大します」

  • 「遅延の中で遅延が発生します」

単純なキーワード検索では、Intel または Ar書込み保証 に関する他の記事のみが推奨されます。ただし、インテリジェント システムでは、3 つのすべてのクリックがメモリ チェーンに関連していることを認識します。これにより、ユーザーの閲覧履歴とキーワードを共有していない場合でも、「シノニムの将来」などの関連記事を推奨できます。

この記事のソリューションでは、クリックストリーム データを取り込んで処理するインテリジェントなメディア・パーソナライズ システムを構築し、ユーザーの興味に関する自然言語の概要を生成し、ユーザーが参加する可能性の高い関連記事を推奨します。

このソリューションは以下を組み合わせたものです。

  • クリックストリーム データのユーザー意向を要約するための、リアルタイムデータの取り込み、エンタープライズ、および LM 統合のための Atlas Stream Processing

  • セマンティック検索と推奨事項のための自動 Vyage AI埋め込みを備えた Atlas ベクトル検索

このアーキテクチャでは、 MongoDB Atlasを使用して、ユーザーの動作をリアルタイムで取り込み、処理し、応答するAI駆動型メディア推奨エンジンを構築する方法を示します。

Atlas Stream Processing とMongoDB ベクトル検索を使用したメディアパーソナライズパイプラインの参照アーキテクチャを示す画像
クリックして拡大します

図の 1。 Atlas Stream Processing とMongoDB ベクトル検索を備えたAI駆動型メディアパーソナライズ アーキテクチャ

このアーキテクチャは 3 つのフェーズで動作します。これらについては、次のセクションで詳しく説明します。

  1. Ingest と Encryption : ユーザー向けアプリケーションから未加工のクリックストリーム イベントをキャプチャし、Atlas Stream Processing を使用してリアルタイムで記事メタメタデータと結合します。

  2. セッション化とサマリー: 関連するクリックをセッションにグループ化し、LDM を使用してユーザーの興味に関する自然言語のサマリーを生成します。

  3. 検索とサーバー: 生成されたサマリーを使用してセマンティックベクトル検索を実行し、パーソナライズされた推奨事項を返します。

最初のフェーズでは、このソリューション アーキテクチャは Atlas Stream Processing を使用して、メディアプラットフォームから未加工のクリックストリーム データを取り込み、データベース内の記事のメタデータで強化します。

1

このソリューションでは、同じMongoDBクラスターの newsデータベースにある次の 2 つのコレクションを使用します。

2
  1. Atlasで、プロジェクトのGo Stream Processing{0 ページに します。

    1. まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。

    2. まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。

    3. サイドバーで、 Streaming Data見出しの下のStream Processingをクリックします。

      Atlas Stream Processingページが表示されます。

  2. [Create a workspace] をクリックします。

  3. Create a stream processing workspaceページで、ワークスペースを次のように設定します。

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Workspace Name: article-personalization

3

ストリーム プロセッサは、エンティティを実行するためにクリックストリーム データと記事メタデータの両方に接続する必要があります。どちらのデータソースも同じ Atlas クラスター内になければなりません。

  1. Stream Processing ワークスペースのペインで、Manage をクリックします。

  2. [ Connection Registryタブで、右上の [ + Add Connection ] をクリックします。

  3. Edit Connection ページで、次のように接続を構成します。

    • 接続タイプ:Atlas Database

    • 接続名:userevents

    • Atlas クラスター: クリックストリームと記事 データが存在するクラスターの名前(例: 、ClickstreamCluster

    • 実行方法:Read and write to any database

  4. 接続を作成するには、Save changes をクリックします。

4

ステージを使用して、user_eventsコレクションから未加工のクリックストリーム イベントを読み取り、記事メタデータでイベントを強化するステージを持つ、userIntentSummarizer という名前のストリーム プロセッサを作成します。

  1. Atlasプロジェクトの Stream Processing ページで、ストリーム処理ワークスペースのManage ペインにある [] をクリックします。

  2. JSON editorで、次の JSON定義をコピーしてJSONエディターのテキストボックスに貼り付け、これらのステージを持つ という名前のストリームuserIntentSummarizer プロセッサを定義します。

    • $source:user_events news接続されたクリックストリーム クラスター(userevents 接続)の データベース内の コレクションから未加工のクリックストリーム イベントを読み取ります。

    • $lookup: フィールドに基づいてarticles コレクションと未加工のクリックストリーム イベントを結合し、article_id descriptionkeywordstitle フィールドから関連する記事メタデータを取得します。

    • $addFields:description keywordstitlearticle_detailsフィールドから 、 、 フィールドをイベントストリームの最上位にプロジェクションし、下流のステージから簡単にアクセスできるようにします。

    • $project: 下流処理の関連フィールドをプロジェクションします。

    {
    "$source": {
    "connectionName": "userevents",
    "db": "news",
    "collection": "user_events"
    }
    },
    {
    "$lookup": {
    "from": {
    "connectionName": "userevents",
    "db": "news",
    "coll": "articles"
    },
    "localField": "article_id",
    "foreignField": "_id",
    "as": "article_details",
    "pipeline": [
    {
    "$project": {
    "_id": 0,
    "description": 1,
    "keywords": 1,
    "title": 1
    }
    }
    ]
    }
    },
    {
    "$addFields": {
    "description": { "$arrayElemAt": [ "$article_details.description", 0 ] },
    "keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] },
    "title": { "$arrayElemAt": [ "$article_details.title", 0 ] }
    }
    },
    {
    "$project": {
    "userId": 1,
    "article_id": 1,
    "eventTime": 1,
    "event_type": 1,
    "device": 1,
    "session_id": 1,
    "description": 1,
    "keywords": 1,
    "title": 1
    }
    },
  3. 変更を保存するには、[Update stream processor] をクリックします。

このフェーズでは、ストリーム プロセッサパイプラインを拡張して、関連するクリックをセッションにグループ化し、LVM を使用して各セッションの自然言語のサマリーを生成し、ユーザーの興味を説明します。

1

LM プロバイダー(例、 Azure OpenAI)に外部 HTTPS 接続を追加して、ストリーム プロセッサがパイプラインから LM を直接呼び出してデータを増やすようにします。

  1. Stream Processing ワークスペースのペインで、Manage をクリックします。

  2. [ Connection Registryタブで、右上の [ + Add Connection ] をクリックします。

  3. 次のように接続を構成します。

    • 接続タイプ:HTTPS

    • 接続名:azureopenai

    • URL : Azure OpenAIインスタンスのエンドポイントURL

    • ヘッダー: 次のキーと値のペアを ヘッダーに追加します。

      • キー: Content-Type、値: application/json

      • キー: api-key、値: Azure OpenAI APIキー

2

$sessionWindow指定されたセッション ギャップに基づいて関連するイベントをセッションにグループ化するには、ストリーム プロセッサパイプラインに ステージを追加します。このソリューションでは、セッションは、非アクティブなギャップがsession_id 60秒以上ない同じ からのイベントのシーケンスとして定義されます。

このステージを エンタープライズ ステージの後に userIntentSummarizerパイプラインに追加します。

{
"$sessionWindow": {
"partitionBy": "$session_id",
"gap": { "unit": "second", "size": 60 },
"pipeline": [{
"$group": {
"_id": "$session_id", "titles": {
"$push": "$title"
}
}
}]
}
}
3

$httpsストリーム処理パイプラインから LM プロバイダーを直接呼び出すには、 ステージを追加します。このソリューションではAzure OpenAI を呼び出して、セッションの記事タイトルに基づいてユーザーの希望を説明する各セッションの自然言語のサマリーを生成します。

このステージを $sessionWindow ステージの後にパイプラインに追加する。

{
"$https": {
"connectionName": "azureopenai",
"method": "POST",
"as": "apiResults",
"config": { "parseJsonStrings": true },
"payload": [
{
"$project": {
"_id": 0,
"model": "gpt-4o-mini",
"response_format": { "type": "json_object" },
"messages": [
{
"role": "system",
"content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions the create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)"
},
{
"role": "user",
"content": { "$toString": "$titles" }
}
]
}
}
]
}
}

注意

ストリーム プロセッサ自体は「インテリジェント」です。データがディスクに到達する前に、タイトルのリストをセマンティックなサマリー(例:これは、通常未加工のセッション データをデータベースに書込み、その後アプリケーションサーバーから外部APIを呼び出す従来のバッチする処理パイプラインとは基本的に異なります。

4

次のステージをパイプラインに追加して、LM 出力からサマリーを抽出し、それを新しいコレクションに書込みます。

  • $match: LM 呼び出しが失敗しエラーが返されたセッションをフィルタリングで除外し、データベースへの不完全なデータの書込みを回避します。

  • $addFields:summary LM 出力から フィールドを抽出し、それをドキュメントの最上位に追加します。

  • $project:ドキュメントから未加工の LM 出力を排除し、ノイズとストレージのコストを削減します。

  • $merge:user_intent news結果のドキュメントを、クリックストリーム クラスターの データベースにある という新しいコレクションに書き込みます(userevents 接続)。このコレクション内の各ドキュメントはユーザー セッションを表し、ユーザーの興味の概要が含まれています。

{
"$match": {
"titles": {
"$exists": true
},
"apiResults": {
"$exists": true
}
}
},
{
"$addFields": {
"summary": {
"$arrayElemAt": [
"$apiResults.choices.message.content.summary",
0
]
}
}
},
{
"$project": {
"apiResults": 0
}
},
{
"$merge": {
"into": {
"coll": "user_intent",
"connectionName": "userevents",
"db": "news"
}
}
}
5

クリックストリーム データの概要を開始する準備ができたら、ストリーム処理ワークスペースのストリーム プロセッサのリストにある userIntentSummarizer プロセッサの Start アイコンをクリックします。

このパイプラインは、ユーザーの興味をキャプチャするセッション サマリーを含むドキュメントを user_intentコレクションに書き込む必要があります。 (例: )。

{
"_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c",
"summary": "The user seems interested in geopolitical developments, especially in the Middle East, US political strategies involving Trump, and legal aspects of government operations.",
"titles": [
"Israel and Hamas agree to part of Trump's Gaza peace plan, will free hostages and prisoners",
"Top officials from US and Qatar join talks aimed at brokering peace in Gaza",
"How Trump secured a Gaza breakthrough",
"Ontario's anti-tariff ad is clever, effective and legally sound, experts say",
"Shutdown? Trump's been dismantling the government all year",
"AP News Summary at 7:58 p.m. EDT"
]
}

最後に、 MongoDB ベクトル検索 を使用して、前のフェーズで生成されたセッションサマリーを使用して、記事カタログに対してセマンティック検索を実行し、パーソナライズされたコンテンツの推奨を実現します。

1

セマンティック検索を実行する前に、記事データのベクトル埋め込みを生成する必要があります。そのためには、 という名前のMongoDBvector_index ベクトル検索インデックスを作成し、description articlesコレクションのautoEmbed フィールドを タイプとしてインデックス化します。これは、ドキュメントがコレクションに挿入または更新されるたびに、自動埋め込みを使用して フィールドのベクトル埋め込みを自動的に生成するようにMongoDB descriptionベクトル検索に指示します。

重要

自動埋め込みは、 MongoDB Community Edition v8.2 以降でのみプレビュー機能として利用できます。機能および関連するドキュメントは、プレビュー期間中にいつでも変更される可能性があります。「プレビュー機能」を参照してください。

Atlas はMongoDBのすべてのエディションで手動埋め込みをサポートしています。

このJSON定義を使用して、これらのフィールドにベクトルインデックスを作成します。

  • autoEmbed タイプとしての descriptionフィールドは、ドキュメントがコレクションに挿入または更新されるたびに、voyage-4-large 埋め込みモデルを使用して descriptionフィールドのベクトル埋め込みを自動的に生成するようにMongoDB ベクトル検索に指示します。

  • フィールドの string 値を使用して、セマンティック検索のデータを事前にフィルタリングするための filter タイプとしての titleフィールド。これにより、ユーザーがすでに読み取った記事を検索結果から除外できます。

{
"fields": [
{
"type": "autoEmbed",
"modalitytype": "text",
"path": "description",
"model": "voyage-4-large"
},
{
"type": "filter",
"path": "title"
}
]
}
2

ユーザーがサイトにアクセスすると、 は現在のセッションの概要を取得し、それをクエリとして使用して記事カタログに対するベクトル検索を実行します。インデックスで自動埋め込みを有効にしたため、 MongoDB ベクトル検索 はクエリ時にサマリーとしての埋め込みを自動的に生成し、それを有効なクエリベクトルとして使用します。

この例では、セッション summary をクエリベクトルとして使用し、titlesフィールドに基づいてユーザーがすでに読み取った記事を除外する単純化されたベクトル検索クエリーを示します。

[{
"$vectorSearch": {
"index": "vector_index", // Vector index with autoEmbed on article descriptions
"path": "description",
"query": {
"text": "<session-summary>" // Session summary from user_intent document
},
"numCandidates": 100,
"filter": {
"title": { "$nin": [<read-titles>] } // Exclude articles the array of titles in the user_intent document
}
}
}]

このアーキテクチャは、最新のデータ製品を構築する際のいくつかの重要な利点を示しています。

  • レイテンシを削減 : LVM 呼び出しをストリーム プロセッサ内に直接埋め込むことで、複数のネットワーク ドロップと中間永続性レイヤーが排除されます。システムは、未加工のクリックをほぼリアルタイムで実行可能な意向に変換します。

  • 開発者エクスペリエンスの向上: JSONベースのMQLでパイプラインを定義し、 MongoDBクエリをすでに知っているチームは、新しい DSL を学習したり、追加のインフラストラクチャをプロビジョニングことなく、高度なストリーミングとAIを使用したワークロードを構築できます。

  • セマンティック パーソナライズ: キーワード マッチングやオーバーライドバッチするジョブを超え、ユーザーの動作をリッスンし、判断して、即座に応答するシステムを構築します。

  • MongoDB 、ソリューション アーキテクチャ、V ind KrishMany

戻る

AI 主導のメディア パーソナライズ

項目一覧