mirror of https://github.com/milvus-io/milvus.git
Add collection id and partition id to insert & search workflow
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/4973/head^2
parent
b853020ef9
commit
6c3f169ecc
|
@ -3,7 +3,6 @@ package proxynode
|
|||
import (
|
||||
"log"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
|
||||
|
@ -181,6 +180,7 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
|
|||
partitionID := insertRequest.PartitionID
|
||||
partitionName := insertRequest.PartitionName
|
||||
proxyID := insertRequest.Base.SourceID
|
||||
channelNames := channelNamesMap[collectionID]
|
||||
for index, key := range keys {
|
||||
ts := insertRequest.Timestamps[index]
|
||||
rowID := insertRequest.RowIDs[index]
|
||||
|
@ -191,6 +191,7 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
|
|||
result[key] = &msgPack
|
||||
}
|
||||
segmentID := getSegmentID(reqID, key)
|
||||
channelID := channelNames[int(key)%len(channelNames)]
|
||||
sliceRequest := internalpb2.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kInsert,
|
||||
|
@ -204,7 +205,8 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
|
|||
PartitionName: partitionName,
|
||||
SegmentID: segmentID,
|
||||
// todo rename to ChannelName
|
||||
ChannelID: strconv.FormatInt(int64(key), 10),
|
||||
// ChannelID: strconv.FormatInt(int64(key), 10),
|
||||
ChannelID: channelID,
|
||||
Timestamps: []uint64{ts},
|
||||
RowIDs: []int64{rowID},
|
||||
RowData: []*commonpb.Blob{row},
|
||||
|
|
|
@ -106,9 +106,17 @@ func (it *InsertTask) Execute() error {
|
|||
return err
|
||||
}
|
||||
it.CollectionID = collID
|
||||
partitionID, err := globalMetaCache.GetPartitionID(collectionName, it.PartitionName)
|
||||
if err != nil {
|
||||
return err
|
||||
var partitionID UniqueID
|
||||
if len(it.PartitionName) > 0 {
|
||||
partitionID, err = globalMetaCache.GetPartitionID(collectionName, it.PartitionName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
partitionID, err = globalMetaCache.GetPartitionID(collectionName, Params.DefaultPartitionTag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
it.PartitionID = partitionID
|
||||
var rowIDBegin UniqueID
|
||||
|
@ -463,6 +471,25 @@ func (st *SearchTask) PreExecute() error {
|
|||
st.Query = &commonpb.Blob{
|
||||
Value: queryBytes,
|
||||
}
|
||||
|
||||
st.ResultChannelID = Params.SearchResultChannelNames[0]
|
||||
st.DbID = 0 // todo
|
||||
collectionID, err := globalMetaCache.GetCollectionID(collectionName)
|
||||
if err != nil { // err is not nil if collection not exists
|
||||
return err
|
||||
}
|
||||
st.CollectionID = collectionID
|
||||
st.PartitionIDs = make([]UniqueID, 0)
|
||||
for _, partitionName := range st.query.PartitionNames {
|
||||
partitionID, err := globalMetaCache.GetPartitionID(collectionName, partitionName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
st.PartitionIDs = append(st.PartitionIDs, partitionID)
|
||||
}
|
||||
st.Dsl = st.query.Dsl
|
||||
st.PlaceholderGroup = st.query.PlaceholderGroup
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue