Fix flush segment

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
pull/4973/head^2
neza2017 2021-03-13 17:05:36 +08:00 committed by yefu.chen
parent 47d9471498
commit 6eb45c92eb
5 changed files with 40 additions and 36 deletions

View File

@ -112,7 +112,7 @@ func TestGrpcService(t *testing.T) {
core.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
return []string{"file1", "file2", "file3"}, nil
}
core.GetNumRowsReq = func(segID typeutil.UniqueID) (int64, error) {
core.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
return cms.Params.MinSegmentSizeToEnableIndex, nil
}

View File

@ -95,7 +95,7 @@ type Core struct {
//get binlog file path from data service,
GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
GetNumRowsReq func(segID typeutil.UniqueID) (int64, error)
GetNumRowsReq func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error)
//call index builder's client to build index, return build id
BuildIndexReq func(ctx context.Context, binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error)
@ -343,12 +343,14 @@ func (c *Core) startSegmentFlushCompletedLoop() {
fieldSch, err := GetFieldSchemaByID(coll, f.FiledID)
if err == nil {
t := &CreateIndexTask{
core: c,
segmentID: seg,
indexName: idxInfo.IndexName,
indexID: idxInfo.IndexID,
fieldSchema: fieldSch,
indexParams: idxInfo.IndexParams,
ctx: c.ctx,
core: c,
segmentID: seg,
indexName: idxInfo.IndexName,
indexID: idxInfo.IndexID,
fieldSchema: fieldSch,
indexParams: idxInfo.IndexParams,
isFromFlushedChan: true,
}
c.indexTaskQueue <- t
}
@ -659,7 +661,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
return nil, fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID)
}
c.GetNumRowsReq = func(segID typeutil.UniqueID) (int64, error) {
c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
ts, err := c.tsoAllocator.Alloc(1)
if err != nil {
return 0, err
@ -683,7 +685,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
log.Debug("get segment info empty")
return 0, nil
}
if segInfo.Infos[0].FlushedTime == 0 {
if !isFromFlushedChan && segInfo.Infos[0].State != commonpb.SegmentState_Flushed {
log.Debug("segment id not flushed", zap.Int64("segment id", segID))
return 0, nil
}

View File

@ -93,11 +93,21 @@ func (d *dataMock) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInf
{
FlushedTime: 100,
NumRows: Params.MinSegmentSizeToEnableIndex,
State: commonpb.SegmentState_Flushed,
},
},
}, nil
}
func (d *dataMock) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Value: fmt.Sprintf("segment-info-channel-%d", d.randVal),
}, nil
}
type queryMock struct {
types.QueryService
collID []typeutil.UniqueID
@ -114,15 +124,6 @@ func (q *queryMock) ReleaseCollection(ctx context.Context, req *querypb.ReleaseC
}, nil
}
func (d *dataMock) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Value: fmt.Sprintf("segment-info-channel-%d", d.randVal),
}, nil
}
type indexMock struct {
types.IndexService
fileArray []string

View File

@ -686,13 +686,14 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
}
for _, seg := range segIDs {
task := CreateIndexTask{
ctx: ctx,
core: t.core,
segmentID: seg,
indexName: idxInfo.IndexName,
indexID: idxInfo.IndexID,
fieldSchema: &field,
indexParams: t.Req.ExtraParams,
ctx: t.core.ctx,
core: t.core,
segmentID: seg,
indexName: idxInfo.IndexName,
indexID: idxInfo.IndexID,
fieldSchema: &field,
indexParams: t.Req.ExtraParams,
isFromFlushedChan: false,
}
t.core.indexTaskQueue <- &task
fmt.Println("create index task enqueue, segID = ", seg)
@ -780,20 +781,21 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error {
}
type CreateIndexTask struct {
ctx context.Context
core *Core
segmentID typeutil.UniqueID
indexName string
indexID typeutil.UniqueID
fieldSchema *schemapb.FieldSchema
indexParams []*commonpb.KeyValuePair
ctx context.Context
core *Core
segmentID typeutil.UniqueID
indexName string
indexID typeutil.UniqueID
fieldSchema *schemapb.FieldSchema
indexParams []*commonpb.KeyValuePair
isFromFlushedChan bool
}
func (t *CreateIndexTask) BuildIndex() error {
if t.core.MetaTable.IsSegmentIndexed(t.segmentID, t.fieldSchema, t.indexParams) {
return nil
}
rows, err := t.core.GetNumRowsReq(t.segmentID)
rows, err := t.core.GetNumRowsReq(t.segmentID, t.isFromFlushedChan)
if err != nil {
return err
}

View File

@ -287,8 +287,7 @@ class TestSearchBase:
assert res[0]._distances[0] < epsilon
assert check_id_result(res[0], ids[0])
@pytest.mark.tags("0331")
# @pytest.mark.skip("r0.3-test")
#@pytest.mark.tags("0331")
def test_search_after_index_different_metric_type(self, connect, collection, get_simple_index):
'''
target: test search with different metric_type