Indexe qualquer coisa, pesquise tudo: Vector Search escalável com IA replicada, MongoDB e Hookdeck
Avalie esse Tutorial
Neste tutorial, criaremos um aplicação MongoDB que permite aos usuários indexar e pesquisar qualquer coisa na Internet com um URL acessível publicamente. Isso mesmo! Solicite ao aplicativo para indexar um3 arquivo MP ou WAV, um arquivo HTML ou de texto, ou um4 arquivo Mover ou MP, e ele usará o poder da IA de replicação para criar uma representação textual desse arquivo, e o app armazena os resultados no MongoDB Atlas. Desde que um LLM possa analisar o recurso e criar uma representação textual, ele pode ser indexado. Em seguida, todos esses arquivos indexados, independentemente do tipo de arquivo de origem, podem ser pesquisados com texto usando MongoDB Atlas.
Usaremos o Hookdeck, um gateway de evento que fornece infraestrutura e ferramentas para criar e gerenciar aplicativos orientados a eventos, lidando perfeitamente com webhooks e solicitações de API. Neste tutorial, o Hookdeck garante uma comunicação confiável e escalável entre o aplicação MongoDB e a Replicate, uma plataforma de API LLM, atuando como uma fila sem servidor para chamadas de API que limitam a taxa e garante a entrega de webhook.
Começaremos configurando os serviços necessários e colocando o aplicação Pipeline em funcionamento. Em seguida, acompanharemos a J Em última análise , o conteúdo é disponibilizado para pesquisa dentro de um índice de pesquisa vetorial .
A escalabilidade geralmente é supervalorizada, mas continua sendo um aspecto importante da criação de aplicativos robustos. Um dos benefícios de usar provedores sem servidor e hospedados na nuvem é a capacidade de transferir o trabalho para serviços especializados. Também é necessário em qualquer arquitetura escalável uma forma de garantir que os serviços não sejam sobrecarregados e que seu aplicação seja tolerante a falhas. Neste tutorial, utilizamos vários desses serviços para lidar com diferentes aspectos do nosso aplicação.
Primeiro, vamos dar uma olhada nos serviços:
- Replicar: fornece modelos de aprendizado de máquina de código aberto acessíveis por meio de uma API
- MongoDB Atlas: Um conjunto integrado de serviços de dados centralizado em um banco de banco de dados na nuvem projetado para acelerar e simplificar a forma como você constrói com dados
- Hookdeck: um gateway de evento que oferece às equipes de engenharia infraestrutura e ferramentas para criar e gerenciar aplicativos orientados a eventos
Em seguida, vejamos como os serviços são utilizados.

- MongoDB Atlas: O MongoDB Atlas oferece recursos de armazenamento de banco de dados de dados e pesquisa vetorial, garantindo que nossos dados sejam armazenados de forma eficiente e consultados rapidamente.
- Índice Todas as coisas: Este é o aplicação Frasco .
- Hookdeck: o Hookdeck atua como uma fila sem servidor para a) garantir que as solicitações da API de replicação não excedam os limites de taxa e possam ser repetidas e b) ingestão, entrega e nova tentativa de webhooks da Replicate para garantir a ingestão confiável de eventos. Observação: também usaremos a CLI do Hookdeck para receber webhooks em nosso ambiente de desenvolvimento local.
- Replicar: a replicação lida com a inferência de IA, produz texto e incorporações e nos permite descarregar as tarefas computacionalmente intensivas da execução de modelos de machine learning. Usamos diferentes LLMs para analisar diferentes tipos de conteúdo.
Ao utilizar esses serviços baseados na nuvem, podemos nos concentrar na criação da funcionalidade principal de nosso aplicação , garantindo que ele permaneça escalável e eficiente. Os webhooks, em particular, permitem a escalabilidade ao ativar fluxos de trabalho de IA assíncronos, transferindo esses cenários de alto uso de computação para serviços de terceiros e apenas recebendo chamadas de resposta por meio de um webhook quando o trabalho é concluído.
Antes de começar, verifique se tem o seguinte:
- Uma conta Hookdeck gratuita
- Uma conta gratuita do MongoDB Atlas
- Escrever para gerenciamento de pacote
Vamos começar executando o aplicação e vê-lo em ação.
Comece obtendo a base de código do aplicação .
1 git clone https://github.com/hookdeck/index-all-the-things.git
Ative um ambiente virtual com o POetry:
1 poetry shell
E instale as dependências do aplicativo:
1 poetry install
O aplicação precisa de credenciais para os serviços com os quais interage.
Copie o arquivo de exemplo
.env-example
para um novo .env
arquivo:1 cp .env-example .env
Atualize os valores dentro de
.env
da seguinte forma:SECRET_KEY
: uma chave secreta que será usada para assinar com segurança o cookie de sessão.SECRET_KEY
Consulte os Docs do Frasco para obter mais informações.MONGODB_CONNECTION_URI
: preencha com uma string de conexão do MongoDB Atlas com um read e write para qualquer role de banco de dados de dados. Consulte os documentos Obter cadeia de conexão.HOOKDECK_PROJECT_API_KEY
: Obtenha uma chave de API na seção Projeto -> Configurações -> Segredos do painel do Hookdeck.HOOKDECK_WEBHOOK_SECRET
: Obtenha um segredo de assinatura na seção Projeto -> Configurações -> Segredos do painel do Hookdeck.REPLICATE_WEBHOOKS_SECRET
: vá para a seção Webhooks do painel de replicação e clique no botãoMostrar chave de assinatura.HOOKDECK_REPLICATE_API_QUEUE_API_KEY
,HOOKDECK_REPLICATE_API_QUEUE_URL
,AUDIO_WEBHOOK_URL
eEMBEDDINGS_WEBHOOK_URL
serão automaticamente preenchidos na próxima etapa.
As conexões Hookdeck são usadas para rotear solicitações HTTP de entrada recebidas por uma origem Hookdeck para um destino Hookdeck.
O
create-hookdeck-connections.py
script cria automaticamente as seguintes conexões do Hookdeck que:- Direcione as solicitações feitas para URLs do Hookdeck para o aplicação executado localmente por meio da CLI do Hookdeck. Aqui, o Hookdeck é usado como uma fila de entrada.
- Direcione as solicitações feitas para uma URL do Hookdeck para a API de replicação. O Hookdeck é utilizado como uma fila de saída, nesta situação.
O script também atualiza o
.env
arquivo com as URLs de origem que lidam com os webhooks. Vamos analisar os detalhes do script.Primeiro, verifique se você tem as importações necessárias e defina os cabeçalhos de autenticação e tipo de conteúdo para a solicitação da API do Hookdeck:
1 import httpx 2 import re 3 import hashlib 4 import os 5 6 from config import Config 7 8 headers = { 9 "Authorization": f"Bearer {Config.HOOKDECK_PROJECT_API_KEY}", 10 "Content-Type": "application/json", 11 }
Em seguida, defina uma função para criar uma conexão com a API do Hookdeck:
1 def create_connection(payload): 2 response = httpx.request( 3 "PUT", 4 "https://api.hookdeck.com/latest/connections", 5 headers=headers, 6 json=payload, 7 ) 8 data = response.json() 9 10 if response.status_code != 200: 11 raise Exception(f"Failed to create connection: {data}") 12 13 return data
Essa função faz uma
PUT
solicitação para a API do Hookdeck com a carga útil da conexão upsert e lida com a resposta. Se o status da resposta não for 200
(OK), será gerada uma exceção. A função retorna a resposta JSON analisada.A primeira conexão a ser criada é para a fila de saída da API de replicação:
1 replicate_api_queue_api_key = hashlib.sha256(os.urandom(32)).hexdigest() 2 replicate_api_queue = { 3 "name": "replicate-api-queue", 4 "source": { 5 "name": "replicate-api-queue", 6 "verification": { 7 "type": "API_KEY", 8 "configs": { 9 "header_key": Config.HOOKDECK_QUEUE_API_KEY_HEADER_NAME, 10 "api_key": replicate_api_queue_api_key, 11 }, 12 }, 13 }, 14 "rules": [ 15 { 16 "type": "retry", 17 "strategy": "exponential", 18 "count": 5, 19 "interval": 30000, 20 "response_status_codes": ["429", "500"], 21 } 22 ], 23 "destination": { 24 "name": "replicate-api", 25 "url": "https://api.replicate.com/v1/", 26 "auth_method": { 27 "type": "BEARER_TOKEN", 28 "config": { 29 "token": Config.REPLICATE_API_TOKEN, 30 }, 31 }, 32 }, 33 } 34 35 replicate_api_connection = create_connection(replicate_api_queue)
A conexão tem um
name
, um source
e um destination
. O source
também tem um name
e um verification
. O verification
instrui o Hookdeck como autenticar solicitações. Como a conexão está funcionando como uma fila de API, estamos usando o API_KEY
tipo com o header_key
definido como o valor definido em Config.HOOKDECK_QUEUE_API_KEY_HEADER_NAME
e o api_key
valor definido como o hash gerado armazenado em replicate_api_queue_api_key
.O
rules
define a estratégia de nova tentativa de solicitação a ser usada ao interagir com a API de replicação. Nesse caso, estamos declarando que devemos tentar novamente até cinco vezes, usando um intervalo de 30000
milissegundos, mas aplicar uma estratégia de nova exponential
tentativa de back-off. Além disso, estamos usando a response_status_codes
opção para informar ao Hookdeck que só tente novamente 429
com as 500
respostas HTTP e.Consulte os Documentos de Repetições do Hookdeck para obter mais informações sobre as novas tentativas e os Documentos de Regras do Hookdeck para obter detalhes sobre outros tipos de regras disponíveis.O
url
no destino é a URL base para a API de replicação. O Hookdeck usa o encaminhamento de caminho por padrão, portanto, qualquer caminho anexado ao URL de origem do Hookdeck também será anexado ao URL de destino. Por exemplo, uma solicitação para uma origem do Hookdeck com URL https://hkdk.events/{id}/predictions
resultará em uma solicitação para um destino conectado de https://api.replicate.com/v1/predictions
onde o destino tem um URL base https://api.replicate.com/v1/
de. O Hookdeck age como um proxy nesse cenário.O
auth_method
no destino é do tipo BEARER_TOKEN
com um config.token
definido para o valor da REPLICATE_API_TOKEN
variável de ambiente. Isso permite que o Hookdeck faça chamadas de API autenticadas para replicar.Agora, crie uma conexão para os webhooks de replicação de áudio lidarem com os retornos de chamada de análise de áudio:
1 replicate_audio = { 2 "name": "replicate-audio", 3 "source": { 4 "name": "replicate-audio", 5 "verification": { 6 "type": "REPLICATE", 7 "configs": { 8 "webhook_secret_key": Config.REPLICATE_WEBHOOKS_SECRET, 9 }, 10 }, 11 }, 12 "rules": [ 13 { 14 "type": "retry", 15 "count": 5, 16 "interval": 30000, 17 "strategy": "exponential", 18 "response_status_codes": ["!404"], 19 } 20 ], 21 "destination": { 22 "name": "cli-replicate-audio", 23 "cli_path": "/webhooks/audio", 24 }, 25 } 26 27 replicate_audio_connection = create_connection(replicate_audio)
A conexão de chamada de resposta de resposta do webhook Replicar áudio usa um
verification
do tipo REPLICATE
com um configs.webhook_secret_key
valor definido a partir do REPLICATE_WEBHOOKS_SECRET
valor que armazenamos no .env
arquivo. Isso habilita e instrui o Hookdeck a verificar se o webhook veio do Replicate.Os
rules
para essa conexão de entrada são semelhantes à conexão de saída e definem uma estratégia de nova tentativa de entrega a ser seguida se alguma solicitação ao endpoint do webhook do nosso aplicativo falhar. A única diferença é que o response_status_codes
informa ao Hookdeck para não tentar novamente se receber uma 200
404
resposta ou.O
destination
tem um name
e um cli_path
informando ao Hookdeck que o destino é a CLI do Hookdeck e o caminho para o qual a solicitação deve ser encaminhada é /webhooks/audio
.Em seguida, crie uma conexão para chamadas de resposta do webhook do Replicate Embeddings:
1 replicate_embedding = { 2 "name": "replicate-embedding", 3 "source": { 4 "name": "replicate-embedding", 5 "verification": { 6 "type": "REPLICATE", 7 "configs": { 8 "webhook_secret_key": Config.REPLICATE_WEBHOOKS_SECRET, 9 }, 10 }, 11 }, 12 "rules": [ 13 { 14 "type": "retry", 15 "count": 5, 16 "interval": 30000, 17 "strategy": "exponential", 18 "response_status_codes": ["!200", "!404"], 19 } 20 ], 21 "destination": { 22 "name": "cli-replicate-embedding", 23 "cli_path": "/webhooks/embedding", 24 }, 25 } 26 27 replicate_embedding_connection = create_connection(replicate_embedding)
Por fim, atualize o
.env
arquivo com alguns dos valores gerados:1 # Update .env 2 with open(".env", "r") as file: 3 env_content = file.read() 4 5 replicate_api_connection_url = replicate_api_connection["source"]["url"] 6 audio_webhook_url = replicate_audio_connection["source"]["url"] 7 embedding_webhook_url = replicate_embedding_connection["source"]["url"] 8 9 # Replace the .env URLs in the .env content 10 env_content = re.sub( 11 r"HOOKDECK_REPLICATE_API_QUEUE_API_KEY=.*", 12 f"HOOKDECK_REPLICATE_API_QUEUE_API_KEY={replicate_api_queue_api_key}", 13 env_content, 14 ) 15 env_content = re.sub( 16 r"HOOKDECK_REPLICATE_API_QUEUE_URL=.*", 17 f"HOOKDECK_REPLICATE_API_QUEUE_URL={replicate_api_connection_url}", 18 env_content, 19 ) 20 env_content = re.sub( 21 r"AUDIO_WEBHOOK_URL=.*", f"AUDIO_WEBHOOK_URL={audio_webhook_url}", env_content 22 ) 23 env_content = re.sub( 24 r"EMBEDDINGS_WEBHOOK_URL=.*", 25 f"EMBEDDINGS_WEBHOOK_URL={embedding_webhook_url}", 26 env_content, 27 ) 28 29 with open(".env", "w") as file: 30 file.write(env_content) 31 32 print("Connections created successfully!")
Esse código lê o
.env
conteúdo atual do, substitui as linhas por espaços reservados da variável ambiental existente usando expressões regulares e grava o conteúdo atualizado de volta no .env
arquivo. Isso garante que as variáveis de ambiente, como as URLs do webhook, estejam atualizadas.Execute o roteiro:
1 poetry run python create-hookdeck-connections.py
Verifique seu
.env
arquivo para garantir que todos os valores sejam preenchidos.Além disso, navegue até a seçãoConexões do painel do Hookdeck e verifique a representação visual de suas conexões.

Para pesquisar um banco de banco de dados MongoDB de forma eficiente, você precisa de índices. Para a pesquisa vetorial do MongoDB , você deve criar um índice do Atlas Vector Search. O
create-indexes.py
script automatiza a criação e atualização dos índices de pesquisa no MongoDB utilizando a pymongo
biblioteca do.Primeiro, certifique-se de ter as importações necessárias e inicialize a conexão do banco de dados de dados:
1 from allthethings.mongo import Database 2 from pymongo.operations import SearchIndexModel 3 4 database = Database() 5 collection = database.get_collection()
Database
é definido em allthethings/mongo.py
e fornece acesso utilitário à assets
coleção no iaat
banco de banco de dados, com esses valores de string definidos config.py
em.Em seguida, certifique-se de que exista a coleção necessária dentro do banco de dados de dados para que os índices possam ser criados:
1 if collection.name not in collection.database.list_collection_names(): 2 print("Creating empty collection so indexes can be created.") 3 collection.database.create_collection(collection.name)
Com a coleção criada, defina uma função para criar ou atualizar índices de pesquisa:
1 def create_or_update_search_index(index_name, index_definition, index_type): 2 indexes = list(collection.list_search_indexes(index_name)) 3 4 if len(indexes) == 0: 5 print(f'Creating search index: "{index_name}"') 6 index_model = SearchIndexModel( 7 definition=index_definition, 8 name=index_name, 9 type=index_type, 10 ) 11 collection.create_search_index(model=index_model) 12 13 else: 14 print(f'Search index "{index_name}" already exists. Updating.') 15 collection.update_search_index(name=index_name, definition=index_definition)
Esta função verifica se um índice com o
index_name
fornecido já existe. Ele cria um novo índice de pesquisa usando a definição e o tipo fornecidos se não existir. Se existir, ele atualizará o índice existente com a nova definição.Agora, crie um índice de pesquisa vetorial para incorporações:
1 vector_result = create_or_update_search_index( 2 "vector_index", 3 { 4 "fields": [ 5 { 6 "type": "vector", 7 "path": "embedding", 8 "numDimensions": 768, 9 "similarity": "euclidean", 10 } 11 ] 12 }, 13 "vectorSearch", 14 )
Isso cria ou atualiza um índice de pesquisa vetorial chamado "vector_index" para o
embedding
campo.Por fim, crie um índice de pesquisa para o
url
campo , pois ele é usado para determinar se uma URL já foi indexada:1 create_or_update_search_index( 2 "url_index", 3 { 4 "mappings": { 5 "fields": { 6 "url": { 7 "type": "string", 8 }, 9 }, 10 } 11 }, 12 "search", 13 ) 14 15 print("Indexes created successfully!")
Execute o roteiro:
1 poetry run python create-indexes.py
Vá para a seção Atlas Search no painel do MongoDB Atlas e verifique se os índices de pesquisa foram criados.

Em uma janela de terminal, execute o aplicação Firefly :
1 poetry run python -m flask --app app --debug run
Em uma segunda janela do terminal, crie um localtunnel usando a CLI do Hookdeck:
1 hookdeck listen 5000 '*'
Este comando escuta todas as fontes do Hookdeck conectadas a um destino CLI, roteando webhooks para o aplicação executado localmente na porta 5000.
Ao executar o comando, você verá uma saída semelhante a esta:
1 Listening for events on Sources that have Connections with CLI Destinations 2 3 Dashboard 4 👉 Inspect and replay events: https://dashboard.hookdeck.com?team_id=tm_{id} 5 6 Sources 7 🔌 replicate-embedding URL: https://hkdk.events/{id} 8 🔌 replicate-audio URL: https://hkdk.events/{id} 9 10 Connections 11 replicate-embedding -> replicate-embedding forwarding to /webhooks/embedding 12 replicate-audio -> replicate-audio forwarding to /webhooks/audio 13 14 Ready! (^C to quit)
Abra
localhost:5000
o no seu navegador da web para garantir que o aplicativo Spark esteja em execução.
Com o aplicativo em execução, é hora de enviar um ativo para indexação.
Clique em PDF3(mp) sob o cabeçalho Exemplos para preencher a barra de pesquisa no aplicativo com um URL e clique em Enviar.

O envio do formulário envia a URL para um
/process
endpoint como uma POST
solicitação . Vamos ver o que esse código faz.Primeiro, defina a
/process
rota em app.py
:1 2 def process(): 3 url = request.form["url"] 4 5 parsed_url = urlparse(url) 6 if not all([parsed_url.scheme, parsed_url.netloc]): 7 flash("Invalid URL") 8 return redirect(url_for("index"))
Esta rota lida com a
POST
solicitação para o /process
endpoint e recupera a URL a partir dos dados do formulário enviados pelo usuário. Ele valida a URL e redireciona para a página índice com uma mensagem de erro, caso não seja.Em seguida, verifique se a URL já existe no banco de banco de dados:
1 database = Database() 2 collection = database.get_collection() 3 4 exists = collection.find_one({"url": url}) 5 6 if exists is not None: 7 flash("URL has already been indexed") 8 return redirect(url_for("index"))
Se a URL já estiver indexada, mostre uma mensagem para o usuário e redirecione-o para a página de índice.
Verifique se o recurso existe:
1 req = urllib.request.Request(url, method="HEAD") 2 fetch = urllib.request.urlopen(req) 3 4 if fetch.status != 200: 5 flash("URL is not reachable") 6 return redirect(url_for("index"))
Este código envia uma
HEAD
solicitação para a URL para evitar o download do arquivo inteiro. Se o URL não estiver acessível (o código de status não 200 for), mostre uma mensagem ao usuário e redirecione-o para a página índice.Recupere o tipo de conteúdo e o comprimento dos cabeçalhos de resposta:
1 content_length = fetch.headers["Content-Length"] 2 content_type = fetch.headers["Content-Type"]
Este código extrai o comprimento do conteúdo e o tipo de conteúdo dos cabeçalhos de resposta.
Recupere o processador de ativo apropriado com base no tipo de conteúdo:
1 processor = get_asset_processor(content_type) 2 3 if processor is None: 4 flash('Unsupported content type "' + content_type + '"') 5 return redirect(url_for("index"))
Se nenhum processador for encontrado para o tipo de conteúdo, mostre uma mensagem ao usuário e redirecione-o para a página de índice.
A
get_asset_processor
função, definida em allthethings/processors.py
, retorna um processador usado para analisar o conteúdo de um ativo com base content_type
em.1 def get_asset_processor( 2 content_type, 3 ): 4 if "audio/" in content_type: 5 return AudioProcessor() 6 elif "video/" in content_type: 7 return None 8 elif "image/" in content_type: 9 return None 10 else: 11 return None
Neste caso, o arquivo é um MP3 e o
content_type
é audio/mpeg
, então retorne uma AudioProcessor
instância.Insira a URL, juntamente com seu tipo de conteúdo e comprimento, no banco de banco de dados com um status de
SUBMITTED
:1 asset = collection.insert_one( 2 { 3 "url": url, 4 "content_type": content_type, 5 "content_length": content_length, 6 "status": "SUBMITTED", 7 } 8 )
Processe a URL utilizando o processador de ativos, um
AudioProcessor
e obtenha os resultados da predição:1 try: 2 response = processor.process(asset.inserted_id, url) 3 except Exception as e: 4 app.logger.error("Error processing asset: %s", e) 5 collection.update_one( 6 filter={"url": url}, 7 update={ 8 "$set": { 9 "status": "PROCESSING_ERROR", 10 "error": str(e), 11 } 12 }, 13 ) 14 flash("Error processing asset") 15 return redirect(url_for("index"))
Vejamos o
AudioProcessor
de allthethings/processors.py
em mais detalhes para entender o que isso faz:1 import httpx 2 from config import Config 3 4 ... 5 6 class AudioProcessor: 7 def process(self, id, url): 8 input = { 9 "audio": url, 10 "model": "large-v3", 11 "language": "auto", 12 "translate": False, 13 "temperature": 0, 14 "transcription": "plain text", 15 "suppress_tokens": "-1", 16 "logprob_threshold": -1, 17 "no_speech_threshold": 0.6, 18 "condition_on_previous_text": True, 19 "compression_ratio_threshold": 2.4, 20 "temperature_increment_on_fallback": 0.2, 21 } 22 23 payload = { 24 "version": "cdd97b257f93cb89dede1c7584e3f3dfc969571b357dbcee08e793740bedd854", 25 "input": input, 26 "webhook": f"{Config.AUDIO_WEBHOOK_URL}/{id}", 27 "webhook_events_filter": ["completed"], 28 } 29 30 response = httpx.request( 31 "POST", 32 f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions", 33 headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS, 34 json=payload, 35 ) 36 37 return response.json()
process
O método processa a URL de áudio criando uma solicitação de predição passando o payload
como o corpo JSON.payload
inclui webhooks
, que consiste no Config.AUDIO_WEBHOOK_URL
com um caminho anexado (/{id}
) que indica para qual ativo é a chamada de resposta de resposta. O uso do webhook_events_filter=["completed"]
filtro informa ao Replicate para enviar um webhook somente quando a predição for concluída.O
payload.version
instrui o Replicate a usar o modelo OpenAI Wisper de áudio para texto. O input
inclui detalhes como o idioma deve ser detectado automaticamente e a transcrição deve estar em plain text
.Como estamos usando o Hookdeck como uma fila de API de saída, a solicitação usa o
Config.HOOKDECK_REPLICATE_API_QUEUE_URL
com o sufixo de caminho da /predications
API. Os cabeçalhos de autenticação apropriados também são usados a partir de Config.HOOKDECK_QUEUE_AUTH_HEADERS
.De volta
app.py
a, atualize o banco de dados de dados com o status de processamento e os detalhes de predição pendentes:1 collection.update_one( 2 filter={"url": url}, 3 update={ 4 "$set": { 5 "status": "PROCESSING", 6 "processor_response": response, 7 } 8 }, 9 )
O
processor_response
valor é armazenado para fins de depuração, pois contém um ID de solicitação do Hookdeck que pode ser útil.Passe uma mensagem de sucesso para o usuário e redirecione-o para a página índice:
1 flash( 2 message="Processing: " + url + " with content type: " + content_type, 3 category="success", 4 ) 5 6 return redirect(url_for("index"))
Neste ponto, o aplicação Pipeline já baixou todo o trabalho para replicar e, do ponto de vista da viagem de dados, estamos aguardando o webhook concluído da predicação.
Depois que a replicação concluir a predicação, ela fará uma
/webhooks/audio/<id>
chamada de chamada de /webhooks/audio/<id>
resposta do webhook para o Hookdeck. O Hookdeck ingere instantaneamente o webhook, verifica se o evento veio do Replicate e envia os dados para uma fila para processamento e entrega. Com base na configuração atual da conexão do Hookdeck, o evento do webhook é entregue na CLI e, em seguida, no endpoint do aplicação Firefly. Vejamos o código que lida com a solicitação.Aqui está a
/webhooks/audio/<id>
definição de rota do app.py
no :1 2 def webhook_audio(id): 3 if not verify_webhook(request): 4 app.logger.error("Webhook signature verification failed") 5 return jsonify({"error": "Webhook signature verification failed"}), 401 6 7 payload = request.json 8 app.logger.info("Audio payload received for id %s", id) 9 app.logger.debug(payload)
Esta rota lida com
POST
solicitações para o /webhooks/audio/<id>
endpoint. O id
parâmetro de caminho representa o ativo no banco de banco de dados MongoDB para o qual a chamada de resposta de resposta de áudio é. A carga útil JSON do chamada de resposta de resposta do Replicate.Antes de lidar com o webhook, verificamos se o webhook veio do Hookdeck por meio de uma
verify_webhook
função. Se a verificação falhar, uma 401
resposta será retornada. Este é o código para verificar o webhook:1 def verify_webhook(request): 2 if Config.HOOKDECK_WEBHOOK_SECRET is None: 3 app.logger.error("No HOOKDECK_WEBHOOK_SECRET found.") 4 return False 5 6 hmac_header = request.headers.get("x-hookdeck-signature") 7 8 hash = base64.b64encode( 9 hmac.new( 10 Config.HOOKDECK_WEBHOOK_SECRET.encode(), request.data, hashlib.sha256 11 ).digest() 12 ).decode() 13 14 verified = hash == hmac_header 15 app.logger.debug("Webhook signature verification: %s", verified) 16 return verified
Isto lê o segredo do webhook do Hookdeck armazenado na
HOOKDECK_WEBHOOK_SECRET
variável de ambiente, gera um hash usando o segredo dos dados do webhook de entrada e o compara com o hash que foi enviado no x-hookdeck-signature
cabeçalho. Se corresponderem, o webhook será verificado.Em seguida, o status de processamento é determinado com base na presença de um erro na carga útil:
1 database = Database() 2 collection = database.get_collection() 3 4 status = ( 5 "PROCESSING_ERROR" if "error" in payload and payload["error"] else "PROCESSED" 6 )
Se houver um erro, o status será definido
PROCESSING_ERROR
como. Caso contrário, é definido como PROCESSED
.O banco de dados de dados é atualizado com os resultados da transcrição e o status de processamento:
1 result = collection.find_one_and_update( 2 filter={"_id": ObjectId(id)}, 3 update={ 4 "$set": { 5 "status": status, 6 "text": payload["output"]["transcription"], 7 "replicate_response": payload, 8 } 9 }, 10 return_document=True, 11 )
Isso localiza o documento no banco de banco de dados com o correspondente
id
e o atualiza com o novo status, a transcrição text
e toda a carga útil da resposta Replicar.Em seguida, verificamos se o documento foi encontrado:
1 if result is None: 2 app.logger.error( 3 "No document found for id %s to add audio transcript", payload["id"] 4 ) 5 return jsonify({"error": "No document found to add audio transcript"}), 404
Se nenhum documento for encontrado para o
id
fornecido, um erro será registrado e uma resposta JSON com uma mensagem de erro será retornada. A 404
resposta informará ao Hookdeck que, embora a solicitação não tenha sido bem-sucedida, ela não deve ser repetida.Com o áudio convertido em texto e armazenado, a viagem de dados passa para gerar incorporações via Replicar:
1 app.logger.info("Transcription updated") 2 app.logger.debug(result) 3 4 request_embeddings(id) 5 6 return "OK"
Em seguida, a
request_embeddings
função é chamada para gerar incorporações para o áudio processado. O endpoint retorna uma OK
resposta para informar ao Hookdeck que o webhook foi processado com sucesso.A
request_embeddings
função aciona a geração de incorporações para a representação textual de um ativo indexado:1 def request_embeddings(id): 2 app.logger.info("Requesting embeddings for %s", id) 3 4 database = Database() 5 collection = database.get_collection() 6 7 asset = collection.find_one({"_id": id}) 8 9 if asset is None: 10 raise RuntimeError("Asset not found") 11 12 if asset["status"] != "PROCESSED": 13 raise RuntimeError("Asset has not been processed")
Se esse ativo com o passado
id
não for encontrado ou o status do ativo não for PROCESSED
, o que indica que uma representação textual foi criada, um RuntimeError
será gerado.Em seguida, as incorporações são geradas para o ativo processado usando o
AsyncEmbeddingsGenerator
:1 generator = AsyncEmbeddingsGenerator() 2 3 try: 4 response = generator.generate(id, asset["text"]) 5 except Exception as e: 6 app.logger.error("Error generating embeddings for %s: %s", id, e) 7 raise
Isso inicializa o
AsyncEmbeddingsGenerator
e chama a generate
função na instância, passando o ID do ativo que está sendo indexado e a representação textual.A
AsyncEmbeddingsGenerator
definição no allthethings/generators.py
segue um padrão semelhante ao processador usado anteriormente:1 import httpx 2 from config import Config 3 4 5 class AsyncEmbeddingsGenerator: 6 def generate(self, id, text): 7 payload = { 8 "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305", 9 "input": {"text": text}, 10 "webhook": f"{Config.EMBEDDINGS_WEBHOOK_URL}/{id}", 11 "webhook_events_filter": ["completed"], 12 } 13 14 response = httpx.request( 15 "POST", 16 f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions", 17 headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS, 18 json=payload, 19 ) 20 21 return response.json()
O
generate
método recebe o ativo id
e o text
para o qual as incorporações devem ser geradas.Uma solicitação
payload
é criada contendo um version
que identifica que o modeloreplication/all-mpnet-base-v é usado para gerar embeddings e que o2 text
para a incorporação é passado dentro de um input
parâmetro.A
webhook
propriedade é definida Config.EMBEDDINGS_WEBHOOK_URL
como com um caminho anexado (/{id}
) que indica para qual ativo é a chamada de resposta de resposta. Assim como antes, o uso do webhook_events_filter=["completed"]
filtro informa ao Replicate para enviar um webhook somente quando a predição for concluída.Como essa é uma chamada assíncrona, o Hookdeck é novamente usado para colocar
HOOKDECK_REPLICATE_API_QUEUE_URL
na /predications
fila a solicitação da API de replicação por meio de uma chamada para o endpoint com o caminho.O método retorna a resposta do Hookdeck.
De volta ao
app.py
, atualize o banco de dados de dados com o status e o ID da solicitação de incorporação:1 collection.update_one( 2 filter={"_id": ObjectId(id)}, 3 update={ 4 "$set": { 5 "status": "GENERATING_EMBEDDINGS", 6 "generator_response": response, 7 } 8 }, 9 )
Atualize o documento no banco de banco de dados com o novo status
GENERATING_EMBEDDINGS
e a resposta da fila do Hookdeck.A solicitação para gerar de forma assíncrona as incorporações foi acionada e o trabalho transferido para Replicar. Quando o resultado for lido, um webhook será acionado com o resultado.
Depois que o Replicate gerar a incorporação, um retorno de chamada de resposta de webhook é feito para a
/webhooks/embedding/<id>
rota em nosso aplicação Frask. Essa rota recebe a carga útil do webhook, verifica se ela veio do Hookdeck, atualiza o banco de dados de dados com os resultados da incorporação e define o status apropriado.Esta é a definição da rota:
1 2 def webhook_audio(id): 3 if not verify_webhook(request): 4 app.logger.error("Webhook signature verification failed") 5 return jsonify({"error": "Webhook signature verification failed"}), 401 6 7 payload = request.json 8 app.logger.info("Audio payload received for id %s", id) 9 app.logger.debug(payload)
Essa rota lida com
POST
solicitações para o /webhooks/embedding/<id>
endpoint e recebe o id
parâmetro de caminho. Ele verifica se a solicitação veio do Hookdeck e, em caso afirmativo, recupera a carga útil JSON da solicitação. Caso contrário, retorna uma 401
resposta.Em seguida, ele verifica se há erros:
1 status = ( 2 "EMBEDDINGS_ERROR" if "error" in payload and payload["error"] else "SEARCHABLE" 3 )
Se houver um erro, o status será definido
EMBEDDINGS_ERROR
como. Caso contrário, é definido como SEARCHABLE
.Em seguida, o vetor de incorporação é extraído da carga útil e o banco de dados de dados é atualizado com os detalhes da incorporação e o novo status:
1 embedding = payload["output"][0]["embedding"] 2 3 database = Database() 4 collection = database.get_collection() 5 6 result = collection.update_one( 7 filter={"_id": ObjectId(id)}, 8 update={ 9 "$set": { 10 "status": status, 11 "embedding": embedding, 12 "replicate_embeddings_response": payload, 13 } 14 }, 15 )
Isso encontra o documento no banco de banco de dados com o correspondente
id
e o atualiza com o novo status, incorporação e toda a carga útil.Verifique se o documento foi encontrado e atualizado:
1 if result.matched_count == 0: 2 app.logger.error( 3 "No document found for id %s to update embedding", payload["id"] 4 ) 5 return jsonify({"error": "No document found to update embedding"}), 404 6 7 return "OK"
Se nenhum documento for encontrado para o
id
fornecido, um erro será registrado e uma resposta JSON com uma mensagem de erro será retornada com um 404
status. Se a atualização for um sucesso, retorne um OK
para informar ao Hookdeck que o webhook foi processado.Com o vetor incorporação armazenado na
embedding
propriedade , agora ele pode ser pesquisado com o MongoDB devido ao índice de pesquisa vetorial definido anteriormente.A pesquisa é orientada para o usuário. O usuário insere um termo de pesquisa e envia um formulário. Essa query de pesquisa é tratada e processada, e o resultado é retornado e exibido. Idealmente, esta é uma experiência em tempo real, então as operações são realizadas de forma síncrona.
Vamos analisar cada uma dessas etapas.
O usuário navega até o
/search
endpoint em seu navegador da web, insere um termo de pesquisa e envia o formulário, fazendo uma GET
solicitação para o /search
endpoint:1 2 def search(): 3 query = request.args.get("query") 4 if query is None: 5 return render_template("search.html", results=[]) 6 7 app.logger.info("Query submitted") 8 app.logger.debug(query) 9 10 results = query_vector_search(query) 11 12 results = format_results(results) 13 14 app.logger.debug("Formatted search results", results) 15 16 return render_template("search.html", results=results, query=query)
A
search
função no aplicação Spark lida com GET
solicitações para o /search
endpoint. Ele recupera a pesquisa query
do request.args.get
enviado pelo usuário. Se não houver nenhuma query, o search
modelo será renderizado. Caso contrário, uma pesquisa vetorial é executada utilizando a query_vector_search
função. O resultado é então formatado passando os resultados para a format_results
função. Os resultados formatados são então renderizados usando o search.html
modelo.A
query_vector_search
função gera incorporações para a query, executa uma pesquisa vetorial usando a query fornecida pelo usuário e recupera documentos correspondentes da collection do MongoDB .1 def query_vector_search(q): 2 generator = SyncEmbeddingsGenerator() 3 4 try: 5 generator_response = generator.generate(q) 6 app.logger.debug(generator_response) 7 except Exception as e: 8 app.logger.error("Error generating embeddings: %s", e) 9 return None 10 11 if generator_response["status"] != "completed": 12 app.logger.debug("Embeddings generation timed out") 13 return None 14 15 query_embedding = generator_response["output"][0]["embedding"]
A função pega a query
q
e usa o SyncEmbeddingsGenerator
para gerar a incorporação para a query de pesquisa chamando sua generate
função e passando a query. Se a criação da incorporação falhar por vários motivos, None
será retornado.O
SyncEmbeddingsGenerated
é usado para gerar de forma síncrona incorporações para a query de pesquisa. Esta operação é síncrona porque a solicitação é orientada ao usuário e requer uma resposta direta. SyncEmbeddingsGenerated
é definido em allthethings/generators.py
:1 class SyncEmbeddingsGenerator: 2 def generate(self, text): 3 payload = { 4 "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305", 5 "input": {"text": text}, 6 } 7 8 response = httpx.request( 9 "POST", 10 "https://api.replicate.com/v1/predictions", 11 headers={**Config.REPLICATE_API_AUTH_HEADERS, "Prefer": "wait"}, 12 json=payload, 13 timeout=60, 14 ) 15 16 return response.json()
A
generate
função recebe o text
para gerar uma incorporação. Uma solicitação síncrona é feita diretamente para a API HTTP de replicação, passando o mesmo modelode replication/all-mpnet-base-v2 version
usado na solicitação de incorporação assíncrona. O "Prefer": "Wait"
cabeçalho e os timeout
valores são definidos para habilitar solicitações HTTP síncronas de longa duração. Além disso, o token da API de replicação está incluído nos cabeçalhos por meio Config.REPLICATE_API_AUTH_HEADERS
do .O JSON de resposta é retornado para a função de chamada.
De volta ao
query_vector_search
, o resultado da incorporação é usado para construir a query de pesquisa vetorial .1 ... 2 3 query_embedding = generate_response[0]["embedding"] 4 5 vs_query = { 6 "index": "vector_index", 7 "path": "embedding", 8 "queryVector": query_embedding, 9 "numCandidates": 100, 10 "limit": 10, 11 } 12 13 new_search_query = {"$vectorSearch": vs_query} 14 15 app.logger.info("Vector search query created") 16 app.logger.debug(new_search_query)
vs_query
representa a pesquisa vetorial a ser executada. Identifica o index
a ser consultado como vector_index
; o path
para a propriedade, embedding
, a query está ativada; e o resultado da query de texto no formato de incorporação ("queryVector": query_embedding
). Consulte os documentos do MongoDB Vector Search para obter mais informações, incluindo a finalidade das numCandidates
propriedades limit
e.Em seguida, a função define a projeção para especificar quais campos incluir nos resultados da pesquisa.
1 project = { 2 "$project": { 3 "score": {"$meta": "vectorSearchScore"}, 4 "_id": 0, 5 "url": 1, 6 "content_type": 1, 7 "content_length": 1, 8 "text": 1, 9 } 10 }
A projeção inclui a pontuação do vetor de pesquisa, URL, tipo de conteúdo, comprimento do conteúdo e texto. Para obter mais informações sobre a pontuação, consulte a Documentação de pontuação do Atlas Vector Search.
A função então executa a query de agregação usando a query de pesquisa vetorial construída e projeção:
1 database = Database() 2 collection = database.get_collection() 3 4 app.logger.info("Vector search query without post filter") 5 res = list(collection.aggregate([new_search_query, project])) 6 7 app.logger.info("Vector search query run") 8 app.logger.debug(res) 9 return res
No geral, a
query_vector_search
função executa uma pesquisa vetorial usando a query fornecida pelo usuário, gera incorporações para a query e recupera documentos correspondentes do banco de banco de dados MongoDB .Em seguida, dentro
search_post
de em app.py
, os resultados são formatados para renderização:1 results = format_results(results)
E dentro
format_results
app.py
de , também definido em :1 def format_results(results): 2 formatted_results = [] 3 for _idx, index in enumerate(results): 4 parse_result = urlparse(index["url"]) 5 parsed_url = { 6 "netloc": parse_result.netloc, 7 "path": parse_result.path, 8 "params": parse_result.params, 9 "query": parse_result.query, 10 "fragment": parse_result.fragment, 11 "hostname": parse_result.hostname, 12 "last_part": parse_result.path.rstrip("/").split("/")[-1], 13 } 14 index["parsed_url"] = parsed_url 15 formatted_results.append(index) 16 17 return formatted_results
A
format_results
função itera sobre o resultado da pesquisa vetorial e retorna uma array com cada elemento contendo o resultado junto com uma parsed_url
propriedade com informações sobre o ativo indexado.Finalmente, de volta à
POST /search
rota, os resultados são exibidos:1 2 def search_post(): 3 ... 4 5 results = format_results(results) 6 7 return render_template("search.html", results=results, query=query)
Isso renderiza o
search.html
modelo, passando os resultados formatados e a query original para o modelo para exibição.
Neste tutorial, percorremos os componentes usados em um aplicação MongoDB que pode indexar e executar uma pesquisa de texto em qualquer ativo com uma URL pública . Os dados são armazenados e a pesquisa vetorial ocorre via MongoDB. A geração de inferência e incorporação de IA é realizada pelo Replicar. O Hookdeck é usado como uma fila sem servidor entre o aplicação Spark e o Replicate para gerenciar a limitação da taxa de solicitações de API para o Replicate e para verificar, enfileirar e garantir a entrega de chamadas de webhook assíncronas do Replicate de volta para o aplicação Firefly.
Se ainda não o tiver feito isso, você pode obter o código Index All The Things no Github. Há uma série de questões de recursos para adicionar suporte para tipos de conteúdo adicionais, então fique à vontade para se envolver.
Por fim, se você tiver alguma dúvida ou ideia, compartilhe-a por meio de um problema no repositório do Github ou enviando-me uma mensagem no X ou Bloskey.
Principais comentários nos fóruns
Fil_LeggetterFil Leggetter4 trimestres atrás
Olá
Sou o autor do artigo e estou muito interessado em ouvir comentários e ideias.