mirror of https://github.com/milvus-io/milvus.git
Fix load failure and remove parition release related code (#16038)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/16050/head
parent
fab0263616
commit
caf9cbfcd4
|
@ -29,7 +29,6 @@ package querynode
|
|||
*/
|
||||
import "C"
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"math"
|
||||
|
@ -79,9 +78,10 @@ func (c *Collection) addPartitionID(partitionID UniqueID) {
|
|||
c.releaseMu.Lock()
|
||||
defer c.releaseMu.Unlock()
|
||||
|
||||
log.Debug("queryNode collection add a partition", zap.Int64("collection", c.id), zap.Int64("partitionID", partitionID))
|
||||
c.partitionIDs = append(c.partitionIDs, partitionID)
|
||||
log.Debug("queryNode collection info after add a partition", zap.Int64("collectionID", c.id), zap.Int64s("partitions", c.partitionIDs), zap.Any("releasePartitions", c.releasedPartitions))
|
||||
log.Info("queryNode collection info after add a partition",
|
||||
zap.Int64("partitionID", partitionID), zap.Int64("collectionID", c.id),
|
||||
zap.Int64s("partitions", c.partitionIDs))
|
||||
}
|
||||
|
||||
// removePartitionID removes the partition id from partition id list of collection
|
||||
|
@ -277,48 +277,6 @@ func (c *Collection) getReleaseTime() Timestamp {
|
|||
return c.releaseTime
|
||||
}
|
||||
|
||||
// addReleasedPartition records the partition to indicate that this partition has been released
|
||||
func (c *Collection) addReleasedPartition(partitionID UniqueID) {
|
||||
c.releaseMu.Lock()
|
||||
defer c.releaseMu.Unlock()
|
||||
|
||||
log.Debug("queryNode collection release a partition", zap.Int64("collectionID", c.id), zap.Int64("partition", partitionID))
|
||||
c.releasedPartitions[partitionID] = struct{}{}
|
||||
partitions := make([]UniqueID, 0)
|
||||
for _, id := range c.partitionIDs {
|
||||
if id != partitionID {
|
||||
partitions = append(partitions, id)
|
||||
}
|
||||
}
|
||||
c.partitionIDs = partitions
|
||||
log.Debug("queryNode collection info after release a partition", zap.Int64("collectionID", c.id), zap.Int64s("partitions", c.partitionIDs), zap.Any("releasePartitions", c.releasedPartitions))
|
||||
}
|
||||
|
||||
// deleteReleasedPartition remove the released partition record from collection
|
||||
func (c *Collection) deleteReleasedPartition(partitionID UniqueID) {
|
||||
c.releaseMu.Lock()
|
||||
defer c.releaseMu.Unlock()
|
||||
|
||||
log.Debug("queryNode collection reload a released partition", zap.Int64("collectionID", c.id), zap.Int64("partition", partitionID))
|
||||
delete(c.releasedPartitions, partitionID)
|
||||
log.Debug("queryNode collection info after reload a released partition", zap.Int64("collectionID", c.id), zap.Int64s("partitions", c.partitionIDs), zap.Any("releasePartitions", c.releasedPartitions))
|
||||
}
|
||||
|
||||
// checkReleasedPartitions returns error if any partition has been released
|
||||
func (c *Collection) checkReleasedPartitions(partitionIDs []UniqueID) error {
|
||||
c.releaseMu.RLock()
|
||||
defer c.releaseMu.RUnlock()
|
||||
for _, id := range partitionIDs {
|
||||
if _, ok := c.releasedPartitions[id]; ok {
|
||||
return errors.New("partition has been released" +
|
||||
", collectionID = " + fmt.Sprintln(c.ID()) +
|
||||
", partitionID = " + fmt.Sprintln(id))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// setLoadType set the loading type of collection, which is loadTypeCollection or loadTypePartition
|
||||
func (c *Collection) setLoadType(l loadType) {
|
||||
c.loadType = l
|
||||
|
|
|
@ -121,21 +121,6 @@ func TestCollection_releaseTime(t *testing.T) {
|
|||
assert.Equal(t, t0, t1)
|
||||
}
|
||||
|
||||
func TestCollection_releasePartition(t *testing.T) {
|
||||
collectionID := UniqueID(0)
|
||||
collectionMeta := genTestCollectionMeta(collectionID, false)
|
||||
|
||||
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
|
||||
collection.addReleasedPartition(defaultPartitionID)
|
||||
assert.Equal(t, 1, len(collection.releasedPartitions))
|
||||
err := collection.checkReleasedPartitions([]UniqueID{defaultPartitionID})
|
||||
assert.Error(t, err)
|
||||
err = collection.checkReleasedPartitions([]UniqueID{UniqueID(1000)})
|
||||
assert.NoError(t, err)
|
||||
collection.deleteReleasedPartition(defaultPartitionID)
|
||||
assert.Equal(t, 0, len(collection.releasedPartitions))
|
||||
}
|
||||
|
||||
func TestCollection_loadType(t *testing.T) {
|
||||
collectionID := UniqueID(0)
|
||||
collectionMeta := genTestCollectionMeta(collectionID, false)
|
||||
|
|
|
@ -129,19 +129,6 @@ func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg
|
|||
}
|
||||
}
|
||||
|
||||
// check if partition has been released
|
||||
if col.getLoadType() == loadTypeCollection {
|
||||
col, err := fdmNode.replica.getCollectionByID(msg.CollectionID)
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
return nil
|
||||
}
|
||||
if err = col.checkReleasedPartitions([]UniqueID{msg.PartitionID}); err != nil {
|
||||
log.Warn(err.Error())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if len(msg.PrimaryKeys) != len(msg.Timestamps) {
|
||||
log.Warn("Error, misaligned messages detected")
|
||||
return nil
|
||||
|
@ -193,14 +180,6 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
|
|||
}
|
||||
}
|
||||
|
||||
// check if partition has been released
|
||||
if col.getLoadType() == loadTypeCollection {
|
||||
if err = col.checkReleasedPartitions([]UniqueID{msg.PartitionID}); err != nil {
|
||||
log.Warn(err.Error())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the segment is in excluded segments,
|
||||
// messages after seekPosition may contain the redundant data from flushed slice of segment,
|
||||
// so we need to compare the endTimestamp of received messages and position's timestamp.
|
||||
|
|
|
@ -95,18 +95,6 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
|
|||
assert.Nil(t, res)
|
||||
})
|
||||
|
||||
t.Run("test released partition", func(t *testing.T) {
|
||||
msg, err := genSimpleInsertMsg()
|
||||
assert.NoError(t, err)
|
||||
fg, err := getFilterDMNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
col, err := fg.replica.getCollectionByID(defaultCollectionID)
|
||||
assert.NoError(t, err)
|
||||
col.addReleasedPartition(defaultPartitionID)
|
||||
res := fg.filterInvalidInsertMessage(msg)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
||||
t.Run("test no exclude segment", func(t *testing.T) {
|
||||
msg, err := genSimpleInsertMsg()
|
||||
assert.NoError(t, err)
|
||||
|
@ -207,18 +195,6 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) {
|
|||
assert.Nil(t, res)
|
||||
})
|
||||
|
||||
t.Run("test delete released partition", func(t *testing.T) {
|
||||
msg, err := genSimpleDeleteMsg()
|
||||
assert.NoError(t, err)
|
||||
fg, err := getFilterDMNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
col, err := fg.replica.getCollectionByID(defaultCollectionID)
|
||||
assert.NoError(t, err)
|
||||
col.addReleasedPartition(defaultPartitionID)
|
||||
res := fg.filterInvalidDeleteMessage(msg)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
||||
t.Run("test delete misaligned messages", func(t *testing.T) {
|
||||
msg, err := genSimpleDeleteMsg()
|
||||
assert.NoError(t, err)
|
||||
|
|
|
@ -151,9 +151,6 @@ func (h *historical) search(searchReqs []*searchRequest, collID UniqueID, partID
|
|||
}
|
||||
|
||||
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection {
|
||||
if err = col.checkReleasedPartitions(partIDs); err != nil {
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, err
|
||||
}
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -104,25 +104,4 @@ func TestHistorical_Search(t *testing.T) {
|
|||
assert.Equal(t, 0, len(ids))
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("test load collection partition released in collection", func(t *testing.T) {
|
||||
tSafe := newTSafeReplica()
|
||||
his, err := genSimpleHistorical(ctx, tSafe)
|
||||
assert.NoError(t, err)
|
||||
|
||||
plan, searchReqs, err := genSimpleSearchPlanAndRequests(IndexFaissIDMap)
|
||||
assert.NoError(t, err)
|
||||
|
||||
col, err := his.replica.getCollectionByID(defaultCollectionID)
|
||||
assert.NoError(t, err)
|
||||
col.addReleasedPartition(defaultPartitionID)
|
||||
|
||||
err = his.replica.removePartition(defaultPartitionID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
res, ids, _, err := his.search(searchReqs, defaultCollectionID, []UniqueID{defaultPartitionID}, plan, Timestamp(0))
|
||||
assert.Equal(t, 0, len(res))
|
||||
assert.Equal(t, 0, len(ids))
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -505,7 +505,6 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
|
|||
var cTimestampsPtr = (*C.uint64_t)(&(*timestamps)[0])
|
||||
var cSizeofPerRow = C.int(sizeofPerRow)
|
||||
var cRawDataVoidPtr = unsafe.Pointer(&rawData[0])
|
||||
log.Debug("QueryNode::Segment::InsertBegin", zap.Any("cNumOfRows", cNumOfRows))
|
||||
status := C.Insert(s.segmentPtr,
|
||||
cOffset,
|
||||
cNumOfRows,
|
||||
|
|
|
@ -154,9 +154,6 @@ func (s *streaming) search(searchReqs []*searchRequest, collID UniqueID, partIDs
|
|||
}
|
||||
|
||||
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection {
|
||||
if err = col.checkReleasedPartitions(partIDs); err != nil {
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, err
|
||||
}
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -226,19 +226,13 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
collectionID := w.req.CollectionID
|
||||
partitionIDs := w.req.GetPartitionIDs()
|
||||
|
||||
var lType loadType
|
||||
|
||||
switch w.req.GetLoadMeta().GetLoadType() {
|
||||
case queryPb.LoadType_LoadCollection:
|
||||
lType = loadTypeCollection
|
||||
case queryPb.LoadType_LoadPartition:
|
||||
lType = loadTypePartition
|
||||
default:
|
||||
lType := w.req.GetLoadMeta().GetLoadType()
|
||||
if lType == queryPb.LoadType_UnKnownType {
|
||||
// if no partitionID is specified, load type is load collection
|
||||
if len(partitionIDs) != 0 {
|
||||
lType = loadTypePartition
|
||||
lType = queryPb.LoadType_LoadPartition
|
||||
} else {
|
||||
lType = loadTypeCollection
|
||||
lType = queryPb.LoadType_LoadCollection
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -261,6 +255,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
log.Debug("Starting WatchDmChannels ...",
|
||||
zap.String("collectionName", w.req.Schema.Name),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Any("load type", lType),
|
||||
zap.Strings("vChannels", vChannels),
|
||||
zap.Strings("pChannels", pChannels),
|
||||
)
|
||||
|
@ -299,6 +294,17 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
Schema: w.req.GetSchema(),
|
||||
LoadMeta: w.req.GetLoadMeta(),
|
||||
}
|
||||
|
||||
// update partition info from unFlushedSegments and loadMeta
|
||||
for _, info := range req.Infos {
|
||||
w.node.streaming.replica.addPartition(collectionID, info.PartitionID)
|
||||
w.node.historical.replica.addPartition(collectionID, info.PartitionID)
|
||||
}
|
||||
for _, partitionID := range req.GetLoadMeta().GetPartitionIDs() {
|
||||
w.node.historical.replica.addPartition(collectionID, partitionID)
|
||||
w.node.streaming.replica.addPartition(collectionID, partitionID)
|
||||
}
|
||||
|
||||
log.Debug("loading growing segments in WatchDmChannels...",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs),
|
||||
|
@ -441,12 +447,6 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
hCol.addVChannels(vChannels)
|
||||
hCol.addPChannels(pChannels)
|
||||
hCol.setLoadType(lType)
|
||||
for _, partitionID := range w.req.GetLoadMeta().GetPartitionIDs() {
|
||||
sCol.deleteReleasedPartition(partitionID)
|
||||
hCol.deleteReleasedPartition(partitionID)
|
||||
w.node.streaming.replica.addPartition(collectionID, partitionID)
|
||||
w.node.historical.replica.addPartition(collectionID, partitionID)
|
||||
}
|
||||
log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
|
||||
|
||||
// create tSafe
|
||||
|
@ -665,21 +665,6 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
for _, info := range l.req.Infos {
|
||||
collectionID := info.CollectionID
|
||||
partitionID := info.PartitionID
|
||||
sCol, err := l.node.streaming.replica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sCol.deleteReleasedPartition(partitionID)
|
||||
hCol, err := l.node.historical.replica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hCol.deleteReleasedPartition(partitionID)
|
||||
}
|
||||
|
||||
log.Debug("LoadSegments done", zap.String("SegmentLoadInfos", fmt.Sprintln(l.req.Infos)))
|
||||
return nil
|
||||
}
|
||||
|
@ -825,11 +810,11 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
|
|||
time.Sleep(gracefulReleaseTime * time.Second)
|
||||
|
||||
// get collection from streaming and historical
|
||||
hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID)
|
||||
_, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("release partitions failed, collectionID = %d, err = %s", r.req.CollectionID, err)
|
||||
}
|
||||
sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
|
||||
_, err = r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("release partitions failed, collectionID = %d, err = %s", r.req.CollectionID, err)
|
||||
}
|
||||
|
@ -853,9 +838,6 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
|
|||
log.Warn(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
hCol.addReleasedPartition(id)
|
||||
sCol.addReleasedPartition(id)
|
||||
}
|
||||
|
||||
log.Debug("Release partition task done",
|
||||
|
|
|
@ -258,6 +258,11 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
|
|||
req: genWatchDMChannelsRequest(),
|
||||
node: node,
|
||||
}
|
||||
task.req.LoadMeta = &querypb.LoadMetaInfo{
|
||||
LoadType: querypb.LoadType_LoadPartition,
|
||||
CollectionID: defaultCollectionID,
|
||||
PartitionIDs: []UniqueID{defaultPartitionID},
|
||||
}
|
||||
task.req.Infos = []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: defaultCollectionID,
|
||||
|
@ -377,6 +382,39 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
|
|||
err = task.Execute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("test load growing segment", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := watchDmChannelsTask{
|
||||
req: genWatchDMChannelsRequest(),
|
||||
node: node,
|
||||
}
|
||||
|
||||
fieldBinlog, err := saveSimpleBinLog(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task.req.Infos = []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: defaultCollectionID,
|
||||
ChannelName: defaultDMLChannel,
|
||||
UnflushedSegments: []*datapb.SegmentInfo{
|
||||
{
|
||||
CollectionID: defaultCollectionID,
|
||||
PartitionID: defaultPartitionID + 1, // load a new partition
|
||||
DmlPosition: &internalpb.MsgPosition{
|
||||
ChannelName: defaultDMLChannel,
|
||||
Timestamp: typeutil.MaxTimestamp,
|
||||
},
|
||||
Binlogs: fieldBinlog,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err = task.Execute(ctx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestTask_watchDeltaChannelsTask(t *testing.T) {
|
||||
|
@ -762,7 +800,7 @@ func TestTask_releasePartitionTask(t *testing.T) {
|
|||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("test execute, remove deltaVChannel", func(t *testing.T) {
|
||||
t.Run("test execute remove deltaVChannel", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package querynode
|
||||
|
||||
import (
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -49,9 +50,9 @@ type TimeRange struct {
|
|||
}
|
||||
|
||||
// loadType is load collection or load partition
|
||||
type loadType = int32
|
||||
type loadType = querypb.LoadType
|
||||
|
||||
const (
|
||||
loadTypeCollection loadType = 0
|
||||
loadTypePartition loadType = 1
|
||||
loadTypeCollection = querypb.LoadType_LoadCollection
|
||||
loadTypePartition = querypb.LoadType_LoadPartition
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue