Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /
/ / /

$https Stage de agregación (Stream Processing)

$https

La $https La etapa especifica una conexión en el Registro de conexiones para enviar HTTPS solicitudes. Cada vez que una etapa anterior pasa un documento a $https, la etapa envía una nueva solicitud.

$https debe venir después de la etapa $source, puede venir antes de la etapa $emit o $merge, o puede servir como la etapa final en una pipeline. Puedes utilizar $https en los pipelines internos de Stream Processor Windows.

Para enviar una solicitud HTTPS a una conexión dada:

{
"$https": {
"connectionName": "<registered-connection>",
"path" : "<subpath>" | <expression>,
"parameters" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"method" : "<GET | POST | PUT | PATCH | DELETE>",
"headers" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"as" : "response",
"onError" : "<DLQ | Ignore | Fail>",
"payload" : [{
<inner-pipeline>
}],
"config" : {
"connectionTimeoutSec" : <integer>,
"requestTimeoutSec" : <integer>,
"parseJsonStrings": <boolean>
}
}
}

La etapa $https procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Etiqueta que identifica la conexión en el Registro de conexiones a la que se enviará una solicitud de HTTPS.

path

string | expresión

Opcional

Ruta para agregar a la URL a la que se resuelve connectionName.

Por ejemplo, si especifica un connectionName que se resuelve en https://sample.com, puede especificar un path de "endpoint" para que su procesador de flujo envíe HTTPS solicitudes a https://sample.com/endpoint.

Si defines path como una expresión, dicha expresión debe evaluarse como una string.

El endpoint de la API al que llamas debe ser idempotente.

parameters

Documento

Opcional

Documento que contiene pares clave-valor para pasar como parámetros a tu llamada al endpoint de API. Cada clave debe ser un string, y cada valor debe evaluarse como un valor numérico, de string o booleano. Este campo admite expresiones como valores.

method

string

Opcional

Método de solicitud HTTPS para su conexión. Debe tener uno de los siguientes valores:

  • "GET"

  • "POST"

  • "PUT"

  • "PATCH"

  • "DELETE"

Se establece por defecto en "GET".

headers

Documento

Opcional

Documento que contiene pares clave-valor para pasar como encabezados al endpoint de la API. Cada clave debe ser una cadena y cada valor debe evaluar a una cadena. Este campo admite expresiones como valores.

Si el punto final de la API requiere autenticación, como una clave de API o una autenticación de token de acceso de portador, debe agregar detalles de autenticación como encabezados cuando define la conexión para evitar proporcionarlos como texto sin formato como parte de este operador.

Los nombres y valores de encabezados HTTP no válidos no se envían al punto de conexión de la API. En cambio, estos son ignorados.

Para obtener más información sobre HTTP headers no válidos, consulta RFC 9110.

Si una expresión en un valor falla o se evalúa a un tipo diferente de una string, el mensaje se envía a la fila de letra muerta y el operador no envía esta solicitud al endpoint de la API.

as

string

Condicional

Nombre del campo para la respuesta de la API REST.

El parámetro as es required cuando se usa como la etapa de origen, y optional cuando se usa como la última etapa o etapa de sumidero.

Si el endpoint devuelve 0 bytes, el operador no configura el campo as.

El operador admite respuestas con un Content-Type de application/json o text/plain. Si el endpoint de la API devuelve una respuesta con un Content-Type diferente, el operador gestiona el documento según el comportamiento de onError que definas.

Si el endpoint de la API devuelve una respuesta sin un Content-Type definido, el operador asume que la respuesta es application/json.

onError

string

Opcional

Comportamiento cuando el operador detecta un fallo relacionado con HTTPS. Debe ser uno de los siguientes valores:

  • "dlq" : Pase el documento afectado a la fila de letra muerta.

  • "ignore" : Ignora el error y pasa el documento afectado a la siguiente etapa del pipeline.

  • "fail" :Terminar el procesador de flujo en caso de error.

El operador considera todos los 2XX códigos de estado HTTP como éxitos. Si el operador recibe alguno de los siguientes códigos de estado HTTP como respuesta, el operador sigue el comportamiento en función del valor que usted proporciona en este campo:

  • 400

  • 404

  • 410

  • 413

  • 414

  • 431

El operador considera cualquier otro código de estado HTTP como un error "fail". Por ejemplo, si el punto final de la API devuelve un código de estado HTTP 500, el procesador entra en estado de error y se detiene.

onError no se activa en errores que surjan de una configuración incorrecta del propio operador $https, como expresiones no válidas.

Se establece por defecto en "dlq".

payload

arreglo

Opcional

Pipeline interno personalizado que permite personalizar el cuerpo de la solicitud enviado al endpoint de la API. payload soporta las siguientes expresiones:

  • $project

  • $addFields

  • $replaceRoot

  • $set

De forma predeterminada, el mensaje completo se envía al punto final de la API. Atlas Stream Processing envía una carga útil JSON en modo relajado al punto final de la API.

Las solicitudes HTTP no válidas no son enviadas al endpoint de la API. En cambio, estos se envían a la fila de letra muerta.

Para obtener más información sobre cuerpos de solicitud HTTP no válidos, consulte RFC.9110

config

Documento

Opcional

Documento que contiene campos que sobrescriben diversos valores por defecto.

config.connectionTimeoutSec

entero

Opcional

El tiempo, en segundos, tras el cual una conexión HTTPS exitosa supera el límite si no recibe respuesta.

Se establece por defecto en 30.

config.parseJsonStrings

booleano

Opcional

Configuración que determina si Atlas itera recursivamente a través de la respuesta del servidor y serializa los valores de string que contienen válidos JSON (usando comillas escapadas) en BSON válido para que puedas trabajar con los resultados en las etapas siguientes del pipeline.

Se establece por defecto en false.

config.requestTimeoutSec

entero

Opcional

El tiempo, en segundos, tras el cual una solicitud HTTPS se agota si no puede conectarse.

Se establece por defecto en 60.

Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:

  1. La etapa $source establece una conexión con el Apache Kafka broker que recopila estos informes en un tema denominado my_weatherdata, exponiendo cada registro a medida que se procesa a las siguientes etapas de agregación. Este etapa también reemplaza el nombre del campo de marca de tiempo que proyecta, estableciéndolo en ingestionTime.

  2. Para cada registro del broker de Apache Kafka, la etapa $https envía una solicitud a una fuente meteorológica HTTPS definida en la conexión https_weather. La solicitud utiliza position.coordinates del registro en la solicitud HTTPS para recopilar un pronóstico de temperatura máxima de siete días en grados Celsius para esa ubicación, que se añade al documento del pipeline en un campo airTemperatureForecast.

  3. La etapa escribe la salida en una colección de Atlas $merge llamada stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$https': {
connectionName: 'https_weather',
path: 'forecast',
parameters: {
latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] },
longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] }
},
as: 'airTemperatureForecast'
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1],
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 30.27, -97.74], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Nota

Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.

Volver

$validate

En esta página