Creating Mongodb object once lambda python?

I’m building a serverless application using Python and Mongodb. In documentation I found that I need to write db connection outside handler function. I have used Mangum python package as adapter to handle API gateway.

from fastapi import FastAPI, Body, status, Depends

from mangum import Mangum

from motor.motor_asyncio import AsyncIOMotorClient

from fastapi.responses import JSONResponse

from app.utility.config import MONGODB_URL, MAX_CONNECTIONS_COUNT, MIN_CONNECTIONS_COUNT, MAX_DB_THREADS_WAIT_COUNT, MAX_DB_THREAD_QUEUE_TIMEOUT_COUNT

application= FastAPI()

client = AsyncIOMotorClient(str(MONGODB_URL),

                                   maxPoolSize=MAX_CONNECTIONS_COUNT,

                                   minPoolSize=MIN_CONNECTIONS_COUNT,

                                   waitQueueMultiple = MAX_DB_THREADS_WAIT_COUNT,

                                   waitQueueTimeoutMS = MAX_DB_THREAD_QUEUE_TIMEOUT_COUNT )

async def get_database() -> AsyncIOMotorClient:

   

    return client

@application.post("/createStudent")

async def create_student(student = Body(...), db: AsyncIOMotorClient = Depends(get_database)):

    new_student = await db["college"]["students"].insert_one(student)

    created_student = await db["college"]["students"].find_one({"_id": new_student.inserted_id})

    return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_student)

@application.post("/createTeacher")

async def create_teacher(teacher = Body(...), db: AsyncIOMotorClient = Depends(get_database)):

    new_teacher = await db["college"]["students"].insert_one(teacher)

    created_teacher = await db["college"]["students"].find_one({"_id": new_teacher.inserted_id})

    return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_teacher)

handler = Mangum(application)

For every API request, new connection is created. How to cache db so that new request uses old connection? Every time new request is created so that lambda compute time is increased dramatically after db hits max connection limit.
In documentation , I only found nodejs example but couldnot solve with python

Hi @Rabindra_Acharya and welcome in the MongoDB Community :muscle: !

I might be wrong here, but it looks like you are trying to build something stateful in a stateless serverless environment.

The MongoDB driver - Motor here - usually keeps a pool of connection open and these connections are reused. But if this pool isn’t persisted between calls and a new pool is recreated each time because the serverless environment you are using is stateless, then this just doesn’t work (and won’t).

MongoDB Realm keeps a connection pool open and the same connection are reused between each calls.

Here is an example of a REST API doing a findOne operation:

exports = function({ query, headers, body}, response) {
    const coll = context.services.get("mongodb-atlas").db("covid19").collection("global");
    coll.findOne().then(res => {
      response.setBody(JSON.stringify(res));
      response.setHeader("content-type", "application/json");
    });
};

As you can see, the connection to the Atlas service is retrieved from the context and from there, we can use any MongoDB query we want without recreating a new connection pool.

Cheers,
Maxime.

Now its difficult to switch to another at my project stage ,https://docs.atlas.mongodb.com/best-practices-connecting-from-aws-lambda/ According to this document , mongodb can be cached. I am confused how to implement with python

Well I didn’t know MongoDB made this doc so :+1: for finding it :smiley: ! And I’m glad they are explaining more or less the same “caching the MongoDB client / connection” concept.

Let me know if you figure out how to do it in Python, but it should be more or less a translation of what they are doing in JS.

I only played once with AWS lambdas so are probably ahead of me already.

Cheers,
Maxime.

Hi PyMongo/Motor maintainer here. I suspect this issue may be caused by a feature added in PyMongo 3.11 (specifically PYTHON-2123). Could you try again using PyMongo 3.10.1 and Motor 2.1.0 and report back if these issues still occur?

3 Likes

Hi All,

I am having the same issue as Rabindra as of his last post (i.e. searching for working Python implementation of the proposed solution).

The first block below is the code I am using (minus MongoDB Atlas credentials). I initially tried this using PyMongo[srv] 3.12.3 and Motor 2.5.1, and then using PyMongo[srv] 3.10.1 and Motor 2.1.0, as Shane suggested.

Using POSTMAN, in either case, I can get a “hello world” response from the GET route, but not an MongoDB insertion or the find method from the POST route, for which I instead get a JSON message “{“message”: “Endpoint request timed out”}”, and the CloudWatch log file (2nd & 3rd blocks) for this can be seen below my Python code. (Notably, this same POST route (and the GET) works as expected when hosting this code locally with uvicorn.)

from __future__ import annotations
from imp import reload
import os
from typing import Optional
from datetime import datetime
from fastapi import FastAPI, Body, HTTPException, status
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from bson import ObjectId
from pydantic import BaseModel, Field, EmailStr
import motor.motor_asyncio
from mangum import Mangum

app = FastAPI()

# client = motor.motor_asyncio.AsyncIOMotorClient(os.environ["MONGODB_URL"])
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb+srv://USERNAME:PASSWORD@URL.mongodb.net/myFirstDatabase?retryWrites=true&w=majority')

db = client['test']

# both classes below based on https://www.mongodb.com/developer/quickstart/python-quickstart-fastapi/
class PyObjectId(ObjectId): # so FastAPI can encode ObjectID as JSON
    @classmethod
    def __get_validators__(cls):
        yield cls.validate
    @classmethod
    def validate(cls, v):
        if not ObjectId.is_valid(v):
            raise ValueError("Invalid objectid")
        return ObjectId(v)
    @classmethod
    def __modify_schema__(cls, field_schema):
        field_schema.update(type="string")

class FlexyDataInsertion(BaseModel):
    id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
    dataSeries_id: str = Field(...)
    timestamp: datetime = Field(...)
    value: int = Field(...)

    class Config:
        allow_population_by_field_name = True
        arbitrary_types_allowed = True
        json_encoders = {ObjectId: str}
        schema_extra = {
            "example": {
                "dataSeries_id": "blah",
                "timestamp": "2022-03-24T08:48:57Z",
                "value": 26,
            }
        }

@app.post('/dataPoints', response_description="Add data point(s)", response_model=None, tags=["insertion"])
async def add_data_point(datapoint: FlexyDataInsertion = Body(...)) -> None:
    datapoint = jsonable_encoder(datapoint)
    new_data = await db['test/dataSeries_SENSOR_Humidity/dataPoints'].insert_one(datapoint)
    created_data = await db['test/dataSeries_SENSOR_Humidity/dataPoints'].find_one({"_id": new_data.inserted_id})
    return JSONResponse(content=created_data)

@app.get('/dataPoints', response_model=None, tags=["extraction"])
async def retrieve_data_point():
    return JSONResponse("hello world")

handler = Mangum(app=app)

# uncomment for running on localhost server, not in production
# import uvicorn
# if __name__ == "__main__":
#     uvicorn.run("app:app", host="127.0.0.1", port=8000, log_level="info", reload=True) 

.

Error message from CloudWatch (part of MongoDB URL redacted using “XXXXXXXXXXX”):

[ERROR]	2022-04-09T02:49:34.716Z	fb7adf6d-f52b-42d5-bd34-72f4fc3bc7b4	An error occurred running the application.
Traceback (most recent call last):
  File "/var/task/mangum/protocols/http.py", line 66, in run
    await app(self.scope, self.receive, self.send)
  File "/var/task/fastapi/applications.py", line 261, in __call__
    await super().__call__(scope, receive, send)
  File "/var/task/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/var/task/starlette/middleware/errors.py", line 181, in __call__
    raise exc
  File "/var/task/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/var/task/starlette/exceptions.py", line 82, in __call__
    raise exc
  File "/var/task/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/var/task/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/var/task/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/var/task/starlette/routing.py", line 656, in __call__
    await route.handle(scope, receive, send)
  File "/var/task/starlette/routing.py", line 259, in handle
    await self.app(scope, receive, send)
  File "/var/task/starlette/routing.py", line 61, in app
    response = await func(request)
  File "/var/task/fastapi/routing.py", line 227, in app
    raw_response = await run_endpoint_function(
  File "/var/task/fastapi/routing.py", line 160, in run_endpoint_function
    return await dependant.call(**values)
  File "/var/task/app.py", line 97, in add_data_point
    new_data = await db['test/dataSeries_SENSOR_Humidity/dataPoints'].insert_one(datapoint)
  File "/var/lang/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/var/task/pymongo/collection.py", line 695, in insert_one
    self._insert(document,
  File "/var/task/pymongo/collection.py", line 610, in _insert
    return self._insert_one(
  File "/var/task/pymongo/collection.py", line 599, in _insert_one
    self.__database.client._retryable_write(
  File "/var/task/pymongo/mongo_client.py", line 1490, in _retryable_write
    with self._tmp_session(session) as s:
  File "/var/lang/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/var/task/pymongo/mongo_client.py", line 1823, in _tmp_session
    s = self._ensure_session(session)
  File "/var/task/pymongo/mongo_client.py", line 1810, in _ensure_session
    return self.__start_session(True, causal_consistency=False)
  File "/var/task/pymongo/mongo_client.py", line 1763, in __start_session
    server_session = self._get_server_session()
  File "/var/task/pymongo/mongo_client.py", line 1796, in _get_server_session
    return self._topology.get_server_session()
  File "/var/task/pymongo/topology.py", line 487, in get_server_session
    self._select_servers_loop(
  File "/var/task/pymongo/topology.py", line 208, in _select_servers_loop
    raise ServerSelectionTimeoutError(
pymongo.errors.ServerSelectionTimeoutError: connection closed,connection closed,connection closed
[ERROR] 2022-04-09T02:49:34.716Z fb7adf6d-f52b-42d5-bd34-72f4fc3bc7b4 An error occurred running the application. Traceback (most recent call last): File "/var/task/mangum/protocols/http.py", line 66, in run await app(self.scope, self.receive, self.send) File "/var/task/fastapi/applications.py", line 261, in __call__ await super().__call__(scope, receive, send) File "/var/task/starlette/applications.py", line 112, in __call__ await self.middleware_stack(scope, receive, send) File "/var/task/starlette/middleware/errors.py", line 181, in __call__ raise exc File "/var/task/starlette/middleware/errors.py", line 159, in __call__ await self.app(scope, receive, _send) File "/var/task/starlette/exceptions.py", line 82, in __call__ raise exc File "/var/task/starlette/exceptions.py", line 71, in __call__ await self.app(scope, receive, sender) File "/var/task/fastapi/middleware/asyncexitstack.py", line 21, in __call__ raise e File "/var/task/fastapi/middleware/asyncexitstack.py", line 18, in __call__ await self.app(scope, receive, send) File "/var/task/starlette/routing.py", line 656, in __call__ await route.handle(scope, receive, send) File "/var/task/starlette/routing.py", line 259, in handle await self.app(scope, receive, send) File "/var/task/starlette/routing.py", line 61, in app response = await func(request) File "/var/task/fastapi/routing.py", line 227, in app raw_response = await run_endpoint_function( File "/var/task/fastapi/routing.py", line 160, in run_endpoint_function return await dependant.call(**values) File "/var/task/app.py", line 97, in add_data_point new_data = await db['test/dataSeries_SENSOR_Humidity/dataPoints'].insert_one(datapoint) File "/var/lang/lib/python3.8/concurrent/futures/thread.py", line 57, in run result = self.fn(*self.args, **self.kwargs) File "/var/task/pymongo/collection.py", line 695, in insert_one self._insert(document, File "/var/task/pymongo/collection.py", line 610, in _insert return self._insert_one( File "/var/task/pymongo/collection.py", line 599, in _insert_one self.__database.client._retryable_write( File "/var/task/pymongo/mongo_client.py", line 1490, in _retryable_write with self._tmp_session(session) as s: File "/var/lang/lib/python3.8/contextlib.py", line 113, in __enter__ return next(self.gen) File "/var/task/pymongo/mongo_client.py", line 1823, in _tmp_session s = self._ensure_session(session) File "/var/task/pymongo/mongo_client.py", line 1810, in _ensure_session return self.__start_session(True, causal_consistency=False) File "/var/task/pymongo/mongo_client.py", line 1763, in __start_session server_session = self._get_server_session() File "/var/task/pymongo/mongo_client.py", line 1796, in _get_server_session return self._topology.get_server_session() File "/var/task/pymongo/topology.py", line 487, in get_server_session self._select_servers_loop( File "/var/task/pymongo/topology.py", line 208, in _select_servers_loop raise ServerSelectionTimeoutError( pymongo.errors.ServerSelectionTimeoutError: connection closed,connection closed,connection closed

.

With the latest Motor and Pymongo versions, there is the following additional 2 lines (session_timeout & _check_session_support) in the error log, near the end:

    session_timeout = self._check_session_support()
  File "/var/task/pymongo/topology.py", line 504, in _check_session_support
    self._select_servers_loop(
  File "/var/task/pymongo/topology.py", line 218, in _select_servers_loop
    raise ServerSelectionTimeoutError(
pymongo.errors.ServerSelectionTimeoutError: connection closed,connection closed,connection closed, Timeout: 30s, Topology Description: <TopologyDescription id: 6250e075e1229bd047cf86b7, topology_type: ReplicaSetNoPrimary, servers: [<ServerDescription ('XXXXXXXXXXX.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('connection closed')>, <ServerDescription ('XXXXXXXXXXX.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('connection closed')>, <ServerDescription ('XXXXXXXXXXX.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('connection closed')>]>
[ERROR] 2022-04-09T01:32:20.565Z c1103461-149b-4eed-92d2-013b500fceaf An error occurred running the application. Traceback (most recent call last): File "/var/task/mangum/protocols/http.py", line 66, in run await app(self.scope, self.receive, self.send) File "/var/task/fastapi/applications.py", line 261, in __call__ await super().__call__(scope, receive, send) File "/var/task/starlette/applications.py", line 112, in __call__ await self.middleware_stack(scope, receive, send) File "/var/task/starlette/middleware/errors.py", line 181, in __call__ raise exc File "/var/task/starlette/middleware/errors.py", line 159, in __call__ await self.app(scope, receive, _send) File "/var/task/starlette/exceptions.py", line 82, in __call__ raise exc File "/var/task/starlette/exceptions.py", line 71, in __call__ await self.app(scope, receive, sender) File "/var/task/fastapi/middleware/asyncexitstack.py", line 21, in __call__ raise e File "/var/task/fastapi/middleware/asyncexitstack.py", line 18, in __call__ await self.app(scope, receive, send) File "/var/task/starlette/routing.py", line 656, in __call__ await route.handle(scope, receive, send) File "/var/task/starlette/routing.py", line 259, in handle await self.app(scope, receive, send) File "/var/task/starlette/routing.py", line 61, in app response = await func(request) File "/var/task/fastapi/routing.py", line 227, in app raw_response = await run_endpoint_function( File "/var/task/fastapi/routing.py", line 160, in run_endpoint_function return await dependant.call(**values) File "/var/task/app.py", line 96, in add_data_point new_data = await db['test/dataSeries_SENSOR_Humidity/dataPoints'].insert_one(datapoint) File "/var/lang/lib/python3.8/concurrent/futures/thread.py", line 57, in run result = self.fn(*self.args, **self.kwargs) File "/var/task/pymongo/collection.py", line 705, in insert_one self._insert(document, File "/var/task/pymongo/collection.py", line 620, in _insert return self._insert_one( File "/var/task/pymongo/collection.py", line 609, in _insert_one self.__database.client._retryable_write( File "/var/task/pymongo/mongo_client.py", line 1551, in _retryable_write with self._tmp_session(session) as s: File "/var/lang/lib/python3.8/contextlib.py", line 113, in __enter__ return next(self.gen) File "/var/task/pymongo/mongo_client.py", line 1948, in _tmp_session s = self._ensure_session(session) File "/var/task/pymongo/mongo_client.py", line 1935, in _ensure_session return self.__start_session(True, causal_consistency=False) File "/var/task/pymongo/mongo_client.py", line 1883, in __start_session server_session = self._get_server_session() File "/var/task/pymongo/mongo_client.py", line 1921, in _get_server_session return self._topology.get_server_session() File "/var/task/pymongo/topology.py", line 520, in get_server_session session_timeout = self._check_session_support() File "/var/task/pymongo/topology.py", line 504, in _check_session_support self._select_servers_loop( File "/var/task/pymongo/topology.py", line 218, in _select_servers_loop raise ServerSelectionTimeoutError( pymongo.errors.ServerSelectionTimeoutError: connection closed,connection closed,connection closed, Timeout: 30s, Topology Description: <TopologyDescription id: 6250e075e1229bd047cf86b7, topology_type: ReplicaSetNoPrimary, servers: [<ServerDescription ('XXXXXXXXXXX.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('connection closed')>, <ServerDescription ('XXXXXXXXXXX.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('connection closed')>, <ServerDescription ('XXXXXXXXXXX.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('connection closed')>]>

yeap in latest pymongo and motor, I am also getting same issue.

Still stuck in DB cache in python. Did you use DB cache?

Found the solution for me. Each time I destroyed then again spun up my Lambda using Terraform, the IP would change. Getting that IP on my MongoDB Atlas allow list solved the problem. I did not have to use older PyMongo and Motor versions.

1 Like

Could you help me with code snippet ? Just how you modify the code you write before here in the post

1 Like

The only change I made in my code was to my MongoDB URL, changing the URL, username and password. The error message from CloudWatch had part of the MongoDB URL redacted using “XXXXXXXXXXX”.

It would be easy to troubbleshoot in my project if you put your MoNGODB url with username ,password, cluster name hidden.