mirror of https://github.com/milvus-io/milvus.git
Remove global sealed segments in historical (#15177)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/15164/head
parent
3254d36609
commit
0c98f21d4d
|
@ -24,10 +24,7 @@ import (
|
|||
|
||||
"go.uber.org/zap"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/segcorepb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
)
|
||||
|
@ -38,85 +35,26 @@ type historical struct {
|
|||
|
||||
replica ReplicaInterface
|
||||
tSafeReplica TSafeReplicaInterface
|
||||
|
||||
mu sync.Mutex // guards globalSealedSegments
|
||||
globalSealedSegments map[UniqueID]*querypb.SegmentInfo
|
||||
|
||||
etcdKV *etcdkv.EtcdKV
|
||||
}
|
||||
|
||||
// newHistorical returns a new historical
|
||||
func newHistorical(ctx context.Context,
|
||||
replica ReplicaInterface,
|
||||
etcdKV *etcdkv.EtcdKV,
|
||||
tSafeReplica TSafeReplicaInterface) *historical {
|
||||
|
||||
return &historical{
|
||||
ctx: ctx,
|
||||
replica: replica,
|
||||
globalSealedSegments: make(map[UniqueID]*querypb.SegmentInfo),
|
||||
etcdKV: etcdKV,
|
||||
tSafeReplica: tSafeReplica,
|
||||
ctx: ctx,
|
||||
replica: replica,
|
||||
tSafeReplica: tSafeReplica,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *historical) start() {
|
||||
}
|
||||
|
||||
// close would release all resources in historical
|
||||
func (h *historical) close() {
|
||||
// free collectionReplica
|
||||
h.replica.freeAll()
|
||||
}
|
||||
|
||||
func (h *historical) getGlobalSegmentIDsByCollectionID(collectionID UniqueID) []UniqueID {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
resIDs := make([]UniqueID, 0)
|
||||
for _, v := range h.globalSealedSegments {
|
||||
if v.CollectionID == collectionID {
|
||||
resIDs = append(resIDs, v.SegmentID)
|
||||
}
|
||||
}
|
||||
return resIDs
|
||||
}
|
||||
|
||||
func (h *historical) getGlobalSegmentIDsByPartitionIds(partitionIDs []UniqueID) []UniqueID {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
resIDs := make([]UniqueID, 0)
|
||||
for _, v := range h.globalSealedSegments {
|
||||
for _, partitionID := range partitionIDs {
|
||||
if v.PartitionID == partitionID {
|
||||
resIDs = append(resIDs, v.SegmentID)
|
||||
}
|
||||
}
|
||||
}
|
||||
return resIDs
|
||||
}
|
||||
|
||||
func (h *historical) removeGlobalSegmentIDsByCollectionID(collectionID UniqueID) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
for _, v := range h.globalSealedSegments {
|
||||
if v.CollectionID == collectionID {
|
||||
delete(h.globalSealedSegments, v.SegmentID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *historical) removeGlobalSegmentIDsByPartitionIds(partitionIDs []UniqueID) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
for _, v := range h.globalSealedSegments {
|
||||
for _, partitionID := range partitionIDs {
|
||||
if v.PartitionID == partitionID {
|
||||
delete(h.globalSealedSegments, v.SegmentID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// // retrieve will retrieve from the segments in historical
|
||||
func (h *historical) retrieve(collID UniqueID, partIDs []UniqueID, vcm storage.ChunkManager,
|
||||
plan *RetrievePlan) ([]*segcorepb.RetrieveResults, []UniqueID, []UniqueID, error) {
|
||||
|
|
|
@ -887,15 +887,11 @@ func genSimpleSegmentLoader(ctx context.Context, historicalReplica ReplicaInterf
|
|||
}
|
||||
|
||||
func genSimpleHistorical(ctx context.Context, tSafeReplica TSafeReplicaInterface) (*historical, error) {
|
||||
kv, err := genEtcdKV()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
replica, err := genSimpleReplica()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h := newHistorical(ctx, replica, kv, tSafeReplica)
|
||||
h := newHistorical(ctx, replica, tSafeReplica)
|
||||
r, err := genSimpleReplica()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -1028,9 +1028,9 @@ func (q *queryCollection) search(msg queryMsg) error {
|
|||
// get global sealed segments
|
||||
var globalSealedSegments []UniqueID
|
||||
if len(searchMsg.PartitionIDs) > 0 {
|
||||
globalSealedSegments = q.historical.getGlobalSegmentIDsByPartitionIds(searchMsg.PartitionIDs)
|
||||
globalSealedSegments = q.globalSegmentManager.getGlobalSegmentIDsByPartitionIds(searchMsg.PartitionIDs)
|
||||
} else {
|
||||
globalSealedSegments = q.historical.getGlobalSegmentIDsByCollectionID(collection.id)
|
||||
globalSealedSegments = q.globalSegmentManager.getGlobalSegmentIDs()
|
||||
}
|
||||
|
||||
searchResults := make([]*SearchResult, 0)
|
||||
|
@ -1243,9 +1243,9 @@ func (q *queryCollection) retrieve(msg queryMsg) error {
|
|||
|
||||
var globalSealedSegments []UniqueID
|
||||
if len(retrieveMsg.PartitionIDs) > 0 {
|
||||
globalSealedSegments = q.historical.getGlobalSegmentIDsByPartitionIds(retrieveMsg.PartitionIDs)
|
||||
globalSealedSegments = q.globalSegmentManager.getGlobalSegmentIDsByPartitionIds(retrieveMsg.PartitionIDs)
|
||||
} else {
|
||||
globalSealedSegments = q.historical.getGlobalSegmentIDsByCollectionID(collectionID)
|
||||
globalSealedSegments = q.globalSegmentManager.getGlobalSegmentIDs()
|
||||
}
|
||||
|
||||
var mergeList []*segcorepb.RetrieveResults
|
||||
|
|
|
@ -141,7 +141,7 @@ func TestQueryCollection_withoutVChannel(t *testing.T) {
|
|||
historicalReplica := newCollectionReplica(etcdKV)
|
||||
tsReplica := newTSafeReplica()
|
||||
streamingReplica := newCollectionReplica(etcdKV)
|
||||
historical := newHistorical(context.Background(), historicalReplica, etcdKV, tsReplica)
|
||||
historical := newHistorical(context.Background(), historicalReplica, tsReplica)
|
||||
|
||||
//add a segment to historical data
|
||||
err = historical.replica.addCollection(0, schema)
|
||||
|
|
|
@ -203,7 +203,6 @@ func (node *QueryNode) Init() error {
|
|||
|
||||
node.historical = newHistorical(node.queryNodeLoopCtx,
|
||||
historicalReplica,
|
||||
node.etcdKV,
|
||||
node.tSafeReplica,
|
||||
)
|
||||
node.streaming = newStreaming(node.queryNodeLoopCtx,
|
||||
|
@ -269,7 +268,6 @@ func (node *QueryNode) Start() error {
|
|||
go node.scheduler.Start()
|
||||
|
||||
// start services
|
||||
go node.historical.start()
|
||||
go node.watchChangeInfo()
|
||||
go node.statsService.start()
|
||||
|
||||
|
|
|
@ -202,7 +202,7 @@ func newQueryNodeMock() *QueryNode {
|
|||
tsReplica := newTSafeReplica()
|
||||
streamingReplica := newCollectionReplica(etcdKV)
|
||||
historicalReplica := newCollectionReplica(etcdKV)
|
||||
svr.historical = newHistorical(svr.queryNodeLoopCtx, historicalReplica, etcdKV, tsReplica)
|
||||
svr.historical = newHistorical(svr.queryNodeLoopCtx, historicalReplica, tsReplica)
|
||||
svr.streaming = newStreaming(ctx, streamingReplica, msFactory, etcdKV, tsReplica)
|
||||
svr.dataSyncService = newDataSyncService(ctx, svr.streaming.replica, svr.historical.replica, tsReplica, msFactory)
|
||||
svr.statsService = newStatsService(ctx, svr.historical.replica, nil, msFactory)
|
||||
|
|
|
@ -791,7 +791,6 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("release collection failed, collectionID = %d, err = %s", r.req.CollectionID, err)
|
||||
}
|
||||
r.node.historical.removeGlobalSegmentIDsByCollectionID(r.req.CollectionID)
|
||||
|
||||
debug.FreeOSMemory()
|
||||
|
||||
|
|
Loading…
Reference in New Issue