mirror of https://github.com/milvus-io/milvus.git
Fix insert pks overwrite (#12365)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>pull/12414/head
parent
b1da696767
commit
c4f0837d84
|
@ -14,6 +14,7 @@ package querynode
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
|
@ -110,7 +111,12 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
iData.insertIDs[task.SegmentID] = append(iData.insertIDs[task.SegmentID], task.RowIDs...)
|
||||
iData.insertTimestamps[task.SegmentID] = append(iData.insertTimestamps[task.SegmentID], task.Timestamps...)
|
||||
iData.insertRecords[task.SegmentID] = append(iData.insertRecords[task.SegmentID], task.RowData...)
|
||||
iData.insertPKs[task.SegmentID] = getPrimaryKeys(task, iNode.streamingReplica)
|
||||
pks, err := getPrimaryKeys(task, iNode.streamingReplica)
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
iData.insertPKs[task.SegmentID] = append(iData.insertPKs[task.SegmentID], pks...)
|
||||
}
|
||||
|
||||
// 2. do preInsert
|
||||
|
@ -307,17 +313,17 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *
|
|||
|
||||
// TODO: remove this function to proper file
|
||||
// TODO: why not return error?
|
||||
func getPrimaryKeys(msg *msgstream.InsertMsg, streamingReplica ReplicaInterface) []int64 {
|
||||
func getPrimaryKeys(msg *msgstream.InsertMsg, streamingReplica ReplicaInterface) ([]int64, error) {
|
||||
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
|
||||
log.Warn("misaligned messages detected")
|
||||
return nil
|
||||
return nil, errors.New("misaligned messages detected")
|
||||
}
|
||||
collectionID := msg.GetCollectionID()
|
||||
|
||||
collection, err := streamingReplica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
return nil
|
||||
return nil, err
|
||||
}
|
||||
offset := 0
|
||||
for _, field := range collection.schema.Fields {
|
||||
|
@ -357,7 +363,7 @@ func getPrimaryKeys(msg *msgstream.InsertMsg, streamingReplica ReplicaInterface)
|
|||
dim, err := strconv.Atoi(t.Value)
|
||||
if err != nil {
|
||||
log.Error("strconv wrong on get dim", zap.Error(err))
|
||||
return nil
|
||||
return nil, err
|
||||
}
|
||||
offset += dim / 8
|
||||
break
|
||||
|
@ -376,10 +382,11 @@ func getPrimaryKeys(msg *msgstream.InsertMsg, streamingReplica ReplicaInterface)
|
|||
err := binary.Read(reader, common.Endian, &pks[i])
|
||||
if err != nil {
|
||||
log.Warn("binary read blob value failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return pks
|
||||
return pks, nil
|
||||
}
|
||||
func newInsertNode(streamingReplica ReplicaInterface) *insertNode {
|
||||
maxQueueLength := Params.FlowGraphMaxQueueLength
|
||||
|
|
|
@ -310,7 +310,10 @@ func (loader *segmentLoader) loadGrowingSegments(segment *Segment,
|
|||
RowData: records,
|
||||
},
|
||||
}
|
||||
pks := getPrimaryKeys(tmpInsertMsg, loader.streamingReplica)
|
||||
pks, err := getPrimaryKeys(tmpInsertMsg, loader.streamingReplica)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
segment.updateBloomFilter(pks)
|
||||
|
||||
// 3. do insert
|
||||
|
|
Loading…
Reference in New Issue