Definición
La $https La etapa especifica una conexión en el
Registro de conexiones para enviar HTTPS solicitudes. Cada vez que una etapa anterior pasa un documento a $https, la etapa envía una nueva solicitud.
Ubicación
$https debe venir después de la etapa $source, puede venir antes de la etapa $emit o $merge, o puede servir como la etapa final en una pipeline. Puedes utilizar $https en los pipelines internos de Stream Processor Windows.
Sintaxis
Para enviar una solicitud HTTPS a una conexión dada:
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer>, "parseJsonStrings": <boolean> } } }
La etapa $https procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Requerido | Etiqueta que identifica la conexión en el Registro de conexiones a la que se enviará una solicitud de |
| string | expresión | Opcional | Ruta para agregar a la URL a la que se resuelve Por ejemplo, si especifica un Si defines El endpoint de la API al que llamas debe ser idempotente. |
| Documento | Opcional | Documento que contiene pares clave-valor para pasar como parámetros a tu llamada al endpoint de API. Cada clave debe ser un string, y cada valor debe evaluarse como un valor numérico, de string o booleano. Este campo admite expresiones como valores. |
| string | Opcional | Método de solicitud HTTPS para su conexión. Debe tener uno de los siguientes valores:
Se establece por defecto en |
| Documento | Opcional | Documento que contiene pares clave-valor para pasar como encabezados al endpoint de la API. Cada clave debe ser una cadena y cada valor debe evaluar a una cadena. Este campo admite expresiones como valores. Si el punto final de la API requiere autenticación, como una clave de API o una autenticación de token de acceso de portador, debe agregar detalles de autenticación como encabezados cuando define la conexión para evitar proporcionarlos como texto sin formato como parte de este operador. Los nombres y valores de encabezados HTTP no válidos no se envían al punto de conexión de la API. En cambio, estos son ignorados. Para obtener más información sobre HTTP headers no válidos, consulta RFC 9110. Si una expresión en un valor falla o se evalúa a un tipo diferente de una string, el mensaje se envía a la fila de letra muerta y el operador no envía esta solicitud al endpoint de la API. |
| string | Condicional | Nombre del campo para la respuesta de la API REST. El parámetro Si el endpoint devuelve 0 bytes, el operador no configura el campo El operador admite respuestas con un Si el endpoint de la API devuelve una respuesta sin un |
| string | Opcional | Comportamiento cuando el operador detecta un fallo relacionado con
El operador considera todos los
El operador considera cualquier otro código de estado HTTP como un error
Se establece por defecto en |
| arreglo | Opcional | Pipeline interno personalizado que permite personalizar el cuerpo de la solicitud enviado al endpoint de la API.
De forma predeterminada, el mensaje completo se envía al punto final de la API. Atlas Stream Processing envía una carga útil JSON en modo relajado al punto final de la API. Las solicitudes HTTP no válidas no son enviadas al endpoint de la API. En cambio, estos se envían a la fila de letra muerta. Para obtener más información sobre cuerpos de solicitud HTTP no válidos, consulte RFC.9110 |
| Documento | Opcional | Documento que contiene campos que sobrescriben diversos valores por defecto. |
| entero | Opcional | El tiempo, en segundos, tras el cual una conexión Se establece por defecto en |
| booleano | Opcional | Configuración que determina si Atlas itera recursivamente a través de la respuesta del servidor y serializa los valores de string que contienen válidos JSON (usando comillas escapadas) en BSON válido para que puedas trabajar con los resultados en las etapas siguientes del pipeline. Se establece por defecto en |
| entero | Opcional | El tiempo, en segundos, tras el cual una solicitud Se establece por defecto en |
Ejemplos
Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:
La etapa
$sourceestablece una conexión con el Apache Kafka broker que recopila estos informes en un tema denominadomy_weatherdata, exponiendo cada registro a medida que se procesa a las siguientes etapas de agregación. Este etapa también reemplaza el nombre del campo de marca de tiempo que proyecta, estableciéndolo eningestionTime.Para cada registro del broker de Apache Kafka, la etapa
$httpsenvía una solicitud a una fuente meteorológica HTTPS definida en la conexiónhttps_weather. La solicitud utilizaposition.coordinatesdel registro en la solicitud HTTPS para recopilar un pronóstico de temperatura máxima de siete días en grados Celsius para esa ubicación, que se añade al documento del pipeline en un campoairTemperatureForecast.La etapa escribe la salida en una colección de Atlas
$mergellamadastreamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Nota
Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.