Delete querynode’s redundant load IO

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/4973/head^2
xige-16 2021-04-07 18:29:19 +08:00 committed by yefu.chen
parent 1446cd5453
commit 332b14c915
11 changed files with 450 additions and 303 deletions

View File

@ -6,12 +6,12 @@ import (
"time"
"go.uber.org/zap"
"google.golang.org/grpc"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
@ -25,8 +25,7 @@ type IDAllocator struct {
Allocator
masterAddress string
masterConn *grpc.ClientConn
masterClient masterpb.MasterServiceClient
masterClient types.MasterService
countPerRPC uint32
@ -58,30 +57,22 @@ func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error
}
func (ia *IDAllocator) Start() error {
connectMasterFn := func() error {
return ia.connectMaster()
}
err := retry.Retry(1000, time.Millisecond*200, connectMasterFn)
var err error
ia.masterClient, err = msc.NewClient(ia.masterAddress, 20*time.Second)
if err != nil {
panic("connect to master failed")
panic(err)
}
if err = ia.masterClient.Init(); err != nil {
panic(err)
}
if err = ia.masterClient.Start(); err != nil {
panic(err)
}
return ia.Allocator.Start()
}
func (ia *IDAllocator) connectMaster() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, ia.masterAddress, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Error("Connect to master failed", zap.Any("Role", ia.Role), zap.Error(err))
return err
}
log.Debug("Connected to master", zap.Any("Role", ia.Role), zap.Any("masterAddress", ia.masterAddress))
ia.masterConn = conn
ia.masterClient = masterpb.NewMasterServiceClient(conn)
return nil
}
func (ia *IDAllocator) gatherReqIDCount() uint32 {
need := uint32(0)
for _, req := range ia.ToDoReqs {

View File

@ -5,14 +5,14 @@ import (
"fmt"
"time"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
"github.com/zilliztech/milvus-distributed/internal/types"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"google.golang.org/grpc"
)
type Timestamp = typeutil.Timestamp
@ -25,8 +25,7 @@ type TimestampAllocator struct {
Allocator
masterAddress string
masterConn *grpc.ClientConn
masterClient masterpb.MasterServiceClient
masterClient types.MasterService
countPerRPC uint32
lastTsBegin Timestamp
@ -57,29 +56,20 @@ func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAl
}
func (ta *TimestampAllocator) Start() error {
connectMasterFn := func() error {
return ta.connectMaster()
}
err := retry.Retry(1000, time.Millisecond*200, connectMasterFn)
var err error
ta.masterClient, err = msc.NewClient(ta.masterAddress, 20*time.Second)
if err != nil {
panic("Timestamp local allocator connect to master failed")
panic(err)
}
ta.Allocator.Start()
return nil
}
func (ta *TimestampAllocator) connectMaster() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, ta.masterAddress, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Error("TimestampAllocator Connect to master failed", zap.Error(err))
return err
if err = ta.masterClient.Init(); err != nil {
panic(err)
}
log.Debug("TimestampAllocator connected to master", zap.Any("masterAddress", ta.masterAddress))
ta.masterConn = conn
ta.masterClient = masterpb.NewMasterServiceClient(conn)
return nil
if err = ta.masterClient.Start(); err != nil {
panic(err)
}
return ta.Allocator.Start()
}
func (ta *TimestampAllocator) checkSyncFunc(timeout bool) bool {

View File

@ -59,6 +59,7 @@ type ReplicaInterface interface {
// segment
addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error
setSegment(segment *Segment) error
removeSegment(segmentID UniqueID) error
getSegmentByID(segmentID UniqueID) (*Segment, error)
hasSegment(segmentID UniqueID) bool
@ -378,15 +379,15 @@ func (colReplica *collectionReplica) getEnabledPartitionIDsPrivate() []UniqueID
func (colReplica *collectionReplica) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
return colReplica.addSegmentPrivate(segmentID, partitionID, collectionID, segType)
}
func (colReplica *collectionReplica) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
return err
}
var newSegment = newSegment(collection, segmentID, partitionID, collectionID, segType)
return colReplica.addSegmentPrivate(segmentID, partitionID, newSegment)
}
func (colReplica *collectionReplica) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, segment *Segment) error {
partition, err := colReplica.getPartitionByIDPrivate(partitionID)
if err != nil {
return err
@ -396,12 +397,21 @@ func (colReplica *collectionReplica) addSegmentPrivate(segmentID UniqueID, parti
return nil
}
partition.addSegmentID(segmentID)
var newSegment = newSegment(collection, segmentID, partitionID, collectionID, segType)
colReplica.segments[segmentID] = newSegment
colReplica.segments[segmentID] = segment
return nil
}
func (colReplica *collectionReplica) setSegment(segment *Segment) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
_, err := colReplica.getCollectionByIDPrivate(segment.collectionID)
if err != nil {
return err
}
return colReplica.addSegmentPrivate(segment.segmentID, segment.partitionID, segment)
}
func (colReplica *collectionReplica) removeSegment(segmentID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()

View File

@ -0,0 +1,65 @@
package querynode
type indexInfo struct {
indexName string
indexID UniqueID
buildID UniqueID
indexPaths []string
indexParams map[string]string
readyLoad bool
}
func newIndexInfo() *indexInfo {
return &indexInfo{
indexPaths: make([]string, 0),
indexParams: make(map[string]string),
}
}
func (info *indexInfo) setIndexName(name string) {
info.indexName = name
}
func (info *indexInfo) setIndexID(id UniqueID) {
info.indexID = id
}
func (info *indexInfo) setBuildID(id UniqueID) {
info.buildID = id
}
func (info *indexInfo) setIndexPaths(paths []string) {
info.indexPaths = paths
}
func (info *indexInfo) setIndexParams(params map[string]string) {
info.indexParams = params
}
func (info *indexInfo) setReadyLoad(load bool) {
info.readyLoad = load
}
func (info *indexInfo) getIndexName() string {
return info.indexName
}
func (info *indexInfo) getIndexID() UniqueID {
return info.indexID
}
func (info *indexInfo) getBuildID() UniqueID {
return info.buildID
}
func (info *indexInfo) getIndexPaths() []string {
return info.indexPaths
}
func (info *indexInfo) getIndexParams() map[string]string {
return info.indexParams
}
func (info *indexInfo) getReadyLoad() bool {
return info.readyLoad
}

View File

@ -2,6 +2,7 @@ package querynode
import (
"context"
"errors"
"fmt"
"path"
"sort"
@ -10,10 +11,6 @@ import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/types"
"errors"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/kv"
@ -25,8 +22,11 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/storage"
"github.com/zilliztech/milvus-distributed/internal/types"
)
type indexParam = map[string]string
type indexLoader struct {
replica ReplicaInterface
@ -39,12 +39,6 @@ type indexLoader struct {
kv kv.Base // minio kv
}
type loadIndex struct {
segmentID UniqueID
fieldID int64
indexPaths []string
}
func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
collectionIDs, _, segmentIDs := loader.replica.getSegmentsBySegmentType(segmentTypeSealed)
if len(collectionIDs) <= 0 {
@ -54,19 +48,28 @@ func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
log.Debug("do load index for sealed segments:", zap.String("segmentIDs", fmt.Sprintln(segmentIDs)))
for i := range collectionIDs {
// we don't need index id yet
_, buildID, err := loader.getIndexInfo(collectionIDs[i], segmentIDs[i])
segment, err := loader.replica.getSegmentByID(segmentIDs[i])
if err != nil {
log.Warn(err.Error())
continue
}
indexPaths, err := loader.getIndexPaths(buildID)
vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionIDs[i])
if err != nil {
log.Warn(err.Error())
continue
}
err = loader.loadIndexDelayed(collectionIDs[i], segmentIDs[i], indexPaths)
if err != nil {
log.Warn(err.Error())
for _, fieldID := range vecFieldIDs {
err = loader.setIndexInfo(collectionIDs[i], segment, fieldID)
if err != nil {
log.Warn(err.Error())
continue
}
err = loader.loadIndex(segment, fieldID)
if err != nil {
log.Warn(err.Error())
continue
}
}
}
// sendQueryNodeStats
@ -80,15 +83,15 @@ func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
wg.Done()
}
func (loader *indexLoader) execute(l *loadIndex) error {
func (loader *indexLoader) loadIndex(segment *Segment, fieldID int64) error {
// 1. use msg's index paths to get index bytes
var err error
var indexBuffer [][]byte
var indexParams indexParam
var indexName string
var indexID UniqueID
fn := func() error {
indexBuffer, indexParams, indexName, indexID, err = loader.loadIndex(l.indexPaths)
indexPaths := segment.getIndexPaths(fieldID)
indexBuffer, indexParams, indexName, err = loader.getIndexBinlog(indexPaths)
if err != nil {
return err
}
@ -98,26 +101,31 @@ func (loader *indexLoader) execute(l *loadIndex) error {
if err != nil {
return err
}
ok, err := loader.checkIndexReady(indexParams, l)
err = segment.setIndexName(fieldID, indexName)
if err != nil {
return err
}
if ok {
err = segment.setIndexParam(fieldID, indexParams)
if err != nil {
return err
}
ok := segment.checkIndexReady(fieldID)
if !ok {
// no error
return errors.New("")
return errors.New("index info is not set correctly")
}
// 2. use index bytes and index path to update segment
err = loader.updateSegmentIndex(indexParams, indexBuffer, l)
err = segment.updateSegmentIndex(indexBuffer, fieldID)
if err != nil {
return err
}
// 3. drop vector field data if index loaded successfully
err = loader.dropVectorFieldData(l.segmentID, l.fieldID)
err = segment.dropFieldData(fieldID)
if err != nil {
return err
}
// 4. update segment index stats
err = loader.updateSegmentIndexStats(indexParams, indexName, indexID, l)
err = loader.updateSegmentIndexStats(segment)
if err != nil {
return err
}
@ -168,80 +176,71 @@ func (loader *indexLoader) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, e
return collectionID, fieldID, nil
}
func (loader *indexLoader) updateSegmentIndexStats(indexParams indexParam, indexName string, indexID UniqueID, l *loadIndex) error {
targetSegment, err := loader.replica.getSegmentByID(l.segmentID)
if err != nil {
return err
}
fieldStatsKey := loader.fieldsStatsIDs2Key(targetSegment.collectionID, l.fieldID)
_, ok := loader.fieldIndexes[fieldStatsKey]
newIndexParams := make([]*commonpb.KeyValuePair, 0)
for k, v := range indexParams {
newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{
Key: k,
Value: v,
})
}
// sort index params by key
sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key })
if !ok {
loader.fieldIndexes[fieldStatsKey] = make([]*internalpb.IndexStats, 0)
loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
&internalpb.IndexStats{
IndexParams: newIndexParams,
NumRelatedSegments: 1,
func (loader *indexLoader) updateSegmentIndexStats(segment *Segment) error {
for fieldID := range segment.indexInfos {
fieldStatsKey := loader.fieldsStatsIDs2Key(segment.collectionID, fieldID)
_, ok := loader.fieldIndexes[fieldStatsKey]
newIndexParams := make([]*commonpb.KeyValuePair, 0)
indexParams := segment.getIndexParams(fieldID)
for k, v := range indexParams {
newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{
Key: k,
Value: v,
})
} else {
isNewIndex := true
for _, index := range loader.fieldIndexes[fieldStatsKey] {
if loader.indexParamsEqual(newIndexParams, index.IndexParams) {
index.NumRelatedSegments++
isNewIndex = false
}
}
if isNewIndex {
// sort index params by key
sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key })
if !ok {
loader.fieldIndexes[fieldStatsKey] = make([]*internalpb.IndexStats, 0)
loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
&internalpb.IndexStats{
IndexParams: newIndexParams,
NumRelatedSegments: 1,
})
} else {
isNewIndex := true
for _, index := range loader.fieldIndexes[fieldStatsKey] {
if loader.indexParamsEqual(newIndexParams, index.IndexParams) {
index.NumRelatedSegments++
isNewIndex = false
}
}
if isNewIndex {
loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
&internalpb.IndexStats{
IndexParams: newIndexParams,
NumRelatedSegments: 1,
})
}
}
}
err = targetSegment.setIndexParam(l.fieldID, newIndexParams)
if err != nil {
return err
}
targetSegment.setIndexName(indexName)
targetSegment.setIndexID(indexID)
return nil
}
func (loader *indexLoader) loadIndex(indexPath []string) ([][]byte, indexParam, string, UniqueID, error) {
func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexParam, string, error) {
index := make([][]byte, 0)
var indexParams indexParam
var indexName string
var indexID UniqueID
for _, p := range indexPath {
log.Debug("", zap.String("load path", fmt.Sprintln(indexPath)))
indexPiece, err := loader.kv.Load(p)
if err != nil {
return nil, nil, "", -1, err
return nil, nil, "", err
}
// get index params when detecting indexParamPrefix
if path.Base(p) == storage.IndexParamsFile {
indexCodec := storage.NewIndexCodec()
_, indexParams, indexName, indexID, err = indexCodec.Deserialize([]*storage.Blob{
_, indexParams, indexName, _, err = indexCodec.Deserialize([]*storage.Blob{
{
Key: storage.IndexParamsFile,
Value: []byte(indexPiece),
},
})
if err != nil {
return nil, nil, "", -1, err
return nil, nil, "", err
}
} else {
index = append(index, []byte(indexPiece))
@ -249,45 +248,9 @@ func (loader *indexLoader) loadIndex(indexPath []string) ([][]byte, indexParam,
}
if len(indexParams) <= 0 {
return nil, nil, "", -1, errors.New("cannot find index param")
return nil, nil, "", errors.New("cannot find index param")
}
return index, indexParams, indexName, indexID, nil
}
func (loader *indexLoader) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, l *loadIndex) error {
segment, err := loader.replica.getSegmentByID(l.segmentID)
if err != nil {
return err
}
loadIndexInfo, err := newLoadIndexInfo()
defer deleteLoadIndexInfo(loadIndexInfo)
if err != nil {
return err
}
err = loadIndexInfo.appendFieldInfo(l.fieldID)
if err != nil {
return err
}
for k, v := range indexParams {
err = loadIndexInfo.appendIndexParam(k, v)
if err != nil {
return err
}
}
err = loadIndexInfo.appendIndex(bytesIndex, l.indexPaths)
if err != nil {
return err
}
return segment.updateSegmentIndex(loadIndexInfo)
}
func (loader *indexLoader) dropVectorFieldData(segmentID UniqueID, vecFieldID int64) error {
segment, err := loader.replica.getSegmentByID(segmentID)
if err != nil {
return err
}
return segment.dropFieldData(vecFieldID)
return index, indexParams, indexName, nil
}
func (loader *indexLoader) sendQueryNodeStats() error {
@ -310,39 +273,56 @@ func (loader *indexLoader) sendQueryNodeStats() error {
return nil
}
func (loader *indexLoader) checkIndexReady(indexParams indexParam, l *loadIndex) (bool, error) {
segment, err := loader.replica.getSegmentByID(l.segmentID)
if err != nil {
return false, err
}
if !segment.matchIndexParam(l.fieldID, indexParams) {
return false, nil
}
return true, nil
}
func (loader *indexLoader) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, UniqueID, error) {
func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment, fieldID UniqueID) error {
ctx := context.TODO()
req := &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeSegment,
},
CollectionID: collectionID,
SegmentID: segmentID,
SegmentID: segment.segmentID,
}
response, err := loader.masterService.DescribeSegment(ctx, req)
if err != nil {
return 0, 0, err
return err
}
if response.Status.ErrorCode != commonpb.ErrorCode_Success {
return -1, -1, errors.New(response.Status.Reason)
return errors.New(response.Status.Reason)
}
loader.replica.setSegmentEnableIndex(segmentID, response.EnableIndex)
if !response.EnableIndex {
return -1, -1, errors.New("There are no indexes on this segment")
return errors.New("there are no indexes on this segment")
}
return response.IndexID, response.BuildID, nil
if loader.indexService == nil {
return errors.New("null index service client")
}
indexFilePathRequest := &indexpb.GetIndexFilePathsRequest{
IndexBuildIDs: []UniqueID{response.BuildID},
}
pathResponse, err := loader.indexService.GetIndexFilePaths(ctx, indexFilePathRequest)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
return err
}
if len(pathResponse.FilePaths) <= 0 {
return errors.New("illegal index file paths")
}
info := &indexInfo{
indexID: response.IndexID,
buildID: response.BuildID,
indexPaths: pathResponse.FilePaths[0].IndexFilePaths,
readyLoad: true,
}
segment.setEnableIndex(response.EnableIndex)
err = segment.setIndexInfo(fieldID, info)
if err != nil {
return err
}
return nil
}
func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
@ -366,49 +346,6 @@ func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error
return pathResponse.FilePaths[0].IndexFilePaths, nil
}
func (loader *indexLoader) loadIndexImmediate(segment *Segment, indexPaths []string) error {
// get vector field ids from schema to load index
vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(segment.collectionID)
if err != nil {
return err
}
for _, id := range vecFieldIDs {
l := &loadIndex{
segmentID: segment.ID(),
fieldID: id,
indexPaths: indexPaths,
}
err = loader.execute(l)
if err != nil {
return err
}
}
return nil
}
func (loader *indexLoader) loadIndexDelayed(collectionID, segmentID UniqueID, indexPaths []string) error {
// get vector field ids from schema to load index
vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionID)
if err != nil {
return err
}
for _, id := range vecFieldIDs {
l := &loadIndex{
segmentID: segmentID,
fieldID: id,
indexPaths: indexPaths,
}
err = loader.execute(l)
if err != nil {
return err
}
}
return nil
}
func newIndexLoader(ctx context.Context, masterService types.MasterService, indexService types.IndexService, replica ReplicaInterface) *indexLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,

View File

@ -32,7 +32,7 @@ func (s *loadService) start() {
return
case <-time.After(loadingCheckInterval * time.Second):
wg.Add(2)
go s.segLoader.indexLoader.doLoadIndex(wg)
//go s.segLoader.indexLoader.doLoadIndex(wg)
go s.loadSegmentActively(wg)
wg.Wait()
}
@ -51,13 +51,25 @@ func (s *loadService) loadSegmentActively(wg *sync.WaitGroup) {
}
log.Debug("do load segment for growing segments:", zap.String("segmentIDs", fmt.Sprintln(segmentIDs)))
for i := range collectionIDs {
collection, err := s.segLoader.replica.getCollectionByID(collectionIDs[i])
if err != nil {
log.Warn(err.Error())
}
fieldIDs, err := s.segLoader.replica.getFieldIDsByCollectionID(collectionIDs[i])
if err != nil {
log.Error(err.Error())
continue
}
err = s.loadSegmentInternal(collectionIDs[i], partitionIDs[i], segmentIDs[i], fieldIDs)
segment := newSegment(collection, segmentIDs[i], partitionIDs[i], collectionIDs[i], segmentTypeSealed)
segment.setLoadBinLogEnable(true)
err = s.loadSegmentInternal(collectionIDs[i], segment, fieldIDs)
if err == nil {
// replace segment
err = s.segLoader.replica.replaceGrowingSegmentBySealedSegment(segment)
}
if err != nil {
deleteSegment(segment)
log.Error(err.Error())
}
}
@ -83,28 +95,52 @@ func (s *loadService) loadSegmentPassively(collectionID UniqueID, partitionID Un
}
}
for _, segmentID := range segmentIDs {
err := s.segLoader.replica.addSegment(segmentID, partitionID, collectionID, segmentTypeGrowing)
collection, err := s.segLoader.replica.getCollectionByID(collectionID)
if err != nil {
log.Warn(err.Error())
continue
return err
}
err = s.segLoader.replica.setSegmentEnableLoadBinLog(segmentID, true)
_, err = s.segLoader.replica.getPartitionByID(partitionID)
if err != nil {
log.Warn(err.Error())
continue
return err
}
segment := newSegment(collection, segmentID, partitionID, collectionID, segmentTypeSealed)
segment.setLoadBinLogEnable(true)
err = s.loadSegmentInternal(collectionID, segment, fieldIDs)
if err == nil {
err = s.segLoader.replica.setSegment(segment)
}
err = s.loadSegmentInternal(collectionID, partitionID, segmentID, fieldIDs)
if err != nil {
log.Warn(err.Error())
continue
err = s.addSegmentToLoadBuffer(segment)
if err != nil {
log.Warn(err.Error())
}
}
}
return nil
}
func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, fieldIDs []int64) error {
func (s *loadService) addSegmentToLoadBuffer(segment *Segment) error {
segmentID := segment.segmentID
partitionID := segment.partitionID
collectionID := segment.collectionID
deleteSegment(segment)
err := s.segLoader.replica.addSegment(segmentID, partitionID, collectionID, segmentTypeGrowing)
if err != nil {
return err
}
err = s.segLoader.replica.setSegmentEnableLoadBinLog(segmentID, true)
if err != nil {
s.segLoader.replica.removeSegment(segmentID)
}
return err
}
func (s *loadService) loadSegmentInternal(collectionID UniqueID, segment *Segment, fieldIDs []int64) error {
// create segment
statesResp, err := s.segLoader.GetSegmentStates(segmentID)
statesResp, err := s.segLoader.GetSegmentStates(segment.segmentID)
if err != nil {
return err
}
@ -112,56 +148,46 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
return errors.New("segment not flush done")
}
collection, err := s.segLoader.replica.getCollectionByID(collectionID)
insertBinlogPaths, srcFieldIDs, err := s.segLoader.getInsertBinlogPaths(segment.segmentID)
if err != nil {
return err
}
_, err = s.segLoader.replica.getPartitionByID(partitionID)
if err != nil {
return err
}
segment := newSegment(collection, segmentID, partitionID, collectionID, segmentTypeSealed)
// we don't need index id yet
_, buildID, errIndex := s.segLoader.indexLoader.getIndexInfo(collectionID, segmentID)
if errIndex == nil {
// we don't need load to vector fields
vectorFields, err := s.segLoader.replica.getVecFieldIDsByCollectionID(collectionID)
if err != nil {
return err
}
fieldIDs = s.segLoader.filterOutVectorFields(fieldIDs, vectorFields)
}
paths, srcFieldIDs, err := s.segLoader.getInsertBinlogPaths(segmentID)
vectorFieldIDs, err := s.segLoader.replica.getVecFieldIDsByCollectionID(collectionID)
if err != nil {
return err
}
loadIndexFieldIDs := make([]int64, 0)
for _, vecFieldID := range vectorFieldIDs {
err = s.segLoader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID)
if err != nil {
log.Warn(err.Error())
continue
}
loadIndexFieldIDs = append(loadIndexFieldIDs, vecFieldID)
}
// we don't need load to vector fields
fieldIDs = s.segLoader.filterOutVectorFields(fieldIDs, loadIndexFieldIDs)
//log.Debug("srcFieldIDs in internal:", srcFieldIDs)
//log.Debug("dstFieldIDs in internal:", fieldIDs)
targetFields, err := s.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs)
targetFields, err := s.segLoader.checkTargetFields(insertBinlogPaths, srcFieldIDs, fieldIDs)
if err != nil {
return err
}
log.Debug("loading insert...")
err = s.segLoader.loadSegmentFieldsData(segment, targetFields)
if err != nil {
return err
}
// replace segment
err = s.segLoader.replica.replaceGrowingSegmentBySealedSegment(segment)
if err != nil {
return err
}
if errIndex == nil {
for _, id := range loadIndexFieldIDs {
log.Debug("loading index...")
indexPaths, err := s.segLoader.indexLoader.getIndexPaths(buildID)
if err != nil {
return err
}
err = s.segLoader.indexLoader.loadIndexImmediate(segment, indexPaths)
err = s.segLoader.indexLoader.loadIndex(segment, id)
if err != nil {
return err
}
}
return nil
}

View File

@ -1153,7 +1153,14 @@ func TestSegmentLoad_Search_Vector(t *testing.T) {
indexPaths, err := generateIndex(segmentID)
assert.NoError(t, err)
err = node.loadService.segLoader.indexLoader.loadIndexImmediate(segment, indexPaths)
indexInfo := &indexInfo{
indexPaths: indexPaths,
readyLoad: true,
}
err = segment.setIndexInfo(100, indexInfo)
assert.NoError(t, err)
err = node.loadService.segLoader.indexLoader.loadIndex(segment, 100)
assert.NoError(t, err)
// do search

View File

@ -602,14 +602,24 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
if err != nil {
continue
}
var indexName string
var indexID int64
// TODO:: segment has multi vec column
if len(segment.indexInfos) > 0 {
for fieldID := range segment.indexInfos {
indexName = segment.getIndexName(fieldID)
indexID = segment.getIndexID(fieldID)
break
}
}
info := &queryPb.SegmentInfo{
SegmentID: segment.ID(),
CollectionID: segment.collectionID,
PartitionID: segment.partitionID,
MemSize: segment.getMemSize(),
NumRows: segment.getRowCount(),
IndexName: segment.getIndexName(),
IndexID: segment.getIndexID(),
IndexName: indexName,
IndexID: indexID,
}
infos = append(infos, info)
}

View File

@ -35,8 +35,6 @@ const (
segmentTypeIndexing
)
type indexParam = map[string]string
type Segment struct {
segmentPtr C.CSegmentInterface
@ -57,9 +55,7 @@ type Segment struct {
segmentType segmentType
paramMutex sync.RWMutex // guards index
indexParam map[int64]indexParam
indexName string
indexID UniqueID
indexInfos map[int64]*indexInfo
}
//-------------------------------------------------------------------------------------- common interfaces
@ -91,30 +87,6 @@ func (s *Segment) getRecentlyModified() bool {
return s.recentlyModified
}
func (s *Segment) setIndexName(name string) {
s.rmMutex.Lock()
defer s.rmMutex.Unlock()
s.indexName = name
}
func (s *Segment) getIndexName() string {
s.rmMutex.Lock()
defer s.rmMutex.Unlock()
return s.indexName
}
func (s *Segment) setIndexID(id UniqueID) {
s.rmMutex.Lock()
defer s.rmMutex.Unlock()
s.indexID = id
}
func (s *Segment) getIndexID() UniqueID {
s.rmMutex.Lock()
defer s.rmMutex.Unlock()
return s.indexID
}
func (s *Segment) setType(segType segmentType) {
s.typeMu.Lock()
defer s.typeMu.Unlock()
@ -132,7 +104,7 @@ func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, c
CSegmentInterface
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
*/
initIndexParam := make(map[int64]indexParam)
indexInfos := make(map[int64]*indexInfo)
var segmentPtr C.CSegmentInterface
switch segType {
case segmentTypeInvalid:
@ -155,7 +127,7 @@ func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, c
segmentID: segmentID,
partitionID: partitionID,
collectionID: collectionID,
indexParam: initIndexParam,
indexInfos: indexInfos,
enableLoadBinLog: false,
}
@ -270,28 +242,116 @@ func (s *Segment) fillTargetEntry(plan *Plan,
return nil
}
func (s *Segment) setIndexParam(fieldID int64, indexParamKv []*commonpb.KeyValuePair) error {
//-------------------------------------------------------------------------------------- index info interface
func (s *Segment) setIndexName(fieldID int64, name string) error {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
indexParamMap := make(indexParam)
if indexParamKv == nil {
if _, ok := s.indexInfos[fieldID]; !ok {
return errors.New("index info hasn't been init")
}
s.indexInfos[fieldID].setIndexName(name)
return nil
}
func (s *Segment) setIndexParam(fieldID int64, indexParams map[string]string) error {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
if indexParams == nil {
return errors.New("empty loadIndexMsg's indexParam")
}
for _, param := range indexParamKv {
indexParamMap[param.Key] = param.Value
if _, ok := s.indexInfos[fieldID]; !ok {
return errors.New("index info hasn't been init")
}
s.indexParam[fieldID] = indexParamMap
s.indexInfos[fieldID].setIndexParams(indexParams)
return nil
}
func (s *Segment) setIndexPaths(fieldID int64, indexPaths []string) error {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
if _, ok := s.indexInfos[fieldID]; !ok {
return errors.New("index info hasn't been init")
}
s.indexInfos[fieldID].setIndexPaths(indexPaths)
return nil
}
func (s *Segment) setIndexID(fieldID int64, id UniqueID) error {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
if _, ok := s.indexInfos[fieldID]; !ok {
return errors.New("index info hasn't been init")
}
s.indexInfos[fieldID].setIndexID(id)
return nil
}
func (s *Segment) setBuildID(fieldID int64, id UniqueID) error {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
if _, ok := s.indexInfos[fieldID]; !ok {
return errors.New("index info hasn't been init")
}
s.indexInfos[fieldID].setBuildID(id)
return nil
}
func (s *Segment) getIndexName(fieldID int64) string {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
if _, ok := s.indexInfos[fieldID]; !ok {
return ""
}
return s.indexInfos[fieldID].getIndexName()
}
func (s *Segment) getIndexID(fieldID int64) UniqueID {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
if _, ok := s.indexInfos[fieldID]; !ok {
return -1
}
return s.indexInfos[fieldID].getIndexID()
}
func (s *Segment) getBuildID(fieldID int64) UniqueID {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
if _, ok := s.indexInfos[fieldID]; !ok {
return -1
}
return s.indexInfos[fieldID].getBuildID()
}
func (s *Segment) getIndexPaths(fieldID int64) []string {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
if _, ok := s.indexInfos[fieldID]; !ok {
return nil
}
return s.indexInfos[fieldID].getIndexPaths()
}
func (s *Segment) getIndexParams(fieldID int64) map[string]string {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
if _, ok := s.indexInfos[fieldID]; !ok {
return nil
}
return s.indexInfos[fieldID].getIndexParams()
}
func (s *Segment) matchIndexParam(fieldID int64, indexParams indexParam) bool {
s.paramMutex.RLock()
defer s.paramMutex.RUnlock()
fieldIndexParam := s.indexParam[fieldID]
if _, ok := s.indexInfos[fieldID]; !ok {
return false
}
fieldIndexParam := s.indexInfos[fieldID].getIndexParams()
if fieldIndexParam == nil {
return false
}
paramSize := len(s.indexParam)
paramSize := len(s.indexInfos)
matchCount := 0
for k, v := range indexParams {
value, ok := fieldIndexParam[k]
@ -306,6 +366,25 @@ func (s *Segment) matchIndexParam(fieldID int64, indexParams indexParam) bool {
return paramSize == matchCount
}
func (s *Segment) setIndexInfo(fieldID int64, info *indexInfo) error {
s.paramMutex.RLock()
defer s.paramMutex.RUnlock()
if s.indexInfos == nil {
return errors.New("indexInfos hasn't been init")
}
s.indexInfos[fieldID] = info
return nil
}
func (s *Segment) checkIndexReady(fieldID int64) bool {
s.paramMutex.RLock()
defer s.paramMutex.RUnlock()
if _, ok := s.indexInfos[fieldID]; !ok {
return false
}
return s.indexInfos[fieldID].getReadyLoad()
}
//-------------------------------------------------------------------------------------- interfaces for growing segment
func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) {
/*
@ -549,7 +628,29 @@ func (s *Segment) dropFieldData(fieldID int64) error {
return nil
}
func (s *Segment) updateSegmentIndex(loadIndexInfo *LoadIndexInfo) error {
func (s *Segment) updateSegmentIndex(bytesIndex [][]byte, fieldID UniqueID) error {
loadIndexInfo, err := newLoadIndexInfo()
defer deleteLoadIndexInfo(loadIndexInfo)
if err != nil {
return err
}
err = loadIndexInfo.appendFieldInfo(fieldID)
if err != nil {
return err
}
indexParams := s.getIndexParams(fieldID)
for k, v := range indexParams {
err = loadIndexInfo.appendIndexParam(k, v)
if err != nil {
return err
}
}
indexPaths := s.getIndexPaths(fieldID)
err = loadIndexInfo.appendIndex(bytesIndex, indexPaths)
if err != nil {
return err
}
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}

View File

@ -3,10 +3,14 @@ package querynode
import (
"context"
"errors"
"fmt"
"strconv"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/kv"
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
@ -121,6 +125,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetField
paths := p.Values
blobs := make([]*storage.Blob, 0)
log.Debug("loadSegmentFieldsData", zap.Int64("segmentID", segment.segmentID), zap.String("path", fmt.Sprintln(paths)))
for _, path := range paths {
binLog, err := loader.kv.Load(path)
if err != nil {

View File

@ -634,7 +634,8 @@ func (qs *QueryService) GetPartitionStates(ctx context.Context, req *querypb.Get
func (qs *QueryService) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
segmentInfos := make([]*querypb.SegmentInfo, 0)
for _, node := range qs.queryNodes {
totalMemSize := int64(0)
for nodeID, node := range qs.queryNodes {
segmentInfo, err := node.client.GetSegmentInfo(ctx, req)
if err != nil {
return &querypb.GetSegmentInfoResponse{
@ -645,6 +646,10 @@ func (qs *QueryService) GetSegmentInfo(ctx context.Context, req *querypb.GetSegm
}, err
}
segmentInfos = append(segmentInfos, segmentInfo.Infos...)
for _, info := range segmentInfo.Infos {
totalMemSize = totalMemSize + info.MemSize
}
log.Debug("getSegmentInfo", zap.Int64("nodeID", nodeID), zap.Int64("memory size", totalMemSize))
}
return &querypb.GetSegmentInfoResponse{
Status: &commonpb.Status{