Class: Mongo::Server::ConnectionPool
- Inherits:
-
Object
- Object
- Mongo::Server::ConnectionPool
- Extended by:
- Forwardable
- Includes:
- Loggable, Monitoring::Publishable
- Defined in:
- build/ruby-driver-master/lib/mongo/server/connection_pool.rb,
build/ruby-driver-master/lib/mongo/server/connection_pool/generation_manager.rb
Overview
Represents a connection pool for server connections.
Defined Under Namespace
Classes: GenerationManager
Constant Summary collapse
- DEFAULT_MAX_SIZE =
The default max size for the connection pool.
20.freeze
- DEFAULT_MIN_SIZE =
The default min size for the connection pool.
0.freeze
- DEFAULT_WAIT_TIMEOUT =
The default timeout, in seconds, to wait for a connection.
This timeout applies while in flow threads are waiting for background threads to establish connections (and hence they must connect, handshake and auth in the allotted time).
It is currently set to 10 seconds. The default connect timeout is 10 seconds by itself, but setting large timeouts can get applications in trouble if their requests get timed out by the reverse proxy, thus anything over 15 seconds is potentially dangerous.
10.freeze
Constants included from Loggable
Instance Attribute Summary collapse
-
#generation_manager ⇒ Integer
readonly
private
Generation Generation of connections currently being used by the queue.
-
#options ⇒ Hash
readonly
Options The pool options.
-
#populate_semaphore ⇒ Object
readonly
Condition variable broadcast when the size of the pool changes to wake up the populator.
- #populator ⇒ Object readonly private
Attributes included from Monitoring::Publishable
Class Method Summary collapse
-
.finalize(available_connections, pending_connections, populator) ⇒ Proc
Finalize the connection pool for garbage collection.
Instance Method Summary collapse
-
#available_count ⇒ Integer
Number of available connections in the pool.
-
#check_in(connection) ⇒ Object
Check a connection back into the pool.
-
#check_out(connection_global_id: nil) ⇒ Mongo::Server::Connection
Checks a connection out of the pool.
-
#clear(options = nil) ⇒ true
(also: #disconnect!)
Closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool.
-
#close(options = nil) ⇒ true
Marks the pool closed, closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool.
-
#close_idle_sockets ⇒ Object
Close sockets that have been open for longer than the max idle time, if the option is set.
-
#closed? ⇒ true | false
Whether the pool has been closed.
-
#initialize(server, options = {}) ⇒ ConnectionPool
constructor
Create the new connection pool.
-
#inspect ⇒ String
Get a pretty printed string inspection for the pool.
-
#max_idle_time ⇒ Float | nil
The maximum seconds a socket can remain idle since it has been checked in to the pool, if set.
-
#max_size ⇒ Integer
Get the maximum size of the connection pool.
-
#min_size ⇒ Integer
Get the minimum size of the connection pool.
-
#populate ⇒ true | false
private
Creates and adds a connection to the pool, if the pool's size is below min_size.
-
#size ⇒ Integer
Size of the connection pool.
-
#stop_populator ⇒ Object
private
Stop the background populator thread and clean up any connections created which have not been connected yet.
- #summary ⇒ Object
-
#wait_timeout ⇒ Float
The time to wait, in seconds, for a connection to become available.
-
#with_connection(connection_global_id: nil) ⇒ Object
Yield the block to a connection, while handling check in/check out logic.
Methods included from Monitoring::Publishable
#publish_cmap_event, #publish_event, #publish_sdam_event
Methods included from Loggable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Constructor Details
#initialize(server, options = {}) ⇒ ConnectionPool
Create the new connection pool.
Note: Additionally, options for connections created by this pool should
be included in the options passed here, and they will be forwarded to
any connections created by the pool.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 83 def initialize(server, = {}) unless server.is_a?(Server) raise ArgumentError, 'First argument must be a Server instance' end = .dup if [:min_size] && [:min_pool_size] && [:min_size] != [:min_pool_size] raise ArgumentError, "Min size #{[:min_size]} is not identical to min pool size #{[:min_pool_size]}" end if [:max_size] && [:max_pool_size] && [:max_size] != [:max_pool_size] raise ArgumentError, "Max size #{[:max_size]} is not identical to max pool size #{[:max_pool_size]}" end if [:wait_timeout] && [:wait_queue_timeout] && [:wait_timeout] != [:wait_queue_timeout] raise ArgumentError, "Wait timeout #{[:wait_timeout]} is not identical to wait queue timeout #{[:wait_queue_timeout]}" end [:min_size] ||= [:min_pool_size] .delete(:min_pool_size) [:max_size] ||= [:max_pool_size] .delete(:max_pool_size) if [:min_size] && [:max_size] && ([:max_size] != 0 && [:min_size] > [:max_size]) then raise ArgumentError, "Cannot have min size #{[:min_size]} exceed max size #{[:max_size]}" end if [:wait_queue_timeout] [:wait_timeout] ||= [:wait_queue_timeout] end .delete(:wait_queue_timeout) @server = server @options = .freeze @generation_manager = GenerationManager.new(server: server) @closed = false # A connection owned by this pool should be either in the # available connections array (which is used as a stack) # or in the checked out connections set. @available_connections = available_connections = [] @checked_out_connections = Set.new @pending_connections = Set.new # Mutex used for synchronizing access to @available_connections and # @checked_out_connections. The pool object is thread-safe, thus # all methods that retrieve or modify instance variables generally # must do so under this lock. @lock = Mutex.new # Condition variable broadcast when a connection is added to # @available_connections, to wake up any threads waiting for an # available connection when pool is at max size @available_semaphore = Semaphore.new # Background thread reponsible for maintaining the size of # the pool to at least min_size @populator = Populator.new(self, ) @populate_semaphore = Semaphore.new ObjectSpace.define_finalizer(self, self.class.finalize(@available_connections, @pending_connections, @populator)) publish_cmap_event( Monitoring::Event::Cmap::PoolCreated.new(@server.address, , self) ) @populator.run! if min_size > 0 end |
Instance Attribute Details
#generation_manager ⇒ Integer (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns generation Generation of connections currently being used by the queue.
193 194 195 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 193 def generation_manager @generation_manager end |
#options ⇒ Hash (readonly)
Returns options The pool options.
150 151 152 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 150 def @options end |
#populate_semaphore ⇒ Object (readonly)
Condition variable broadcast when the size of the pool changes to wake up the populator
55 56 57 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 55 def populate_semaphore @populate_semaphore end |
#populator ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
262 263 264 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 262 def populator @populator end |
Class Method Details
.finalize(available_connections, pending_connections, populator) ⇒ Proc
Finalize the connection pool for garbage collection.
762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 762 def self.finalize(available_connections, pending_connections, populator) proc do available_connections.each do |connection| connection.disconnect!(reason: :pool_closed) end available_connections.clear pending_connections.each do |connection| connection.disconnect!(reason: :pool_closed) end pending_connections.clear # Finalizer does not close checked out connections. # Those would have to be garbage collected on their own # and that should close them. end end |
Instance Method Details
#available_count ⇒ Integer
Number of available connections in the pool.
230 231 232 233 234 235 236 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 230 def available_count raise_if_closed! @lock.synchronize do @available_connections.length end end |
#check_in(connection) ⇒ Object
Check a connection back into the pool.
The connection must have been previously created by this pool.
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 463 def check_in(connection) check_invariants @lock.synchronize do unless connection.connection_pool == self raise ArgumentError, "Trying to check in a connection which was not checked out by this pool: #{connection} checked out from pool #{connection.connection_pool} (for #{self})" end unless @checked_out_connections.include?(connection) raise ArgumentError, "Trying to check in a connection which is not currently checked out by this pool: #{connection} (for #{self})" end # Note: if an event handler raises, resource will not be signaled. # This means threads waiting for a connection to free up when # the pool is at max size may time out. # Threads that begin waiting after this method completes (with # the exception) should be fine. @checked_out_connections.delete(connection) publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckedIn.new(@server.address, connection.id, self) ) if connection.error? connection.disconnect!(reason: :error) return end if closed? connection.disconnect!(reason: :pool_closed) return end if connection.closed? # Connection was closed - for example, because it experienced # a network error. Nothing else needs to be done here. @populate_semaphore.signal elsif connection.generation != generation(service_id: connection.service_id) && !connection.pinned? # If connection is marked as pinned, it is used by a transaction # or a series of cursor operations in a load balanced setup. # In this case connection should not be disconnected until # unpinned. connection.disconnect!(reason: :stale) @populate_semaphore.signal else connection.record_checkin! @available_connections << connection # Wake up only one thread waiting for an available connection, # since only one connection was checked in. @available_semaphore.signal end end ensure check_invariants end |
#check_out(connection_global_id: nil) ⇒ Mongo::Server::Connection
Checks a connection out of the pool.
If there are active connections in the pool, the most recently used connection is returned. Otherwise if the connection pool size is less than the max size, creates a new connection and returns it. Otherwise waits up to the wait timeout and raises Timeout::Error if there are still no active connections and the pool is at max size.
The returned connection counts toward the pool's max size. When the caller is finished using the connection, the connection should be checked back in via the check_in method.
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 282 def check_out(connection_global_id: nil) check_invariants publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutStarted.new(@server.address) ) if closed? publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::POOL_CLOSED ), ) raise Error::PoolClosedError.new(@server.address, self) end deadline = Utils.monotonic_time + wait_timeout pid = Process.pid connection = nil # It seems that synchronize sets up its own loop, thus a simple break # is insufficient to break the outer loop catch(:done) do loop do # Lock must be taken on each iteration, rather for the method # overall, otherwise other threads will not be able to check in # a connection while this thread is waiting for one. @lock.synchronize do until @available_connections.empty? connection = next_available_connection( connection_global_id: connection_global_id ) if connection.nil? if connection_global_id # A particular connection is requested, but it is not available. # If it is nether available not checked out, we should stop here. @checked_out_connections.detect do |conn| conn.connection_global_id == connection_global_id end.tap do |conn| if conn.nil? publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR ), ) raise Error::MissingConnection.new end end else break end end if connection.pid != pid log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{connection.pid}, new pid #{pid}") connection.disconnect!(reason: :stale) @populate_semaphore.signal next end if !connection.pinned? # If connection is marked as pinned, it is used by a transaction # or a series of cursor operations in a load balanced setup. # In this case connection should not be disconnected until # unpinned. if connection.generation != generation( service_id: connection.service_id ) # Stale connections should be disconnected in the clear # method, but if any don't, check again here connection.disconnect!(reason: :stale) @populate_semaphore.signal next end if max_idle_time && connection.last_checkin && Time.now - connection.last_checkin > max_idle_time then connection.disconnect!(reason: :idle) @populate_semaphore.signal next end end @pending_connections << connection throw(:done) end if @server.load_balancer? && connection_global_id # We need a particular connection, and if it is not available # we can wait for an in-progress operation to return # such a connection to the pool. else # If we are below pool capacity, create a new connection. # # Ruby does not allow a thread to lock a mutex which it already # holds. if max_size == 0 || unsynchronized_size < max_size connection = create_connection @pending_connections << connection throw(:done) end end end wait = deadline - Utils.monotonic_time if wait <= 0 publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::TIMEOUT, ), ) msg = @lock.synchronize do connection_global_id_msg = if connection_global_id " for connection #{connection_global_id}" else '' end "Timed out attempting to check out a connection " + "from pool for #{@server.address}#{connection_global_id_msg} after #{wait_timeout} sec. " + "Connections in pool: #{@available_connections.length} available, " + "#{@checked_out_connections.length} checked out, " + "#{@pending_connections.length} pending " + "(max size: #{max_size})" end raise Error::ConnectionCheckOutTimeout.new(msg, address: @server.address) end @available_semaphore.wait(wait) end end begin connect_connection(connection) rescue Exception # Handshake or authentication failed @lock.synchronize do @pending_connections.delete(connection) end @populate_semaphore.signal publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR ), ) raise end @lock.synchronize do @checked_out_connections << connection @pending_connections.delete(connection) end publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckedOut.new(@server.address, connection.id, self), ) if Lint.enabled? unless connection.connected? raise Error::LintError, "Connection pool for #{address} checked out a disconnected connection #{connection.generation}:#{connection.id}" end end connection ensure check_invariants end |
#clear(options = nil) ⇒ true Also known as: disconnect!
Closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool. The pool remains operational and can create new connections when requested.
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 536 def clear( = nil) raise_if_closed! check_invariants if && [:stop_populator] stop_populator end service_id = && [:service_id] @lock.synchronize do @generation_manager.bump(service_id: service_id) publish_cmap_event( Monitoring::Event::Cmap::PoolCleared.new( @server.address, service_id: service_id ) ) unless && [:lazy] if @server.load_balancer? && service_id loop do conn = @available_connections.detect do |conn| conn.service_id == service_id end if conn @available_connections.delete(conn) conn.disconnect!(reason: :stale) @populate_semaphore.signal else break end end else until @available_connections.empty? connection = @available_connections.pop connection.disconnect!(reason: :stale) @populate_semaphore.signal end end end end true ensure check_invariants end |
#close(options = nil) ⇒ true
Marks the pool closed, closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool. If force option is true, checked out connections are also closed. Attempts to use the pool after it is closed will raise Error::PoolClosedError.
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 602 def close( = nil) return if closed? ||= {} stop_populator @lock.synchronize do until @available_connections.empty? connection = @available_connections.pop connection.disconnect!(reason: :pool_closed) end if [:force] until @checked_out_connections.empty? connection = @checked_out_connections.take(1).first connection.disconnect!(reason: :pool_closed) @checked_out_connections.delete(connection) end end # mark pool as closed before releasing lock so # no connections can be created, checked in, or checked out @closed = true end publish_cmap_event( Monitoring::Event::Cmap::PoolClosed.new(@server.address, self) ) true end |
#close_idle_sockets ⇒ Object
Close sockets that have been open for longer than the max idle time,
if the option is set.
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 678 def close_idle_sockets return if closed? return unless max_idle_time @lock.synchronize do i = 0 while i < @available_connections.length connection = @available_connections[i] if last_checkin = connection.last_checkin if (Time.now - last_checkin) > max_idle_time connection.disconnect!(reason: :idle) @available_connections.delete_at(i) @populate_semaphore.signal next end end i += 1 end end end |
#closed? ⇒ true | false
Whether the pool has been closed.
243 244 245 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 243 def closed? !!@closed end |
#inspect ⇒ String
Get a pretty printed string inspection for the pool.
643 644 645 646 647 648 649 650 651 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 643 def inspect if closed? "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} closed>" else "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} current_size=#{size} available=#{available_count}>" end end |
#max_idle_time ⇒ Float | nil
The maximum seconds a socket can remain idle since it has been checked in to the pool, if set.
188 189 190 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 188 def max_idle_time @max_idle_time ||= [:max_idle_time] end |
#max_size ⇒ Integer
Get the maximum size of the connection pool.
160 161 162 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 160 def max_size @max_size ||= [:max_size] || [DEFAULT_MAX_SIZE, min_size].max end |
#min_size ⇒ Integer
Get the minimum size of the connection pool.
169 170 171 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 169 def min_size @min_size ||= [:min_size] || DEFAULT_MIN_SIZE end |
#populate ⇒ true | false
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Creates and adds a connection to the pool, if the pool's size is below min_size. Retries once if a socket-related error is encountered during this process and raises if a second error or a non socket-related error occurs.
Used by the pool populator background thread.
occured, or the non socket-related error
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 736 def populate return false if closed? begin return create_and_add_connection rescue Error::SocketError, Error::SocketTimeoutError => e # an error was encountered while connecting the connection, # ignore this first error and try again. log_warn("Populator failed to connect a connection for #{address}: #{e.class}: #{e}. It will retry.") end return create_and_add_connection rescue Error::AuthError, Error # wake up one thread waiting for connections, since one could not # be created here, and can instead be created in flow @available_semaphore.signal raise end |
#size ⇒ Integer
Size of the connection pool.
Includes available and checked out connections.
208 209 210 211 212 213 214 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 208 def size raise_if_closed! @lock.synchronize do unsynchronized_size end end |
#stop_populator ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stop the background populator thread and clean up any connections created which have not been connected yet.
Used when closing the pool or when terminating the bg thread for testing purposes. In the latter case, this method must be called before the pool is used, to ensure no connections in pending_connections were created in-flow by the check_out method.
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 708 def stop_populator @populator.stop! @lock.synchronize do # If stop_populator is called while populate is running, there may be # connections waiting to be connected, connections which have not yet # been moved to available_connections, or connections moved to available_connections # but not deleted from pending_connections. These should be cleaned up. until @pending_connections.empty? connection = @pending_connections.take(1).first connection.disconnect! @pending_connections.delete(connection) end end end |
#summary ⇒ Object
This method is experimental and subject to change.
251 252 253 254 255 256 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 251 def summary @lock.synchronize do "#<ConnectionPool size=#{unsynchronized_size} (#{min_size}-#{max_size}) " + "used=#{@checked_out_connections.length} avail=#{@available_connections.length} pending=#{@pending_connections.length}>" end end |
#wait_timeout ⇒ Float
The time to wait, in seconds, for a connection to become available.
178 179 180 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 178 def wait_timeout @wait_timeout ||= [:wait_timeout] || DEFAULT_WAIT_TIMEOUT end |
#with_connection(connection_global_id: nil) ⇒ Object
Yield the block to a connection, while handling check in/check out logic.
663 664 665 666 667 668 669 670 671 672 |
# File 'build/ruby-driver-master/lib/mongo/server/connection_pool.rb', line 663 def with_connection(connection_global_id: nil) raise_if_closed! connection = check_out(connection_global_id: connection_global_id) yield(connection) ensure if connection check_in(connection) end end |