Clase: Mongo::Collection::View::ChangeStream
- Hereda:
-
Agregación
- Objeto
- Agregación
- Mongo::colección::View::ChangeStream
- Incluye:
- Aggregación::Comportamiento, Reintentable
- Definido en:
- lib/mongo/colección/view/change_stream.rb,
lib/mongo/colección/view/change_stream/retryable.rb
Overview
Proporciona un comportamiento en torno a una etapa de la pipeline $changeStream en el framework de agregación. Especificar esta etapa permite a los usuarios solicitar que se envíen notificaciones sobre todos los cambios en una colección o base de datos en particular.
Definido bajo Namespace
Modules: Reintentar
Resumen de constantes colapsar
- FULL_DOCUMENT_DEFAULT =
Devuelve el valor por defecto de la opción fullDocument.
'predeterminado'- DATABASE =
Se utiliza para indicar que el flujo de cambios debería escuchar los cambios en toda la base de datos en lugar de solo en la colección.
:database- clúster =
Devuelve Indica que el flujo de cambios debe escuchar los cambios en todo el clúster y no solo en la colección.
clúster
Constantes incluidas desde Loggable
Constantes incluidas de Explicable
Explainable::ALL_PLANS_EXECUTION, Explainable::EXECUTION_STATS, Explainable::QUERY_PLANNER
Resumen de atributos de la instancia colapsar
-
#cursor ⇒ Cursor
Solo lectura
privado
El cursor subyacente para esta operación.
-
#options ⇒ BSON::Document
Solo lectura
Las opciones de flujo de cambios.
Atributos incluidos de Agregación::Comportamiento
Resumen del método de instancia colapsar
-
#close(opts = {}) ⇒ nil
Cierre el flujo de cambios.
-
#¿cerrado? ⇒ verdadero, falso
¿Está cerrado el flujo de cambios?
-
#cursor_type ⇒ Objeto
“los flujos de cambios son una abstracción en torno a cursores tailable-awaitData...”.
-
#each {|Each| ... } ⇒ Enumerator
Itera a través de los documentos devueltos por el flujo de cambios.
-
#initialize(view, pipeline, changes_for, options = {}) ⇒ ChangeStream
constructor
Inicializa el flujo de cambios para la vista de colección, el pipeline y las opciones proporcionadas.
-
#inspect ⇒ string
Obtén una string formateada para usar en la inspección.
-
#max_await_time_ms ⇒ Entero | nil
Devuelve el valor de la opción max_await_time_ms que fue pasada a este flujo de cambios.
-
#refresh_timeout! ⇒ Object
privado
Actualiza el tiempo de espera de CSOT para la siguiente iteración.
-
#resume_token ⇒ BSON::Document | nil
Devuelve el token de reanudación que el flujo utilizará para reanudarse automáticamente, si existe uno.
-
#timeout_mode ⇒ Objeto
"cambiar flujos...usar implícitamente el modo ITERACIÓN".
- #to_enum ⇒ objeto
-
#try_next ⇒ BSON::Document | nil
Devuelve un documento del flujo de cambios, si hay alguno disponible.
Métodos incluidos de Retryable
#read_worker, #select_server, #with_overload_retry, #write_worker
Métodos incluidos de Aggregación::Comportamiento
#allow_disk_use, #explain, #timeout_ms, #guardar?
Métodos incluidos desde Registrable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Métodos incluidos en Explicable
Métodos incluidos desde Iterable
Métodos incluidos de Mongo::CursorHost
Detalles del Constructor
#initialize(view, pipeline, changes_for, options = {}) ⇒ ChangeStream
Inicializa el flujo de cambios para la vista de colección, el pipeline y las opciones proporcionadas.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 133 def inicializar(vista, pipeline, cambios para, = {}) # flujo de cambios cursors can only be :iterable, so we don't allow # Se debe especificar el modo de tiempo de espera. performar_setup(vista, , forbid: %i[ timeout_mode ]) hacer @changes_for = cambios para @change_stream_filters = pipeline && pipeline.dup @start_after = @options[INICIO_DESPUÉS] end # El token de reanudación rastreado por el flujo de cambios, solo se utiliza # cuando no hay cursor, o no hay token de reanudación de cursor @resume_token = @start_after || @options[:resume_after] create_cursor! # Enviamos diferentes parámetros cuando reanudamos un flujo de cambios # en comparación con cuando enviamos la primera query @resumiendo = true end |
Detalles de atributo de instancias
#cursor ⇒ Cursor (solo lectura)
Este método es parte de una API privada. Se debe evitar el uso de este método si es posible, ya que podría eliminarse o modificarse en el futuro.
Retorna el cursor subyacente para esta operación.
57 58 59 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 57 def cursor @cursor end |
#opciones ⇒ BSON::Documento (solo lectura)
Devuelve las opciones del flujo de cambios.
53 54 55 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 53 def @options end |
Detalles del método de instancia
#close(opts = {}) ⇒ nil(nulo)
Este método intenta cerrar el cursor utilizado por el flujo de cambios, que a su vez cierra el cursor del flujo de cambios del lado del servidor. Este método ignora cualquier error que ocurra al cerrar el cursor del lado del servidor.
Cierre el flujo de cambios.
254 255 256 257 258 259 260 261 262 263 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 254 def Cerrar(opciones = {}) return si ¿Cerrado? begin @cursor.Cerrar(opciones) rescate Error::OperationFailure::Familia, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection # ignore end @cursor = nulo end |
#cerrado? ⇒ verdadero, falso
¿Está cerrado el flujo de cambios?
273 274 275 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 273 def ¿Cerrado? @cursor.nil? end |
#tipo_cursor ⇒ Objeto
"los flujos de cambios son una abstracción en torno a cursores tailables-awaitData..."
307 308 309 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 307 def cursor_type tailable_await end |
#each {|Each| ... } ⇒ Enumerator
Itera a través de los documentos devueltos por el flujo de cambios.
Este método vuelve a intentar una vez por error en errores reanudables (dos errores consecutivos dan como resultado que se produzca el segundo error; un error del que se recupera devuelve el recuento de errores a cero).
169 170 171 172 173 174 175 176 177 178 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 169 def cada propagar Detener iteración.Nuevo si ¿Cerrado? bucle hacer Documento = try_next rendimiento Documento si Documento end rescate Detener iteración sí mismo end |
#inspect ⇒ String
Obtén una string formateada para usar en la inspección.
285 286 287 288 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 285 def inspeccionar "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " + "opciones=#{@opciones} resume_token=#{resume_token}>" end |
#max_await_time_ms ⇒ Integer | nil
Devuelve el valor de la opción max_await_time_ms que fue pasada a este flujo de cambios.
322 323 324 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 322 def max_await_time_ms [; tiempo_máximo_espera_ms] end |
#refresh_timeout! ⇒ Object
Este método es parte de una API privada. Se debe evitar el uso de este método si es posible, ya que podría eliminarse o modificarse en el futuro.
Actualiza el tiempo de espera de CSOT para la siguiente iteración. ¡Delegados al refresh_timeout del cursor subyacente! método para que cada llamada a try_next comience con un plazo de tiempo de espera nuevo, como lo requiere la especificación CSOT para cursores tailable awaitData.
65 66 67 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 65 def refresh_timeout! @cursor&.refresh_timeout! end |
#resume_token ⇒ BSON::Document | nil
Devuelve el token de reanudación que el flujo utilizará para reanudarse automáticamente, si existe uno.
299 300 301 302 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 299 def resume_token cursor_resume_token = @cursor.resume_token si @cursor cursor_resume_token || @resume_token end |
#timeout_mode ⇒ objeto
"flujos de cambio...utilizan implícitamente el modo ITERACIÓN"
314 315 316 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 314 def timeout_mode :iteration end |
#to_enum ⇒ Objeto
227 228 229 230 231 232 233 234 235 236 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 227 def to_enum enum = super enum.enviar(set_variable_de_instancia, '@obj', sí mismo) clase << enum def try_next @obj.try_next end end enum end |
#try_next ⇒ BSON::Document | nil
Devuelve un documento del flujo de cambios, si hay alguno disponible.
Reintenta una vez en caso de error reanudable.
Lanza StopIteration si se cierra el flujo de cambios.
Este método esperará hasta max_await_time_ms milisegundos para cambios desde el servidor, y si no se reciben cambios, devolverá nil.
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 192 def try_next recreate_cursor! si @timed_out propagar Detener iteración.Nuevo si ¿Cerrado? begin doc = @cursor.try_next rescate mongo::Error => e # «Si una llamada siguiente falla con un error de timeout, los controladores NO DEBEN # invalidar el flujo de cambios. La subsiguiente llamada siguiente DEBE # realizar un intento de reanudación para establecer un nuevo flujo de cambios en el # servidor..." # # Sin embargo, SocketTimeoutErrors son TimeoutErrors, pero también son # flujo de cambios-resumable. Para conservar el comportamiento existente (especificado), # Solo contamos los tiempos de espera cuando el error tampoco es # flujo de cambios reanudable. @timed_out = e.is_a?(mongo::Error::TimeoutError) && !e.¿streaming de cambios reanudable? propagar a menos que @timed_out || e.¿streaming de cambios reanudable? @resume_token = @cursor.resume_token propagar e si @timed_out recreate_cursor!(@cursor.context) reintentar end # Necesitamos verificar que cada documento tenga un _id, así que # tener un token de currículum con el que trabajar propagar Error::MissingResumeToken si doc && doc['_id'].nil? doc end |