Clase: Mongo::Collection::View::ChangeStream

Hereda:
Agregación
  • Objeto
Mostrar todo
Incluye:
Agregación::Comportamiento, Reintentable
Definido en:

lib/mongo/colección/vista/cambio_flujo.rb, lib/mongo/colección/vista/cambio_flujo/retryable.rb

Overview

Nota:

Sólo disponible en versiones de servidor 3.6 y superiores.

Nota:

Los flujos de cambios no funcionan correctamente con JRuby debido al problema documentado aquí: github.com/jruby/jruby/issues/.4212 En concreto, JRuby evalúa con avidez #next en un enumerador en un hilo verde en segundo plano; por lo tanto, al llamar a #next en el flujo de cambios, se ejecutará getMores en un bucle en segundo plano.

Proporciona el comportamiento en torno a la etapa de canalización '$changeStream' en el marco de agregación. Al especificar esta etapa, los usuarios pueden solicitar el envío de notificaciones para todos los cambios en una colección o base de datos específica.

Desde:

  • 2.5.0

Definido en el espacio de nombres

Modules: Reutilizable

Colapso delresumen constante

DOCUMENTO_COMPLETO_PREDETERMINADO =

Devuelve el valor predeterminado de la opción fullDocument.

Devuelve:

  • (Cadena) -

    El valor predeterminado de la opción fullDocument.

Desde:

  • 2.5.0

' pordefecto '.freeze
DATABASE =

Devoluciones Se utiliza para indicar que el flujo de cambios debe escuchar los cambios en toda la base de datos en lugar de solo en la colección.

Devuelve:

  • ( Símbolo) -

    Se utiliza para indicar que el flujo de cambios debe escuchar los cambios en toda la base de datos en lugar de solo en la colección.

Desde:

  • 2.6.0

:database
CLUSTER =

Devuelve Indica que el flujo de cambios debe escuchar los cambios en todo el clúster y no solo en la colección.

Devuelve:

  • ( Símbolo) -

    Se utiliza para indicar que el flujo de cambios debe escuchar los cambios en todo el clúster en lugar de solo en la colección.

Desde:

  • 2.6.0

:grupo

Constantes incluidas desde Loggable

Registrable::PREFIX

Constantes incluidas de Explainable

Explicable::TODOS_LOS_PLANES_EJECUCIÓN, Explicable::ESTADÍSTICAS_DE_EJECUCIÓN, Explicable::PLANIFICADOR_DE_CONSULTAS

Colapso delresumen de atributos de instancia

Atributos incluidos de Aggregation::Behavior

#view

Colapso del resumen del método de instancia

Métodos incluidos en Retryable

#trabajador_de_lectura, #servidor_de_selección, #trabajador_de_escritura

Métodos incluidos de Aggregation::Behavior

#permitir_uso_de_disco, #explicar, #tiempo_de_espera_ms, #¿escribir?

Métodos incluidos en Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Métodos incluidos en Explainable

#explicar

Métodos incluidos de Iterable

#cerrar_consulta

Métodos incluidos desde Mongo::CursorHost

#validate_timeout_mode!

Detalles del constructor

#inicializar(vista, canalización, cambios_para, opciones = {}) ⇒ ChangeStream

Inicializar el flujo de cambios para la vista de colección, la canalización y las opciones proporcionadas.

Ejemplos:

Crear la nueva vista de flujo de cambios.

ChangeStream.new(view, pipeline, options)

Parámetros:

  • vista (Colección:: Ver)

    La vista de la colección.

  • pipeline (Matriz<Hash>)

    La tubería de operadores para filtrar las notificaciones de cambio.

  • opciones (Hash) (predeterminado: {})

    Las opciones del flujo de cambios.

Opciones Hash(opciones):

  • :documento_completo (Cadena)

    Valores permitidos: nulo, 'predeterminado', 'updateLookup', 'whenAvailable', 'obligatorio'.

    El valor predeterminado es no enviar ningún valor (es decir, nulo), lo que equivale a "predeterminado". Por defecto, la notificación de cambios para actualizaciones parciales incluirá un delta que describe los cambios en el documento.

    Cuando se configura en 'updateLookup', la notificación de cambio para actualizaciones parciales incluirá tanto un delta que describe los cambios en el documento como una copia del documento completo que se modificó algún tiempo después de que ocurrió el cambio.

    Cuando se establece en 'whenAvailable', configura el flujo de cambios para devolver la imagen posterior del documento modificado para eventos de reemplazo y actualización si la imagen posterior para este evento está disponible.

    Cuando se establece en 'obligatorio', el mismo comportamiento que 'whenAvailable' excepto que se genera un error si la imagen posterior no está disponible.

  • :documento completo antes del cambio (Cadena)

    Valores permitidos: nulo, 'whenAvailable', 'required', 'off'.

    El valor predeterminado es no enviar un valor (es decir, nulo), lo que equivale a "desactivado".

    Cuando se establece en 'whenAvailable', configura el flujo de cambios para devolver la imagen previa del documento modificado para eventos de cambio de reemplazo, actualización y eliminación si está disponible.

    Cuando se establece en 'obligatorio', el mismo comportamiento que 'whenAvailable' excepto que se genera un error si la imagen previa no está disponible.

  • :resume_after (BSON::Documento, Hash)

    Especifica el punto de partida lógico para el nuevo flujo de cambios.

  • :tiempo máximo de espera ms (Entero)

    La cantidad máxima de tiempo que el servidor debe esperar nuevos documentos para satisfacer una consulta de flujo de cambios.

  • :tamaño_del_lote (Entero)

    El número de documentos a devolver por agrupar.

  • :colación (BSON::Documento, Hash)

    La intercalación a utilizar.

  • :start_at_operation_time (BSON::Marca de tiempo)

    Solo se devuelven los cambios ocurridos en la fecha y hora especificadas o después. Cualquier comando ejecutado en el servidor devolverá una hora de clúster que se puede usar aquí. Solo se reconoce en las versiones de servidor 4.0y posteriores.

  • :inicio_después (Bson::Documento, Hash)

    Similar a :resume_after, esta opción toma un token de reanudación e inicia un nuevo flujo de cambios que devuelve la primera notificación después del token. Esto permitirá a los usuarios ver colecciones eliminadas y recreadas, o colecciones renombradas recientemente, sin perder ninguna notificación.

  • :comment (Objeto)

    Un comentario proporcionado por el usuario para adjuntar a este comando.

  • :mostrar eventos expandidos (Booleano)

    Permite que el servidor envíe la lista ampliada de eventos del flujo de cambios. La lista de eventos adicionales incluidos con este conjunto de indicadores es: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection y refineCollectionShardKey.

    El servidor informará un error si se especifican 'startAfter' y 'resumeAfter'.

Desde:

  • 2.5.0



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 133

def inicializar(vista, pipeline, cambios_para, opciones = {})
  # Los cursores de cambio de flujo solo pueden ser iterables, por lo que no lo permitimos
  # Se debe especificar el timeout_mode.
  realizar_configuración(vista, opciones, forbid: %i[ timeout_mode ]) hacer
    @cambios_para = cambios_para
    @cambiar_filtros_de_flujo = pipeline && pipeline.dup
    @start_after = @opciones[:inicio_después]
  end

  # El token de reanudación rastreado por el flujo de cambios, utilizado únicamente
  # cuando no hay cursor o no hay token de reanudación del cursor
  @resume_token = @start_after || @opciones[:resume_after]

  create_cursor!

  # Enviamos diferentes parámetros cuando reanudamos un flujo de cambio
  # en comparación con cuando enviamos la primera consulta
  @reanudando = true
end

Detalles de los atributos de instancia

#cursorCursor (solo lectura)

Este método forma parte de una API privada. Debe evitarlo si es posible, ya que podría eliminarse o modificarse en el futuro.

Devuelve el cursor subyacente para esta operación.

Devuelve:

  • (Cursor)

    El cursor subyacente para esta operación

Desde:

  • 2.5.0



67
68
69
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 67

def cursor
  @cursor
end

#opcionesBSON::Documento (solo lectura)

Devuelve las opciones del flujo de cambios.

Devuelve:

  • (BSON::Documento)

    Las opciones del flujo de cambios.

Desde:

  • 2.5.0



63
64
65
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 63

def opciones
  @opciones
end

Detalles del método de instancia

#cerrar(opciones = {}) ⇒ nulo

Nota:

Este método intenta cerrar el cursor utilizado por el flujo de cambios, lo que a su vez cierra el cursor del flujo de cambios del servidor. Este método ignora cualquier error que se produzca al cerrar el cursor del servidor.

Cerrar el flujo de cambios.

Ejemplos:

Cerrar el flujo de cambios.

stream.close

Devuelve:

  • (nil)

    Siempre nulo.

Desde:

  • 2.5.0



254
255
256
257
258
259
260
261
262
263
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 254

def cerrar(opta = {})
  a no ser que ¿cerrado?
    begin
      @cursor.cerrar(opta)
    rescate Error::Operación fallida::Familia, Error::Error de socket, Error::Error de tiempo de espera del socket, Error::Conexión faltante
      # ignore
    end
    @cursor = nulo
  end
end

#cerrado?verdadero, falso

¿Está cerrado el flujo de cambio?

Ejemplos:

Determinar si el flujo de cambio está cerrado.

stream.closed?

Devuelve:

  • (verdadero,falso)

    Si el flujo de cambio está cerrado.

Desde:

  • 2.5.0



273
274
275
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 273

def ¿cerrado?
  @cursor.nil?
end

#cursor_typeObjeto

“Los flujos de cambio son una abstracción en torno a los cursores tailable-awaitData…”

Devuelve:

  • :tailable_await

Desde:

  • 2.5.0



307
308
309
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 307

def tipo_de_cursor
  :tailable_await
end

#cada {|Cada|... } ⇒ Enumerador

Iterar a través de los documentos devueltos por el flujo de cambios.

Este método vuelve a intentarlo una vez por cada error en los errores reanudables (dos errores consecutivos dan como resultado que se genere el segundo error, cuyo error se recupera y restablece el recuento de errores a cero).

Ejemplos:

Iterar a través del flujo de documentos.

stream.each do |document|
  p document
end

Parámetros de rendimiento:

  • Cada (BSON::Documento)

    documento de flujo de cambios

Devuelve:

  • (Enumerator)

    El enumerador.

Desde:

  • 2.5.0



169
170
171
172
173
174
175
176
177
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 169

def cada
  propagar Detener la iteración.Nuevo Si ¿cerrado?
  bucle hacer
    Documento = try_next
    rendimiento Documento Si Documento
  end
rescate Detener la iteración
  return yo
end

#inspeccionarCadena

Obtenga una cadena formateada para usar en la inspección.

Ejemplos:

Inspeccionar el objeto de flujo de cambios.

stream.inspect

Devuelve:

  • (Cadena) -

    La inspección del flujo de cambios.

Desde:

  • 2.5.0



285
286
287
288
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 285

def inspeccionar
  "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filtros=#{@change_stream_filters} " +
    "opciones=#{@options} token_de_currículum=#{token_de_currículum}>"
end

#tiempo_máximo_de_espera_msEntero | nulo

Devuelve el valor de la opción max_await_time_ms que se pasó a este flujo de cambios.

Devuelve:

  • (Integer | nil)

    el valor max_await_time_ms

Desde:

  • 2.5.0



322
323
324
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 322

def tiempo máximo de espera ms
  opciones[:tiempo máximo de espera ms]
end

#resume_tokenBSON::Document | nil

Devuelve el token de reanudación que la transmisión utilizará para reanudarse automáticamente, si existe uno.

Ejemplos:

Obtenga el token de reanudación del flujo de cambio.

stream.resume_token

Devuelve:

  • (BSON::Document | nil)

    El token de reanudación del flujo de cambio.

Desde:

  • 2.10.0



299
300
301
302
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 299

def resume_token
  cursor_resume_token = @cursor.resume_token Si @cursor
  cursor_resume_token || @resume_token
end

#timeout_modeObjeto

“cambiar flujos…utilizar implícitamente el modo ITERACIÓN”

Devuelve:

  • :iteración

Desde:

  • 2.5.0



314
315
316
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 314

def timeout_mode
  :iteración
end

#to_enumObjeto

Desde:

  • 2.5.0



227
228
229
230
231
232
233
234
235
236
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 227

def to_enum
  enum = Super
  enum.Enviar(:conjunto_de_variables_de_instancia, '@obj', yo)
  clase << enum
    def try_next
      @obj.try_next
    end
  end
  enum
end

#try_nextBSON::Document | nil

Devuelve un documento del flujo de cambios, si hay alguno disponible.

Vuelve a intentarlo una vez en caso de error reanudable.

Genera StopIteration si se cierra el flujo de cambios.

Este método esperará hasta max_await_time_ms milisegundos por cambios del servidor y, si no se reciben cambios, devolverá nil.

Devuelve:

  • (BSON::Document | nil)

    Un documento de flujo de cambios.

Aumentos:

  • (Detener la iteración)

Desde:

  • 2.6.0



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 191

def try_next
  ¡recrear_cursor! Si @tiempo_de_salida

  propagar Detener la iteración.Nuevo Si ¿cerrado?

  begin
    doc = @cursor.try_next
  rescate Mongo::Error => e
    # "Si una próxima llamada falla con un error de tiempo de espera, los conductores NO DEBEN
    # invalidar el flujo de cambios. La subsiguiente llamada siguiente DEBE
    # realizar un intento de reanudación para establecer un nuevo flujo de cambio en el
    # servidor..."
    #
    # Sin embargo, los SocketTimeoutErrors son TimeoutErrors, pero también son
    # flujo de cambios reanudable. Para preservar el comportamiento existente (especificado),
    # Solo contamos los tiempos de espera cuando el error no es también
    #cambio-de-flujo-reanudable.
    @tiempo_de_salida = e.is_a?(Mongo::Error::Error de tiempo de espera) && !e.¿cambio_de_flujo_reanudable?

    propagar a no ser que @tiempo_de_salida || e.¿cambio_de_flujo_reanudable?

    @resume_token = @cursor.resume_token
    propagar e Si @tiempo_de_salida

    ¡recrear_cursor!(@cursor.context)
    reintentar
  end

  # Necesitamos verificar que cada documento tenga un _id, por lo que
  # tener un token de currículum para trabajar
  Si doc && doc['_id'].nil?
    propagar Error::MissingResumeToken
  end
  doc
end