Clase: Mongo::Collection::View::ChangeStream

Hereda:
Agregación
  • Objeto
Mostrar todo
Incluye:
Aggregación::Comportamiento, Reintentable
Definido en:
lib/mongo/colección/view/change_stream.rb,
lib/mongo/colección/view/change_stream/retryable.rb

Overview

Proporciona un comportamiento en torno a una etapa de la pipeline $changeStream en el framework de agregación. Especificar esta etapa permite a los usuarios solicitar que se envíen notificaciones sobre todos los cambios en una colección o base de datos en particular.

Desde:

  • 2.5.0

Definido bajo Namespace

Modules: Reintentar

Resumen de constantes colapsar

FULL_DOCUMENT_DEFAULT =

Devuelve el valor por defecto de la opción fullDocument.

Devuelve:

  • (string)

    Valor por defecto de la opción fullDocument.

Desde:

  • 2.5.0

'predeterminado'
DATABASE =

Se utiliza para indicar que el flujo de cambios debería escuchar los cambios en toda la base de datos en lugar de solo en la colección.

Devuelve:

  • (Símbolo)

    Se utiliza para indicar que el flujo de cambios debería escuchar los cambios en toda la base de datos, en lugar de solo en la colección.

Desde:

  • 2.6.0

:database
clúster =

Devuelve Indica que el flujo de cambios debe escuchar los cambios en todo el clúster y no solo en la colección.

Devuelve:

  • (Símbolo)

    Se usa para indicar que el flujo de cambios debe monitorear cambios en todo el clúster y no solo en la colección.

Desde:

  • 2.6.0

clúster

Constantes incluidas desde Loggable

Loggable::PREFIX

Constantes incluidas de Explicable

Explainable::ALL_PLANS_EXECUTION, Explainable::EXECUTION_STATS, Explainable::QUERY_PLANNER

Resumen de atributos de la instancia colapsar

Atributos incluidos de Agregación::Comportamiento

#view

Resumen del método de instancia colapsar

Métodos incluidos de Retryable

#read_worker, #select_server, #with_overload_retry, #write_worker

Métodos incluidos de Aggregación::Comportamiento

#allow_disk_use, #explain, #timeout_ms, #guardar?

Métodos incluidos desde Registrable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Métodos incluidos en Explicable

#explain

Métodos incluidos desde Iterable

#close_query

Métodos incluidos de Mongo::CursorHost

#validate_timeout_mode!

Detalles del Constructor

#initialize(view, pipeline, changes_for, options = {}) ⇒ ChangeStream

Inicializa el flujo de cambios para la vista de colección, el pipeline y las opciones proporcionadas.

Ejemplos:

Crea la nueva vista de flujo de cambios.

ChangeStream.new(view, pipeline, options)

Parámetros:

  • vista (Collection::View)

    La vista de colección.

  • pipeline (arreglo<Hash>)

    La pipeline de operadores para filtrar las notificaciones de cambio.

  • opciones (encriptada) (por defecto: {})

    Las opciones de flujo de cambios.

Opciones Hash (options):

  • documento_completo (string)

    Valores permitidos: nil, 'por defecto', 'updateLookup', 'cuando esté disponible', 'requerido'.

    El valor por defecto es no enviar un valor (es decir, nil), el cual es equivalente a 'por defecto'. Por defecto, la notificación de cambio para actualizaciones parciales incluirá un delta que describa los cambios en el documento.

    Cuando se establece en 'updateLookup', la notificación de cambio para actualizaciones parciales incluirá tanto un delta que describe los cambios en el documento como una copia de todo el documento que se modificó en algún momento después de que se haya producido el cambio.

    Cuando se configura en "cuando esté disponible", configura el flujo de cambios para devolver la post-imagen del documento modificado para eventos de cambio de reemplazo y actualización si la post-imagen para este evento está disponible.

    Cuando se establece en 'obligatorio', el comportamiento es el mismo que el de 'whenAvailable', excepto que se produce un error si la imagen de publicación no está disponible.

  • documento_completo_antes_de_modificar (string)

    Valores permitidos: nil, 'whenAvailable', 'required', 'off'.

    El valor por defecto es no enviar un valor (es decir, nil), que es equivalente a 'apagado'.

    Cuando se establece en 'whenAvailable', configura el flujo de cambios para devolver la preimagen del documento modificado para los eventos de cambio de reemplazo, actualización y eliminación si está disponible.

    Cuando se establece en 'requerido', el mismo comportamiento que "whenAvailable" excepto que se genera un error si la preimagen no está disponible.

  • :resume_after (BSON::Document, Hash)

    Especifica el punto de partida lógico para el nuevo flujo de cambios.

  • ; tiempo_máximo_espera_ms (Integer)

    El tiempo máximo que el servidor espera documentos nuevos para satisfacer una query de flujo de cambios.

  • :batch_size (Integer)

    El número de documentos a devolver por agrupar.

  • intercalación (BSON::Document, Hash)

    La intercalación para usar.

  • :start_at_operation_time (BSON::Timestamp)

    Devuelve solo los cambios que ocurrieron en o después de la fecha y hora especificada. Cualquier comando que se ejecute contra el servidor devolverá un tiempo de clúster que se puede utilizar aquí.

  • INICIO_DESPUÉS (Bson::Document, Hash)

    Similar a :resume_after, esta opción toma un token de reanudación y comienza un nuevo flujo de cambios devolviendo la primera notificación después del token. Esto permitirá a los usuarios ver colecciones que han sido descartadas y recreadas o colecciones recién renombradas sin perder ninguna notificación.

  • :comment (objeto)

    Un comentario proporcionado por el usuario para adjuntar a este comando.

  • Show expanded events (booleano)

    Permite que el servidor envíe la lista 'ampliada' de eventos de flujo de cambios. La lista de eventos adicionales incluidos con este conjunto de flags son: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection, refineCollectionShardKey.

    El servidor reportará un error si se especifican tanto startAfter como resumeAfter.

Desde:

  • 2.5.0



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 133

def inicializar(vista, pipeline, cambios para, opciones = {})
  # flujo de cambios cursors can only be :iterable, so we don't allow
  # Se debe especificar el modo de tiempo de espera.
  performar_setup(vista, opciones, forbid: %i[ timeout_mode ]) hacer
    @changes_for = cambios para
    @change_stream_filters = pipeline && pipeline.dup
    @start_after = @options[INICIO_DESPUÉS]
  end

  # El token de reanudación rastreado por el flujo de cambios, solo se utiliza
  # cuando no hay cursor, o no hay token de reanudación de cursor
  @resume_token = @start_after || @options[:resume_after]

  create_cursor!

  # Enviamos diferentes parámetros cuando reanudamos un flujo de cambios
  # en comparación con cuando enviamos la primera query
  @resumiendo = true
end

Detalles de atributo de instancias

#cursorCursor (solo lectura)

Este método es parte de una API privada. Se debe evitar el uso de este método si es posible, ya que podría eliminarse o modificarse en el futuro.

Retorna el cursor subyacente para esta operación.

Devuelve:

  • (Cursor)

    el cursor subyacente para esta operación

Desde:

  • 2.5.0



57
58
59
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 57

def cursor
  @cursor
end

#opcionesBSON::Documento (solo lectura)

Devuelve las opciones del flujo de cambios.

Devuelve:

  • (BSON::Document)

    Las opciones de flujo de cambios.

Desde:

  • 2.5.0



53
54
55
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 53

def opciones
  @options
end

Detalles del método de instancia

#close(opts = {}) ⇒ nil(nulo)

Nota:

Este método intenta cerrar el cursor utilizado por el flujo de cambios, que a su vez cierra el cursor del flujo de cambios del lado del servidor. Este método ignora cualquier error que ocurra al cerrar el cursor del lado del servidor.

Cierre el flujo de cambios.

Ejemplos:

Cierre el flujo de cambios.

stream.close

Devuelve:

  • (nil)

    Siempre nulo.

Desde:

  • 2.5.0



254
255
256
257
258
259
260
261
262
263
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 254

def Cerrar(opciones = {})
  return si ¿Cerrado?

  begin
    @cursor.Cerrar(opciones)
  rescate Error::OperationFailure::Familia, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection
    # ignore
  end
  @cursor = nulo
end

#cerrado?verdadero, falso

¿Está cerrado el flujo de cambios?

Ejemplos:

Determina si el flujo de cambios está cerrado.

stream.closed?

Devuelve:

  • (true, false)

    Si el flujo de cambios se cierra.

Desde:

  • 2.5.0



273
274
275
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 273

def ¿Cerrado?
  @cursor.nil?
end

#tipo_cursorObjeto

"los flujos de cambios son una abstracción en torno a cursores tailables-awaitData..."

Devuelve:

  • tailable_await

Desde:

  • 2.5.0



307
308
309
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 307

def cursor_type
  tailable_await
end

#each {|Each| ... } ⇒ Enumerator

Itera a través de los documentos devueltos por el flujo de cambios.

Este método vuelve a intentar una vez por error en errores reanudables (dos errores consecutivos dan como resultado que se produzca el segundo error; un error del que se recupera devuelve el recuento de errores a cero).

Ejemplos:

Itera a través del flujo de documentos.

stream.each do |document|
  p document
end

Parámetros de rendimiento:

  • Cada uno (BSON::Document)

    documento de flujo de cambios.

Devuelve:

  • (Enumerator)

    El enumerador.

Desde:

  • 2.5.0



169
170
171
172
173
174
175
176
177
178
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 169

def cada
  propagar Detener iteración.Nuevo si ¿Cerrado?

  bucle hacer
    Documento = try_next
    rendimiento Documento si Documento
  end
rescate Detener iteración
  sí mismo
end

#inspectString

Obtén una string formateada para usar en la inspección.

Ejemplos:

Inspecciona el objeto de flujo de cambios.

stream.inspect

Devuelve:

  • (string)

    La inspección del flujo de cambios.

Desde:

  • 2.5.0



285
286
287
288
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 285

def inspeccionar
  "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " +
    "opciones=#{@opciones} resume_token=#{resume_token}>"
end

#max_await_time_msInteger | nil

Devuelve el valor de la opción max_await_time_ms que fue pasada a este flujo de cambios.

Devuelve:

  • (Integer | nil)

    el valor max_await_time_ms

Desde:

  • 2.5.0



322
323
324
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 322

def max_await_time_ms
  opciones[; tiempo_máximo_espera_ms]
end

#refresh_timeout!Object

Este método es parte de una API privada. Se debe evitar el uso de este método si es posible, ya que podría eliminarse o modificarse en el futuro.

Actualiza el tiempo de espera de CSOT para la siguiente iteración. ¡Delegados al refresh_timeout del cursor subyacente! método para que cada llamada a try_next comience con un plazo de tiempo de espera nuevo, como lo requiere la especificación CSOT para cursores tailable awaitData.

Desde:

  • 2.5.0



65
66
67
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 65

def refresh_timeout!
  @cursor&.refresh_timeout!
end

#resume_tokenBSON::Document | nil

Devuelve el token de reanudación que el flujo utilizará para reanudarse automáticamente, si existe uno.

Ejemplos:

Obtener el token de reanudación del flujo de cambios.

stream.resume_token

Devuelve:

  • (BSON::Document | nil)

    El token de reanudación del flujo de cambios.

Desde:

  • 2.10.0



299
300
301
302
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 299

def resume_token
  cursor_resume_token = @cursor.resume_token si @cursor
  cursor_resume_token || @resume_token
end

#timeout_modeobjeto

"flujos de cambio...utilizan implícitamente el modo ITERACIÓN"

Devuelve:

  • :iteration

Desde:

  • 2.5.0



314
315
316
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 314

def timeout_mode
  :iteration
end

#to_enumObjeto

Desde:

  • 2.5.0



227
228
229
230
231
232
233
234
235
236
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 227

def to_enum
  enum = super
  enum.enviar(set_variable_de_instancia, '@obj', sí mismo)
  clase << enum
    def try_next
      @obj.try_next
    end
  end
  enum
end

#try_nextBSON::Document | nil

Devuelve un documento del flujo de cambios, si hay alguno disponible.

Reintenta una vez en caso de error reanudable.

Lanza StopIteration si se cierra el flujo de cambios.

Este método esperará hasta max_await_time_ms milisegundos para cambios desde el servidor, y si no se reciben cambios, devolverá nil.

Devuelve:

  • (BSON::Document | nil)

    Un documento de flujo de cambios.

Aumenta:

  • (StopIteration)

Desde:

  • 2.6.0



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 192

def try_next
  recreate_cursor! si @timed_out

  propagar Detener iteración.Nuevo si ¿Cerrado?

  begin
    doc = @cursor.try_next
  rescate mongo::Error => e
    # «Si una llamada siguiente falla con un error de timeout, los controladores NO DEBEN
    # invalidar el flujo de cambios. La subsiguiente llamada siguiente DEBE
    # realizar un intento de reanudación para establecer un nuevo flujo de cambios en el
    # servidor..."
    #
    # Sin embargo, SocketTimeoutErrors son TimeoutErrors, pero también son
    # flujo de cambios-resumable. Para conservar el comportamiento existente (especificado),
    # Solo contamos los tiempos de espera cuando el error tampoco es
    # flujo de cambios reanudable.
    @timed_out = e.is_a?(mongo::Error::TimeoutError) && !e.¿streaming de cambios reanudable?

    propagar a menos que @timed_out || e.¿streaming de cambios reanudable?

    @resume_token = @cursor.resume_token
    propagar e si @timed_out

    recreate_cursor!(@cursor.context)
    reintentar
  end

  # Necesitamos verificar que cada documento tenga un _id, así que
  # tener un token de currículum con el que trabajar
  propagar Error::MissingResumeToken si doc && doc['_id'].nil?

  doc
end