here is a more detailed version of the code
function (mongoc_collection_t *pCollectionRemote, mongoc_collection_t *pCollectionLocal)
{
bson_error_t errorLocal;
mongoc_bulk_operation_t *bulk = NULL;
mongoc_cursor_t *cursor = NULL;
mongoc_cursor_t *indexCursor = NULL;
//add unique indexes
int numIndexKeys = 0;
const char **indexKeyList = NULL;
indexCursor = mongoc_collection_find_indexes_with_opts(pCollectionRemote, NULL);
const bson_t *indexDoc;
mongoc_cursor_set_batch_size (indexCursor, (uint32_t)1);
while(mongoc_cursor_next(indexCursor, &indexDoc))
{
//if the indexName is for _id_ - its the automatically created one - no need to copy it over
bson_iter_t iter;
const char *indexName;
bool bAddNewIndex = false;
bool bIsUniqueIndex = false;
if (bson_iter_init (&iter, indexDoc))
{
while (bson_iter_next (&iter))
{
const char *key = bson_iter_key(&iter);
if (!bson_strcasecmp(key, "unique") && BSON_ITER_HOLDS_BOOL(&iter))
{
bIsUniqueIndex = true;
}
else if (!bson_strcasecmp(key, "key") && BSON_ITER_HOLDS_DOCUMENT(&iter))
{
numIndexKeys = 0;
bson_iter_t keyIterFindKeys;
if (bson_iter_recurse(&iter, &keyIterFindKeys))
{
while (bson_iter_next(&keyIterFindKeys))
{
numIndexKeys++;
}
}
indexKeyList = calloc(numIndexKeys, sizeof(char *));
bson_iter_t keyIter;
if (bson_iter_recurse(&iter, &keyIter))
{
int loopCount = 0;
while (bson_iter_next(&keyIter))
{
indexKeyList[loopCount] = bson_iter_key(&keyIter);;
loopCount++;
}
}
}
else if (!bson_strcasecmp(key, "name") && BSON_ITER_HOLDS_UTF8(&iter))
{
indexName = bson_iter_utf8(&iter, NULL);
if (bson_strcasecmp(indexName, "_id_") != 0)
{
bAddNewIndex = true;
}
}
}
}
if (bAddNewIndex)
{
bson_t index_keys = BSON_INITIALIZER;
bson_t *index_opts = NULL;
for (int i=0;i<numIndexKeys;i++)
{
BSON_APPEND_INT32 (&index_keys, indexKeyList[i], 1);
}
if (bIsUniqueIndex)
{
index_opts = BCON_NEW("unique",
BCON_BOOL (true),
"name",
BCON_UTF8(indexName));
}
else
{
index_opts = BCON_NEW("name",
BCON_UTF8(indexName));
}
mongoc_index_model_t *im = mongoc_index_model_new (&index_keys, index_opts );
if (mongoc_collection_create_indexes_with_opts (pCollectionLocal, &im,1, NULL , NULL , &errorLocal))
{
CNCORE_LOG(PACKAGE_NAME,LOG_DEBUG, NULL, "Created new index");
}
else
{
CNCORE_LOG(PACKAGE_NAME,LOG_ERR, NULL, "Failed to create new index due to %s", errorLocal.message);
mongoc_index_model_destroy(im);
bson_destroy (&index_keys);
bson_destroy (index_opts);
if (indexKeyList)
{
free(indexKeyList);
indexKeyList = NULL;
}
goto cleanup;
}
mongoc_index_model_destroy(im);
bson_destroy (&index_keys); //chris comment
bson_destroy (index_opts);
}
//free(indexName);
if (indexKeyList)
{
free(indexKeyList);
indexKeyList = NULL;
}
}
if (indexCursor)
{
mongoc_cursor_destroy (indexCursor);
indexCursor = NULL;
}
const bson_t *doc;
bson_t filter = BSON_INITIALIZER;
cursor = mongoc_collection_find_with_opts (pCollectionRemote, &filter, NULL , NULL);
mongoc_cursor_set_batch_size (cursor, custom_batch_size);
bson_destroy(&filter);
bool ret;
bson_error_t retError;
if (custom_batch_size == 1)
{
while (mongoc_cursor_next (cursor, &doc))
{
ret = mongoc_collection_insert_one(pCollectionLocal,doc,NULL,NULL,&retError);
if (!ret)
{
CNCORE_LOG(PACKAGE_NAME,LOG_ERR, NULL, "Individual write error occured");
goto cleanup;
}
}
}
//perform bulk writes
else
{
bulk = mongoc_collection_create_bulk_operation_with_opts (pCollectionLocal, NULL);
uint32_t numDocumentsProcessed = 0;
while (mongoc_cursor_next (cursor, &doc))
{
//int64_t currentBulkLatencyWrite = 0;
//struct timeval stop, start;
numDocumentsProcessed++;
mongoc_bulk_operation_insert(bulk,doc);
//bulk write in batches of 1000
if (numDocumentsProcessed == custom_batch_size)
{
numDocumentsProcessed = 0;
ret = mongoc_bulk_operation_execute (bulk, &reply, &retError);
bson_destroy (&reply);
if (bulk)
{
mongoc_bulk_operation_destroy (bulk);
bulk = NULL;
bulk = mongoc_collection_create_bulk_operation_with_opts (pCollectionLocal,NULL);
}
if (!ret) {
CNCORE_LOG(PACKAGE_NAME,LOG_ERR, NULL, "Bulk Write Error Occured: %s", retError.message);
goto cleanup;
}
}
}
if (numDocumentsProcessed > 0)
{
ret = mongoc_bulk_operation_execute (bulk, NULL, &retError);
if (!ret) {
CNCORE_LOG(PACKAGE_NAME,LOG_ERR, NULL, "Bulk Write Error Occured: %s", retError.message);
goto cleanup;
}
}
}
cleanup:
mongoc_bulk_operation_destroy (bulk);
bulk = NULL;
mongoc_cursor_destroy(cursor);
cursor = NULL;
mongoc_cursor_destroy (indexCursor);
return status;
}
//function returns and collection_destroy() is called on collectionLocal and collectionRemote
//this funciton is called for every collection
//collections are of various document and collection sizes.