Para agentes de IA: um índice de documentação está disponível em https://www.mongodb.com/pt-br/docs/llms.txt — as versões de markdown de todas as páginas estão disponíveis anexando .md a qualquer caminho de URL.
Menu Docs

Recomendações de mídia em tempo real

Casos de uso: Personalização, IA generativa

Setores: mídia, telecomunicações

Produtos e ferramentas: Atlas MongoDB, Busca Vetorial MongoDB, Processamento de Fluxo do Atlas MongoDB

Parceiros: Voyage AI, Azure OpenAI

Quando um novo usuário acessa seu site de notícias, você tem segundos para entender o que ele está procurando antes que ele perca o interesse e vá embora. Isso introduz o problema do início a frio: como você recomenda conteúdo aos usuários sem nenhum dado anterior sobre eles?

Considere um usuário anônimo que chega ao seu site e clica em três artigos:

  • "VIDIA cria novo chip de IA"

  • "TSMC expande produção no Arizona"

  • "Ações da Intel cai em meio a atrasos"

Uma simples pesquisa por palavra-chave só recomendaria outros artigos sobre a Intel ou o Arizona. No entanto, um sistema inteligente reconhece que todos os três cliques estão relacionados a cadeias de fornecimento de semicondutores. Isso permite recomendar artigos relevantes como "O Futuro do Vale do Silício", mesmo quando eles não compartilham palavras-chave com o histórico de navegação do usuário.

A solução apresentada neste artigo constrói um sistema inteligente de personalização de mídia que ingere e processa dados de fluxo de cliques para gerar um resumo em linguagem natural dos interesses do usuário e recomendar artigos relevantes com os quais o usuário tenha maior probabilidade de interagir.

Esta solução combina:

  • Atlas Stream Processing para ingestão de dados em tempo real, enriquecimento e integração LLM para resumir a intenção do usuário a partir de dados de fluxo de cliques

  • Atlas Pesquisa Vetorial com embeddings automatizados da Voyage IA para Recuperação Semântica e Recomendações

Essa arquitetura mostra como criar um mecanismo de recomendação de mídia orientado por IA que ingere, processa e reage ao comportamento do usuário em tempo real usando o MongoDB Atlas.

Arquitetura de pipeline de personalização de mídia com Atlas Stream Processing e pesquisa vetorial.

Figura 1. Arquitetura de personalização de mídia baseada em IA com Atlas Stream Processing e MongoDB Vector Search

clique para ampliar

Esta arquitetura opera em três fases, que são descritas em detalhes nas próximas seções:

  1. Obter e Enriquecer: capture eventos brutos de fluxo de cliques do aplicativo voltado para o usuário e combine-os com metadados de artigos em tempo real usando o Atlas Stream Processing.

  2. Sessionizar e Resumir: agrupe cliques relacionados em sessões e use um LLM para gerar resumos em linguagem natural dos interesses dos usuários.

  3. Pesquisar e Servir: use resumos gerados para impulsionar a pesquisa vetorial semântica e retornar recomendações personalizadas.

Na primeira fase, essa arquitetura de solução usa o Atlas Stream Processing para ingerir dados brutos de clickstream da sua plataforma de mídia e enriquecê-los com metadados dos artigos no seu banco de dados.

1

Esta solução utiliza as seguintes duas coleções no banco de dados news do mesmo Cluster MongoDB :

A coleção articles contém metadados sobre os artigos em seu catálogo. Neste exemplo, a coleção está no banco de dados news do cluster ClickstreamCluster.

Cada documento na coleção representa um artigo e contém metadados relevantes. Por exemplo:

{
"_id": {
"$oid": "696493bfbc1084032ac0adfe"
},
"title": "Ukraine updates, Day 6: ‘We are sacrificing our lives for freedom,’ Zelenskyy gets standing ovation after speech to European parliament",
"link": "https://nationalpost.com/news/world/ukraine-updates-day-6-russia-kyiv",
"keywords": null,
"creator": [
"National Post Wire Services"
],
"video_url": null,
"description": "Russia escalated shelling overnight of key cities in Ukraine as its troops on the ground move slowly in a large convoy toward the capital, Kyiv",
"content": "8:20 a.m. EST — Ukraine's Zelenskyy tells EU: 'Prove that you are with us\" Read More",
"pubDate": "2022-03-01 13:45:04",
"expire_at": "Wed, 07 Sep 2022 13:45:04 GMT",
"image_url": null,
"source_id": "nationalpost",
"country": [
"canada"
],
"category": [
"top"
],
"language": "english"
}

A coleção user_events contém eventos de cliques brutos ingeridos do seu aplicativo voltado para o usuário. Neste exemplo, a coleção está no banco de dados news do cluster ClickstreamCluster.

Você pode configurar uma fonte de dados de fluxo de cliques usando o sistema de coleção de evento de sua preferência. Para recriar o método mostrado no diagrama de arquitetura de referência, implemente uma API de coleção de eventos em seu aplicativo voltado para o usuário para enviar eventos de clique para um tópico do Apache Kafka. Em seguida, use o MongoDB Kafka Sink Connector para ler a partir do tópico de clickstream e gravar na coleção user_events do cluster MongoDB .

Cada clique em artigo gera um documento de evento com campos como session_id, article_id e timestamp. Por exemplo:

{
"_id": {
"$oid": "696a1ecd66a51be18fffb8fa"
},
"user_id": "user-2",
"session_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c",
"timestamp": {
"$date": "2026-01-16T16:49:41.208Z"
},
"event_type": "read",
"article_id": {
"$oid": "696493e6bc1084032ac116ed"
},
"device": "desktop",
"metadata": {
"time_on_page": 54,
"referral": "https://guzman.com/main/search/listmain.jsp"
}
}
2
  1. No Atlas, acesse a página Stream Processing do seu projeto.

    1. Se ainda não estiver exibido, selecione a organização que contém seu projeto no menu Organizações na barra de navegação.

    2. Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.

    3. Na barra lateral, clique em Stream Processing sob o título Streaming Data.

      A página Processamento de fluxo é exibida.

  2. Clique em Create a workspace.

  3. Na página Create a stream processing workspace, configure seu espaço de trabalho da seguinte maneira:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Workspace Name: article-personalization

3

O processador de streaming precisa de conexões com os dados do fluxo de cliques e os metadados do artigo para realizar o enriquecimento. As duas fontes de dados devem estar no mesmo cluster Atlas.

  1. No painel da sua área de trabalho do processamento de fluxos, clique em Manage.

  2. Na aba Connection Registry , clique em + Add Connection no canto superior direito.

  3. Na página Edit Connection, configure sua conexão da seguinte forma:

    • Tipo de Conexão: Atlas Database

    • Nome da Conexão: userevents

    • Atlas Cluster: o nome do cluster onde estão os dados do seu fluxo de cliques e do artigo (por exemplo, ClickstreamCluster)

    • Executar como: Read and write to any database

  4. Clique em Save changes para criar a conexão.

4

Crie um processador de fluxo chamado userIntentSummarizer com estágios que leem eventos brutos de clickstream da coleção user_events e enriquecem os eventos com metadados de artigo.

  1. Na página Processamento de fluxos do seu projeto Atlas, clique em Manage no painel do seu espaço de trabalho do processamento de fluxos.

  2. No JSON editor, copie e cole a seguinte definição de JSON na caixa de texto do editor de JSON para definir um processador de fluxo denominado userIntentSummarizer com estes estágios:

    • $source: lê eventos brutos de sequência de cliques da coleção user_events no banco de dados news do seu cluster de sequência de cliques conectado (conexão userevents).

    • $lookup: une os eventos brutos do clickstream com a coleção articles com base no campo article_id, trazendo metadados relevantes dos artigos dos campos description, keywords e title.

    • $addFields: projeta os campos description, keywords e title do campo article_details no nível superior do fluxo de evento para torná-los facilmente acessíveis para estágios downstream.

    • $project: projeta campos relevantes para processamento subsequente.

    {
    "$source": {
    "connectionName": "userevents",
    "db": "news",
    "collection": "user_events"
    }
    },
    {
    "$lookup": {
    "from": {
    "connectionName": "userevents",
    "db": "news",
    "coll": "articles"
    },
    "localField": "article_id",
    "foreignField": "_id",
    "as": "article_details",
    "pipeline": [
    {
    "$project": {
    "_id": 0,
    "description": 1,
    "keywords": 1,
    "title": 1
    }
    }
    ]
    }
    },
    {
    "$addFields": {
    "description": { "$arrayElemAt": [ "$article_details.description", 0 ] },
    "keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] },
    "title": { "$arrayElemAt": [ "$article_details.title", 0 ] }
    }
    },
    {
    "$project": {
    "userId": 1,
    "article_id": 1,
    "eventTime": 1,
    "event_type": 1,
    "device": 1,
    "session_id": 1,
    "description": 1,
    "keywords": 1,
    "title": 1
    }
    },
  3. Clique em Update stream processor para salvar suas alterações.

Nesta fase, estendemos o pipeline do processador de fluxo para agrupar cliques relacionados em sessões e usamos um LLM para gerar um resumo em linguagem natural de cada sessão que descreve os interesses do usuário.

1

Adicione uma conexão HTTPS externa ao seu provedor de LLM (por exemplo, Azure OpenAI) para permitir que o processador de fluxo chame o LLM diretamente do pipeline para enriquecer seus dados:

  1. No painel da sua área de trabalho do processamento de fluxos, clique em Manage.

  2. Na aba Connection Registry , clique em + Add Connection no canto superior direito.

  3. Configure a conexão da seguinte forma:

    • Tipo de Conexão: HTTPS

    • Nome da Conexão: azureopenai

    • URL: a URL do ponto de extremidade de sua instância do Azure OpenAI

    • Cabeçalhos: adicione estes pares chave-valor aos cabeçalhos:

      • Chave: Content-Type, Valor: application/json

      • Chave: api-key, Valor: Sua chave de API do Azure OpenAI

2

Adicione um estágio$sessionWindow ao pipeline do processador de fluxo para agrupar eventos relacionados em sessões com base em um intervalo de sessão especificado. Esta solução define uma sessão como uma sequência de eventos do mesmo session_id, sem intervalo de inatividade maior que 60 segundos.

Adicione este estágio ao pipeline userIntentSummarizer após as etapas de enriquecimento:

{
"$sessionWindow": {
"partitionBy": "$session_id",
"gap": { "unit": "second", "size": 60 },
"pipeline": [{
"$group": {
"_id": "$session_id", "titles": {
"$push": "$title"
}
}
}]
}
}
3

Adicione um estágio$https para chamar seu provedor LLM diretamente do pipeline de processamento de fluxo. Esta solução chama Azure OpenAI para gerar um resumo em linguagem natural de cada sessão que descreve os interesse do usuário com base nos títulos do artigo na sessão.

Adicione este estágio ao pipeline após o estágio $sessionWindow:

{
"$https": {
"connectionName": "azureopenai",
"method": "POST",
"as": "apiResults",
"config": { "parseJsonStrings": true },
"payload": [
{
"$project": {
"_id": 0,
"model": "gpt-4o-mini",
"response_format": { "type": "json_object" },
"messages": [
{
"role": "system",
"content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions the create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)"
},
{
"role": "user",
"content": { "$toString": "$titles" }
}
]
}
}
]
}
}

Observação

O próprio processador de fluxo é "inteligente". Ele transforma uma lista de títulos em um resumo semântico (por exemplo, "O usuário está pesquisando a logística de fabricação de semicondutores") antes que os dados cheguem ao disco. Isso difere fundamentalmente dos pipelines tradicionais de processamento em lote, que normalmente escrevem dados brutos de sessão em um banco de dados e depois chamam uma API externa a partir de um servidor de aplicação.

4

Adicione os seguintes estágios ao pipeline para extrair o resumo da saída de LLM e gravá-lo em uma nova coleção:

  • $match: filtra as sessões em que a chamada do LLM falhou e retornou um erro para evitar a gravação de dados incompletos no banco de dados.

  • $addFields: extrai o campo summary da saída do LLM e o adiciona ao nível superior do documento.

  • $project: remove a saída bruta do LLM do documento para reduzir o ruído e os custos de armazenamento.

  • $merge: grava os documentos resultantes em uma nova coleção chamada user_intent no banco de dados news do seu cluster de clickstream (conexão userevents). Cada documento nesta coleção representa uma sessão de usuário e contém um resumo dos interesses do usuário.

{
"$match": {
"titles": {
"$exists": true
},
"apiResults": {
"$exists": true
}
}
},
{
"$addFields": {
"summary": {
"$arrayElemAt": [
"$apiResults.choices.message.content.summary",
0
]
}
}
},
{
"$project": {
"apiResults": 0
}
},
{
"$merge": {
"into": {
"coll": "user_intent",
"connectionName": "userevents",
"db": "news"
}
}
}
5

Quando estiver pronto para começar a resumir seus dados de fluxo de cliques, clique no ícone Start do processador userIntentSummarizer na lista de processadores de fluxo do seu espaço de trabalho de processamento de fluxo.

Esse pipeline deve gravar documentos na coleção user_intent que contenham resumos de sessões que capturem os interesse do usuário. Por exemplo:

{
"_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c",
"summary": "The user seems interested in geopolitical developments, especially in the Middle East, US political strategies involving Trump, and legal aspects of government operations.",
"titles": [
"Israel and Hamas agree to part of Trump's Gaza peace plan, will free hostages and prisoners",
"Top officials from US and Qatar join talks aimed at brokering peace in Gaza",
"How Trump secured a Gaza breakthrough",
"Ontario's anti-tariff ad is clever, effective and legally sound, experts say",
"Shutdown? Trump's been dismantling the government all year",
"AP News Summary at 7:58 p.m. EDT"
]
}

a definição JSON completa para o processador de stream ``UserIntentSummarizer`` que executa todas as operações descritas nas Fases 1 e 2.

A seguir está a definição JSON completa para o processador de fluxo userIntentSummarizer que executa todas as operações descritas nas Fases 1 e 2, incluindo a ingestão de dados de fluxo de cliques, enriquecimento-os com metadados de artigo, sessão do comportamento do usuário, chamada de um LLM para resumir intenção do usuário e escrever os resumos em uma nova coleção.

{
"name": "userIntentSummarizer",
"pipeline": [
{
"$source": {
"connectionName": "userevents",
"db": "news",
"collection": "user_events"
}
},
{
"$lookup": {
"from": {
"connectionName": "userevents",
"db": "news",
"coll": "articles"
},
"localField": "article_id",
"foreignField": "_id",
"as": "article_details",
"pipeline": [
{
"$project": {
"_id": 0,
"description": 1,
"keywords": 1,
"title": 1
}
}
]
}
},
{
"$addFields": {
"description": { "$arrayElemAt": [ "$article_details.description", 0 ] },
"keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] },
"title": { "$arrayElemAt": [ "$article_details.title", 0 ] }
}
},
{
"$project": {
"userId": 1,
"article_id": 1,
"eventTime": 1,
"event_type": 1,
"device": 1,
"session_id": 1,
"description": 1,
"keywords": 1,
"title": 1
}
},
{
"$sessionWindow": {
"partitionBy": "$session_id",
"gap": { "unit": "second", "size": 60 },
"pipeline": [{
"$group": {
"_id": "$session_id", "titles": {
"$push": "$title"
}
}
}]
}
},
{
"$https": {
"connectionName": "azureopenai",
"method": "POST",
"as": "apiResults",
"config": { "parseJsonStrings": true },
"payload": [
{
"$project": {
"_id": 0,
"model": "gpt-4o-mini",
"response_format": { "type": "json_object" },
"messages": [
{
"role": "system",
"content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions then create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)"
},
{
"role": "user",
"content": { "$toString": "$titles" }
}
]
}
}
]
}
},
{
"$match": {
"titles": {
"$exists": true
},
"apiResults": {
"$exists": true
}
}
},
{
"$addFields": {
"summary": {
"$arrayElemAt": [
"$apiResults.choices.message.content.summary",
0
]
}
}
},
{
"$project": {
"apiResults": 0
}
},
{
"$merge": {
"into": {
"coll": "user_intent",
"connectionName": "userevents",
"db": "news"
}
}
}
]
}

Por fim, usamos o MongoDB Vector Search para realizar pesquisa semântica no catálogo de artigos, usando os resumos das sessões gerados na fase anterior para embasar recomendações de conteúdo personalizadas.

1

Antes de realizar a pesquisa semântica, você precisa gerar embeddings vetoriais para os dados do seu artigo. Para fazer isso, crie um índice do MongoDB Vector Search chamado vector_index, que indexa o campo description da sua coleção articles como o tipo autoEmbed. Isso instrui o MongoDB Vector Search a usar Embedding Automatizado para gerar automaticamente embeddings vetoriais para o campo description sempre que documentos forem inseridos ou atualizados na coleção.

Importante

A incorporação automatizada está disponível como um recurso de visualização somente para o MongoDB Community Edition v8.2 e versões mais recentes. O recurso e a documentação correspondente podem mudar a qualquer momento durante o período de Pré-visualização. Para aprender mais, consulte Visualizar recursos.

O Atlas oferece suporte à incorporação manual em todas as edições do MongoDB.

Utilize esta definição JSON para criar um índice de vetor nestes campos:

  • O campo description como o tipo autoEmbed para instruir o MongoDB Vector Search a gerar automaticamente embeddings vetoriais para o campo description usando o modelo de embedding voyage-4-large sempre que documentos são inseridos ou atualizados na coleção.

  • O campo title como o tipo filter para pré-filtrar os dados da pesquisa semântica usando o valor da string no campo. Isso permite que você exclua artigos que o usuário já leu dos resultados da pesquisa.

{
"fields": [
{
"type": "autoEmbed",
"modalitytype": "text",
"path": "description",
"model": "voyage-4-large"
},
{
"type": "filter",
"path": "title"
}
]
}
2

Quando um usuário visitar seu site, faça o resumo da sessão atual e use-o como consulta para uma pesquisa vetorial em seu catálogo de artigos. Como você habilitou o embedding automatizado no índice, o MongoDB Vector Search gera automaticamente o embedding para o resumo no momento da query e o utiliza como o vetor de consulta eficaz.

Esse exemplo mostra uma consulta de pesquisa vetorial simplificada que usa a sessão summary como vetor de consulta e exclui artigos que o usuário já leu com base no campo titles:

[{
"$vectorSearch": {
"index": "vector_index", // Vector index with autoEmbed on article descriptions
"path": "description",
"query": {
"text": "<session-summary>" // Session summary from user_intent document
},
"numCandidates": 100,
"filter": {
"title": { "$nin": [<read-titles>] } // Exclude articles the array of titles in the user_intent document
}
}
}]

Essa arquitetura demonstra vários avanços importantes na criação de produtos de dados modernos:

  • Latência reduzida: o embedding de chamadas LLM diretamente no processador de stream elimina vários saltos de rede e camadas de persistência intermediárias. O sistema transforma cliques brutos em intenção acionável quase em tempo real.

  • Experiência aprimorada do desenvolvedor: defina pipelines com MQL baseado em JSON, permitindo que as equipes que já conhecem as consultas do MongoDB criem cargas de trabalho avançadas de streaming e impulsionadas por IA sem aprender novas DSLs ou provisionar infraestrutura adicional.

  • Personalização semântica: vá além da correspondência de palavras-chave e dos trabalhos em lote durante a noite para criar sistemas que ouçam, pensem e respondam instantaneamente ao comportamento do usuário.

  • Vinod Krishnan, arquiteto de soluções, MongoDB