Definición
El $https La etapa especifica una conexión en el
Registro HTTPS de conexión al que se enviarán solicitudes. Cada vez que una etapa anterior pasa un documento $https a, la etapa envía una nueva solicitud.
Colocación
$httpsdebe venir después de la etapa$source, puede venir antes de las etapas $emit o$merge, o puede servir como etapa final en una canalización. Puede usar$httpsen las canalizaciones internas de Windows del procesador de flujo.
Sintaxis
Para enviar una solicitud HTTPS a una conexión determinada:
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer>, "parseJsonStrings": <boolean> } } }
La etapa $https procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Requerido | Etiqueta que identifica la conexión en el Registro de conexión |
| cadena | expresión | Opcional | Ruta para agregar a la URL a la que se resuelve Por ejemplo, si especifica un Si define El endpoint de la API al que llamas debe ser idempotente. |
| Documento | Opcional | Documento que contiene pares clave-valor para pasar como parámetros a tu llamada al endpoint de API. Cada clave debe ser un string, y cada valor debe evaluarse como un valor numérico, de string o booleano. Este campo admite expresiones como valores. |
| string | Opcional | Método de solicitud HTTPS para su conexión. Debe tener uno de los siguientes valores:
Se establece por defecto en |
| Documento | Opcional | Documento que contiene pares clave-valor para pasar como encabezados al punto final de la API. Cada clave debe ser una cadena y cada valor debe evaluarse como una cadena. Este campo admite expresiones como valores. Si el punto final de la API requiere autenticación, como una clave de API o una autenticación de token de acceso de portador, debe agregar detalles de autenticación como encabezados cuando define la conexión para evitar proporcionarlos como texto sin formato como parte de este operador. Los nombres y valores de encabezado HTTP no válidos no se envían al punto final de la API. Se ignoran. Para obtener más información sobre encabezados HTTP no válidos, consulte RFC.9110 Si una expresión en un valor falla o se evalúa a un tipo distinto de una cadena, el mensaje se envía a la cola de mensajes no entregados y el operador no envía esta solicitud al punto final de la API. |
| string | Condicional | Nombre del campo para la respuesta de la API REST. El parámetro Si el punto final devuelve 0 bytes, el operador no establece el campo El operador admite respuestas con un Si el punto final de la API devuelve una respuesta sin un |
| string | Opcional | Comportamiento cuando el operador detecta un fallo relacionado con
El operador considera todos los códigos de estado HTTP
El operador considera cualquier otro código de estado HTTP como un error
Se establece por defecto en |
| arreglo | Opcional | Pipeline interno personalizado que permite personalizar el cuerpo de la solicitud enviado al endpoint de la API.
De forma predeterminada, el mensaje completo se envía al punto final de la API. Atlas Stream Processing envía una carga útil JSON en modo relajado al punto final de la API. Los cuerpos de solicitud HTTP no válidos no se envían al punto final de la API. En su lugar, se envían a la cola de mensajes fallidos. Para obtener más información sobre cuerpos de solicitud HTTP no válidos, consulte RFC.9110 |
| Documento | Opcional | Documento que contiene campos que anulan varios valores predeterminados. |
| entero | Opcional | El tiempo, en segundos, después del cual se agota el tiempo de espera de una conexión Se establece por defecto en |
| booleano | Opcional | Configuración que determina si Atlas itera recursivamente a través de la respuesta del servidor y serializa valores de cadena que contienen valores válidos. JSON (usando comillas escapadas) en bson válido para que pueda trabajar con los resultados en las etapas posteriores de la canalización. Se establece por defecto en |
| entero | Opcional | El tiempo, en segundos, después del cual una solicitud Se establece por defecto en |
Ejemplos
Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación consta de tres etapas:
La etapa establece una
$sourceconexión con el agente de Apache Kafka que recopila estos informes en un temamy_weatherdatallamado, exponiendo cada registro a medida que se ingiere a las etapas de agregación posteriores. Esta etapa también anula el nombre del campo de marca de tiempo que proyecta, estableciéndoloingestionTimeen.Para cada registro del agente Apache Kafka, la etapa envía una solicitud a una fuente meteorológica HTTPS definida en
$httpslahttps_weatherconexión. La solicitud utiliza elposition.coordinatesdel registro en la solicitud HTTPS para recopilar un pronóstico de temperatura máxima de siete días en grados Celsius para esa ubicación, que se agrega al documento de canalización en unairTemperatureForecastcampo.La etapa escribe la salida en una colección de Atlas
$mergellamadastreamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Nota
El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.