Recomendaciones de contenido multimedia en tiempo real con Atlas Stream Processing y MongoDB Vector Search.
caso de uso: Personalización, IA Gen
Industrias: Medios de comunicación, Telecomunicaciones
Productos y Herramientas: MongoDB Atlas, Búsqueda vectorialde MongoDB,Procesamientode flujos de MongoDB Atlas
Socios: Voyage AIAzure OpenAI
Resumen de la solución
Cuando un nuevo usuario llega a tu sitio de noticias, tienes solo unos segundos para entender qué busca antes de que pierda el interés y se vaya. Esto plantea el problema del arranque en frío: ¿cómo recomendar contenido a usuarios sin información previa sobre ellos?
Imaginemos un usuario anónimo que llega a tu sitio web y hace clic en tres artículos:
"NVIDIA crea un nuevo chip de IA"
"TSMC amplía su producción en Arizona"
"Las acciones de Intel caen en medio de los retrasos"
Una simple búsqueda por palabras clave solo recomendaría artículos sobre Intel o Arizona. Sin embargo, un sistema inteligente reconoce que los tres clics están relacionados con las cadenas de suministro de semiconductores. Esto le permite recomendar artículos relevantes como "El futuro de Silicon Valley", incluso cuando no comparten palabras clave con el historial de navegación del usuario.
La solución que se presenta en este artículo crea un sistema inteligente de personalización de medios que recopila y procesa datos de clics para generar un resumen en lenguaje natural de los intereses del usuario y recomendar artículos relevantes con los que es más probable que el usuario interactúe.
Esta solución combina:
Atlas Stream Processing para la ingesta de datos en tiempo real, enriquecimiento e integración LLM para resumir la intención del usuario a partir de datos de flujo de clics.
Atlas Vector Search con incrustaciones automatizadas de Voyage AI para recuperación semántica y recomendaciones.
Arquitectura de referencia
Esta arquitectura muestra cómo construir un motor de recomendación de contenido multimedia basado en inteligencia artificial que ingiere, procesa y reacciona al comportamiento del usuario en tiempo real utilizando MongoDB Atlas.

Figura 1. Arquitectura de personalización de medios impulsada por IA con Atlas Stream Processing y MongoDB Vector Search.
Esta arquitectura funciona en tres fases, que se describen en detalle en las siguientes secciones:
Ingesta y enriquecimiento: Captura los eventos de clics sin procesar de la aplicación de cara al usuario y combínalos con los metadatos del artículo en tiempo real mediante el procesamiento de flujos de Atlas.
Organiza y resume las sesiones: agrupa los clics relacionados en sesiones y utiliza un modelo de lenguaje natural (LLM) para generar resúmenes en lenguaje natural de los intereses del usuario.
Búsqueda y servicio: Utilice los resúmenes generados para impulsar la búsqueda vectorial semántica y devolver recomendaciones personalizadas.
Fase 1: Ingesta y enriquecimiento de datos de flujo de clics
En la primera fase, esta arquitectura de solución utiliza Atlas Stream Processing para ingerir datos brutos de flujo de clics desde su plataforma de medios y enriquecerlos con metadatos de los artículos de su base de datos.
Configura tus fuentes de datos.
Esta solución utiliza las dos colecciones siguientes en la news
base de datos del mismo clúster de MongoDB:
La colección articles contiene metadatos sobre los artículos de su catálogo. En este ejemplo, la colección se encuentra en la base de datos news del clúster ClickstreamCluster.
Cada documento de la colección representa un artículo y contiene metadatos relevantes. Por ejemplo:
{ "_id": { "$oid": "696493bfbc1084032ac0adfe" }, "title": "Ukraine updates, Day 6: ‘We are sacrificing our lives for freedom,’ Zelenskyy gets standing ovation after speech to European parliament", "link": "https://nationalpost.com/news/world/ukraine-updates-day-6-russia-kyiv", "keywords": null, "creator": [ "National Post Wire Services" ], "video_url": null, "description": "Russia escalated shelling overnight of key cities in Ukraine as its troops on the ground move slowly in a large convoy toward the capital, Kyiv", "content": "8:20 a.m. EST — Ukraine's Zelenskyy tells EU: 'Prove that you are with us\" Read More", "pubDate": "2022-03-01 13:45:04", "expire_at": "Wed, 07 Sep 2022 13:45:04 GMT", "image_url": null, "source_id": "nationalpost", "country": [ "canada" ], "category": [ "top" ], "language": "english" }
La colección user_events contiene eventos de flujo de clics sin procesar ingeridos desde su aplicación orientada al usuario. En este ejemplo, la colección se encuentra en la base de datos news del clúster ClickstreamCluster.
Puedes configurar una fuente de datos de flujo de clics utilizando tu sistema de recopilación de eventos preferido. Para recrear el método que se muestra en el diagrama de arquitectura de referencia, implementa una API de recopilación de eventos en tu aplicación de cara al usuario para enviar eventos de clic a un tema de Apache Kafka. Luego, usa el conector Kafka Sink de MongoDB para leer desde el tema de flujo de clics y escribir en la colección de tu clúster user_events de MongoDB.
Cada clic en un artículo genera un documento de evento con campos como session_id, article_id y timestamp. Por ejemplo:
{ "_id": { "$oid": "696a1ecd66a51be18fffb8fa" }, "user_id": "user-2", "session_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c", "timestamp": { "$date": "2026-01-16T16:49:41.208Z" }, "event_type": "read", "article_id": { "$oid": "696493e6bc1084032ac116ed" }, "device": "desktop", "metadata": { "time_on_page": 54, "referral": "https://guzman.com/main/search/listmain.jsp" } }
Cree un espacio de trabajo de Stream Processing.
En Atlas, ve a Stream Processing página para tu proyecto.
Si aún no aparece, se debe seleccionar la organización que contiene el proyecto en el menú Organizations de la barra de navegación.
Si aún no se muestra, seleccione su proyecto en el menú Projects de la barra de navegación.
En la barra lateral, haz clic en Stream Processing en la sección Streaming Data.
Se muestra la página Stream Processing.
Haga clic en Create a workspace.
En la página Create a stream processing workspace, configura tu espacio de trabajo de la siguiente manera:
Tier:
SP30Provider:
AWSRegion:
us-east-1Workspace Name:
article-personalization
Agregue una conexión para los datos de su flujo de clics y de sus artículos.
El procesador de flujo necesita conexiones tanto a los datos de flujo de clics como a los metadatos de los artículos para realizar el enriquecimiento. Ambas fuentes de datos deben estar en el mismo clúster de Atlas.
En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Manage.
En la pestaña Connection Registry, haga clic en + Add Connection en la parte superior derecha.
En la página Edit Connection, configure su conexión de la siguiente manera:
Tipo de conexión:
Atlas DatabaseNombre de la conexión:
usereventsClúster Atlas: El nombre del clúster donde se encuentran los datos de tu flujo de clics y artículos (por ejemplo,).
ClickstreamClusterEjecutar como:
Read and write to any database
Haz clic en Save changes para crear la conexión.
Crear un procesador de flujo persistente.
Cree un procesador de flujo llamado userIntentSummarizer con etapas que lean eventos de flujo de clics sin procesar de la colección user_events y enriquezcan los eventos con metadatos del artículo.
En la página Procesamiento de flujo de su proyecto Atlas,Manage haga clic en en el panel de su espacio de trabajo de procesamiento de flujo.
En el JSON editor, copie y pegue lo siguiente Introduzca la definiciónJSON en el cuadro de texto del editor JSON para definir un procesador de flujo llamado
userIntentSummarizercon las siguientes etapas:$source: Lee eventos de flujo de clics sin procesar de lauser_eventscolección en lanewsbase de datos de su clúster de flujo de clics conectadouserevents(conexión).$lookup: Combina los eventos de flujo de clics sin procesar con laarticlescolección basada en elarticle_idcampo, trayendo metadatos relevantes del artículo de los campos,descriptionkeywordstitley.$addFields: Proyecta losdescriptionkeywordscampos, ytitledel campoarticle_detailsen el nivel superior del flujo de eventos para que sean fácilmente accesibles para las etapas posteriores.$project: Proyectos de campos relevantes para el procesamiento posterior.
{ "$source": { "connectionName": "userevents", "db": "news", "collection": "user_events" } }, { "$lookup": { "from": { "connectionName": "userevents", "db": "news", "coll": "articles" }, "localField": "article_id", "foreignField": "_id", "as": "article_details", "pipeline": [ { "$project": { "_id": 0, "description": 1, "keywords": 1, "title": 1 } } ] } }, { "$addFields": { "description": { "$arrayElemAt": [ "$article_details.description", 0 ] }, "keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] }, "title": { "$arrayElemAt": [ "$article_details.title", 0 ] } } }, { "$project": { "userId": 1, "article_id": 1, "eventTime": 1, "event_type": 1, "device": 1, "session_id": 1, "description": 1, "keywords": 1, "title": 1 } }, Haga clic en Update stream processor para guardar los cambios.
Fase 2: Sesión y resumen del comportamiento del usuario
En esta fase, ampliamos el proceso de procesamiento de flujos para agrupar los clics relacionados en sesiones y utilizamos un modelo de lenguaje natural (LLM) para generar un resumen en lenguaje natural de cada sesión que describa los intereses del usuario.
Conecte su proveedor de LLM a su espacio de trabajo de procesamiento de flujos de datos.
Agregue una conexión HTTPS externa a su proveedor LLM (por ejemplo, Azure OpenAI) para permitir que el procesador de flujo llame al LLM directamente desde la canalización para enriquecer sus datos:
En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Manage.
En la pestaña Connection Registry, haga clic en + Add Connection en la parte superior derecha.
Configure la conexión de la siguiente manera:
Tipo de conexión:
HTTPSNombre de la conexión:
azureopenaiURL: La URL del punto final para su instancia de Azure OpenAI
Encabezados: Añada estos pares clave-valor a los encabezados:
Clave:
Content-Type, Valor:application/jsonClave:
api-key, Valor: Su clave de API de Azure OpenAI
Separe los datos de flujo de clics utilizando la $sessionWindow etapa.
Agregue una $sessionWindow etapa a su canalización de procesamiento de flujo para agrupar eventos relacionados en sesiones según un intervalo de sesión especificado. Esta solución define una sesión como una secuencia de eventos del mismo session_id sin un intervalo de inactividad superior a 60 segundos.
Agregue esta etapa a su canalización userIntentSummarizer después de las etapas de enriquecimiento:
{ "$sessionWindow": { "partitionBy": "$session_id", "gap": { "unit": "second", "size": 60 }, "pipeline": [{ "$group": { "_id": "$session_id", "titles": { "$push": "$title" } } }] } }
Resuma las sesiones de usuario utilizando la $https etapa.
Agregue una etapa para llamar a su proveedor LLM directamente desde su canalización de procesamiento de flujo. Esta solución llama a Azure OpenAI para generar un resumen en lenguaje natural de cada sesión que describe los intereses del usuario en función de los títulos de los artículos de la $https sesión.
Agregue esta etapa a su canalización después de la etapa $sessionWindow:
{ "$https": { "connectionName": "azureopenai", "method": "POST", "as": "apiResults", "config": { "parseJsonStrings": true }, "payload": [ { "$project": { "_id": 0, "model": "gpt-4o-mini", "response_format": { "type": "json_object" }, "messages": [ { "role": "system", "content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions the create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)" }, { "role": "user", "content": { "$toString": "$titles" } } ] } } ] } }
Nota
El procesador de flujo en sí es "inteligente". Transforma una lista de títulos en un resumen semántico (por ejemplo, "El usuario está investigando la logística de fabricación de semiconductores") antes de que los datos lleguen al disco. Esto difiere fundamentalmente de los procesos por lotes tradicionales, que normalmente escriben los datos de sesión sin procesar en una base de datos y luego llaman a una API externa desde un servidor de aplicaciones.
Escribe resúmenes de las sesiones para una nueva colección.
Agregue las siguientes etapas al proceso para extraer el resumen de la salida de LLM y escribirlo en una nueva colección:
$match: Filtra las sesiones en las que la llamada LLM falló y devolvió un error para evitar escribir datos incompletos en la base de datos.$addFields: Extrae elsummarycampo de la salida LLM y lo agrega al nivel superior del documento.$projectElimina la salida LLM sin procesar del documento para reducir el ruido y los costos de almacenamiento.$merge: Escribe los documentos resultantes en una nueva colección llamadauser_intenten lanewsbase de datos de tu clúster de flujo de clicsuserevents(conexión). Cada documento en esta colección representa una sesión de usuario y contiene un resumen de los intereses del usuario.
{ "$match": { "titles": { "$exists": true }, "apiResults": { "$exists": true } } }, { "$addFields": { "summary": { "$arrayElemAt": [ "$apiResults.choices.message.content.summary", 0 ] } } }, { "$project": { "apiResults": 0 } }, { "$merge": { "into": { "coll": "user_intent", "connectionName": "userevents", "db": "news" } } }
Inicie el procesador de flujo.
Cuando esté listo para comenzar a resumir sus datos de flujo de clics, haga clic en el icono Start para el procesador userIntentSummarizer en la lista de procesadores de flujo para su espacio de trabajo de procesamiento de flujo.
Esta canalización debe escribir documentos en la colección user_intent que contengan resúmenes de sesión que capturen los intereses del usuario. Por ejemplo:
{ "_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c", "summary": "The user seems interested in geopolitical developments, especially in the Middle East, US political strategies involving Trump, and legal aspects of government operations.", "titles": [ "Israel and Hamas agree to part of Trump's Gaza peace plan, will free hostages and prisoners", "Top officials from US and Qatar join talks aimed at brokering peace in Gaza", "How Trump secured a Gaza breakthrough", "Ontario's anti-tariff ad is clever, effective and legally sound, experts say", "Shutdown? Trump's been dismantling the government all year", "AP News Summary at 7:58 p.m. EDT" ] }
A continuación se muestra la definición JSON completa para el userIntentSummarizer procesador de flujo que realiza todas las operaciones descritas en las fases 1 2y, incluyendo la ingesta de datos de flujo de clics, su enriquecimiento con metadatos de artículos, la creación de sesiones del comportamiento del usuario, la llamada a un LLM para resumir la intención del usuario y la escritura de los resúmenes en una nueva colección.
{ "name": "userIntentSummarizer", "pipeline": [ { "$source": { "connectionName": "userevents", "db": "news", "collection": "user_events" } }, { "$lookup": { "from": { "connectionName": "userevents", "db": "news", "coll": "articles" }, "localField": "article_id", "foreignField": "_id", "as": "article_details", "pipeline": [ { "$project": { "_id": 0, "description": 1, "keywords": 1, "title": 1 } } ] } }, { "$addFields": { "description": { "$arrayElemAt": [ "$article_details.description", 0 ] }, "keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] }, "title": { "$arrayElemAt": [ "$article_details.title", 0 ] } } }, { "$project": { "userId": 1, "article_id": 1, "eventTime": 1, "event_type": 1, "device": 1, "session_id": 1, "description": 1, "keywords": 1, "title": 1 } }, { "$sessionWindow": { "partitionBy": "$session_id", "gap": { "unit": "second", "size": 60 }, "pipeline": [{ "$group": { "_id": "$session_id", "titles": { "$push": "$title" } } }] } }, { "$https": { "connectionName": "azureopenai", "method": "POST", "as": "apiResults", "config": { "parseJsonStrings": true }, "payload": [ { "$project": { "_id": 0, "model": "gpt-4o-mini", "response_format": { "type": "json_object" }, "messages": [ { "role": "system", "content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions then create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)" }, { "role": "user", "content": { "$toString": "$titles" } } ] } } ] } }, { "$match": { "titles": { "$exists": true }, "apiResults": { "$exists": true } } }, { "$addFields": { "summary": { "$arrayElemAt": [ "$apiResults.choices.message.content.summary", 0 ] } } }, { "$project": { "apiResults": 0 } }, { "$merge": { "into": { "coll": "user_intent", "connectionName": "userevents", "db": "news" } } } ] }
Fase 3: Búsqueda semántica y recomendaciones personalizadas
Finalmente, utilizamos MongoDB Vector Search para realizar una búsqueda semántica en el catálogo de artículos, utilizando los resúmenes de sesión generados en la fase anterior para impulsar recomendaciones de contenido personalizadas.
Preparar los datos de los artículos para la recuperación semántica.
Antes de poder realizar una búsqueda semántica, debe generar incrustaciones vectoriales para los datos de sus artículos. Para ello, cree un índice de MongoDB Vector Search llamado vector_index que indexe el description campo de su articles colección como autoEmbed tipo. Esto indica a MongoDB Vector Search que utilice la incrustación automatizada para generar automáticamente incrustaciones vectoriales para el description campo cada vez que se inserten o actualicen documentos en la colección.
Importante
La incrustación automatizada está disponible como función de vista previa solo para MongoDB Community Edition v8.2 y versiones posteriores. Esta función y la documentación correspondiente pueden cambiar en cualquier momento durante el periodo de vista previa. Para obtener más información, consulte Funciones de vista previa.
Atlas admite la incrustación manual en todas las ediciones de MongoDB.
Utilice esta definición JSON para crear un índice vectorial en estos campos:
El campo
descriptioncomo tipoautoEmbedpara indicar a MongoDB Vector Search que genere automáticamente incrustaciones vectoriales para el campodescriptionutilizando el modelo de incrustaciónvoyage-4-largecada vez que se inserten o actualicen documentos en la colección.El campo
titlese utiliza como tipofilterpara prefiltrar los datos de la búsqueda semántica mediante el valor de cadena del campo. Esto permite excluir de los resultados de búsqueda los artículos que el usuario ya ha leído.
{ "fields": [ { "type": "autoEmbed", "modalitytype": "text", "path": "description", "model": "voyage-4-large" }, { "type": "filter", "path": "title" } ] }
Realiza búsquedas semánticas para ofrecer recomendaciones personalizadas.
Cuando un usuario visita tu sitio, toma el resumen de su sesión actual y úsalo como consulta para una búsqueda vectorial en tu catálogo de artículos. Dado que habilitaste la incrustación automática en el índice, MongoDB Vector Search genera automáticamente la incrustación para el resumen al momento de la consulta y la usa como vector de consulta efectivo.
Este ejemplo muestra una consulta de búsqueda vectorial simplificada que utiliza la sesión summary como vector de consulta y excluye los artículos que el usuario ya ha leído basándose en el campo titles:
[{ "$vectorSearch": { "index": "vector_index", // Vector index with autoEmbed on article descriptions "path": "description", "query": { "text": "<session-summary>" // Session summary from user_intent document }, "numCandidates": 100, "filter": { "title": { "$nin": [<read-titles>] } // Exclude articles the array of titles in the user_intent document } } }]
Lecciones clave
Esta arquitectura demuestra varios avances importantes en la creación de productos de datos modernos:
Latencia reducida: La integración de las llamadas LLM directamente en el procesador de flujo elimina múltiples saltos de red y capas de persistencia intermedias. El sistema transforma los clics en acciones concretas prácticamente en tiempo real.
Experiencia de desarrollador mejorada: defina flujos de trabajo con MQL basado en JSON, lo que permite a los equipos que ya conocen las consultas de MongoDB crear cargas de trabajo avanzadas de transmisión y basadas en IA sin necesidad de aprender nuevos DSL ni aprovisionar infraestructura adicional.
Personalización semántica: Vaya más allá de la coincidencia de palabras clave y los procesos por lotes nocturnos para crear sistemas que escuchen, piensen y respondan instantáneamente al comportamiento del usuario.
Autores
Vinod Krishnan, arquitecto de soluciones de MongoDB
Obtén más información
Para comprender cómo Atlas búsqueda vectorial impulsa la búsqueda semántica y permite el análisis en tiempo real, visita la página Atlas búsqueda vectorial.
Para aprender cómo MongoDB está transformando las operaciones de medios, lea el Artículo sobre personalización de medios impulsada por IA: MongoDB y búsqueda vectorial.
Para descubrir cómo MongoDB admite flujos de trabajo de medios modernos, visita la página MongoDB para medios y entretenimiento.
Para obtener más información sobre el procesamiento de flujos de Atlas, consulte la documentación de Atlas Stream Processing.