Clase: Mongo::Collection::View::ChangeStream

Hereda:
Agregación
  • Objeto
Mostrar todo
Incluye:
Agregación::Comportamiento, Reintentable
Definido en:
lib/mongo/colección/view/change_stream.rb,
lib/mongo/colección/view/change_stream/retryable.rb

Overview

Nota:

Disponible solo en versiones de servidor 3.6 y superiores.

Nota:

ChangeStreams no funcionan correctamente con JRuby debido al problema documentado aquí: github.com/jruby/jruby/issues/4212. Concretamente, JRuby evalúa de manera entusiasta #siguiente en un Enumerator en un subproceso green en segundo plano, por lo tanto, llamar a #siguiente en el flujo de cambios provocará que getMore se llame en un bucle en segundo plano.

Proporciona comportamiento en torno a una etapa de pipeline ‘$changeStream` en el framework de agregación. Especificar esta etapa permite a los usuarios solicitar que se envíen notificaciones por todos los cambios en una colección o base de datos en particular.

Desde:

  • 2.5.0

Definido bajo Namespace

Modules: Reintentar

Colapso delresumen constante

FULL_DOCUMENT_DEFAULT =

Devuelve el valor por defecto de la opción fullDocument.

Devuelve:

  • (string)

    Valor por defecto de la opción fullDocument.

Desde:

  • 2.5.0

'por defecto'.freeze
DATABASE =

Devoluciones Se utiliza para indicar que el flujo de cambios debe escuchar los cambios en toda la base de datos en lugar de solo en la colección.

Devuelve:

  • (Símbolo)

    Se utiliza para indicar que el flujo de cambios debe escuchar los cambios en toda la base de datos en lugar de solo en la colección.

Desde:

  • 2.6.0

:database
CLUSTER =

Devuelve Indica que el flujo de cambios debe escuchar los cambios en todo el clúster y no solo en la colección.

Devuelve:

  • (Símbolo)

    Se usa para indicar que el flujo de cambios debe monitorear cambios en todo el clúster y no solo en la colección.

Desde:

  • 2.6.0

:grupo

Constantes incluidas desde Loggable

Registrable::PREFIX

Constantes incluidas de Explicable

Explainable::ALL_PLANS_EXECUTION, Explainable::EXECUTION_STATS, Explainable::QUERY_PLANNER

Resumen de atributos de la instancia colapsar

Atributos incluidos de Agregación::Comportamiento

#view

Resumen del método de instancia colapsar

Métodos incluidos en Retryable

#read_worker, #select_server, #write_worker

Métodos incluidos de Aggregation::Behavior

#allow_disk_use, #explain, #timeout_ms, #guardar?

Métodos incluidos desde Registrable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Métodos incluidos en Explicable

#explain

Métodos incluidos desde Iterable

#close_query

Métodos incluidos desde Mongo::CursorHost

#validate_timeout_mode!

Detalles del constructor

#inicializar(vista, canalización, cambios_para, opciones = {}) ⇒ ChangeStream

Inicializa el flujo de cambios para la vista de colección, el pipeline y las opciones proporcionadas.

Ejemplos:

Crea la nueva vista de flujo de cambios.

ChangeStream.new(view, pipeline, options)

Parámetros:

  • vista (Collection::View)

    La vista de la colección.

  • pipeline (arreglo<Hash>)

    La pipeline de operadores para filtrar las notificaciones de cambio.

  • opciones (Hash) (por defecto: {})

    Las opciones de flujo de cambios.

Opciones Hash (options):

  • :documento_completo (Cadena)

    Valores permitidos: nulo, 'predeterminado', 'updateLookup', 'whenAvailable', 'obligatorio'.

    El valor predeterminado es no enviar ningún valor (es decir, nulo), lo que equivale a "predeterminado". Por defecto, la notificación de cambios para actualizaciones parciales incluirá un delta que describe los cambios en el documento.

    Cuando se configura en 'updateLookup', la notificación de cambio para actualizaciones parciales incluirá tanto un delta que describe los cambios en el documento como una copia del documento completo que se modificó algún tiempo después de que ocurrió el cambio.

    Cuando se establece en 'whenAvailable', configura el flujo de cambios para devolver la imagen posterior del documento modificado para eventos de reemplazo y actualización si la imagen posterior para este evento está disponible.

    Cuando se establece en "requerido", se comporta de la misma manera que "cuandoDisponible" excepto que se genera un error si la imagen de publicación no está disponible.

  • documento_completo_antes_de_modificar (Cadena)

    Valores permitidos: nil, ‘whenAvailable’ , ‘required’ , ‘off’.

    El valor por defecto es no enviar ningún valor (es decir, nil), que equivale a ‘apagado’ o ‘desactivado’.

    Cuando se configura en 'whenAvailable', establece el flujo de cambios para devolver la preimagen del documento modificado en caso de eventos de reemplazo, actualizar o borrar, si está disponible.

    Cuando se establece en 'obligatorio', el mismo comportamiento que 'whenAvailable' excepto que se genera un error si la imagen previa no está disponible.

  • :resume_after (BSON::Document, Hash)

    Especifica el punto de partida lógico para el nuevo flujo de cambios.

  • ; tiempo_máximo_espera_ms (Entero)

    El tiempo máximo que el servidor espera documentos nuevos para satisfacer una query de flujo de cambios.

  • :batch_size (Entero)

    El número de documentos a devolver por agrupar.

  • :colación (BSON::Document, Hash)

    La intercalación para usar.

  • :start_at_operation_time (BSON::Timestamp)

    Solo devuelve los cambios que ocurrieron en o después de la marca de tiempo especificada. Cualquier comando ejecutado en el servidor devolverá un tiempo de clúster que puede usarse aquí. Solo es reconocido por versiones del servidor 4.0+}.

  • INICIO_DESPUÉS (Bson::Document, Hash)

    Similar a :resume_after, esta opción toma un token de reanudación y comienza un nuevo flujo de cambios devolviendo la primera notificación después del token. Esto permitirá a los usuarios ver colecciones que han sido descartadas y recreadas o colecciones recién renombradas sin perder ninguna notificación.

  • :comment (Objeto)

    Un comentario proporcionado por el usuario para adjuntar a este comando.

  • :mostrar eventos expandidos (booleano)

    Permite al servidor enviar la lista 'expandida' de eventos de flujo de cambios. La lista de eventos adicionales incluidos con este conjunto de flags son: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection, refineCollectionShardKey.

    El servidor notificará un error si se especifican simultáneamente 'startAfter' y 'resumeAfter'.

Desde:

  • 2.5.0



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 133

def inicializar(vista, pipeline, cambios_para, opciones = {})
  # flujo de cambios cursors can only be :iterable, so we don't allow
  # Se debe especificar el timeout_mode.
  performar_setup(vista, opciones, forbid: %i[ timeout_mode ]) hacer
    @changes_for = cambios_para
    @change_stream_filters = pipeline && pipeline.dup
    @start_after = @options[INICIO_DESPUÉS]
  end

  # El token de reanudación rastreado por el flujo de cambios, utilizado únicamente
  # cuando no hay cursor, o no hay token de reanudación de cursor
  @resume_token = @start_after || @options[:resume_after]

  create_cursor!

  # Enviamos diferentes parámetros cuando reanudamos un flujo de cambios
  # en comparación con cuando enviamos la primera query
  @reanudando = true
end

Detalles de los atributos de instancia

#cursorCursor (solo lectura)

Este método forma parte de una API privada. Debe evitarlo si es posible, ya que podría eliminarse o modificarse en el futuro.

Retorna el cursor subyacente para esta operación.

Devuelve:

  • (Cursor)

    el cursor subyacente para esta operación

Desde:

  • 2.5.0



67
68
69
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 67

def cursor
  @cursor
end

#opcionesBSON::Documento (solo lectura)

Devuelve las opciones del flujo de cambios.

Devuelve:

  • (BSON::Document)

    Las opciones de flujo de cambios.

Desde:

  • 2.5.0



63
64
65
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 63

def opciones
  @options
end

Detalles del método de instancia

#close(opts = {}) ⇒ nil(nulo)

Nota:

Este método intenta cerrar el cursor utilizado por el flujo de cambios, lo que a su vez cierra el cursor del flujo de cambios del servidor. Este método ignora cualquier error que se produzca al cerrar el cursor del servidor.

Cerrar el flujo de cambios.

Ejemplos:

Cerrar el flujo de cambios.

stream.close

Devuelve:

  • (nil)

    Siempre nulo.

Desde:

  • 2.5.0



254
255
256
257
258
259
260
261
262
263
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 254

def Cerrar(opta = {})
  a menos que ¿Cerrado?
    begin
      @cursor.Cerrar(opta)
    rescate Error::OperationFailure::Familia, Error::Error de socket, Error::SocketTimeoutError, Error::MissingConnection
      # ignore
    end
    @cursor = nulo
  end
end

#cerrado?verdadero, falso

¿Está cerrado el flujo de cambio?

Ejemplos:

Determina si el flujo de cambios está cerrado.

stream.closed?

Devuelve:

  • (verdadero,falso)

    Si el flujo de cambios se cierra.

Desde:

  • 2.5.0



273
274
275
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 273

def ¿Cerrado?
  @cursor.nil?
end

#tipo_cursorObjeto

“Los flujos de cambio son una abstracción en torno a los cursores tailable-awaitData…”

Devuelve:

  • tailable_await

Desde:

  • 2.5.0



307
308
309
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 307

def cursor_type
  tailable_await
end

#each {|Each| ... } ⇒ Enumerator

Itera a través de los documentos devueltos por el flujo de cambios.

Este método vuelve a intentar una vez por error en errores reanudables (dos errores consecutivos dan como resultado que se produzca el segundo error; un error del que se recupera devuelve el recuento de errores a cero).

Ejemplos:

Itera a través del flujo de documentos.

stream.each do |document|
  p document
end

Parámetros de rendimiento:

  • Cada uno (BSON::Document)

    documento de flujo de cambios.

Devuelve:

  • (Enumerator)

    El enumerador.

Desde:

  • 2.5.0



169
170
171
172
173
174
175
176
177
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 169

def cada
  propagar Detener la iteración.Nuevo si ¿Cerrado?
  bucle hacer
    Documento = try_next
    rendimiento Documento si Documento
  end
rescate Detener la iteración
  return sí mismo
end

#inspectString

Obtenga una cadena formateada para usar en la inspección.

Ejemplos:

Inspecciona el objeto de flujo de cambios.

stream.inspect

Devuelve:

  • (string)

    La inspección del flujo de cambios.

Desde:

  • 2.5.0



285
286
287
288
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 285

def inspeccionar
  "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " +
    "opciones=#{@options} token_de_currículum=#{token_de_currículum}>"
end

#tiempo_máximo_de_espera_msEntero | nulo

Devuelve el valor de la opción max_await_time_ms que se pasó a este flujo de cambios.

Devuelve:

  • (Integer | nil)

    el valor max_await_time_ms

Desde:

  • 2.5.0



322
323
324
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 322

def max_await_time_ms
  opciones[; tiempo_máximo_espera_ms]
end

#resume_tokenBSON::Document | nil

Devuelve el token de reanudación que el flujo utilizará para reanudarse automáticamente, si existe uno.

Ejemplos:

Obtener el token de reanudación del flujo de cambios.

stream.resume_token

Devuelve:

  • (BSON::Document | nil)

    El token de reanudación del flujo de cambios.

Desde:

  • 2.10.0



299
300
301
302
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 299

def resume_token
  cursor_resume_token = @cursor.resume_token si @cursor
  cursor_resume_token || @resume_token
end

#timeout_modeobjeto

“los flujos de cambio…usan implícitamente el modo de ITERACIÓN”

Devuelve:

  • :iteration

Desde:

  • 2.5.0



314
315
316
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 314

def timeout_mode
  :iteration
end

#to_enumObjeto

Desde:

  • 2.5.0



227
228
229
230
231
232
233
234
235
236
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 227

def to_enum
  enum = super
  enum.enviar(:conjunto_de_variables_de_instancia, '@obj', sí mismo)
  clase << enum
    def try_next
      @obj.try_next
    end
  end
  enum
end

#try_nextBSON::Document | nil

Devuelve un documento del flujo de cambios, si hay alguno disponible.

Reintenta una vez en caso de error reanudable.

Lanza StopIteration si se cierra el flujo de cambios.

Este método esperará hasta max_await_time_ms milisegundos para cambios desde el servidor, y si no se reciben cambios, devolverá nil.

Devuelve:

  • (BSON::Document | nil)

    Un documento de flujo de cambios.

Aumenta:

  • (Detener la iteración)

Desde:

  • 2.6.0



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Archivo 'lib/mongo/collection/view/change_stream.rb', línea 191

def try_next
  recreate_cursor! si @tiempo_de_salida

  propagar Detener la iteración.Nuevo si ¿Cerrado?

  begin
    doc = @cursor.try_next
  rescate Mongo::Error => e
    # «Si una llamada siguiente falla con un error de timeout, los controladores NO DEBEN
    # invalidar el flujo de cambios. La subsiguiente llamada siguiente DEBE
    # realizar un intento de reanudación para establecer un nuevo flujo de cambios en el
    # servidor..."
    #
    # Sin embargo, los SocketTimeoutErrors son TimeoutErrors, pero también son
    # flujo de cambios-resumable. Para conservar el comportamiento existente (especificado),
    # Solo contamos los tiempos de espera cuando el error no es también
    # flujo de cambios reanudable.
    @tiempo_de_salida = e.is_a?(Mongo::Error::TimeoutError) && !e.¿streaming de cambios reanudable?

    propagar a menos que @tiempo_de_salida || e.¿streaming de cambios reanudable?

    @resume_token = @cursor.resume_token
    propagar e si @tiempo_de_salida

    recreate_cursor!(@cursor.context)
    reintentar
  end

  # Necesitamos verificar que cada documento tenga un _id, por lo que
  # tener un token de currículum para trabajar
  si doc && doc['_id'].nil?
    propagar Error::MissingResumeToken
  end
  doc
end