Clase: Mongo::Collection::View::ChangeStream
- Hereda:
-
Agregación
- Objeto
- Agregación
- Mongo::Colección::Vista::Flujo de cambios
- Incluye:
- Agregación::Comportamiento, Reintentable
- Definido en:
lib/mongo/colección/vista/cambio_flujo.rb, lib/mongo/colección/vista/cambio_flujo/retryable.rb
Overview
Sólo disponible en versiones de servidor 3.6 y superiores.
Los flujos de cambios no funcionan correctamente con JRuby debido al problema documentado aquí: github.com/jruby/jruby/issues/.4212 En concreto, JRuby evalúa con avidez #next en un enumerador en un hilo verde en segundo plano; por lo tanto, al llamar a #next en el flujo de cambios, se ejecutará getMores en un bucle en segundo plano.
Proporciona el comportamiento en torno a la etapa de canalización '$changeStream' en el marco de agregación. Al especificar esta etapa, los usuarios pueden solicitar el envío de notificaciones para todos los cambios en una colección o base de datos específica.
Definido en el espacio de nombres
Modules: Reutilizable
Colapso delresumen constante
- DOCUMENTO_COMPLETO_PREDETERMINADO =
Devuelve el valor predeterminado de la opción fullDocument.
' pordefecto '.freeze
- DATABASE =
Devoluciones Se utiliza para indicar que el flujo de cambios debe escuchar los cambios en toda la base de datos en lugar de solo en la colección.
:database- CLUSTER =
Devuelve Indica que el flujo de cambios debe escuchar los cambios en todo el clúster y no solo en la colección.
:grupo
Constantes incluidas desde Loggable
Constantes incluidas de Explainable
Explicable::TODOS_LOS_PLANES_EJECUCIÓN, Explicable::ESTADÍSTICAS_DE_EJECUCIÓN, Explicable::PLANIFICADOR_DE_CONSULTAS
Colapso delresumen de atributos de instancia
-
#cursor ⇒ Cursor
solo lectura
privada
El cursor subyacente para esta operación.
-
#options ⇒ BSON::Document
solo lectura
Las opciones del flujo de cambios.
Atributos incluidos de Aggregation::Behavior
Colapso del resumen del método de instancia
-
#cerrar(opciones = {}) ⇒ nulo
Cerrar el flujo de cambios.
-
#cerrado? ⇒ verdadero, falso
¿Está cerrado el flujo de cambio?
-
#cursor_type ⇒ Objeto
“Los flujos de cambio son una abstracción en torno a los cursores tailable-awaitData…”.
-
#cada {|Cada|... } ⇒ Enumerador
Iterar a través de los documentos devueltos por el flujo de cambios.
-
#inicializar(vista, canalización, cambios_para, opciones = {}) ⇒ ChangeStream
constructor
Inicializar el flujo de cambios para la vista de colección, la canalización y las opciones proporcionadas.
-
#inspeccionar ⇒ Cadena
Obtenga una cadena formateada para usar en la inspección.
-
#tiempo_máximo_de_espera_ms ⇒ Entero | nulo
Devuelve el valor de la opción max_await_time_ms que se pasó a este flujo de cambios.
-
#resume_token ⇒ BSON::Document | nil
Devuelve el token de reanudación que la transmisión utilizará para reanudarse automáticamente, si existe uno.
-
#timeout_mode ⇒ Objeto
“las change streams… usan implícitamente el modo ITERATION”.
- #to_enum ⇒ Objeto
-
#try_next ⇒ BSON::Document | nil
Devuelve un documento del flujo de cambios, si hay alguno disponible.
Métodos incluidos en Retryable
#trabajador_de_lectura, #servidor_de_selección, #trabajador_de_escritura
Métodos incluidos de Aggregation::Behavior
#permitir_uso_de_disco, #explicar, #tiempo_de_espera_ms, #¿escribir?
Métodos incluidos en Loggable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Métodos incluidos en Explainable
Métodos incluidos de Iterable
Métodos incluidos desde Mongo::CursorHost
Detalles del constructor
#inicializar(vista, canalización, cambios_para, opciones = {}) ⇒ ChangeStream
Inicializar el flujo de cambios para la vista de colección, la canalización y las opciones proporcionadas.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 133 def inicializar(vista, pipeline, cambios_para, = {}) # Los cursores de cambio de flujo solo pueden ser iterables, por lo que no lo permitimos # Se debe especificar el timeout_mode. realizar_configuración(vista, , forbid: %i[ timeout_mode ]) hacer @cambios_para = cambios_para @cambiar_filtros_de_flujo = pipeline && pipeline.dup @start_after = @opciones[:inicio_después] end # El token de reanudación rastreado por el flujo de cambios, utilizado únicamente # cuando no hay cursor o no hay token de reanudación del cursor @resume_token = @start_after || @opciones[:resume_after] create_cursor! # Enviamos diferentes parámetros cuando reanudamos un flujo de cambio # en comparación con cuando enviamos la primera consulta @reanudando = true end |
Detalles de los atributos de instancia
#cursor ⇒ Cursor (solo lectura)
Este método forma parte de una API privada. Debe evitarlo si es posible, ya que podría eliminarse o modificarse en el futuro.
Devuelve el cursor subyacente para esta operación.
67 68 69 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 67 def cursor @cursor end |
#opciones ⇒ BSON::Documento (solo lectura)
Devuelve las opciones del flujo de cambios.
63 64 65 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 63 def @opciones end |
Detalles del método de instancia
#cerrar(opciones = {}) ⇒ nulo
Este método intenta cerrar el cursor utilizado por el flujo de cambios, lo que a su vez cierra el cursor del flujo de cambios del servidor. Este método ignora cualquier error que se produzca al cerrar el cursor del servidor.
Cerrar el flujo de cambios.
254 255 256 257 258 259 260 261 262 263 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 254 def cerrar(opta = {}) a no ser que ¿cerrado? begin @cursor.cerrar(opta) rescate Error::Operación fallida::Familia, Error::Error de socket, Error::Error de tiempo de espera del socket, Error::Conexión faltante # ignore end @cursor = nulo end end |
#cerrado? ⇒ verdadero, falso
¿Está cerrado el flujo de cambio?
273 274 275 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 273 def ¿cerrado? @cursor.nil? end |
#cursor_type ⇒ Objeto
“Los flujos de cambio son una abstracción en torno a los cursores tailable-awaitData…”
307 308 309 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 307 def tipo_de_cursor :tailable_await end |
#cada {|Cada|... } ⇒ Enumerador
Iterar a través de los documentos devueltos por el flujo de cambios.
Este método vuelve a intentarlo una vez por cada error en los errores reanudables (dos errores consecutivos dan como resultado que se genere el segundo error, cuyo error se recupera y restablece el recuento de errores a cero).
169 170 171 172 173 174 175 176 177 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 169 def cada propagar Detener la iteración.Nuevo Si ¿cerrado? bucle hacer Documento = try_next rendimiento Documento Si Documento end rescate Detener la iteración return yo end |
#inspeccionar ⇒ Cadena
Obtenga una cadena formateada para usar en la inspección.
285 286 287 288 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 285 def inspeccionar "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filtros=#{@change_stream_filters} " + "opciones=#{@options} token_de_currículum=#{token_de_currículum}>" end |
#tiempo_máximo_de_espera_ms ⇒ Entero | nulo
Devuelve el valor de la opción max_await_time_ms que se pasó a este flujo de cambios.
322 323 324 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 322 def tiempo máximo de espera ms [:tiempo máximo de espera ms] end |
#resume_token ⇒ BSON::Document | nil
Devuelve el token de reanudación que la transmisión utilizará para reanudarse automáticamente, si existe uno.
299 300 301 302 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 299 def resume_token cursor_resume_token = @cursor.resume_token Si @cursor cursor_resume_token || @resume_token end |
#timeout_mode ⇒ Objeto
“cambiar flujos…utilizar implícitamente el modo ITERACIÓN”
314 315 316 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 314 def timeout_mode :iteración end |
#to_enum ⇒ Objeto
227 228 229 230 231 232 233 234 235 236 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 227 def to_enum enum = Super enum.Enviar(:conjunto_de_variables_de_instancia, '@obj', yo) clase << enum def try_next @obj.try_next end end enum end |
#try_next ⇒ BSON::Document | nil
Devuelve un documento del flujo de cambios, si hay alguno disponible.
Vuelve a intentarlo una vez en caso de error reanudable.
Genera StopIteration si se cierra el flujo de cambios.
Este método esperará hasta max_await_time_ms milisegundos por cambios del servidor y, si no se reciben cambios, devolverá nil.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 191 def try_next ¡recrear_cursor! Si @tiempo_de_salida propagar Detener la iteración.Nuevo Si ¿cerrado? begin doc = @cursor.try_next rescate Mongo::Error => e # "Si una próxima llamada falla con un error de tiempo de espera, los conductores NO DEBEN # invalidar el flujo de cambios. La subsiguiente llamada siguiente DEBE # realizar un intento de reanudación para establecer un nuevo flujo de cambio en el # servidor..." # # Sin embargo, los SocketTimeoutErrors son TimeoutErrors, pero también son # flujo de cambios reanudable. Para preservar el comportamiento existente (especificado), # Solo contamos los tiempos de espera cuando el error no es también #cambio-de-flujo-reanudable. @tiempo_de_salida = e.is_a?(Mongo::Error::Error de tiempo de espera) && !e.¿cambio_de_flujo_reanudable? propagar a no ser que @tiempo_de_salida || e.¿cambio_de_flujo_reanudable? @resume_token = @cursor.resume_token propagar e Si @tiempo_de_salida ¡recrear_cursor!(@cursor.context) reintentar end # Necesitamos verificar que cada documento tenga un _id, por lo que # tener un token de currículum para trabajar Si doc && doc['_id'].nil? propagar Error::MissingResumeToken end doc end |