Clase: Mongo::Collection::View::ChangeStream
- Hereda:
-
Agregación
- Objeto
- Agregación
- Mongo::Colección::Vista::Flujo de cambios
- Incluye:
- Agregación::Comportamiento, Reintentable
- Definido en:
- lib/mongo/colección/view/change_stream.rb,
lib/mongo/colección/view/change_stream/retryable.rb
Overview
Disponible solo en versiones de servidor 3.6 y superiores.
ChangeStreams no funcionan correctamente con JRuby debido al problema documentado aquí: github.com/jruby/jruby/issues/4212. Concretamente, JRuby evalúa de manera entusiasta #siguiente en un Enumerator en un subproceso green en segundo plano, por lo tanto, llamar a #siguiente en el flujo de cambios provocará que getMore se llame en un bucle en segundo plano.
Proporciona comportamiento en torno a una etapa de pipeline ‘$changeStream` en el framework de agregación. Especificar esta etapa permite a los usuarios solicitar que se envíen notificaciones por todos los cambios en una colección o base de datos en particular.
Definido bajo Namespace
Modules: Reintentar
Colapso delresumen constante
- FULL_DOCUMENT_DEFAULT =
Devuelve el valor por defecto de la opción fullDocument.
'por defecto'.freeze
- DATABASE =
Devoluciones Se utiliza para indicar que el flujo de cambios debe escuchar los cambios en toda la base de datos en lugar de solo en la colección.
:database- CLUSTER =
Devuelve Indica que el flujo de cambios debe escuchar los cambios en todo el clúster y no solo en la colección.
:grupo
Constantes incluidas desde Loggable
Constantes incluidas de Explicable
Explainable::ALL_PLANS_EXECUTION, Explainable::EXECUTION_STATS, Explainable::QUERY_PLANNER
Resumen de atributos de la instancia colapsar
-
#cursor ⇒ Cursor
Solo lectura
privado
El cursor subyacente para esta operación.
-
#options ⇒ BSON::Document
Solo lectura
Las opciones de flujo de cambios.
Atributos incluidos de Agregación::Comportamiento
Resumen del método de instancia colapsar
-
#close(opts = {}) ⇒ nil
Cerrar el flujo de cambios.
-
#¿cerrado? ⇒ verdadero, falso
¿Está cerrado el flujo de cambios?
-
#cursor_type ⇒ Objeto
"los flujos de cambios son una abstracción en torno a los cursores tailable-awaitData…".
-
#each {|Each| ... } ⇒ Enumerator
Itera a través de los documentos devueltos por el flujo de cambios.
-
#initialize(view, pipeline, changes_for, options = {}) ⇒ ChangeStream
constructor
Inicializa el flujo de cambios para la vista de colección, el pipeline y las opciones proporcionadas.
-
#inspeccionar ⇒ Cadena
Obtenga una cadena formateada para usar en la inspección.
-
#max_await_time_ms ⇒ Entero | nil
Devuelve el valor de la opción max_await_time_ms que se pasó a este flujo de cambios.
-
#resume_token ⇒ BSON::Document | nil
Devuelve el token de reanudación que el flujo utilizará para reanudarse automáticamente, si existe uno.
-
#timeout_mode ⇒ Objeto
“las change streams… usan implícitamente el modo ITERATION”.
- #to_enum ⇒ objeto
-
#try_next ⇒ BSON::Document | nil
Devuelve un documento del flujo de cambios, si hay alguno disponible.
Métodos incluidos en Retryable
#read_worker, #select_server, #write_worker
Métodos incluidos de Aggregation::Behavior
#allow_disk_use, #explain, #timeout_ms, #guardar?
Métodos incluidos desde Registrable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Métodos incluidos en Explicable
Métodos incluidos desde Iterable
Métodos incluidos desde Mongo::CursorHost
Detalles del constructor
#inicializar(vista, canalización, cambios_para, opciones = {}) ⇒ ChangeStream
Inicializa el flujo de cambios para la vista de colección, el pipeline y las opciones proporcionadas.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 133 def inicializar(vista, pipeline, cambios_para, = {}) # flujo de cambios cursors can only be :iterable, so we don't allow # Se debe especificar el timeout_mode. performar_setup(vista, , forbid: %i[ timeout_mode ]) hacer @changes_for = cambios_para @change_stream_filters = pipeline && pipeline.dup @start_after = @options[INICIO_DESPUÉS] end # El token de reanudación rastreado por el flujo de cambios, utilizado únicamente # cuando no hay cursor, o no hay token de reanudación de cursor @resume_token = @start_after || @options[:resume_after] create_cursor! # Enviamos diferentes parámetros cuando reanudamos un flujo de cambios # en comparación con cuando enviamos la primera query @reanudando = true end |
Detalles de los atributos de instancia
#cursor ⇒ Cursor (solo lectura)
Este método forma parte de una API privada. Debe evitarlo si es posible, ya que podría eliminarse o modificarse en el futuro.
Retorna el cursor subyacente para esta operación.
67 68 69 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 67 def cursor @cursor end |
#opciones ⇒ BSON::Documento (solo lectura)
Devuelve las opciones del flujo de cambios.
63 64 65 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 63 def @options end |
Detalles del método de instancia
#close(opts = {}) ⇒ nil(nulo)
Este método intenta cerrar el cursor utilizado por el flujo de cambios, lo que a su vez cierra el cursor del flujo de cambios del servidor. Este método ignora cualquier error que se produzca al cerrar el cursor del servidor.
Cerrar el flujo de cambios.
254 255 256 257 258 259 260 261 262 263 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 254 def Cerrar(opta = {}) a menos que ¿Cerrado? begin @cursor.Cerrar(opta) rescate Error::OperationFailure::Familia, Error::Error de socket, Error::SocketTimeoutError, Error::MissingConnection # ignore end @cursor = nulo end end |
#cerrado? ⇒ verdadero, falso
¿Está cerrado el flujo de cambio?
273 274 275 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 273 def ¿Cerrado? @cursor.nil? end |
#tipo_cursor ⇒ Objeto
“Los flujos de cambio son una abstracción en torno a los cursores tailable-awaitData…”
307 308 309 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 307 def cursor_type tailable_await end |
#each {|Each| ... } ⇒ Enumerator
Itera a través de los documentos devueltos por el flujo de cambios.
Este método vuelve a intentar una vez por error en errores reanudables (dos errores consecutivos dan como resultado que se produzca el segundo error; un error del que se recupera devuelve el recuento de errores a cero).
169 170 171 172 173 174 175 176 177 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 169 def cada propagar Detener la iteración.Nuevo si ¿Cerrado? bucle hacer Documento = try_next rendimiento Documento si Documento end rescate Detener la iteración return sí mismo end |
#inspect ⇒ String
Obtenga una cadena formateada para usar en la inspección.
285 286 287 288 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 285 def inspeccionar "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " + "opciones=#{@options} token_de_currículum=#{token_de_currículum}>" end |
#tiempo_máximo_de_espera_ms ⇒ Entero | nulo
Devuelve el valor de la opción max_await_time_ms que se pasó a este flujo de cambios.
322 323 324 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 322 def max_await_time_ms [; tiempo_máximo_espera_ms] end |
#resume_token ⇒ BSON::Document | nil
Devuelve el token de reanudación que el flujo utilizará para reanudarse automáticamente, si existe uno.
299 300 301 302 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 299 def resume_token cursor_resume_token = @cursor.resume_token si @cursor cursor_resume_token || @resume_token end |
#timeout_mode ⇒ objeto
“los flujos de cambio…usan implícitamente el modo de ITERACIÓN”
314 315 316 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 314 def timeout_mode :iteration end |
#to_enum ⇒ Objeto
227 228 229 230 231 232 233 234 235 236 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 227 def to_enum enum = super enum.enviar(:conjunto_de_variables_de_instancia, '@obj', sí mismo) clase << enum def try_next @obj.try_next end end enum end |
#try_next ⇒ BSON::Document | nil
Devuelve un documento del flujo de cambios, si hay alguno disponible.
Reintenta una vez en caso de error reanudable.
Lanza StopIteration si se cierra el flujo de cambios.
Este método esperará hasta max_await_time_ms milisegundos para cambios desde el servidor, y si no se reciben cambios, devolverá nil.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 191 def try_next recreate_cursor! si @tiempo_de_salida propagar Detener la iteración.Nuevo si ¿Cerrado? begin doc = @cursor.try_next rescate Mongo::Error => e # «Si una llamada siguiente falla con un error de timeout, los controladores NO DEBEN # invalidar el flujo de cambios. La subsiguiente llamada siguiente DEBE # realizar un intento de reanudación para establecer un nuevo flujo de cambios en el # servidor..." # # Sin embargo, los SocketTimeoutErrors son TimeoutErrors, pero también son # flujo de cambios-resumable. Para conservar el comportamiento existente (especificado), # Solo contamos los tiempos de espera cuando el error no es también # flujo de cambios reanudable. @tiempo_de_salida = e.is_a?(Mongo::Error::TimeoutError) && !e.¿streaming de cambios reanudable? propagar a menos que @tiempo_de_salida || e.¿streaming de cambios reanudable? @resume_token = @cursor.resume_token propagar e si @tiempo_de_salida recreate_cursor!(@cursor.context) reintentar end # Necesitamos verificar que cada documento tenga un _id, por lo que # tener un token de currículum para trabajar si doc && doc['_id'].nil? propagar Error::MissingResumeToken end doc end |