Hi,
I have a ruby script which is looking up ip addresses from a collection (50m documents) and querying an API for data around the IP address. It then takes the API response, which is native JSON, and inserts it into another collection.
I use ruby queues to thread these two operations. The producer thread is taking each ip from the collection, looking it up in the target collection, if not exist, add ipaddr to the queue.
In ruby code, as follows:
queue = Queue.new
producer = []
producer << Thread.new do
unique_ipaddr.find.each do |ipaddr|
if geoip_infoDB.count("ip" => ipaddr.to_h["_id"]) == 0
queue.push(ipaddr.to_h["_id"])
end # end if test
end # end unique_ipaddr find
end
The consumer thread waits until there is something in the queue, pulls the ipaddr from the queue, does the lookup from the API, and inserts the result into the destination collection. The API can handle 50 concurrent connections, but we limit it to 48 to be conservative. Like this:
consumer = []
48.times do
consumer << Thread.new do
while qipaddr = queue.pop
insert_to_db(get_geoip_results(qipaddr), geoip_infoDB, geoip_errorsDB)
end # end while
end # end new thread
end # end 48 times
consumer.each { |t| t.join }
The insert_to_db function is taking the response from the API, making sure it’s valid, and inserting the result:
def insert_to_db(response, geoip_infoDB, geoip_errorsDB)
if response.code == 200 && !response.body.nil?
geoip_infoDB.insert_one(response.parsed_response)
else
geoip_errorsDB.insert_one(response.parsed_response)
end
end
Consistently at 45,000 documents on queue, we hit a cursor id not fond (43) error:
/var/lib/gems/2.7.0/gems/mongo-2.14.0/lib/mongo/operation/result.rb:343:in raise_operation_failure: cursor id 2074666401145332617 not found (43) (on databaseserver) (Mongo::Error::OperationFailure)
What am I doing wrong?
Thank you for reading this far!