Hi,
We have a use case to perform upsert operation for 20 millions records from Pyspark to Mongo Collection.
However it takes more than an hour (1 hr 5 mins or sometimes even more) just for 100K records…
Approaches that I tried:
-
while writing to mongo, I tried with multiple batch sizes but no luck
df.write.format(‘com.mongodb.spark.sql.DefaultSource’).mode(‘append’)
.option(‘uri’, connection_uri)
.option(‘database’, database)
.option(‘collection’, collection)
.option(‘maxBatchSize’, 1000)
.option(‘replaceDocument’, False)
.option(‘shardkey’, shard_key)
.option(‘multi’, True)
.save() -
Splitting dataframe and calling the above df.write operation:
def upsert_operation_into_mongo(df3, connection_uri, mode, database, collection, shard_key):
df3.write.format(‘com.mongodb.spark.sql.DefaultSource’).mode(‘append’)
.option(‘uri’, connection_uri)
.option(‘database’, database)
.option(‘collection’, collection)
.option(‘replaceDocument’, False)
.option(‘shardkey’, shard_key)
.option(‘multi’, True)
.save()
return ‘Successfully Written Data to Mongo DB Collection’
each_len = 3000
copy_df = df
i = 0
while i < df.count():
j = copy_df.count()
if each_len < j:
i += each_len
temp_df = copy_df.limit(each_len)
copy_df = copy_df.subtract(temp_df)
msg = upsert_operation_into_mongo(temp_df, connection_uri, mode, database, collection, shard_key)
print(msg)
else:
i += j
temp_df = copy_df.limit(j)
copy_df = copy_df.subtract(temp_df)
msg = upsert_operation_into_mongo(temp_df, connection_uri, mode, database, collection, shard_key)
print(msg)
- I tried the above operations by creating index on the key (that is being used) for upsert operation (say ‘id’ column), performance improved but not much for 100K records.
I understand Mongo-Spark Connector internally uses BulkWrite.
Is there any efficient way by which we can increase the speed for Upsert operation ?
On the other hand, overwrite operation hardly takes 2 mins for the same number of records.
Thanks,
Sarvesh