mirror of https://github.com/milvus-io/milvus.git
[2.2] Fix unsub channel alway removes queryshard (#22961)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/22975/head
parent
c1cb8def84
commit
c13684c5e5
|
@ -644,18 +644,24 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseS
|
|||
|
||||
log.Info("start to release segments", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs))
|
||||
|
||||
var delta int64
|
||||
|
||||
for _, id := range in.SegmentIDs {
|
||||
switch in.GetScope() {
|
||||
case querypb.DataScope_Streaming:
|
||||
node.metaReplica.removeSegment(id, segmentTypeGrowing)
|
||||
case querypb.DataScope_Historical:
|
||||
node.metaReplica.removeSegment(id, segmentTypeSealed)
|
||||
delta += node.metaReplica.removeSegment(id, segmentTypeSealed)
|
||||
case querypb.DataScope_All:
|
||||
node.metaReplica.removeSegment(id, segmentTypeSealed)
|
||||
node.metaReplica.removeSegment(id, segmentTypeGrowing)
|
||||
delta += node.metaReplica.removeSegment(id, segmentTypeGrowing)
|
||||
}
|
||||
}
|
||||
|
||||
// reduce queryshard in use
|
||||
if delta > 0 {
|
||||
node.queryShardService.removeQueryShard(in.GetShard(), delta)
|
||||
}
|
||||
// note that argument is dmlchannel name
|
||||
node.dataSyncService.removeEmptyFlowGraphByChannel(in.GetCollectionID(), in.GetShard())
|
||||
|
||||
|
|
|
@ -742,7 +742,7 @@ func TestImpl_Search(t *testing.T) {
|
|||
req, err := genSearchRequest(defaultNQ, IndexFaissIDMap, schema)
|
||||
require.NoError(t, err)
|
||||
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID)
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID, 1)
|
||||
node.ShardClusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel, defaultVersion)
|
||||
// shard cluster not synced
|
||||
_, err = node.Search(ctx, &queryPb.SearchRequest{
|
||||
|
@ -785,7 +785,7 @@ func TestImpl_searchWithDmlChannel(t *testing.T) {
|
|||
req, err := genSearchRequest(defaultNQ, IndexFaissIDMap, schema)
|
||||
require.NoError(t, err)
|
||||
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID)
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID, 1)
|
||||
node.ShardClusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel, defaultVersion)
|
||||
sc, ok := node.ShardClusterService.getShardCluster(defaultDMLChannel)
|
||||
assert.True(t, ok)
|
||||
|
@ -827,7 +827,7 @@ func TestImpl_GetCollectionStatistics(t *testing.T) {
|
|||
req, err := genGetCollectionStatisticRequest()
|
||||
require.NoError(t, err)
|
||||
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID)
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID, 1)
|
||||
|
||||
_, err = node.GetStatistics(ctx, &queryPb.GetStatisticsRequest{
|
||||
Req: req,
|
||||
|
@ -848,7 +848,7 @@ func TestImpl_GetPartitionStatistics(t *testing.T) {
|
|||
req, err := genGetPartitionStatisticRequest()
|
||||
require.NoError(t, err)
|
||||
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID)
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID, 1)
|
||||
|
||||
_, err = node.GetStatistics(ctx, &queryPb.GetStatisticsRequest{
|
||||
Req: req,
|
||||
|
@ -870,7 +870,7 @@ func TestImpl_Query(t *testing.T) {
|
|||
req, err := genRetrieveRequest(schema)
|
||||
require.NoError(t, err)
|
||||
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID)
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID, 1)
|
||||
node.ShardClusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel, defaultVersion)
|
||||
// shard cluster not synced
|
||||
_, err = node.Query(ctx, &queryPb.QueryRequest{
|
||||
|
@ -914,7 +914,7 @@ func TestImpl_queryWithDmlChannel(t *testing.T) {
|
|||
req, err := genRetrieveRequest(schema)
|
||||
require.NoError(t, err)
|
||||
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID)
|
||||
node.queryShardService.addQueryShard(defaultCollectionID, defaultDMLChannel, defaultReplicaID, 1)
|
||||
node.ShardClusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel, defaultVersion)
|
||||
sc, ok := node.ShardClusterService.getShardCluster(defaultDMLChannel)
|
||||
assert.True(t, ok)
|
||||
|
|
|
@ -86,7 +86,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// TODO delta channel need to released 1. if other watchDeltaChannel fail 2. when segment release
|
||||
err := l.watchDeltaChannel(vchanName)
|
||||
err := l.watchDeltaChannel(vchanName, int64(len(loadDoneSegmentIDs)))
|
||||
if err != nil {
|
||||
// roll back
|
||||
for _, segment := range l.req.Infos {
|
||||
|
@ -133,7 +133,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// internal helper function to subscribe delta channel
|
||||
func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error {
|
||||
func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string, delta int64) error {
|
||||
collectionID := l.req.CollectionID
|
||||
log := log.With(
|
||||
zap.Int64("collectionID", collectionID),
|
||||
|
@ -229,7 +229,7 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error {
|
|||
log.Error("failed to convert delta channel to dml", zap.String("channel", channel), zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
err = l.node.queryShardService.addQueryShard(collectionID, dmlChannel, l.req.GetReplicaID())
|
||||
err = l.node.queryShardService.addQueryShard(collectionID, dmlChannel, l.req.GetReplicaID(), delta)
|
||||
if err != nil {
|
||||
log.Error("failed to add shard Service to query shard", zap.String("channel", channel), zap.Error(err))
|
||||
panic(err)
|
||||
|
|
|
@ -106,7 +106,7 @@ type ReplicaInterface interface {
|
|||
// setSegment adds a segment to collectionReplica
|
||||
setSegment(segment *Segment) error
|
||||
// removeSegment removes a segment from collectionReplica
|
||||
removeSegment(segmentID UniqueID, segType segmentType)
|
||||
removeSegment(segmentID UniqueID, segType segmentType) int64
|
||||
// getSegmentByID returns the segment which id is segmentID
|
||||
getSegmentByID(segmentID UniqueID, segType segmentType) (*Segment, error)
|
||||
// hasSegment returns true if collectionReplica has the segment, false otherwise
|
||||
|
@ -658,7 +658,7 @@ func (replica *metaReplica) setSegment(segment *Segment) error {
|
|||
}
|
||||
|
||||
// removeSegment removes a segment from collectionReplica
|
||||
func (replica *metaReplica) removeSegment(segmentID UniqueID, segType segmentType) {
|
||||
func (replica *metaReplica) removeSegment(segmentID UniqueID, segType segmentType) int64 {
|
||||
replica.mu.Lock()
|
||||
defer replica.mu.Unlock()
|
||||
|
||||
|
@ -680,13 +680,14 @@ func (replica *metaReplica) removeSegment(segmentID UniqueID, segType segmentTyp
|
|||
default:
|
||||
panic(fmt.Sprintf("unsupported segment type %s", segType.String()))
|
||||
}
|
||||
replica.removeSegmentPrivate(segmentID, segType)
|
||||
return replica.removeSegmentPrivate(segmentID, segType)
|
||||
}
|
||||
|
||||
// removeSegmentPrivate is private function in collectionReplica, to remove a segment from collectionReplica
|
||||
func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType segmentType) {
|
||||
func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType segmentType) int64 {
|
||||
var rowCount int64
|
||||
var segment *Segment
|
||||
var delta int64
|
||||
|
||||
switch segType {
|
||||
case segmentTypeGrowing:
|
||||
|
@ -708,6 +709,7 @@ func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType seg
|
|||
rowCount = segment.getRowCount()
|
||||
delete(replica.sealedSegments, segmentID)
|
||||
deleteSegment(segment)
|
||||
delta++
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported segment type %s", segType.String()))
|
||||
|
@ -747,7 +749,9 @@ func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType seg
|
|||
).Sub(float64(rowCount))
|
||||
}
|
||||
}
|
||||
|
||||
replica.sendNoSegmentSignal()
|
||||
return delta
|
||||
}
|
||||
|
||||
func (replica *metaReplica) sendNoSegmentSignal() {
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
|
@ -43,6 +44,8 @@ type queryShard struct {
|
|||
vectorChunkManager *storage.VectorChunkManager
|
||||
localCacheEnabled bool
|
||||
localCacheSize int64
|
||||
|
||||
inUse atomic.Int64
|
||||
}
|
||||
|
||||
func newQueryShard(
|
||||
|
|
|
@ -73,10 +73,18 @@ func newQueryShardService(ctx context.Context, metaReplica ReplicaInterface, tSa
|
|||
return qss, nil
|
||||
}
|
||||
|
||||
func (q *queryShardService) addQueryShard(collectionID UniqueID, channel Channel, replicaID int64) error {
|
||||
func (q *queryShardService) addQueryShard(collectionID UniqueID, channel Channel, replicaID int64, delta int64) error {
|
||||
log := log.With(
|
||||
zap.Int64("collection", collectionID),
|
||||
zap.Int64("replica", replicaID),
|
||||
zap.String("channel", channel),
|
||||
zap.Int64("delta", delta),
|
||||
)
|
||||
q.queryShardsMu.Lock()
|
||||
defer q.queryShardsMu.Unlock()
|
||||
if _, ok := q.queryShards[channel]; ok {
|
||||
if qs, ok := q.queryShards[channel]; ok {
|
||||
qs.inUse.Add(delta)
|
||||
log.Info("Successfully add query shard delta")
|
||||
return nil
|
||||
}
|
||||
qs, err := newQueryShard(
|
||||
|
@ -94,19 +102,31 @@ func (q *queryShardService) addQueryShard(collectionID UniqueID, channel Channel
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
qs.inUse.Add(delta)
|
||||
q.queryShards[channel] = qs
|
||||
log.Info("Successfully add query shard", zap.Int64("collection", collectionID), zap.Int64("replica", replicaID), zap.String("channel", channel))
|
||||
log.Info("Successfully add new query shard")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *queryShardService) removeQueryShard(channel Channel) error {
|
||||
func (q *queryShardService) removeQueryShard(channel Channel, delta int64) error {
|
||||
log := log.With(
|
||||
zap.String("channel", channel),
|
||||
zap.Int64("delta", delta),
|
||||
)
|
||||
q.queryShardsMu.Lock()
|
||||
defer q.queryShardsMu.Unlock()
|
||||
if _, ok := q.queryShards[channel]; !ok {
|
||||
qs, ok := q.queryShards[channel]
|
||||
if !ok {
|
||||
return errors.New(fmt.Sprintln("query shard(channel) ", channel, " does not exist"))
|
||||
}
|
||||
delete(q.queryShards, channel)
|
||||
log.Info("Successfully remove query shard", zap.String("channel", channel))
|
||||
inUse := qs.inUse.Add(-delta)
|
||||
if inUse == 0 {
|
||||
delete(q.queryShards, channel)
|
||||
qs.Close()
|
||||
log.Info("Successfully remove query shard")
|
||||
return nil
|
||||
}
|
||||
log.Info("Successfully remove query shard inUse")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -30,20 +30,20 @@ func TestQueryShardService(t *testing.T) {
|
|||
|
||||
qss, err := newQueryShardService(context.Background(), qn.metaReplica, qn.tSafeReplica, qn.ShardClusterService, qn.factory, qn.scheduler)
|
||||
assert.NoError(t, err)
|
||||
err = qss.addQueryShard(0, "vchan1", 0)
|
||||
err = qss.addQueryShard(0, "vchan1", 0, 1)
|
||||
assert.NoError(t, err)
|
||||
found1 := qss.hasQueryShard("vchan1")
|
||||
assert.Equal(t, true, found1)
|
||||
_, err = qss.getQueryShard("vchan1")
|
||||
assert.NoError(t, err)
|
||||
err = qss.removeQueryShard("vchan1")
|
||||
err = qss.removeQueryShard("vchan1", 1)
|
||||
assert.NoError(t, err)
|
||||
|
||||
found2 := qss.hasQueryShard("vchan2")
|
||||
assert.Equal(t, false, found2)
|
||||
_, err = qss.getQueryShard("vchan2")
|
||||
assert.Error(t, err)
|
||||
err = qss.removeQueryShard("vchan2")
|
||||
err = qss.removeQueryShard("vchan2", 1)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
|
@ -57,7 +57,7 @@ func TestQueryShardService_InvalidChunkManager(t *testing.T) {
|
|||
lcm := qss.localChunkManager
|
||||
qss.localChunkManager = nil
|
||||
|
||||
err = qss.addQueryShard(0, "vchan", 0)
|
||||
err = qss.addQueryShard(0, "vchan", 0, 1)
|
||||
assert.Error(t, err)
|
||||
|
||||
qss.localChunkManager = lcm
|
||||
|
@ -65,7 +65,7 @@ func TestQueryShardService_InvalidChunkManager(t *testing.T) {
|
|||
rcm := qss.remoteChunkManager
|
||||
qss.remoteChunkManager = nil
|
||||
|
||||
err = qss.addQueryShard(0, "vchan", 0)
|
||||
err = qss.addQueryShard(0, "vchan", 0, 1)
|
||||
assert.Error(t, err)
|
||||
|
||||
qss.remoteChunkManager = rcm
|
||||
|
|
|
@ -284,7 +284,7 @@ func (t *unsubDmChannelTask) releaseChannelResources(collection *Collection) err
|
|||
collection.removeVChannel(t.channel)
|
||||
// release flowgraph resources
|
||||
t.node.dataSyncService.removeFlowGraphsByDMLChannels([]string{t.channel})
|
||||
t.node.queryShardService.releaseQueryShard(t.channel)
|
||||
t.node.queryShardService.removeQueryShard(t.channel, 1)
|
||||
t.node.ShardClusterService.releaseShardCluster(t.channel)
|
||||
|
||||
t.node.tSafeReplica.removeTSafe(t.channel)
|
||||
|
|
|
@ -144,7 +144,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) {
|
|||
// add tsafe watch in query shard if exists
|
||||
for _, dmlChannel := range vChannels {
|
||||
// Here this error could be ignored
|
||||
w.node.queryShardService.addQueryShard(collectionID, dmlChannel, w.req.GetReplicaID())
|
||||
w.node.queryShardService.addQueryShard(collectionID, dmlChannel, w.req.GetReplicaID(), 1)
|
||||
}
|
||||
|
||||
// start flow graphs
|
||||
|
|
Loading…
Reference in New Issue