A Voyage AI se une ao MongoDB para impulsionar aplicativos de AI mais precisos e confiáveis no Atlas.

Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Desenvolvedor do MongoDB
Centro de desenvolvedores do MongoDB
chevron-right
Produtos
chevron-right
Atlas
chevron-right

Indexe qualquer coisa, pesquise tudo: Vector Search escalável com IA replicada, MongoDB e Hookdeck

Phil Leggetter20 min read • Published Dec 16, 2024 • Updated Dec 16, 2024
Atlas
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Neste tutorial, criaremos um aplicação MongoDB que permite aos usuários indexar e pesquisar qualquer coisa na Internet com um URL acessível publicamente. Isso mesmo! Solicite ao aplicativo para indexar um3 arquivo MP ou WAV, um arquivo HTML ou de texto, ou um4 arquivo Mover ou MP, e ele usará o poder da IA de replicação para criar uma representação textual desse arquivo, e o app armazena os resultados no MongoDB Atlas. Desde que um LLM possa analisar o recurso e criar uma representação textual, ele pode ser indexado. Em seguida, todos esses arquivos indexados, independentemente do tipo de arquivo de origem, podem ser pesquisados com texto usando MongoDB Atlas.
Usaremos o Hookdeck, um gateway de evento que fornece infraestrutura e ferramentas para criar e gerenciar aplicativos orientados a eventos, lidando perfeitamente com webhooks e solicitações de API. Neste tutorial, o Hookdeck garante uma comunicação confiável e escalável entre o aplicação MongoDB e a Replicate, uma plataforma de API LLM, atuando como uma fila sem servidor para chamadas de API que limitam a taxa e garante a entrega de webhook.
Começaremos configurando os serviços necessários e colocando o aplicação Pipeline em funcionamento. Em seguida, acompanharemos a J Em última análise , o conteúdo é disponibilizado para pesquisa dentro de um índice de pesquisa vetorial .

Visão geral da arquitetura

A escalabilidade geralmente é supervalorizada, mas continua sendo um aspecto importante da criação de aplicativos robustos. Um dos benefícios de usar provedores sem servidor e hospedados na nuvem é a capacidade de transferir o trabalho para serviços especializados. Também é necessário em qualquer arquitetura escalável uma forma de garantir que os serviços não sejam sobrecarregados e que seu aplicação seja tolerante a falhas. Neste tutorial, utilizamos vários desses serviços para lidar com diferentes aspectos do nosso aplicação.
Primeiro, vamos dar uma olhada nos serviços:
  • Replicar: fornece modelos de aprendizado de máquina de código aberto acessíveis por meio de uma API
  • MongoDB Atlas: Um conjunto integrado de serviços de dados centralizado em um banco de banco de dados na nuvem projetado para acelerar e simplificar a forma como você constrói com dados
  • Hookdeck: um gateway de evento que oferece às equipes de engenharia infraestrutura e ferramentas para criar e gerenciar aplicativos orientados a eventos
Em seguida, vejamos como os serviços são utilizados.
Arquitetura de todas as coisas
  • MongoDB Atlas: O MongoDB Atlas oferece recursos de armazenamento de banco de dados de dados e pesquisa vetorial, garantindo que nossos dados sejam armazenados de forma eficiente e consultados rapidamente.
  • Índice Todas as coisas: Este é o aplicação Frasco .
  • Hookdeck: o Hookdeck atua como uma fila sem servidor para a) garantir que as solicitações da API de replicação não excedam os limites de taxa e possam ser repetidas e b) ingestão, entrega e nova tentativa de webhooks da Replicate para garantir a ingestão confiável de eventos. Observação: também usaremos a CLI do Hookdeck para receber webhooks em nosso ambiente de desenvolvimento local.
  • Replicar: a replicação lida com a inferência de IA, produz texto e incorporações e nos permite descarregar as tarefas computacionalmente intensivas da execução de modelos de machine learning. Usamos diferentes LLMs para analisar diferentes tipos de conteúdo.
Ao utilizar esses serviços baseados na nuvem, podemos nos concentrar na criação da funcionalidade principal de nosso aplicação , garantindo que ele permaneça escalável e eficiente. Os webhooks, em particular, permitem a escalabilidade ao ativar fluxos de trabalho de IA assíncronos, transferindo esses cenários de alto uso de computação para serviços de terceiros e apenas recebendo chamadas de resposta por meio de um webhook quando o trabalho é concluído.

Pré-requisitos

Antes de começar, verifique se tem o seguinte:

Coloque o aplicativo em funcionamento

Vamos começar executando o aplicação e vê-lo em ação.

Obtenha o código

Comece obtendo a base de código do aplicação .
1git clone https://github.com/hookdeck/index-all-the-things.git
Ative um ambiente virtual com o POetry:
1poetry shell
E instale as dependências do aplicativo:
1poetry install

Configurar o aplicativo

O aplicação precisa de credenciais para os serviços com os quais interage.
Copie o arquivo de exemplo .env-example para um novo .env arquivo:
1cp .env-example .env
Atualize os valores dentro de .env da seguinte forma:
  • SECRET_KEY: uma chave secreta que será usada para assinar com segurança o cookie de sessão.SECRET_KEY Consulte os Docs do Frasco para obter mais informações.
  • MONGODB_CONNECTION_URI: preencha com uma string de conexão do MongoDB Atlas com um read e write para qualquer role de banco de dados de dados. Consulte os documentos Obter cadeia de conexão.
  • HOOKDECK_PROJECT_API_KEY: Obtenha uma chave de API na seção Projeto -> Configurações -> Segredos do painel do Hookdeck.
  • HOOKDECK_WEBHOOK_SECRET: Obtenha um segredo de assinatura na seção Projeto -> Configurações -> Segredos do painel do Hookdeck.
  • REPLICATE_API_TOKEN: crie um token de API no painel Replicar.
  • REPLICATE_WEBHOOKS_SECRET: vá para a seção Webhooks do painel de replicação e clique no botãoMostrar chave de assinatura.
  • HOOKDECK_REPLICATE_API_QUEUE_API_KEY, HOOKDECK_REPLICATE_API_QUEUE_URL, AUDIO_WEBHOOK_URL e EMBEDDINGS_WEBHOOK_URL serão automaticamente preenchidos na próxima etapa.

Criar conexões Hookdeck

As conexões Hookdeck são usadas para rotear solicitações HTTP de entrada recebidas por uma origem Hookdeck para um destino Hookdeck.
O create-hookdeck-connections.py script cria automaticamente as seguintes conexões do Hookdeck que:
  1. Direcione as solicitações feitas para URLs do Hookdeck para o aplicação executado localmente por meio da CLI do Hookdeck. Aqui, o Hookdeck é usado como uma fila de entrada.
  2. Direcione as solicitações feitas para uma URL do Hookdeck para a API de replicação. O Hookdeck é utilizado como uma fila de saída, nesta situação.
O script também atualiza o .env arquivo com as URLs de origem que lidam com os webhooks. Vamos analisar os detalhes do script.
Primeiro, verifique se você tem as importações necessárias e defina os cabeçalhos de autenticação e tipo de conteúdo para a solicitação da API do Hookdeck:
1import httpx
2import re
3import hashlib
4import os
5
6from config import Config
7
8headers = {
9 "Authorization": f"Bearer {Config.HOOKDECK_PROJECT_API_KEY}",
10 "Content-Type": "application/json",
11}
Em seguida, defina uma função para criar uma conexão com a API do Hookdeck:
1def create_connection(payload):
2 response = httpx.request(
3 "PUT",
4 "https://api.hookdeck.com/latest/connections",
5 headers=headers,
6 json=payload,
7 )
8 data = response.json()
9
10 if response.status_code != 200:
11 raise Exception(f"Failed to create connection: {data}")
12
13 return data
Essa função faz uma PUT solicitação para a API do Hookdeck com a carga útil da conexão upsert e lida com a resposta. Se o status da resposta não for 200 (OK), será gerada uma exceção. A função retorna a resposta JSON analisada.
A primeira conexão a ser criada é para a fila de saída da API de replicação:
1replicate_api_queue_api_key = hashlib.sha256(os.urandom(32)).hexdigest()
2replicate_api_queue = {
3 "name": "replicate-api-queue",
4 "source": {
5 "name": "replicate-api-queue",
6 "verification": {
7 "type": "API_KEY",
8 "configs": {
9 "header_key": Config.HOOKDECK_QUEUE_API_KEY_HEADER_NAME,
10 "api_key": replicate_api_queue_api_key,
11 },
12 },
13 },
14 "rules": [
15 {
16 "type": "retry",
17 "strategy": "exponential",
18 "count": 5,
19 "interval": 30000,
20 "response_status_codes": ["429", "500"],
21 }
22 ],
23 "destination": {
24 "name": "replicate-api",
25 "url": "https://api.replicate.com/v1/",
26 "auth_method": {
27 "type": "BEARER_TOKEN",
28 "config": {
29 "token": Config.REPLICATE_API_TOKEN,
30 },
31 },
32 },
33}
34
35replicate_api_connection = create_connection(replicate_api_queue)
A conexão tem um name, um source e um destination. O source também tem um name e um verification. O verification instrui o Hookdeck como autenticar solicitações. Como a conexão está funcionando como uma fila de API, estamos usando o API_KEY tipo com o header_key definido como o valor definido em Config.HOOKDECK_QUEUE_API_KEY_HEADER_NAME e o api_key valor definido como o hash gerado armazenado em replicate_api_queue_api_key.
O rules define a estratégia de nova tentativa de solicitação a ser usada ao interagir com a API de replicação. Nesse caso, estamos declarando que devemos tentar novamente até cinco vezes, usando um intervalo de 30000 milissegundos, mas aplicar uma estratégia de nova exponential tentativa de back-off. Além disso, estamos usando a response_status_codes opção para informar ao Hookdeck que só tente novamente 429 com as 500 respostas HTTP e.Consulte os Documentos de Repetições do Hookdeck para obter mais informações sobre as novas tentativas e os Documentos de Regras do Hookdeck para obter detalhes sobre outros tipos de regras disponíveis.
O url no destino é a URL base para a API de replicação. O Hookdeck usa o encaminhamento de caminho por padrão, portanto, qualquer caminho anexado ao URL de origem do Hookdeck também será anexado ao URL de destino. Por exemplo, uma solicitação para uma origem do Hookdeck com URL https://hkdk.events/{id}/predictions resultará em uma solicitação para um destino conectado de https://api.replicate.com/v1/predictions onde o destino tem um URL base https://api.replicate.com/v1/ de. O Hookdeck age como um proxy nesse cenário.
O auth_method no destino é do tipo BEARER_TOKEN com um config.token definido para o valor da REPLICATE_API_TOKEN variável de ambiente. Isso permite que o Hookdeck faça chamadas de API autenticadas para replicar.
Agora, crie uma conexão para os webhooks de replicação de áudio lidarem com os retornos de chamada de análise de áudio:
1replicate_audio = {
2 "name": "replicate-audio",
3 "source": {
4 "name": "replicate-audio",
5 "verification": {
6 "type": "REPLICATE",
7 "configs": {
8 "webhook_secret_key": Config.REPLICATE_WEBHOOKS_SECRET,
9 },
10 },
11 },
12 "rules": [
13 {
14 "type": "retry",
15 "count": 5,
16 "interval": 30000,
17 "strategy": "exponential",
18 "response_status_codes": ["!404"],
19 }
20 ],
21 "destination": {
22 "name": "cli-replicate-audio",
23 "cli_path": "/webhooks/audio",
24 },
25}
26
27replicate_audio_connection = create_connection(replicate_audio)
A conexão de chamada de resposta de resposta do webhook Replicar áudio usa um verification do tipo REPLICATE com um configs.webhook_secret_key valor definido a partir do REPLICATE_WEBHOOKS_SECRET valor que armazenamos no .env arquivo. Isso habilita e instrui o Hookdeck a verificar se o webhook veio do Replicate.
Os rules para essa conexão de entrada são semelhantes à conexão de saída e definem uma estratégia de nova tentativa de entrega a ser seguida se alguma solicitação ao endpoint do webhook do nosso aplicativo falhar. A única diferença é que o response_status_codes informa ao Hookdeck para não tentar novamente se receber uma 200 404 resposta ou.
O destination tem um name e um cli_path informando ao Hookdeck que o destino é a CLI do Hookdeck e o caminho para o qual a solicitação deve ser encaminhada é /webhooks/audio.
Em seguida, crie uma conexão para chamadas de resposta do webhook do Replicate Embeddings:
1replicate_embedding = {
2 "name": "replicate-embedding",
3 "source": {
4 "name": "replicate-embedding",
5 "verification": {
6 "type": "REPLICATE",
7 "configs": {
8 "webhook_secret_key": Config.REPLICATE_WEBHOOKS_SECRET,
9 },
10 },
11 },
12 "rules": [
13 {
14 "type": "retry",
15 "count": 5,
16 "interval": 30000,
17 "strategy": "exponential",
18 "response_status_codes": ["!200", "!404"],
19 }
20 ],
21 "destination": {
22 "name": "cli-replicate-embedding",
23 "cli_path": "/webhooks/embedding",
24 },
25}
26
27replicate_embedding_connection = create_connection(replicate_embedding)
Por fim, atualize o .env arquivo com alguns dos valores gerados:
1# Update .env
2with open(".env", "r") as file:
3 env_content = file.read()
4
5replicate_api_connection_url = replicate_api_connection["source"]["url"]
6audio_webhook_url = replicate_audio_connection["source"]["url"]
7embedding_webhook_url = replicate_embedding_connection["source"]["url"]
8
9# Replace the .env URLs in the .env content
10env_content = re.sub(
11 r"HOOKDECK_REPLICATE_API_QUEUE_API_KEY=.*",
12 f"HOOKDECK_REPLICATE_API_QUEUE_API_KEY={replicate_api_queue_api_key}",
13 env_content,
14)
15env_content = re.sub(
16 r"HOOKDECK_REPLICATE_API_QUEUE_URL=.*",
17 f"HOOKDECK_REPLICATE_API_QUEUE_URL={replicate_api_connection_url}",
18 env_content,
19)
20env_content = re.sub(
21 r"AUDIO_WEBHOOK_URL=.*", f"AUDIO_WEBHOOK_URL={audio_webhook_url}", env_content
22)
23env_content = re.sub(
24 r"EMBEDDINGS_WEBHOOK_URL=.*",
25 f"EMBEDDINGS_WEBHOOK_URL={embedding_webhook_url}",
26 env_content,
27)
28
29with open(".env", "w") as file:
30 file.write(env_content)
31
32print("Connections created successfully!")
Esse código lê o .env conteúdo atual do, substitui as linhas por espaços reservados da variável ambiental existente usando expressões regulares e grava o conteúdo atualizado de volta no .env arquivo. Isso garante que as variáveis de ambiente, como as URLs do webhook, estejam atualizadas.
Execute o roteiro:
1poetry run python create-hookdeck-connections.py
Verifique seu .env arquivo para garantir que todos os valores sejam preenchidos.
Além disso, navegue até a seçãoConexões do painel do Hookdeck e verifique a representação visual de suas conexões.
Conexão do Hookdeck no dashboard do Hookdeck

Criar índices do MongoDB Atlas

Para pesquisar um banco de banco de dados MongoDB de forma eficiente, você precisa de índices. Para a pesquisa vetorial do MongoDB , você deve criar um índice do Atlas Vector Search. O create-indexes.py script automatiza a criação e atualização dos índices de pesquisa no MongoDB utilizando a pymongo biblioteca do.
Primeiro, certifique-se de ter as importações necessárias e inicialize a conexão do banco de dados de dados:
1from allthethings.mongo import Database
2from pymongo.operations import SearchIndexModel
3
4database = Database()
5collection = database.get_collection()
Database é definido em allthethings/mongo.py e fornece acesso utilitário à assets coleção no iaat banco de banco de dados, com esses valores de string definidos config.py em.
Em seguida, certifique-se de que exista a coleção necessária dentro do banco de dados de dados para que os índices possam ser criados:
1if collection.name not in collection.database.list_collection_names():
2 print("Creating empty collection so indexes can be created.")
3 collection.database.create_collection(collection.name)
Com a coleção criada, defina uma função para criar ou atualizar índices de pesquisa:
1def create_or_update_search_index(index_name, index_definition, index_type):
2 indexes = list(collection.list_search_indexes(index_name))
3
4 if len(indexes) == 0:
5 print(f'Creating search index: "{index_name}"')
6 index_model = SearchIndexModel(
7 definition=index_definition,
8 name=index_name,
9 type=index_type,
10 )
11 collection.create_search_index(model=index_model)
12
13 else:
14 print(f'Search index "{index_name}" already exists. Updating.')
15 collection.update_search_index(name=index_name, definition=index_definition)
Esta função verifica se um índice com o index_name fornecido já existe. Ele cria um novo índice de pesquisa usando a definição e o tipo fornecidos se não existir. Se existir, ele atualizará o índice existente com a nova definição.
Agora, crie um índice de pesquisa vetorial para incorporações:
1vector_result = create_or_update_search_index(
2 "vector_index",
3 {
4 "fields": [
5 {
6 "type": "vector",
7 "path": "embedding",
8 "numDimensions": 768,
9 "similarity": "euclidean",
10 }
11 ]
12 },
13 "vectorSearch",
14)
Isso cria ou atualiza um índice de pesquisa vetorial chamado "vector_index" para o embedding campo.
Por fim, crie um índice de pesquisa para o url campo , pois ele é usado para determinar se uma URL já foi indexada:
1create_or_update_search_index(
2 "url_index",
3 {
4 "mappings": {
5 "fields": {
6 "url": {
7 "type": "string",
8 },
9 },
10 }
11 },
12 "search",
13)
14
15print("Indexes created successfully!")
Execute o roteiro:
1poetry run python create-indexes.py
Vá para a seção Atlas Search no painel do MongoDB Atlas e verifique se os índices de pesquisa foram criados.
Índices do Atlas Search do painel do MongoDB Atlas

Verifique se o aplicativo está funcionando

Em uma janela de terminal, execute o aplicação Firefly :
1poetry run python -m flask --app app --debug run
Em uma segunda janela do terminal, crie um localtunnel usando a CLI do Hookdeck:
1hookdeck listen 5000 '*'
Este comando escuta todas as fontes do Hookdeck conectadas a um destino CLI, roteando webhooks para o aplicação executado localmente na porta 5000.
Ao executar o comando, você verá uma saída semelhante a esta:
1Listening for events on Sources that have Connections with CLI Destinations
2
3Dashboard
4👉 Inspect and replay events: https://dashboard.hookdeck.com?team_id=tm_{id}
5
6Sources
7🔌 replicate-embedding URL: https://hkdk.events/{id}
8🔌 replicate-audio URL: https://hkdk.events/{id}
9
10Connections
11replicate-embedding -> replicate-embedding forwarding to /webhooks/embedding
12replicate-audio -> replicate-audio forwarding to /webhooks/audio
13
14> Ready! (^C to quit)
Abra localhost:5000 o no seu navegador da web para garantir que o aplicativo Spark esteja em execução.
Indexe a aplicação Todas as Coisas

Enviar conteúdo para análise e indexação

Com o aplicativo em execução, é hora de enviar um ativo para indexação.
Clique em PDF3(mp) sob o cabeçalho Exemplos para preencher a barra de pesquisa no aplicativo com um URL e clique em Enviar.
URL enviada para indexação
O envio do formulário envia a URL para um /process endpoint como uma POST solicitação . Vamos ver o que esse código faz.
Primeiro, defina a /process rota em app.py:
1@app.route("/process", methods=["POST"])
2def process():
3 url = request.form["url"]
4
5 parsed_url = urlparse(url)
6 if not all([parsed_url.scheme, parsed_url.netloc]):
7 flash("Invalid URL")
8 return redirect(url_for("index"))
Esta rota lida com a POST solicitação para o /process endpoint e recupera a URL a partir dos dados do formulário enviados pelo usuário. Ele valida a URL e redireciona para a página índice com uma mensagem de erro, caso não seja.
Em seguida, verifique se a URL já existe no banco de banco de dados:
1 database = Database()
2 collection = database.get_collection()
3
4 exists = collection.find_one({"url": url})
5
6 if exists is not None:
7 flash("URL has already been indexed")
8 return redirect(url_for("index"))
Se a URL já estiver indexada, mostre uma mensagem para o usuário e redirecione-o para a página de índice.
Verifique se o recurso existe:
1 req = urllib.request.Request(url, method="HEAD")
2 fetch = urllib.request.urlopen(req)
3
4 if fetch.status != 200:
5 flash("URL is not reachable")
6 return redirect(url_for("index"))
Este código envia uma HEAD solicitação para a URL para evitar o download do arquivo inteiro. Se o URL não estiver acessível (o código de status não 200 for), mostre uma mensagem ao usuário e redirecione-o para a página índice.
Recupere o tipo de conteúdo e o comprimento dos cabeçalhos de resposta:
1 content_length = fetch.headers["Content-Length"]
2 content_type = fetch.headers["Content-Type"]
Este código extrai o comprimento do conteúdo e o tipo de conteúdo dos cabeçalhos de resposta.
Recupere o processador de ativo apropriado com base no tipo de conteúdo:
1 processor = get_asset_processor(content_type)
2
3 if processor is None:
4 flash('Unsupported content type "' + content_type + '"')
5 return redirect(url_for("index"))
Se nenhum processador for encontrado para o tipo de conteúdo, mostre uma mensagem ao usuário e redirecione-o para a página de índice.
A get_asset_processor função, definida em allthethings/processors.py, retorna um processador usado para analisar o conteúdo de um ativo com base content_type em.
1def get_asset_processor(
2 content_type,
3):
4 if "audio/" in content_type:
5 return AudioProcessor()
6 elif "video/" in content_type:
7 return None
8 elif "image/" in content_type:
9 return None
10 else:
11 return None
Neste caso, o arquivo é um MP3 e o content_type é audio/mpeg, então retorne uma AudioProcessor instância.
Insira a URL, juntamente com seu tipo de conteúdo e comprimento, no banco de banco de dados com um status de SUBMITTED:
1 asset = collection.insert_one(
2 {
3 "url": url,
4 "content_type": content_type,
5 "content_length": content_length,
6 "status": "SUBMITTED",
7 }
8 )
Processe a URL utilizando o processador de ativos, um AudioProcessor e obtenha os resultados da predição:
1 try:
2 response = processor.process(asset.inserted_id, url)
3 except Exception as e:
4 app.logger.error("Error processing asset: %s", e)
5 collection.update_one(
6 filter={"url": url},
7 update={
8 "$set": {
9 "status": "PROCESSING_ERROR",
10 "error": str(e),
11 }
12 },
13 )
14 flash("Error processing asset")
15 return redirect(url_for("index"))
Vejamos o AudioProcessor de allthethings/processors.py em mais detalhes para entender o que isso faz:
1import httpx
2from config import Config
3
4...
5
6class AudioProcessor:
7 def process(self, id, url):
8 input = {
9 "audio": url,
10 "model": "large-v3",
11 "language": "auto",
12 "translate": False,
13 "temperature": 0,
14 "transcription": "plain text",
15 "suppress_tokens": "-1",
16 "logprob_threshold": -1,
17 "no_speech_threshold": 0.6,
18 "condition_on_previous_text": True,
19 "compression_ratio_threshold": 2.4,
20 "temperature_increment_on_fallback": 0.2,
21 }
22
23 payload = {
24 "version": "cdd97b257f93cb89dede1c7584e3f3dfc969571b357dbcee08e793740bedd854",
25 "input": input,
26 "webhook": f"{Config.AUDIO_WEBHOOK_URL}/{id}",
27 "webhook_events_filter": ["completed"],
28 }
29
30 response = httpx.request(
31 "POST",
32 f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions",
33 headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS,
34 json=payload,
35 )
36
37 return response.json()
process O método processa a URL de áudio criando uma solicitação de predição passando o payload como o corpo JSON.
payload inclui webhooks, que consiste no Config.AUDIO_WEBHOOK_URL com um caminho anexado (/{id}) que indica para qual ativo é a chamada de resposta de resposta. O uso do webhook_events_filter=["completed"] filtro informa ao Replicate para enviar um webhook somente quando a predição for concluída.
O payload.version instrui o Replicate a usar o modelo OpenAI Wisper de áudio para texto. O input inclui detalhes como o idioma deve ser detectado automaticamente e a transcrição deve estar em plain text.
Como estamos usando o Hookdeck como uma fila de API de saída, a solicitação usa o Config.HOOKDECK_REPLICATE_API_QUEUE_URL com o sufixo de caminho da /predications API. Os cabeçalhos de autenticação apropriados também são usados a partir de Config.HOOKDECK_QUEUE_AUTH_HEADERS.
De volta app.py a, atualize o banco de dados de dados com o status de processamento e os detalhes de predição pendentes:
1 collection.update_one(
2 filter={"url": url},
3 update={
4 "$set": {
5 "status": "PROCESSING",
6 "processor_response": response,
7 }
8 },
9 )
O processor_response valor é armazenado para fins de depuração, pois contém um ID de solicitação do Hookdeck que pode ser útil.
Passe uma mensagem de sucesso para o usuário e redirecione-o para a página índice:
1 flash(
2 message="Processing: " + url + " with content type: " + content_type,
3 category="success",
4 )
5
6 return redirect(url_for("index"))
Neste ponto, o aplicação Pipeline já baixou todo o trabalho para replicar e, do ponto de vista da viagem de dados, estamos aguardando o webhook concluído da predicação.

Lidar com webhook de conclusão da predição de áudio para texto

Depois que a replicação concluir a predicação, ela fará uma /webhooks/audio/<id> chamada de chamada de /webhooks/audio/<id> resposta do webhook para o Hookdeck. O Hookdeck ingere instantaneamente o webhook, verifica se o evento veio do Replicate e envia os dados para uma fila para processamento e entrega. Com base na configuração atual da conexão do Hookdeck, o evento do webhook é entregue na CLI e, em seguida, no endpoint do aplicação Firefly. Vejamos o código que lida com a solicitação.
Aqui está a /webhooks/audio/<id> definição de rota do app.py no :
1@app.route("/webhooks/audio/<id>", methods=["POST"])
2def webhook_audio(id):
3 if not verify_webhook(request):
4 app.logger.error("Webhook signature verification failed")
5 return jsonify({"error": "Webhook signature verification failed"}), 401
6
7 payload = request.json
8 app.logger.info("Audio payload received for id %s", id)
9 app.logger.debug(payload)
Esta rota lida com POST solicitações para o /webhooks/audio/<id> endpoint. O id parâmetro de caminho representa o ativo no banco de banco de dados MongoDB para o qual a chamada de resposta de resposta de áudio é. A carga útil JSON do chamada de resposta de resposta do Replicate.
Antes de lidar com o webhook, verificamos se o webhook veio do Hookdeck por meio de uma verify_webhook função. Se a verificação falhar, uma 401 resposta será retornada. Este é o código para verificar o webhook:
1def verify_webhook(request):
2 if Config.HOOKDECK_WEBHOOK_SECRET is None:
3 app.logger.error("No HOOKDECK_WEBHOOK_SECRET found.")
4 return False
5
6 hmac_header = request.headers.get("x-hookdeck-signature")
7
8 hash = base64.b64encode(
9 hmac.new(
10 Config.HOOKDECK_WEBHOOK_SECRET.encode(), request.data, hashlib.sha256
11 ).digest()
12 ).decode()
13
14 verified = hash == hmac_header
15 app.logger.debug("Webhook signature verification: %s", verified)
16 return verified
Isto lê o segredo do webhook do Hookdeck armazenado na HOOKDECK_WEBHOOK_SECRET variável de ambiente, gera um hash usando o segredo dos dados do webhook de entrada e o compara com o hash que foi enviado no x-hookdeck-signature cabeçalho. Se corresponderem, o webhook será verificado.
Em seguida, o status de processamento é determinado com base na presença de um erro na carga útil:
1 database = Database()
2 collection = database.get_collection()
3
4 status = (
5 "PROCESSING_ERROR" if "error" in payload and payload["error"] else "PROCESSED"
6 )
Se houver um erro, o status será definido PROCESSING_ERROR como. Caso contrário, é definido como PROCESSED.
O banco de dados de dados é atualizado com os resultados da transcrição e o status de processamento:
1 result = collection.find_one_and_update(
2 filter={"_id": ObjectId(id)},
3 update={
4 "$set": {
5 "status": status,
6 "text": payload["output"]["transcription"],
7 "replicate_response": payload,
8 }
9 },
10 return_document=True,
11 )
Isso localiza o documento no banco de banco de dados com o correspondente id e o atualiza com o novo status, a transcrição text e toda a carga útil da resposta Replicar.
Em seguida, verificamos se o documento foi encontrado:
1 if result is None:
2 app.logger.error(
3 "No document found for id %s to add audio transcript", payload["id"]
4 )
5 return jsonify({"error": "No document found to add audio transcript"}), 404
Se nenhum documento for encontrado para o id fornecido, um erro será registrado e uma resposta JSON com uma mensagem de erro será retornada. A 404 resposta informará ao Hookdeck que, embora a solicitação não tenha sido bem-sucedida, ela não deve ser repetida.
Com o áudio convertido em texto e armazenado, a viagem de dados passa para gerar incorporações via Replicar:
1 app.logger.info("Transcription updated")
2 app.logger.debug(result)
3
4 request_embeddings(id)
5
6 return "OK"
Em seguida, a request_embeddings função é chamada para gerar incorporações para o áudio processado. O endpoint retorna uma OK resposta para informar ao Hookdeck que o webhook foi processado com sucesso.

Gerar incorporação

A request_embeddings função aciona a geração de incorporações para a representação textual de um ativo indexado:
1def request_embeddings(id):
2 app.logger.info("Requesting embeddings for %s", id)
3
4 database = Database()
5 collection = database.get_collection()
6
7 asset = collection.find_one({"_id": id})
8
9 if asset is None:
10 raise RuntimeError("Asset not found")
11
12 if asset["status"] != "PROCESSED":
13 raise RuntimeError("Asset has not been processed")
Se esse ativo com o passado id não for encontrado ou o status do ativo não for PROCESSED, o que indica que uma representação textual foi criada, um RuntimeError será gerado.

acione a geração de incorporação com chamada de chamada de resposta webhook

Em seguida, as incorporações são geradas para o ativo processado usando o AsyncEmbeddingsGenerator:
1 generator = AsyncEmbeddingsGenerator()
2
3 try:
4 response = generator.generate(id, asset["text"])
5 except Exception as e:
6 app.logger.error("Error generating embeddings for %s: %s", id, e)
7 raise
Isso inicializa o AsyncEmbeddingsGenerator e chama a generate função na instância, passando o ID do ativo que está sendo indexado e a representação textual.
A AsyncEmbeddingsGenerator definição no allthethings/generators.py segue um padrão semelhante ao processador usado anteriormente:
1import httpx
2from config import Config
3
4
5class AsyncEmbeddingsGenerator:
6 def generate(self, id, text):
7 payload = {
8 "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305",
9 "input": {"text": text},
10 "webhook": f"{Config.EMBEDDINGS_WEBHOOK_URL}/{id}",
11 "webhook_events_filter": ["completed"],
12 }
13
14 response = httpx.request(
15 "POST",
16 f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions",
17 headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS,
18 json=payload,
19 )
20
21 return response.json()
O generate método recebe o ativo id e o text para o qual as incorporações devem ser geradas.
Uma solicitação payload é criada contendo um version que identifica que o modeloreplication/all-mpnet-base-v é usado para gerar embeddings e que o2 text para a incorporação é passado dentro de um input parâmetro.
A webhook propriedade é definida Config.EMBEDDINGS_WEBHOOK_URL como com um caminho anexado (/{id}) que indica para qual ativo é a chamada de resposta de resposta. Assim como antes, o uso do webhook_events_filter=["completed"] filtro informa ao Replicate para enviar um webhook somente quando a predição for concluída.
Como essa é uma chamada assíncrona, o Hookdeck é novamente usado para colocar HOOKDECK_REPLICATE_API_QUEUE_URL na /predications fila a solicitação da API de replicação por meio de uma chamada para o endpoint com o caminho.
O método retorna a resposta do Hookdeck.
De volta ao app.py, atualize o banco de dados de dados com o status e o ID da solicitação de incorporação:
1 collection.update_one(
2 filter={"_id": ObjectId(id)},
3 update={
4 "$set": {
5 "status": "GENERATING_EMBEDDINGS",
6 "generator_response": response,
7 }
8 },
9 )
Atualize o documento no banco de banco de dados com o novo status GENERATING_EMBEDDINGS e a resposta da fila do Hookdeck.
A solicitação para gerar de forma assíncrona as incorporações foi acionada e o trabalho transferido para Replicar. Quando o resultado for lido, um webhook será acionado com o resultado.

Gerenciar chamada de resposta de resposta do webhook de geração incorporada

Depois que o Replicate gerar a incorporação, um retorno de chamada de resposta de webhook é feito para a /webhooks/embedding/<id> rota em nosso aplicação Frask. Essa rota recebe a carga útil do webhook, verifica se ela veio do Hookdeck, atualiza o banco de dados de dados com os resultados da incorporação e define o status apropriado.
Esta é a definição da rota:
1@app.route("/webhooks/audio/<id>", methods=["POST"])
2def webhook_audio(id):
3 if not verify_webhook(request):
4 app.logger.error("Webhook signature verification failed")
5 return jsonify({"error": "Webhook signature verification failed"}), 401
6
7 payload = request.json
8 app.logger.info("Audio payload received for id %s", id)
9 app.logger.debug(payload)
Essa rota lida com POST solicitações para o /webhooks/embedding/<id> endpoint e recebe o id parâmetro de caminho. Ele verifica se a solicitação veio do Hookdeck e, em caso afirmativo, recupera a carga útil JSON da solicitação. Caso contrário, retorna uma 401 resposta.
Em seguida, ele verifica se há erros:
1 status = (
2 "EMBEDDINGS_ERROR" if "error" in payload and payload["error"] else "SEARCHABLE"
3 )
Se houver um erro, o status será definido EMBEDDINGS_ERROR como. Caso contrário, é definido como SEARCHABLE.
Em seguida, o vetor de incorporação é extraído da carga útil e o banco de dados de dados é atualizado com os detalhes da incorporação e o novo status:
1 embedding = payload["output"][0]["embedding"]
2
3 database = Database()
4 collection = database.get_collection()
5
6 result = collection.update_one(
7 filter={"_id": ObjectId(id)},
8 update={
9 "$set": {
10 "status": status,
11 "embedding": embedding,
12 "replicate_embeddings_response": payload,
13 }
14 },
15 )
Isso encontra o documento no banco de banco de dados com o correspondente id e o atualiza com o novo status, incorporação e toda a carga útil.
Verifique se o documento foi encontrado e atualizado:
1 if result.matched_count == 0:
2 app.logger.error(
3 "No document found for id %s to update embedding", payload["id"]
4 )
5 return jsonify({"error": "No document found to update embedding"}), 404
6
7 return "OK"
Se nenhum documento for encontrado para o id fornecido, um erro será registrado e uma resposta JSON com uma mensagem de erro será retornada com um 404 status. Se a atualização for um sucesso, retorne um OK para informar ao Hookdeck que o webhook foi processado.
Com o vetor incorporação armazenado na embedding propriedade , agora ele pode ser pesquisado com o MongoDB devido ao índice de pesquisa vetorial definido anteriormente.
A pesquisa é orientada para o usuário. O usuário insere um termo de pesquisa e envia um formulário. Essa query de pesquisa é tratada e processada, e o resultado é retornado e exibido. Idealmente, esta é uma experiência em tempo real, então as operações são realizadas de forma síncrona.
Vamos analisar cada uma dessas etapas.

Lidar com o envio da pesquisa

O usuário navega até o /search endpoint em seu navegador da web, insere um termo de pesquisa e envia o formulário, fazendo uma GET solicitação para o /search endpoint:
1@app.route("/search", methods=["GET"])
2def search():
3 query = request.args.get("query")
4 if query is None:
5 return render_template("search.html", results=[])
6
7 app.logger.info("Query submitted")
8 app.logger.debug(query)
9
10 results = query_vector_search(query)
11
12 results = format_results(results)
13
14 app.logger.debug("Formatted search results", results)
15
16 return render_template("search.html", results=results, query=query)
A search função no aplicação Spark lida com GET solicitações para o /search endpoint. Ele recupera a pesquisa query do request.args.get enviado pelo usuário. Se não houver nenhuma query, o search modelo será renderizado. Caso contrário, uma pesquisa vetorial é executada utilizando a query_vector_search função. O resultado é então formatado passando os resultados para a format_results função. Os resultados formatados são então renderizados usando o search.html modelo.

Gerando incorporações de query de pesquisa

A query_vector_search função gera incorporações para a query, executa uma pesquisa vetorial usando a query fornecida pelo usuário e recupera documentos correspondentes da collection do MongoDB .
1def query_vector_search(q):
2 generator = SyncEmbeddingsGenerator()
3
4 try:
5 generator_response = generator.generate(q)
6 app.logger.debug(generator_response)
7 except Exception as e:
8 app.logger.error("Error generating embeddings: %s", e)
9 return None
10
11 if generator_response["status"] != "completed":
12 app.logger.debug("Embeddings generation timed out")
13 return None
14
15 query_embedding = generator_response["output"][0]["embedding"]
A função pega a queryq e usa o SyncEmbeddingsGenerator para gerar a incorporação para a query de pesquisa chamando sua generate função e passando a query. Se a criação da incorporação falhar por vários motivos, None será retornado.
O SyncEmbeddingsGenerated é usado para gerar de forma síncrona incorporações para a query de pesquisa. Esta operação é síncrona porque a solicitação é orientada ao usuário e requer uma resposta direta. SyncEmbeddingsGenerated é definido em allthethings/generators.py:
1class SyncEmbeddingsGenerator:
2 def generate(self, text):
3 payload = {
4 "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305",
5 "input": {"text": text},
6 }
7
8 response = httpx.request(
9 "POST",
10 "https://api.replicate.com/v1/predictions",
11 headers={**Config.REPLICATE_API_AUTH_HEADERS, "Prefer": "wait"},
12 json=payload,
13 timeout=60,
14 )
15
16 return response.json()
A generate função recebe o text para gerar uma incorporação. Uma solicitação síncrona é feita diretamente para a API HTTP de replicação, passando o mesmo modelode replication/all-mpnet-base-v2 version usado na solicitação de incorporação assíncrona. O "Prefer": "Wait" cabeçalho e os timeout valores são definidos para habilitar solicitações HTTP síncronas de longa duração. Além disso, o token da API de replicação está incluído nos cabeçalhos por meio Config.REPLICATE_API_AUTH_HEADERS do .
O JSON de resposta é retornado para a função de chamada.

Criar query de Vector Search

De volta ao query_vector_search, o resultado da incorporação é usado para construir a query de pesquisa vetorial .
1 ...
2
3 query_embedding = generate_response[0]["embedding"]
4
5 vs_query = {
6 "index": "vector_index",
7 "path": "embedding",
8 "queryVector": query_embedding,
9 "numCandidates": 100,
10 "limit": 10,
11 }
12
13 new_search_query = {"$vectorSearch": vs_query}
14
15 app.logger.info("Vector search query created")
16 app.logger.debug(new_search_query)
vs_query representa a pesquisa vetorial a ser executada. Identifica o index a ser consultado como vector_index; o path para a propriedade, embedding, a query está ativada; e o resultado da query de texto no formato de incorporação ("queryVector": query_embedding). Consulte os documentos do MongoDB Vector Search para obter mais informações, incluindo a finalidade das numCandidates propriedades limit e.

Recuperar resultados do Vector Search

Em seguida, a função define a projeção para especificar quais campos incluir nos resultados da pesquisa.
1 project = {
2 "$project": {
3 "score": {"$meta": "vectorSearchScore"},
4 "_id": 0,
5 "url": 1,
6 "content_type": 1,
7 "content_length": 1,
8 "text": 1,
9 }
10 }
A projeção inclui a pontuação do vetor de pesquisa, URL, tipo de conteúdo, comprimento do conteúdo e texto. Para obter mais informações sobre a pontuação, consulte a Documentação de pontuação do Atlas Vector Search.
A função então executa a query de agregação usando a query de pesquisa vetorial construída e projeção:
1 database = Database()
2 collection = database.get_collection()
3
4 app.logger.info("Vector search query without post filter")
5 res = list(collection.aggregate([new_search_query, project]))
6
7 app.logger.info("Vector search query run")
8 app.logger.debug(res)
9 return res
No geral, a query_vector_search função executa uma pesquisa vetorial usando a query fornecida pelo usuário, gera incorporações para a query e recupera documentos correspondentes do banco de banco de dados MongoDB .

Formatar e exibir os resultados do Vector Search

Em seguida, dentro search_post de em app.py, os resultados são formatados para renderização:
1 results = format_results(results)
E dentro format_results app.pyde , também definido em :
1def format_results(results):
2 formatted_results = []
3 for _idx, index in enumerate(results):
4 parse_result = urlparse(index["url"])
5 parsed_url = {
6 "netloc": parse_result.netloc,
7 "path": parse_result.path,
8 "params": parse_result.params,
9 "query": parse_result.query,
10 "fragment": parse_result.fragment,
11 "hostname": parse_result.hostname,
12 "last_part": parse_result.path.rstrip("/").split("/")[-1],
13 }
14 index["parsed_url"] = parsed_url
15 formatted_results.append(index)
16
17 return formatted_results
A format_results função itera sobre o resultado da pesquisa vetorial e retorna uma array com cada elemento contendo o resultado junto com uma parsed_url propriedade com informações sobre o ativo indexado.
Finalmente, de volta à POST /search rota, os resultados são exibidos:
1@app.route("/search", methods=["POST"])
2def search_post():
3 ...
4
5 results = format_results(results)
6
7 return render_template("search.html", results=results, query=query)
Isso renderiza o search.html modelo, passando os resultados formatados e a query original para o modelo para exibição.
Resultados da pesquisa

Conclusão

Neste tutorial, percorremos os componentes usados em um aplicação MongoDB que pode indexar e executar uma pesquisa de texto em qualquer ativo com uma URL pública . Os dados são armazenados e a pesquisa vetorial ocorre via MongoDB. A geração de inferência e incorporação de IA é realizada pelo Replicar. O Hookdeck é usado como uma fila sem servidor entre o aplicação Spark e o Replicate para gerenciar a limitação da taxa de solicitações de API para o Replicate e para verificar, enfileirar e garantir a entrega de chamadas de webhook assíncronas do Replicate de volta para o aplicação Firefly.
Se ainda não o tiver feito isso, você pode obter o código Index All The Things no Github. Há uma série de questões de recursos para adicionar suporte para tipos de conteúdo adicionais, então fique à vontade para se envolver.
Por fim, se você tiver alguma dúvida ou ideia, compartilhe-a por meio de um problema no repositório do Github ou enviando-me uma mensagem no X ou Bloskey.
Principais comentários nos fóruns
Avatar do Comentarista do Fórum
Fil_LeggetterFil Leggetter4 trimestres atrás

Olá :onda:

Sou o autor do artigo e estou muito interessado em ouvir comentários e ideias.

Veja mais nos fóruns

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Como usar os vetores quantizados do Cohere para criar aplicativos de AI econômicos com o MongoDB


Oct 03, 2024 | 23 min read
Tutorial

Utilizar Globbing e Proveniência de Coleção no Data Federation


Jun 28, 2023 | 5 min read
Tutorial

Criando um localizador de restaurantes usando Atlas, Neurelo e AWS Lambda


Apr 02, 2024 | 8 min read
Tutorial

Inicie um fluxo de trabalho RAG totalmente gerenciado com o MongoDB Atlas e o Amazon Bedrock


May 08, 2024 | 6 min read
Sumário