Casos de uso: Personalização, IA generativa
Setores: mídia, telecomunicações
Produtos e ferramentas: Atlas MongoDB, Busca Vetorial MongoDB, Processamento de Fluxo do Atlas MongoDB
Parceiros: Voyage AI, Azure OpenAI
Visão Geral da Solução
Quando um novo usuário acessa seu site de notícias, você tem segundos para entender o que ele está procurando antes que ele perca o interesse e vá embora. Isso introduz o problema do início a frio: como você recomenda conteúdo aos usuários sem nenhum dado anterior sobre eles?
Considere um usuário anônimo que chega ao seu site e clica em três artigos:
"VIDIA cria novo chip de IA"
"TSMC expande produção no Arizona"
"Ações da Intel cai em meio a atrasos"
Uma simples pesquisa por palavra-chave só recomendaria outros artigos sobre a Intel ou o Arizona. No entanto, um sistema inteligente reconhece que todos os três cliques estão relacionados a cadeias de fornecimento de semicondutores. Isso permite recomendar artigos relevantes como "O Futuro do Vale do Silício", mesmo quando eles não compartilham palavras-chave com o histórico de navegação do usuário.
A solução apresentada neste artigo constrói um sistema inteligente de personalização de mídia que ingere e processa dados de fluxo de cliques para gerar um resumo em linguagem natural dos interesses do usuário e recomendar artigos relevantes com os quais o usuário tenha maior probabilidade de interagir.
Esta solução combina:
Atlas Stream Processing para ingestão de dados em tempo real, enriquecimento e integração LLM para resumir a intenção do usuário a partir de dados de fluxo de cliques
Atlas Pesquisa Vetorial com embeddings automatizados da Voyage IA para Recuperação Semântica e Recomendações
Arquitetura de referência
Essa arquitetura mostra como criar um mecanismo de recomendação de mídia orientado por IA que ingere, processa e reage ao comportamento do usuário em tempo real usando o MongoDB Atlas.

Figura 1. Arquitetura de personalização de mídia baseada em IA com Atlas Stream Processing e MongoDB Vector Search
Esta arquitetura opera em três fases, que são descritas em detalhes nas próximas seções:
Obter e Enriquecer: capture eventos brutos de fluxo de cliques do aplicativo voltado para o usuário e combine-os com metadados de artigos em tempo real usando o Atlas Stream Processing.
Sessionizar e Resumir: agrupe cliques relacionados em sessões e use um LLM para gerar resumos em linguagem natural dos interesses dos usuários.
Pesquisar e Servir: use resumos gerados para impulsionar a pesquisa vetorial semântica e retornar recomendações personalizadas.
Fase 1: Ingerir e Enriquecer Dados de Clickstream
Na primeira fase, essa arquitetura de solução usa o Atlas Stream Processing para ingerir dados brutos de clickstream da sua plataforma de mídia e enriquecê-los com metadados dos artigos no seu banco de dados.
Configure suas fontes de dados.
Esta solução utiliza as seguintes duas coleções no banco de dados news do mesmo Cluster MongoDB :
A coleção articles contém metadados sobre os artigos em seu catálogo. Neste exemplo, a coleção está no banco de dados news do cluster ClickstreamCluster.
Cada documento na coleção representa um artigo e contém metadados relevantes. Por exemplo:
{ "_id": { "$oid": "696493bfbc1084032ac0adfe" }, "title": "Ukraine updates, Day 6: ‘We are sacrificing our lives for freedom,’ Zelenskyy gets standing ovation after speech to European parliament", "link": "https://nationalpost.com/news/world/ukraine-updates-day-6-russia-kyiv", "keywords": null, "creator": [ "National Post Wire Services" ], "video_url": null, "description": "Russia escalated shelling overnight of key cities in Ukraine as its troops on the ground move slowly in a large convoy toward the capital, Kyiv", "content": "8:20 a.m. EST — Ukraine's Zelenskyy tells EU: 'Prove that you are with us\" Read More", "pubDate": "2022-03-01 13:45:04", "expire_at": "Wed, 07 Sep 2022 13:45:04 GMT", "image_url": null, "source_id": "nationalpost", "country": [ "canada" ], "category": [ "top" ], "language": "english" }
A coleção user_events contém eventos de cliques brutos ingeridos do seu aplicativo voltado para o usuário. Neste exemplo, a coleção está no banco de dados news do cluster ClickstreamCluster.
Você pode configurar uma fonte de dados de fluxo de cliques usando o sistema de coleção de evento de sua preferência. Para recriar o método mostrado no diagrama de arquitetura de referência, implemente uma API de coleção de eventos em seu aplicativo voltado para o usuário para enviar eventos de clique para um tópico do Apache Kafka. Em seguida, use o MongoDB Kafka Sink Connector para ler a partir do tópico de clickstream e gravar na coleção user_events do cluster MongoDB .
Cada clique em artigo gera um documento de evento com campos como session_id, article_id e timestamp. Por exemplo:
{ "_id": { "$oid": "696a1ecd66a51be18fffb8fa" }, "user_id": "user-2", "session_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c", "timestamp": { "$date": "2026-01-16T16:49:41.208Z" }, "event_type": "read", "article_id": { "$oid": "696493e6bc1084032ac116ed" }, "device": "desktop", "metadata": { "time_on_page": 54, "referral": "https://guzman.com/main/search/listmain.jsp" } }
Crie um espaço de trabalho do Stream Processing.
No Atlas, acesse a página Stream Processing do seu projeto.
Se ainda não estiver exibido, selecione a organização que contém seu projeto no menu Organizações na barra de navegação.
Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.
Na barra lateral, clique em Stream Processing sob o título Streaming Data.
A página Processamento de fluxo é exibida.
Clique em Create a workspace.
Na página Create a stream processing workspace, configure seu espaço de trabalho da seguinte maneira:
Tier:
SP30Provider:
AWSRegion:
us-east-1Workspace Name:
article-personalization
Adicione uma conexão para os dados de fluxo de cliques e artigos.
O processador de streaming precisa de conexões com os dados do fluxo de cliques e os metadados do artigo para realizar o enriquecimento. As duas fontes de dados devem estar no mesmo cluster Atlas.
No painel da sua área de trabalho do processamento de fluxos, clique em Manage.
Na aba Connection Registry , clique em + Add Connection no canto superior direito.
Na página Edit Connection, configure sua conexão da seguinte forma:
Tipo de Conexão:
Atlas DatabaseNome da Conexão:
usereventsAtlas Cluster: o nome do cluster onde estão os dados do seu fluxo de cliques e do artigo (por exemplo,
ClickstreamCluster)Executar como:
Read and write to any database
Clique em Save changes para criar a conexão.
Crie um processador de fluxo persistente.
Crie um processador de fluxo chamado userIntentSummarizer com estágios que leem eventos brutos de clickstream da coleção user_events e enriquecem os eventos com metadados de artigo.
Na página Processamento de fluxos do seu projeto Atlas, clique em Manage no painel do seu espaço de trabalho do processamento de fluxos.
No JSON editor, copie e cole a seguinte definição de JSON na caixa de texto do editor de JSON para definir um processador de fluxo denominado
userIntentSummarizercom estes estágios:$source: lê eventos brutos de sequência de cliques da coleçãouser_eventsno banco de dadosnewsdo seu cluster de sequência de cliques conectado (conexãouserevents).$lookup: une os eventos brutos do clickstream com a coleçãoarticlescom base no campoarticle_id, trazendo metadados relevantes dos artigos dos camposdescription,keywordsetitle.$addFields: projeta os camposdescription,keywordsetitledo campoarticle_detailsno nível superior do fluxo de evento para torná-los facilmente acessíveis para estágios downstream.$project: projeta campos relevantes para processamento subsequente.
{ "$source": { "connectionName": "userevents", "db": "news", "collection": "user_events" } }, { "$lookup": { "from": { "connectionName": "userevents", "db": "news", "coll": "articles" }, "localField": "article_id", "foreignField": "_id", "as": "article_details", "pipeline": [ { "$project": { "_id": 0, "description": 1, "keywords": 1, "title": 1 } } ] } }, { "$addFields": { "description": { "$arrayElemAt": [ "$article_details.description", 0 ] }, "keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] }, "title": { "$arrayElemAt": [ "$article_details.title", 0 ] } } }, { "$project": { "userId": 1, "article_id": 1, "eventTime": 1, "event_type": 1, "device": 1, "session_id": 1, "description": 1, "keywords": 1, "title": 1 } }, Clique em Update stream processor para salvar suas alterações.
Fase 2: Sessionizar e Resumir o Comportamento do Usuário
Nesta fase, estendemos o pipeline do processador de fluxo para agrupar cliques relacionados em sessões e usamos um LLM para gerar um resumo em linguagem natural de cada sessão que descreve os interesses do usuário.
Conecte seu provedor de LLM ao seu espaço de trabalho do processamento de fluxos.
Adicione uma conexão HTTPS externa ao seu provedor de LLM (por exemplo, Azure OpenAI) para permitir que o processador de fluxo chame o LLM diretamente do pipeline para enriquecer seus dados:
No painel da sua área de trabalho do processamento de fluxos, clique em Manage.
Na aba Connection Registry , clique em + Add Connection no canto superior direito.
Configure a conexão da seguinte forma:
Tipo de Conexão:
HTTPSNome da Conexão:
azureopenaiURL: a URL do ponto de extremidade de sua instância do Azure OpenAI
Cabeçalhos: adicione estes pares chave-valor aos cabeçalhos:
Chave:
Content-Type, Valor:application/jsonChave:
api-key, Valor: Sua chave de API do Azure OpenAI
Sessionize os dados do fluxo de cliques usando o $sessionWindow estágio .
Adicione um estágio$sessionWindow ao pipeline do processador de fluxo para agrupar eventos relacionados em sessões com base em um intervalo de sessão especificado. Esta solução define uma sessão como uma sequência de eventos do mesmo session_id, sem intervalo de inatividade maior que 60 segundos.
Adicione este estágio ao pipeline userIntentSummarizer após as etapas de enriquecimento:
{ "$sessionWindow": { "partitionBy": "$session_id", "gap": { "unit": "second", "size": 60 }, "pipeline": [{ "$group": { "_id": "$session_id", "titles": { "$push": "$title" } } }] } }
Resuma as sessões de usuários usando o $https estágio.
Adicione um estágio$https para chamar seu provedor LLM diretamente do pipeline de processamento de fluxo. Esta solução chama Azure OpenAI para gerar um resumo em linguagem natural de cada sessão que descreve os interesse do usuário com base nos títulos do artigo na sessão.
Adicione este estágio ao pipeline após o estágio $sessionWindow:
{ "$https": { "connectionName": "azureopenai", "method": "POST", "as": "apiResults", "config": { "parseJsonStrings": true }, "payload": [ { "$project": { "_id": 0, "model": "gpt-4o-mini", "response_format": { "type": "json_object" }, "messages": [ { "role": "system", "content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions the create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)" }, { "role": "user", "content": { "$toString": "$titles" } } ] } } ] } }
Observação
O próprio processador de fluxo é "inteligente". Ele transforma uma lista de títulos em um resumo semântico (por exemplo, "O usuário está pesquisando a logística de fabricação de semicondutores") antes que os dados cheguem ao disco. Isso difere fundamentalmente dos pipelines tradicionais de processamento em lote, que normalmente escrevem dados brutos de sessão em um banco de dados e depois chamam uma API externa a partir de um servidor de aplicação.
Gravar resumos de sessões para uma nova coleção.
Adicione os seguintes estágios ao pipeline para extrair o resumo da saída de LLM e gravá-lo em uma nova coleção:
$match: filtra as sessões em que a chamada do LLM falhou e retornou um erro para evitar a gravação de dados incompletos no banco de dados.$addFields: extrai o camposummaryda saída do LLM e o adiciona ao nível superior do documento.$project: remove a saída bruta do LLM do documento para reduzir o ruído e os custos de armazenamento.$merge: grava os documentos resultantes em uma nova coleção chamadauser_intentno banco de dadosnewsdo seu cluster de clickstream (conexãouserevents). Cada documento nesta coleção representa uma sessão de usuário e contém um resumo dos interesses do usuário.
{ "$match": { "titles": { "$exists": true }, "apiResults": { "$exists": true } } }, { "$addFields": { "summary": { "$arrayElemAt": [ "$apiResults.choices.message.content.summary", 0 ] } } }, { "$project": { "apiResults": 0 } }, { "$merge": { "into": { "coll": "user_intent", "connectionName": "userevents", "db": "news" } } }
Inicie o processador de fluxo.
Quando estiver pronto para começar a resumir seus dados de fluxo de cliques, clique no ícone Start do processador userIntentSummarizer na lista de processadores de fluxo do seu espaço de trabalho de processamento de fluxo.
Esse pipeline deve gravar documentos na coleção user_intent que contenham resumos de sessões que capturem os interesse do usuário. Por exemplo:
{ "_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c", "summary": "The user seems interested in geopolitical developments, especially in the Middle East, US political strategies involving Trump, and legal aspects of government operations.", "titles": [ "Israel and Hamas agree to part of Trump's Gaza peace plan, will free hostages and prisoners", "Top officials from US and Qatar join talks aimed at brokering peace in Gaza", "How Trump secured a Gaza breakthrough", "Ontario's anti-tariff ad is clever, effective and legally sound, experts say", "Shutdown? Trump's been dismantling the government all year", "AP News Summary at 7:58 p.m. EDT" ] }
A seguir está a definição JSON completa para o processador de fluxo userIntentSummarizer que executa todas as operações descritas nas Fases 1 e 2, incluindo a ingestão de dados de fluxo de cliques, enriquecimento-os com metadados de artigo, sessão do comportamento do usuário, chamada de um LLM para resumir intenção do usuário e escrever os resumos em uma nova coleção.
{ "name": "userIntentSummarizer", "pipeline": [ { "$source": { "connectionName": "userevents", "db": "news", "collection": "user_events" } }, { "$lookup": { "from": { "connectionName": "userevents", "db": "news", "coll": "articles" }, "localField": "article_id", "foreignField": "_id", "as": "article_details", "pipeline": [ { "$project": { "_id": 0, "description": 1, "keywords": 1, "title": 1 } } ] } }, { "$addFields": { "description": { "$arrayElemAt": [ "$article_details.description", 0 ] }, "keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] }, "title": { "$arrayElemAt": [ "$article_details.title", 0 ] } } }, { "$project": { "userId": 1, "article_id": 1, "eventTime": 1, "event_type": 1, "device": 1, "session_id": 1, "description": 1, "keywords": 1, "title": 1 } }, { "$sessionWindow": { "partitionBy": "$session_id", "gap": { "unit": "second", "size": 60 }, "pipeline": [{ "$group": { "_id": "$session_id", "titles": { "$push": "$title" } } }] } }, { "$https": { "connectionName": "azureopenai", "method": "POST", "as": "apiResults", "config": { "parseJsonStrings": true }, "payload": [ { "$project": { "_id": 0, "model": "gpt-4o-mini", "response_format": { "type": "json_object" }, "messages": [ { "role": "system", "content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions then create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)" }, { "role": "user", "content": { "$toString": "$titles" } } ] } } ] } }, { "$match": { "titles": { "$exists": true }, "apiResults": { "$exists": true } } }, { "$addFields": { "summary": { "$arrayElemAt": [ "$apiResults.choices.message.content.summary", 0 ] } } }, { "$project": { "apiResults": 0 } }, { "$merge": { "into": { "coll": "user_intent", "connectionName": "userevents", "db": "news" } } } ] }
Fase 3: Pesquisa Semântica e Fornecimento de Recomendações Personalizadas
Por fim, usamos o MongoDB Vector Search para realizar pesquisa semântica no catálogo de artigos, usando os resumos das sessões gerados na fase anterior para embasar recomendações de conteúdo personalizadas.
Prepare os dados do artigo para recuperação semântica.
Antes de realizar a pesquisa semântica, você precisa gerar embeddings vetoriais para os dados do seu artigo. Para fazer isso, crie um índice do MongoDB Vector Search chamado vector_index, que indexa o campo description da sua coleção articles como o tipo autoEmbed. Isso instrui o MongoDB Vector Search a usar Embedding Automatizado para gerar automaticamente embeddings vetoriais para o campo description sempre que documentos forem inseridos ou atualizados na coleção.
Importante
A incorporação automatizada está disponível como um recurso de visualização somente para o MongoDB Community Edition v8.2 e versões mais recentes. O recurso e a documentação correspondente podem mudar a qualquer momento durante o período de Pré-visualização. Para aprender mais, consulte Visualizar recursos.
O Atlas oferece suporte à incorporação manual em todas as edições do MongoDB.
Utilize esta definição JSON para criar um índice de vetor nestes campos:
O campo
descriptioncomo o tipoautoEmbedpara instruir o MongoDB Vector Search a gerar automaticamente embeddings vetoriais para o campodescriptionusando o modelo de embeddingvoyage-4-largesempre que documentos são inseridos ou atualizados na coleção.O campo
titlecomo o tipofilterpara pré-filtrar os dados da pesquisa semântica usando o valor da string no campo. Isso permite que você exclua artigos que o usuário já leu dos resultados da pesquisa.
{ "fields": [ { "type": "autoEmbed", "modalitytype": "text", "path": "description", "model": "voyage-4-large" }, { "type": "filter", "path": "title" } ] }
Execute consultas de pesquisa semântica para servir recomendações personalizadas.
Quando um usuário visitar seu site, faça o resumo da sessão atual e use-o como consulta para uma pesquisa vetorial em seu catálogo de artigos. Como você habilitou o embedding automatizado no índice, o MongoDB Vector Search gera automaticamente o embedding para o resumo no momento da query e o utiliza como o vetor de consulta eficaz.
Esse exemplo mostra uma consulta de pesquisa vetorial simplificada que usa a sessão summary como vetor de consulta e exclui artigos que o usuário já leu com base no campo titles:
[{ "$vectorSearch": { "index": "vector_index", // Vector index with autoEmbed on article descriptions "path": "description", "query": { "text": "<session-summary>" // Session summary from user_intent document }, "numCandidates": 100, "filter": { "title": { "$nin": [<read-titles>] } // Exclude articles the array of titles in the user_intent document } } }]
Principais Aprendizados
Essa arquitetura demonstra vários avanços importantes na criação de produtos de dados modernos:
Latência reduzida: o embedding de chamadas LLM diretamente no processador de stream elimina vários saltos de rede e camadas de persistência intermediárias. O sistema transforma cliques brutos em intenção acionável quase em tempo real.
Experiência aprimorada do desenvolvedor: defina pipelines com MQL baseado em JSON, permitindo que as equipes que já conhecem as consultas do MongoDB criem cargas de trabalho avançadas de streaming e impulsionadas por IA sem aprender novas DSLs ou provisionar infraestrutura adicional.
Personalização semântica: vá além da correspondência de palavras-chave e dos trabalhos em lote durante a noite para criar sistemas que ouçam, pensem e respondam instantaneamente ao comportamento do usuário.
Autores
- Vinod Krishnan, arquiteto de soluções, MongoDB
Saiba mais
Para entender como o Atlas Vector Search aprimora a pesquisa semântica e habilita a análise em tempo real, visite a página do Atlas Vector Search.
Para saber como o MongoDB está transformando as operações de mídia, leia o artigo Personalização de mídia alimentada por IA: MongoDB e Vector Search.
Para descobrir como o MongoDB oferece suporte a fluxos de trabalho de mídia modernos, visite a página MongoDB para mídia e entretenimento.
Para saber mais sobre o Processamento de Stream do Atlas, visite a documentação do Processamento de Stream do Atlas.