Docs Menu
Docs Home
/ /
/ / /

$https Stage de agregación (Stream Processing)

$https

El $https La etapa especifica una conexión en el Registro HTTPS de conexión al que se enviarán solicitudes. Cada vez que una etapa anterior pasa un documento $https a, la etapa envía una nueva solicitud.

$httpsdebe venir después de la etapa$source, puede venir antes de las etapas $emit o$merge, o puede servir como etapa final en una canalización. Puede usar$httpsen las canalizaciones internas de Windows del procesador de flujo.

Para enviar una solicitud HTTPS a una conexión determinada:

{
"$https": {
"connectionName": "<registered-connection>",
"path" : "<subpath>" | <expression>,
"parameters" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"method" : "<GET | POST | PUT | PATCH | DELETE>",
"headers" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"as" : "response",
"onError" : "<DLQ | Ignore | Fail>",
"payload" : [{
<inner-pipeline>
}],
"config" : {
"connectionTimeoutSec" : <integer>,
"requestTimeoutSec" : <integer>,
"parseJsonStrings": <boolean>
}
}
}

La etapa $https procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Etiqueta que identifica la conexión en el Registro de conexión HTTPS a la que se enviará una solicitud.

path

cadena | expresión

Opcional

Ruta para agregar a la URL a la que se resuelve connectionName.

Por ejemplo, si especifica un connectionName que se resuelve en https://sample.com, puede especificar un path de "endpoint" para que su procesador de flujo envíe HTTPS solicitudes a https://sample.com/endpoint.

Si define path como una expresión, esa expresión debe evaluarse como una cadena.

El endpoint de la API al que llamas debe ser idempotente.

parameters

Documento

Opcional

Documento que contiene pares clave-valor para pasar como parámetros a tu llamada al endpoint de API. Cada clave debe ser un string, y cada valor debe evaluarse como un valor numérico, de string o booleano. Este campo admite expresiones como valores.

method

string

Opcional

Método de solicitud HTTPS para su conexión. Debe tener uno de los siguientes valores:

  • "GET"

  • "POST"

  • "PUT"

  • "PATCH"

  • "DELETE"

Se establece por defecto en "GET".

headers

Documento

Opcional

Documento que contiene pares clave-valor para pasar como encabezados al punto final de la API. Cada clave debe ser una cadena y cada valor debe evaluarse como una cadena. Este campo admite expresiones como valores.

Si el punto final de la API requiere autenticación, como una clave de API o una autenticación de token de acceso de portador, debe agregar detalles de autenticación como encabezados cuando define la conexión para evitar proporcionarlos como texto sin formato como parte de este operador.

Los nombres y valores de encabezado HTTP no válidos no se envían al punto final de la API. Se ignoran.

Para obtener más información sobre encabezados HTTP no válidos, consulte RFC.9110

Si una expresión en un valor falla o se evalúa a un tipo distinto de una cadena, el mensaje se envía a la cola de mensajes no entregados y el operador no envía esta solicitud al punto final de la API.

as

string

Condicional

Nombre del campo para la respuesta de la API REST.

El parámetro as es required cuando se utiliza como etapa de origen y optional cuando se utiliza como etapa final o de sumidero.

Si el punto final devuelve 0 bytes, el operador no establece el campo as.

El operador admite respuestas con un Content-Type de application/json o text/plain. Si el punto final de la API devuelve una respuesta con un Content-Type diferente, el operador gestiona el documento según el comportamiento onError que defina.

Si el punto final de la API devuelve una respuesta sin un Content-Type definido, el operador asume que la respuesta es application/json.

onError

string

Opcional

Comportamiento cuando el operador detecta un fallo relacionado con HTTPS. Debe ser uno de los siguientes valores:

  • "dlq" :Pase el documento afectado a la cola de mensajes no entregados.

  • "ignore" : Ignora el error y pasa el documento afectado a la siguiente etapa del pipeline.

  • "fail" :Terminar el procesador de flujo en caso de error.

El operador considera todos los códigos de estado HTTP 2XX como correctos. Si recibe alguno de los siguientes códigos de estado HTTP como respuesta, seguirá el comportamiento según el valor que proporcione en este campo:

  • 400

  • 404

  • 410

  • 413

  • 414

  • 431

El operador considera cualquier otro código de estado HTTP como un error "fail". Por ejemplo, si el punto final de la API devuelve un código de estado HTTP 500, el procesador entra en estado de error y se detiene.

onError no se activa ante errores que surgen de una configuración incorrecta del propio operador $https, como expresiones no válidas.

Se establece por defecto en "dlq".

payload

arreglo

Opcional

Pipeline interno personalizado que permite personalizar el cuerpo de la solicitud enviado al endpoint de la API. payload soporta las siguientes expresiones:

  • $project

  • $addFields

  • $replaceRoot

  • $set

De forma predeterminada, el mensaje completo se envía al punto final de la API. Atlas Stream Processing envía una carga útil JSON en modo relajado al punto final de la API.

Los cuerpos de solicitud HTTP no válidos no se envían al punto final de la API. En su lugar, se envían a la cola de mensajes fallidos.

Para obtener más información sobre cuerpos de solicitud HTTP no válidos, consulte RFC.9110

config

Documento

Opcional

Documento que contiene campos que anulan varios valores predeterminados.

config.connectionTimeoutSec

entero

Opcional

El tiempo, en segundos, después del cual se agota el tiempo de espera de una conexión HTTPS exitosa si no recibe respuesta.

Se establece por defecto en 30.

config.parseJsonStrings

booleano

Opcional

Configuración que determina si Atlas itera recursivamente a través de la respuesta del servidor y serializa valores de cadena que contienen valores válidos. JSON (usando comillas escapadas) en bson válido para que pueda trabajar con los resultados en las etapas posteriores de la canalización.

Se establece por defecto en false.

config.requestTimeoutSec

entero

Opcional

El tiempo, en segundos, después del cual una solicitud HTTPS expira si no puede conectarse.

Se establece por defecto en 60.

Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación consta de tres etapas:

  1. La etapa establece una $source conexión con el agente de Apache Kafka que recopila estos informes en un tema my_weatherdata llamado, exponiendo cada registro a medida que se ingiere a las etapas de agregación posteriores. Esta etapa también anula el nombre del campo de marca de tiempo que proyecta, estableciéndolo ingestionTime en.

  2. Para cada registro del agente Apache Kafka, la etapa envía una solicitud a una fuente meteorológica HTTPS definida en $https la https_weather conexión. La solicitud utiliza el position.coordinates del registro en la solicitud HTTPS para recopilar un pronóstico de temperatura máxima de siete días en grados Celsius para esa ubicación, que se agrega al documento de canalización en un airTemperatureForecast campo.

  3. La etapa escribe la salida en una colección de Atlas $merge llamada stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$https': {
connectionName: 'https_weather',
path: 'forecast',
parameters: {
latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] },
longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] }
},
as: 'airTemperatureForecast'
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1],
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 30.27, -97.74], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Nota

El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.

Volver

$validar

En esta página