Async Motor Driver not working for pymongo based functions

Hi,
I am trying to create an async version of my MongoDB functions using the Motor driver for MongoDB.

Platform settings:

python (sys.version): 3.10.0 | packaged by conda-forge | (default, Nov 10 2021, 13:20:59) [MSC v.1916 64 bit (AMD64)]
MotorVersion: 3.0.0.0
TornadoVersion: 6.1
OperatingSystem: Windows 10, 64-bit . Intel Core-17
pymongo.version : 4.1.1
pymongo.has_c(): True

The following is my code for the normal synchronous version of the tasks.

## Import Libraries
import pymongo
from pymongo import MongoClient,ReturnDocument
from pymongo.errors import ConnectionFailure, AutoReconnect, BulkWriteError, OperationFailure
import pandas as pd
import time
import credentials

credentials_mongodb = credentials.mongodb
user_id = credentials_mongodb["username"]
password = credentials_mongodb["password"]
domain = credentials_mongodb["domain"]
database = credentials_mongodb["db"]

CLIENT_URI = "mongodb+srv://{}:{}@{}/admin?retryWrites=true&w=majority".format(user_id, password, domain)

def MongoDB_Connection(client_uri : str = CLIENT_URI) -> dict:
    '''
    Makes a connection to the MongoDB Cluster and returns success or exception handling (error) messages.
    ## Reference: https://www.programcreek.com/python/example/94224/pymongo.errors

    Parameters
    ----------
    client_url  : str, default = CLIENT_URI stored in MongoWorks.py
                  Client URL to connect to the MongoDB cluster.

    Returns
    -------
    dict        : {Success or Failure message}
    '''
    client = MongoClient(client_uri)
    try:
        client.admin.command('ismaster') # The ismaster command is cheap and does not require auth.
        # print("Connecting to MongoCluster at %s - Successful" % (domain))
        return client
    except (AutoReconnect, ConnectionFailure) as e:
        error_message = e + ": Server not available"
        print(error_message)
        return e

def MongoDB_Collection_Write(insert_method : str, collection_name : str, docs_to_insert, db: str = database) -> str:
    '''
    Writes documents to a MongoDB Collection.

    Parameters
    ----------
    insert_method   : str, {"insert_one", "insert_many"}
                      MongoDB write method selection key inserting one document or multiple documents.

    collection_name : str, name of the collection in MongoDB
    docs_to_insert  : a single dict or list of dicts,
                      Documents to insert into a MongoDB collection in list of dicts format
    db              : str, name of the database in MongoDB

    Returns
    -------
    messages        : dict, {doc_iD : Success/Failure message}
                      Success or Failure message with documents insertion ID

    '''
    client = MongoDB_Connection()
    db = client[db]
    collection = db[collection_name]
    if insert_method == "insert_one":
        try:
            result = collection.insert_one(docs_to_insert).inserted_id
            message = "{} - Document Inserted successfully into `{}` collection".format(result, collection_name)
            print(message)
            return {result : message}
        except Exception as e:
            message = "An Exception occured when inserting a document into `{}` Collection. ".format(collection_name)
            print(message, e)
            return e
    if insert_method == "insert_many":
        try:
            result = collection.insert_many(docs_to_insert,ordered=False)
            inserted_ids = result.inserted_ids
            message = {"DocsInsertionSuccess - {} documents inserted into {}".format(len(inserted_ids), collection_name)}
            print(message)
            return message
        except (Exception,BulkWriteError, OperationFailure) as e:
            if Exception:
                error_message = e
            else:
                error_message = e.details['writeErrors']
            print("Error Message:", error_message)
            return error_message

def MongoDB_Collection_Clear(collection:str, db:str = database) -> pd.DataFrame:
    '''
    Deltes all documents in a MongoDB collection.

    Parameters
    -----------
    collection  : str, name of the collection in MongoDB
    db          : str, name of the database in MongoDB


    Returns
    -------
    msg         : success or failure message
    '''
    # Connect to MongoDB
    client = MongoDB_Connection()
    db = client[db]
    cursor = db[collection]
    try:
        cursor.delete_many({})
        message = "All Documents deleted in Collection - {}".format(collection)
        print(message)
        return message
    except Exception as e:
        message = "An exception has occurred when trying to Clear Collection - `{}`.\t {}".format(collection, e)
        print(message)
        return e

def MongoDB_UpdateCollection_AccountInfo(collection_name:str = "AccountInfoDemo", new_data = None) -> str:
    '''
    Updates the `AccountInfoDemo` collection in MongoDB.

    Parameters
    ----------
    collection_name : str, default = "AccountInfo",
                      Name of the collection in MongoDB
    new_data        : dict, new data to update the collection

    Returns
    -------
    Object_ID : str,
                Returns a success message with the object ID
    '''
    client = MongoDB_Connection()
    db = client[database]
    ## MongoDB Creates Collections Automatically no need to check if it exists but for logging may be needed
    if collection_name not in db.list_collection_names():
        print("Creating `{}` collection for the first time".format(collection_name))
        collection = db[collection_name] ## Creates collection for the first time
        operation_result = MongoDB_Collection_Write(insert_method = "insert_one", collection_name = collection_name, docs_to_insert = account_info_sample)
    else:
        try:
            collection = db[collection_name]
            result = collection.update_one({},{'$set': new_data}, upsert=False).acknowledged
            message = "Update Acknowledged:{} - `Balance`, `Equity`, `Profit` successfully updated in `{}` collection".format(result, collection_name)
            print(message)
            return message
        except Exception as e:
            message = "An Exception occured when updating a document into `{}` Collection. ".format(collection_name)
            print(message, e)
            return e

def MongoDB_UpdateCollection_CPH(collection_name:str = "CPHDemo", doc : dict = None, rewrite :bool = False) -> dict:
    '''
    Updates the `CPH_Demo` collection in MongoDB.

    Parameters
    ----------
    collection_name : str, default = "CPH_Demo",
                      Name of the collection in MongoDB
    doc             : dict,
                      a document in dict format to be inserted into a MongoDB collection.
    rewrite        : bool,
                      If True recreate the collection and insert closed positions history from scratch,
                      If False insert the passed document into the collection
    Returns
    -------
    operation_result : dict,
                       Returns a dictionary with the object ID(s) and a success or failure message
    '''
    closed_positions_history = pd.read_csv("ClosedPositionsHistory.csv")
    client = MongoDB_Connection()
    db = client[database]
    collection = db[collection_name]
    cursor = collection.find()
    results = list(cursor)
    if ((len(results) == 0) or (rewrite == True)):  # Checking if the MongoDB Collection (cursor) is empty
        if rewrite:
            MongoDB_Collection_Clear(collection_name)
            message = f"MongoDB Collection {collection_name} - Cleared for ReWrite"
            print(message)
        else:
            message = f"MongoDB Collection - {collection_name} Empty"
            print(message)
        ## Insert entire history of closed positions into the empty collection
        operation_result = MongoDB_Collection_Write(insert_method = "insert_many", collection_name = collection_name, docs_to_insert = closed_positions_history.to_dict('records'))
        return operation_result
    elif doc != None:
        try:
            position_id = doc["position_id"]
            result = collection.find_one_and_update({"position_id": position_id},{'$set':doc}, return_document=ReturnDocument.AFTER, upsert=True)
            message = "Document Inserted successfully into `{}` collection. \nResult: \t{}".format(collection_name,result)
            print(message)
            return message
        except Exception as e:
            message = "An Exception occured when inserting a document into `{}` Collection. ".format(collection_name)
            print(message, '\n' ,e)
            return e


account_info_sample = {'balance': 27133.2, 'currency': 'USD', 'equity': 27133.2, 'leverage': 25, 'limit_orders': 200,
                        'login': 123456789, 'name': 'John Smith', 'profit': 0.0, 'server': 'Broker-Demo'}

account_info_new_data = { 'balance': 50000.0,  'equity': 49000.0, 'profit': 777.77}
cph_new_data = {'ClosePrice': 2029.90,'CloseTime': '2022-05-14 21:25:16',
                             'OpenPrice': 2024.86,'OpenTime': '2022-05-14 21:24:03',
                             'commission': 0.0, 'fee': 0.0, 'position_id': 221234567, 'profit': 3.0, 'sl': 0.0,
                             'swap': 0.0, 'symbol': 'ETHUSD', 'tp': 0.0, 'type': 'BUY', 'volume': 1.0}

def seq_mongo_tasks():
    MongoDB_UpdateCollection_AccountInfo(new_data = account_info_new_data)
    MongoDB_UpdateCollection_CPH(doc = cph_new_data)
    return None

## CODE EXPERIMENTS
## SYNCHRONOUS TASKS DEMO
st = time.time()
seq_mongo_tasks()
et = time.time()
# get the execution time
elapsed_time = et - st
print('Execution time:', elapsed_time, 'seconds')

The above code executes successfully and updates the mongoDB collections on Atlast cloud.

(tradebot) D:\AlgoBot>python MongoDemo.py
Update Acknowledged:True - `Balance`, `Equity`, `Profit` successfully updated in `AccountInfoDemo` collection
MongoDB Collection - CPHDemo Empty
{'DocsInsertionSuccess - 11 documents inserted into CPHDemo'}
Execution time: 1.6709 seconds

I also made an attempt in making the asynchronous version of the above code using the Motor Driver along withasync and await python operations. But it is not working as expected. Please find the code below for the async version:

## Import Libraries
import pymongo
from pymongo import MongoClient,ReturnDocument
from pymongo.errors import ConnectionFailure, AutoReconnect, BulkWriteError, OperationFailure
import pandas as pd
import time
import credentials
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from tornado.ioloop import IOLoop

credentials_mongodb = credentials.mongodb
user_id = credentials_mongodb["username"]
password = credentials_mongodb["password"]
domain = credentials_mongodb["domain"]
database = credentials_mongodb["db"]

CLIENT_URI = "mongodb+srv://{}:{}@{}/admin?retryWrites=true&w=majority".format(user_id, password, domain)

def MongoDB_Connection(client_uri : str = CLIENT_URI) -> dict:
    '''
    Makes a connection to the MongoDB Cluster and returns success or exception handling (error) messages.
    ## Reference: https://www.programcreek.com/python/example/94224/pymongo.errors

    Parameters
    ----------
    client_url  : str, default = CLIENT_URI stored in MongoWorks.py
                  Client URL to connect to the MongoDB cluster.

    Returns
    -------
    dict        : {Success or Failure message}
    '''
    client = AsyncIOMotorClient(client_uri)
    try:
        client.admin.command('ismaster') # The ismaster command is cheap and does not require auth.
        # print("Connecting to MongoCluster at %s - Successful" % (domain))
        return client
    except (AutoReconnect, ConnectionFailure) as e:
        error_message = e + ": Server not available"
        print(error_message)
        print(error_message)
        return e

async def MongoDB_Collection_Write(insert_method : str, collection_name : str, docs_to_insert, db: str = database) -> str:
    '''
    Writes documents to a MongoDB Collection asynchronously.
    Parameters
    ----------
    insert_method   : str, {"insert_one", "insert_many"}
                      MongoDB write method selection key inserting one document or multiple documents.

    collection_name : str, name of the collection in MongoDB
    docs_to_insert  : a single dict or list of dicts,
                      Documents to insert into a MongoDB collection in list of dicts format
    db              : str, name of the database in MongoDB

    Returns
    -------
    messages        : dict, {doc_iD : Success/Failure message}
                      Success or Failure message with documents insertion ID

    '''
    client = MongoDB_Connection()
    db = client[db]
    collection = db[collection_name]
    if insert_method == "insert_one":
        try:
            result = await collection.insert_one(docs_to_insert).inserted_id
            message = "{} - Document Inserted successfully into `{}` collection".format(result, collection_name)
            print(message)
            return {result : message}
        except Exception as e:
            message = "An Exception occured when inserting a document into `{}` Collection. ".format(collection_name)
            print(message, e)
            return e
    if insert_method == "insert_many":
        try:
            result = await collection.insert_many(docs_to_insert,ordered=False)
            inserted_ids = result.inserted_ids
            message = {"DocsInsertionSuccess - {} documents inserted into {}".format(len(inserted_ids), collection_name)}
            print(message)
            return message
        except (Exception,BulkWriteError, OperationFailure) as e:
            if Exception:
                error_message = e
            else:
                error_message = e.details['writeErrors']
            print("Error Message:", error_message)
            print(error_message)
            return error_message

def MongoDB_Collection_Clear(collection:str, db:str = database) -> pd.DataFrame:
    '''
    Deltes all documents in a MongoDB collection.

    Parameters
    -----------
    collection  : str, name of the collection in MongoDB
    db          : str, name of the database in MongoDB


    Returns
    -------
    msg         : success or failure message
    '''
    # Connect to MongoDB
    client = MongoDB_Connection()
    db = client[db]
    cursor = db[collection]
    try:
        cursor.delete_many({})
        message = "All Documents deleted in Collection - {}".format(collection)
        print(message)
        return message
    except Exception as e:
        message = "An exception has occurred when trying to Clear Collection - `{}`.\t {}".format(collection, e)
        print(message)
        return e

async def MongoDB_UpdateCollection_AccountInfo(collection_name:str = "AccountInfoDemo", new_data = None) -> str:
    '''
    Updates the `AccountInfoDemo` collection in MongoDB.

    Parameters
    ----------
    collection_name : str, default = "AccountInfo",
                      Name of the collection in MongoDB
    new_data        : dict, new data to update the collection

    Returns
    -------
    Object_ID : str,
                Returns a success message with the object ID
    '''
    client = MongoDB_Connection()
    db = client[database]
    ## MongoDB Creates Collections Automatically no need to check if it exists but for logging may be needed
    if collection_name not in db.list_collection_names():
        print("Creating `{}` collection for the first time".format(collection_name))
        collection = db[collection_name] ## Creates collection for the first time
        operation_result = MongoDB_Collection_Write(insert_method = "insert_one", collection_name = collection_name, docs_to_insert = account_info_sample)
    else:
        try:
            collection = db[collection_name]
            result = await collection.update_one({},{'$set': new_data}, upsert=False).acknowledged
            message = "Update Acknowledged:{} - `Balance`, `Equity`, `Profit` successfully updated in `{}` collection".format(result, collection_name)
            print(message)
            return message
        except Exception as e:
            message = "An Exception occured when updating a document into `{}` Collection. ".format(collection_name)
            print(message, e)
            return e

async def MongoDB_UpdateCollection_CPH(collection_name:str = "CPHDemo", doc : dict = None, rewrite :bool = False) -> dict:
    '''
    Updates the `CPH_Demo` collection in MongoDB.

    Parameters
    ----------
    collection_name : str, default = "CPH_Demo",
                      Name of the collection in MongoDB
    doc             : dict,
                      a document in dict format to be inserted into a MongoDB collection.
    rewrite        : bool,
                      If True recreate the collection and insert closed positions history from scratch,
                      If False insert the passed document into the collection
    Returns
    -------
    operation_result : dict,
                       Returns a dictionary with the object ID(s) and a success or failure message
    '''
    closed_positions_history = pd.read_csv("ClosedPositionsHistory.csv")
    client = MongoDB_Connection()
    db = client[database]
    collection = db[collection_name]
    cursor = collection.find()
    results = list(cursor)
    if ((len(results) == 0) or (rewrite == True)):  # Checking if the MongoDB Collection (cursor) is empty
        if rewrite:
            MongoDB_Collection_Clear(collection_name)
            message = f"MongoDB Collection {collection_name} - Cleared for ReWrite"
            print(message)
        else:
            message = f"MongoDB Collection - {collection_name} Empty"
            print(message)
        ## Insert entire history of closed positions into the empty collection
        operation_result = await MongoDB_Collection_Write(insert_method = "insert_many", collection_name = collection_name, docs_to_insert = closed_positions_history.to_dict('records'))
        return operation_result
    elif doc != None:
        try:
            position_id = doc["position_id"]
            result = await collection.find_one_and_update({"position_id": position_id},{'$set':doc}, return_document=ReturnDocument.AFTER, upsert=True)
            message = "Document Inserted successfully into `{}` collection. \nResult: \t{}".format(collection_name,result)
            print(message)
            return message
        except Exception as e:
            message = "An Exception occured when inserting a document into `{}` Collection. ".format(collection_name)
            print(message, '\n' ,e)
            return e


account_info_sample = {'balance': 27133.2, 'currency': 'USD', 'equity': 27133.2, 'leverage': 25, 'limit_orders': 200,
                        'login': 123456789, 'name': 'John Smith', 'profit': 0.0, 'server': 'Broker-Demo'}

account_info_new_data = { 'balance': 50000.0,  'equity': 49000.0, 'profit': 777.77}
cph_new_data = {'ClosePrice': 2029.90,'CloseTime': '2022-05-14 21:25:16',
                             'OpenPrice': 2024.86,'OpenTime': '2022-05-14 21:24:03',
                             'commission': 0.0, 'fee': 0.0, 'position_id': 221234567, 'profit': 3.0, 'sl': 0.0,
                             'swap': 0.0, 'symbol': 'ETHUSD', 'tp': 0.0, 'type': 'BUY', 'volume': 1.0}

def async_mongo_tasks():
    MongoDB_UpdateCollection_AccountInfo(new_data = account_info_new_data)
    MongoDB_UpdateCollection_CPH(doc = cph_new_data)
    pass

## CODE EXPERIMENTS
## ASYNCHRONOUS TASKS DEMO
st = time.time()
# result = IOLoop.current().run_sync(async_mongo_tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(async_mongo_tasks()))
et = time.time()
# get the execution time
elapsed_time = et - st
print('Execution time:', elapsed_time, 'seconds')

The above async code runs with some RunTimeWarnings but I dont see the updates in my MongoDB Cloud Atlas Collections.

D:\AlgoBot\MongoDemo.py:213: RuntimeWarning: coroutine 'MongoDB_UpdateCollection_AccountInfo' was never awaited
  MongoDB_UpdateCollection_AccountInfo(new_data = account_info_new_data)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
D:\AlgoBot\MongoDemo.py:214: RuntimeWarning: coroutine 'MongoDB_UpdateCollection_CPH' was never awaited
  MongoDB_UpdateCollection_CPH(doc = cph_new_data)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Execution time: 0.01097 seconds

The final application will have some several thousand rows of voluminous data to deal with for the MongoDB_UpdateCollection_CPH function and thus I would like to know if there will be any signficant execution time benefits when using the motor driver in my case. Any help to help me fix my async version of the code would also be greatly appreciated. :pray:

Attached is the sample dataset for MongoDB_UpdateCollection_CPH function:
ClosedPositionsHistory.csv (1.4 KB)

Thanks and Regards,
Dilip