Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Menu Docs
Página inicial do Docs
/

Recomendações de mídia em tempo real com Atlas Stream Processing e Vector Search do MongoDB

Casos de uso: Personalização, IA generativa

Setores: mídia, telecomunicações

Produtos e ferramentas: MongoDB Atlas, MongoDB Vector Search, MongoDB Atlas Stream Processing

Parceiros: Voyage AI, Azure OpenAI

Quando um novo usuário chega em seu site de notícias, você tem segundos para entender o que ele está procurando antes que ele possa perder o interesse e sair. Isso introduz o problema da inicialização a frio: como você recomenda conteúdo aos usuários sem dados anteriores sobre eles?

Considere um usuário anônimo que acessa seu site e clica em três artigos:

  • "VIDIA cria novo processador de IA"

  • "TSMC expande produção no Arizona"

  • "Ações da Intel cai em meio a atrasos"

Uma simples pesquisa por palavra-chave só recomenda outros artigos sobre Intel ou Arizona. No entanto, um sistema inteligente reconhece que todos os três cliques estão relacionados a cadeias de suprimentos de semicondutores. Isso permite que ele recomande artigos relevantes como "O futuro do vale do silício", mesmo quando eles não compartilham palavras-chave com o histórico de navegação do usuário.

A solução deste artigo cria um sistema inteligente de personalização de mídia que ingere e processa dados de cliques para gerar um resumo em linguagem natural dos interesses do usuário e recomenda artigos relevantes com os quais o usuário tem maior probabilidade de se envolver.

Esta solução combina:

  • Atlas Stream Processing para ingestão de dados em tempo real, enriquecimento e integração LLM para resumir a intenção do usuário a partir de dados de fluxo de cliques

  • Atlas Vector Search com incorporações automatizadas de IA Voyage para recuperação semântica e recomendações

Essa arquitetura mostra como criar um mecanismo de recomendações de mídia orientado por IA que ingere, processa e reage ao comportamento do usuário em tempo real usando o MongoDB Atlas.

Uma imagem mostrando a arquitetura de referência de um pipeline de personalização de mídia com Atlas Stream Processing e MongoDB Vector Search
clique para ampliar

figura 1. Arquitetura de personalização de mídia orientada por IA com Atlas Stream Processing e MongoDB Vector Search

Essa arquitetura opera em três fases, que são descritas em detalhes nas próximas seções:

  1. Ingestão e enriquecimento: capture eventos de fluxo de cliques brutos do aplicação voltado para o usuário e una-os com metadados de artigo em tempo real usando o Atlas Stream Processing.

  2. Sessão e resumo: Agrupe cliques relacionados em sessões e use um LLM para gerar resumos em linguagem natural dos interesses do usuário.

  3. Pesquisar e servir: Use resumos gerados para direcionar a pesquisa de vetor semântica e retornar recomendações personalizadas.

Na primeira fase, essa arquitetura de solução usa o Atlas Stream Processing para consumir dados brutos de cliques de sua plataforma de mídia e enriquecê-los com metadados dos artigos em seu banco de dados.

1

Esta solução utiliza as seguintes duas collections no banco de dados news do mesmo cluster MongoDB :

2
  1. No Atlas, Go para a página Stream Processing do seu projeto.

    1. Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.

    2. Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.

    3. Na barra lateral, clique em Stream Processing sob o título Streaming Data.

      A página Processamento de fluxo é exibida.

  2. Clique em Create a workspace.

  3. Na página Create a stream processing workspace, configure seu espaço de trabalho da seguinte maneira:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Workspace Name: article-personalization

3

O processador de stream precisa de conexões com os dados de stream de cliques e com os metadados de artigo para realizar o enriquecimento. Ambas as fontes de dados devem estar no mesmo Atlas cluster.

  1. No painel da sua área de trabalho do processamento de fluxos, clique em Manage.

  2. Na aba Connection Registry , clique em + Add Connection no canto superior direito.

  3. Na página Edit Connection, configure sua conexão da seguinte maneira:

    • Tipo de conexão: Atlas Database

    • Nome da conexão: userevents

    • Atlas Cluster: o nome do cluster onde seu fluxo de cliques e dados de artigo residem (porClickstreamCluster exemplo,)

    • Executar como: Read and write to any database

  4. Clique em Save changes para criar a conexão.

4

Crie um processador de fluxo denominado userIntentSummarizer com estágios que leem eventos de cliques brutos da coleção user_events e enriqueçam os eventos com metadados de artigo.

  1. Na página Processamento de Stream do seu projeto Atlas , clique Manage em no painel do seu espaço de trabalho de processamento de stream.

  2. JSON editorNo, copie e cole a seguinte definição deJSON do na caixa de texto do editor de JSON para definir um processador de fluxo denominado userIntentSummarizer com estes estágios:

    • $source: lê eventos de fluxo de cliques brutos da user_events coleção no news banco de dados do cluster de fluxo de cliques conectadouserevents (conexão).

    • $lookup: Une os eventos de fluxo de cliques brutos com a articles coleção com base no article_id campo , levando metadados de artigos relevantes dos campos,description keywords title e.

    • $addFields: projeta os description campos, keywords e title do campo article_details no nível superior do fluxo de evento para torná-los facilmente acessíveis para os estágios downstream.

    • $project: Projeta campos relevantes para processamento downstream.

    {
    "$source": {
    "connectionName": "userevents",
    "db": "news",
    "collection": "user_events"
    }
    },
    {
    "$lookup": {
    "from": {
    "connectionName": "userevents",
    "db": "news",
    "coll": "articles"
    },
    "localField": "article_id",
    "foreignField": "_id",
    "as": "article_details",
    "pipeline": [
    {
    "$project": {
    "_id": 0,
    "description": 1,
    "keywords": 1,
    "title": 1
    }
    }
    ]
    }
    },
    {
    "$addFields": {
    "description": { "$arrayElemAt": [ "$article_details.description", 0 ] },
    "keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] },
    "title": { "$arrayElemAt": [ "$article_details.title", 0 ] }
    }
    },
    {
    "$project": {
    "userId": 1,
    "article_id": 1,
    "eventTime": 1,
    "event_type": 1,
    "device": 1,
    "session_id": 1,
    "description": 1,
    "keywords": 1,
    "title": 1
    }
    },
  3. Clique em Update stream processor para salvar suas alterações.

Nesta fase, estendemos o pipeline do processador de fluxo para agrupar cliques relacionados em sessões e usamos um LLM para gerar um resumo em linguagem natural de cada sessão que descreve os interesses do usuário.

1

Adicione uma conexão HTTPS externa ao seu provedor de LLM (por exemplo, Azure OpenAI) para permitir que o processador de fluxo chame o LLM diretamente do pipeline para enriquecer seus dados:

  1. No painel da sua área de trabalho do processamento de fluxos, clique em Manage.

  2. Na aba Connection Registry , clique em + Add Connection no canto superior direito.

  3. Configure a conexão da seguinte maneira:

    • Tipo de conexão: HTTPS

    • Nome da conexão: azureopenai

    • URL: a URL do ponto de conexão para sua instância do Azure OpenAI

    • Cabeçalhos: adicione estes pares de valores-chave aos cabeçalhos:

      • Chave: Content-Type, Valor: application/json

      • Chave: api-key, Valor: Sua chave de API do Azure OpenAI

2

Adicione um estágio ao pipeline do processador de fluxo para agrupar eventos relacionados em sessões com base em uma lacuna de sessão especificada. Esta solução define uma sessão como uma sequência de eventos do $sessionWindow mesmo session_id sem intervalo de inatividade superior a 60 segundos.

Adicione este estágio ao seu pipeline userIntentSummarizer após os estágios de enriquecimento:

{
"$sessionWindow": {
"partitionBy": "$session_id",
"gap": { "unit": "second", "size": 60 },
"pipeline": [{
"$group": {
"_id": "$session_id", "titles": {
"$push": "$title"
}
}
}]
}
}
3

Adicione um estágio para chamar seu provedor de LLM diretamente de seu pipeline de processamento de fluxo. Essa solução chama o Azure OpenAI para gerar um resumo em linguagem natural de cada sessão que descreve os interesses do usuário com base nos títulos dos artigos da $https sessão.

Adicione este estágio ao seu pipeline após o estágio $sessionWindow:

{
"$https": {
"connectionName": "azureopenai",
"method": "POST",
"as": "apiResults",
"config": { "parseJsonStrings": true },
"payload": [
{
"$project": {
"_id": 0,
"model": "gpt-4o-mini",
"response_format": { "type": "json_object" },
"messages": [
{
"role": "system",
"content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions the create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)"
},
{
"role": "user",
"content": { "$toString": "$titles" }
}
]
}
}
]
}
}

Observação

O processador de fluxo em si é "inteligente". Ele transforma uma lista de títulos em um resumo semântica (por exemplo, "O usuário está pesquisando lógica de produção de semicondutores") antes que os dados cheguem ao disco. Isso difere fundamentalmente dos pipelines tradicionais de processamento em lote , que normalmente gravam dados brutos de sessão em um banco de dados e, em seguida, chamam uma API externa de um servidor de aplicação .

4

Adicione os seguintes estágios ao pipeline para extrair o resumo da saída do LLM e escrevê-lo em uma nova collection:

  • $match: Filtra todas as sessões em que a chamada LLM falhou e retornou um erro para evitar a gravação de dados incompletos no banco de dados.

  • $addFields: Extrai o summary campo da saída do LLM e o adiciona ao nível superior do documento.

  • $project: remove a saída LLM bruta do documento para reduzir o ruído e os custos de armazenamento.

  • $merge: Grava os documentos resultantes em uma nova coleção chamada user_intent no news banco de dados do seu cluster de cliquesuserevents (conexão). Cada documento nesta coleção representa uma sessão de usuário e contém um resumo dos interesses do usuário.

{
"$match": {
"titles": {
"$exists": true
},
"apiResults": {
"$exists": true
}
}
},
{
"$addFields": {
"summary": {
"$arrayElemAt": [
"$apiResults.choices.message.content.summary",
0
]
}
}
},
{
"$project": {
"apiResults": 0
}
},
{
"$merge": {
"into": {
"coll": "user_intent",
"connectionName": "userevents",
"db": "news"
}
}
}
5

Quando estiver pronto para começar a resumir seus dados de fluxo de cliques, clique no ícone Start do processador userIntentSummarizer na lista de processadores de fluxo para seu espaço de trabalho de processamento de fluxo.

Esse pipeline deve escrever documentos na coleção user_intent que contenham resumos de sessão que capturem os interesses do usuário. Por exemplo:

{
"_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c",
"summary": "The user seems interested in geopolitical developments, especially in the Middle East, US political strategies involving Trump, and legal aspects of government operations.",
"titles": [
"Israel and Hamas agree to part of Trump's Gaza peace plan, will free hostages and prisoners",
"Top officials from US and Qatar join talks aimed at brokering peace in Gaza",
"How Trump secured a Gaza breakthrough",
"Ontario's anti-tariff ad is clever, effective and legally sound, experts say",
"Shutdown? Trump's been dismantling the government all year",
"AP News Summary at 7:58 p.m. EDT"
]
}

Por fim, usamos o MongoDB Vector Search para realizar a pesquisa semântica no catálogo de artigos usando os resumos de sessão gerados na fase anterior para gerar recomendações de conteúdo personalizadas.

1

Antes de realizar a pesquisa semântica, você precisa gerar incorporações vetoriais para os dados do artigo. Para fazer isso, crie um índice do MongoDB Vector Search chamado vector_index que indexe o description campo da sua articles coleção como o autoEmbed tipo . Isso instrui o MongoDB Vector Search a usar a incorporação automatizada para gerar automaticamente incorporações vetoriais para o description campo sempre que os documentos forem inseridos ou atualizados na coleção.

Importante

A incorporação automatizada está disponível como um recurso de visualização somente para o MongoDB Community Edition v8.2 e versões mais recentes. O recurso e a documentação correspondente podem mudar a qualquer momento durante o período de Pré-visualização. Para aprender mais, consulte Visualizar recursos.

O Atlas suporta incorporação manual em todas as edições do MongoDB.

Utilize esta definição JSON para criar um índice de vetor nestes campos:

  • O campo description como o tipo autoEmbed para instruir o MongoDB Vector Search a gerar automaticamente incorporações vetoriais para o campo description usando o modelo de incorporação voyage-4-large sempre que documentos forem inseridos ou atualizados na coleção.

  • O campo title como o tipo filter para pré-filtrar os dados para a pesquisa semântica usando o valor da string no campo. Isso permite excluir artigos que o usuário já leu dos resultados da pesquisa.

{
"fields": [
{
"type": "autoEmbed",
"modalitytype": "text",
"path": "description",
"model": "voyage-4-large"
},
{
"type": "filter",
"path": "title"
}
]
}
2

Quando um usuário visitar seu site, pegue o resumo da sessão atual e use-o como query para uma pesquisa vetorial em seu catálogo de artigos. Desde que você ativou a incorporação automatizada no índice, a Vector Search do MongoDB gera automaticamente a incorporação para o resumo no momento da query e a usa como o vetor de query efetivo.

Este exemplo mostra uma query de pesquisa vetorial simplificada que usa a sessão summary como o vetor de query e exclui artigos que o usuário já leu com base no campo titles :

[{
"$vectorSearch": {
"index": "vector_index", // Vector index with autoEmbed on article descriptions
"path": "description",
"query": {
"text": "<session-summary>" // Session summary from user_intent document
},
"numCandidates": 100,
"filter": {
"title": { "$nin": [<read-titles>] } // Exclude articles the array of titles in the user_intent document
}
}
}]

Essa arquitetura demonstra vários progressos importantes na criação de produtos de dados modernos:

  • Latência reduzida: a incorporação de chamadas LLM diretamente no processador de fluxo elimina vários saltos de rede e camadas de persistência intermediária. O sistema transforma cliques brutos em intenção aplicável quase em tempo real.

  • Experiência aprimorada do desenvolvedor: defina pipelines com MQL baseado em JSON , permitindo que equipes que já conhecem queries do MongoDB criem streaming avançado e cargas de trabalho alimentadas por IA sem aprender novos DSLs ou provisionar infraestrutura adicional.

  • Personalização semântica: vá além da correspondência de palavras-chave e dos trabalhos em lote da noite para o dia para criar sistemas que escutam, refletem e respondem instantaneamente ao comportamento do usuário.

  • Vinod Krsnaan, arquitetura de soluções, MongoDB

Voltar

Personalização de mídia impulsionada por AI

Nesta página