falcon
(Falcon)
November 24, 2021, 11:01pm
1
We are using spring data jpa and mongodb.
There is document A with two line item objects. Each line item has unique id. We consume messages from kafka topic at line item level and update the attributes in each line item.
In this situation we are seeing race condition and always second line item attributes are updated, but first line item attributes is not updated.
Object structure:
Document: {
id : 1234
line items: [
line_item1: {
id: ‘line_111’
received: false
}
line_item2: {
id: ‘line_222’,
received: false
}
]
}
line_item1 comes first from the topic and does findById(1234) and should update received = true for line_item1.
line_item2 comes next from the topic and does findById(1234) and should update received = true line_item2.
But actually this is not happening, line_item1 is never updated and received = false always and line_item2 received = true.
steevej
(Steeve Juneau)
November 25, 2021, 1:10pm
2
Most likely the issue is with your update code.
You probably overwrite line_item1 with it old value with your update for line_item2.
You will need to share your code so that we can help.
But before posting more code or documents, please read Formatting code and log snippets in posts .
falcon
(Falcon)
January 3, 2022, 7:54pm
4
This is the sample code, where we use spring jpa and mongo client
Document doc = repository.findById(transactionId).get()
dbCollection.updateOne(
Filters.eq("line_item_id", lineItemId),
Updates.set('line_items.$.received', true)
)
repository.save(doc)
falcon
(Falcon)
January 3, 2022, 7:58pm
5
For each document, there are two line items and setting received flag on each line item as they are consumed from kafka topic.As these messages are coming at exact same time, second line item flag not setting, tried even adding synchronized on the method, which did not help.
steevej
(Steeve Juneau)
January 4, 2022, 3:52am
6
This does not query the documents correctly in order to set the $ of line_items.$.received correctly. There is no field name line_item_id .
And I think your sample document is wrongly redacted as I cannot insert it in my environment.
This field contains a space and is not between quotes:
falcon:
line items:
The things inside the line items array are not valid JSON. Inside an array you are supposed to have
"line items" : [ "line_111" , "line_222" , true , 1234 ]
"line items" : [ { "id" : "line_111" , "received" : true } ,
{ "id" : "line_222" , "received" : false } ]
"line items" : [ [ "line_111" , true ] ,
[ "line_222" , false ] ]
What is the role of doc in the code you shared?
falcon
(Falcon)
January 4, 2022, 5:05pm
8
This is the actual object structure of the document. Doing lookup using _id and updating respective line_items
received flag, by finding matching line_id
{
"_id": "60cb9eca8bbd7b3fe6b327e7-433840928973",
"lineItems": [
{
"line_id": "line1-60cb9eca8bbd7b3fe6b327e7-433840928973",
"received": false,
},
{
"line_id": "line2-60cb9eca8bbd7b3fe6b327e7-433840928973",
"received": false,
}
]
}
What is the correct way to query the documents ?
steevej
(Steeve Juneau)
January 4, 2022, 6:06pm
9
Thanks it is much easier to work with real data.
Try (mongosh version, you might need to convert to java):
// Starting with:
mongosh> c.find()
[
{
_id: '60cb9eca8bbd7b3fe6b327e7-433840928973',
lineItems: [
{
line_id: 'line1-60cb9eca8bbd7b3fe6b327e7-433840928973',
received: false
},
{
line_id: 'line2-60cb9eca8bbd7b3fe6b327e7-433840928973',
received: false
}
]
}
]
mongosh> query = {
_id: '60cb9eca8bbd7b3fe6b327e7-433840928973',
'lineItems.line_id': 'line1-60cb9eca8bbd7b3fe6b327e7-433840928973'
} ;
mongosh> update = { "$set" : { 'lineItems.$.received' : true } } ;
mongosh> c.updateOne( query , update ) ;
// Result:
mongosh> c.find()
[
{
_id: '60cb9eca8bbd7b3fe6b327e7-433840928973',
lineItems: [
{
line_id: 'line1-60cb9eca8bbd7b3fe6b327e7-433840928973',
received: true
},
{
line_id: 'line2-60cb9eca8bbd7b3fe6b327e7-433840928973',
received: false
}
]
}
]
falcon
(Falcon)
January 4, 2022, 7:17pm
10
When we receive line1 and line2 at same time, the changes you suggested would help resolving the race condition and update received= true for both line items ? instead of just setting line2 (received)=true and line1(received) = false always.
steevej
(Steeve Juneau)
January 4, 2022, 7:37pm
12
There are one that is not resolve:
How is
and
falcon:
repository.save(doc)
related to
falcon
(Falcon)
January 4, 2022, 8:06pm
13
When received = true set on both line items, then verifyFlag set to true on header level and saving the whole document to db. If one of the line item received flag is false, then verifyFlag = false.
{
"_id": "60cb9eca8bbd7b3fe6b327e7-433840928973",
"verifyFlag": true,
"lineItems": [
{
"line_id": "line1-60cb9eca8bbd7b3fe6b327e7-433840928973",
"received": true,
},
{
"line_id": "line2-60cb9eca8bbd7b3fe6b327e7-433840928973",
"received": true,
}
]
}
steevej
(Steeve Juneau)
January 4, 2022, 9:05pm
14
That will not work.
Your document doc contains the lineItems before dbCollection.updateOne() so your save() will overwrite whatever is done in your updateOne().
falcon
(Falcon)
January 4, 2022, 9:14pm
15
So its good to separate out these two logic
first method just updates line item level received flag
second run daily, separate service/cron job to update header level verifyFlag
steevej
(Steeve Juneau)
January 4, 2022, 10:42pm
16
It depends if your use-case can tolerate out of sync data. You would also need contingent plans for the occasions when your job fails to run correctly.
Personally, I would not rely on a field that is updated from other fields (unless performance issues mandate it). I would query
{ "lineItems": { "$not" : { "$elemMatch": { "received": false } } } }
steevej
(Steeve Juneau)
January 5, 2022, 2:41pm
18
Of course it is. Your field is named line_id and you use lineItemId in your code.
falcon
(Falcon)
January 5, 2022, 3:03pm
19
Sorry, its typo.
Bson lineIdFilter = Filters.eq("lineItems.line_id", lineItemId)
Bson idFilter = Filters.eq('_id', transactionId)
collection.updateOne(
Filters.and(idFilter, lineIdFilter),
Updates.set('lineItems.$.received', true)
)
steevej
(Steeve Juneau)
January 5, 2022, 3:07pm
20
Typo in your post and in your code I guess. Otherwise it means you do not share your code by cut-n-pasting from your source. If you do not cut-n-paste from your code then we are in the dark for what you really do in your code.
falcon
(Falcon)
January 5, 2022, 3:38pm
21
I tried providing simplified code. Here is the complete code and object
{
"_id": "61ad15e590dc1a6469638995-433872067459",
"verifyFlag": false,
"lineItems": [
{
"transactionId": "61ad15e590dc1a6469638995-433872067459",
"lineItemId": "seller--line-61ad15e590dc1a6469638995-433872067459",
"received": false
},
{
"transactionId": "61ad15e590dc1a6469638995-433872067459",
"lineItemId": "target--line-61ad15e590dc1a6469638995-433872067459",
"received": true
}
]
}
Code snippet
Document doc = repository.findById(transactionId).get()
Pattern regex = Pattern.compile('[-]')
Bson lineIdFilter = Filters.eq('lineItems.lineItemId', lineItemId)
if (regex.matcher(transactionId).find()) {
Bson idFilter = Filters.eq('_id', transactionId)
collection.updateOne(
Filters.and(idFilter, lineIdFilter),
Updates.set('lineItems.$.received', true)
)
} else {
Bson objectIdFilter = Filters.eq('_id', new ObjectId(transactionId))
collection.updateOne(
Filters.and(objectIdFilter, lineIdFilter),
Updates.set('lineItems.$.received', true)
)
}
List<TransferItem> lineItems = doc.lineItems
TransferItem transferItem = lineItems?.find { it -> it.lineItemId == lineItemId }
transferItem.received = true
doc.verifyFlag = lineItems?.every { transferItem -> transferItem.received }
repository.save(doc)
falcon
(Falcon)
January 5, 2022, 4:05pm
23
Moved document read after udateOne
Pattern regex = Pattern.compile('[-]')
Bson lineIdFilter = Filters.eq('lineItems.lineItemId', lineItemId)
if (regex.matcher(transactionId).find()) {
Bson idFilter = Filters.eq('_id', transactionId)
collection.updateOne(
Filters.and(idFilter, lineIdFilter),
Updates.set('lineItems.$.received', true)
)
} else {
Bson objectIdFilter = Filters.eq('_id', new ObjectId(transactionId))
collection.updateOne(
Filters.and(objectIdFilter, lineIdFilter),
Updates.set('lineItems.$.received', true)
)
}
Document doc = repository.findById(transactionId).get()
List<TransferItem> lineItems = doc.lineItems
TransferItem transferItem = lineItems?.find { it -> it.lineItemId == lineItemId }
transferItem.received = true
doc.verifyFlag = lineItems?.every { transferItem -> transferItem.received }
repository.save(doc)