Alloc timestamp instead of using start position timestamp for segment index (#19820)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/19871/head
cai.zhang 2022-10-18 17:05:26 +08:00 committed by GitHub
parent 15f7592739
commit b3f6b67977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 124 additions and 6 deletions

View File

@ -28,10 +28,12 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util"
)
@ -265,6 +267,16 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
return nil
}
resp, err := fsw.ic.rootCoordClient.AllocTimestamp(fsw.ctx, &rootcoordpb.AllocTimestampRequest{
Count: 1,
})
if err != nil {
return err
}
if resp.Status.GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(resp.Status.GetReason())
}
for _, index := range fieldIndexes {
segIdx := &model.SegmentIndex{
SegmentID: t.segmentInfo.ID,
@ -272,7 +284,7 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
PartitionID: t.segmentInfo.PartitionID,
NumRows: t.segmentInfo.NumOfRows,
IndexID: index.IndexID,
CreateTime: t.segmentInfo.StartPosition.Timestamp,
CreateTime: resp.Timestamp,
}
//create index task for metaTable

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
)
func Test_flushSegmentWatcher(t *testing.T) {
@ -249,6 +250,7 @@ func Test_flushSegmentWatcher_internalProcess_success(t *testing.T) {
}, nil
},
},
rootCoordClient: NewRootCoordMock(),
metaTable: meta,
},
internalTasks: map[UniqueID]*internalTask{
@ -335,6 +337,7 @@ func Test_flushSegmentWatcher_internalProcess_error(t *testing.T) {
}, nil
},
},
rootCoordClient: NewRootCoordMock(),
metaTable: &metaTable{},
},
internalTasks: map[UniqueID]*internalTask{
@ -521,3 +524,89 @@ func Test_flushSegmentWatcher_removeFlushedSegment(t *testing.T) {
assert.Error(t, err)
})
}
func Test_flushSegmentWatcher_constructTask_error(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
meta := &metaTable{
segmentIndexLock: sync.RWMutex{},
indexLock: sync.RWMutex{},
catalog: &indexcoord.Catalog{Txn: &mockETCDKV{
multiSave: func(m map[string]string) error {
return nil
},
}},
collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 1,
TypeParams: nil,
IndexParams: nil,
},
},
},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{},
}
task := &internalTask{
state: indexTaskInit,
segmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
},
}
fsw := &flushedSegmentWatcher{
ctx: ctx,
cancel: cancel,
kvClient: nil,
wg: sync.WaitGroup{},
scheduleDuration: 100 * time.Millisecond,
internalTaskMutex: sync.RWMutex{},
internalNotify: make(chan struct{}, 1),
etcdRevision: 0,
watchChan: nil,
meta: meta,
builder: nil,
ic: &IndexCoord{
rootCoordClient: NewRootCoordMock(),
},
handoff: nil,
internalTasks: map[UniqueID]*internalTask{
segID: task,
},
}
t.Run("alloc timestamp error", func(t *testing.T) {
fsw.ic.rootCoordClient = &RootCoordMock{
CallAllocTimestamp: func(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
return nil, errors.New("error")
},
}
err := fsw.constructTask(task)
assert.Error(t, err)
})
t.Run("alloc timestamp not success", func(t *testing.T) {
fsw.ic.rootCoordClient = &RootCoordMock{
CallAllocTimestamp: func(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
return &rootcoordpb.AllocTimestampResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "fail reason",
},
}, nil
},
}
err := fsw.constructTask(task)
assert.Error(t, err)
})
}

View File

@ -301,6 +301,7 @@ type RootCoordMock struct {
CallGetComponentStates func(ctx context.Context) (*milvuspb.ComponentStates, error)
CallAllocID func(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error)
CallAllocTimestamp func(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error)
}
func (rcm *RootCoordMock) Init() error {
@ -315,6 +316,10 @@ func (rcm *RootCoordMock) GetComponentStates(ctx context.Context) (*milvuspb.Com
return rcm.CallGetComponentStates(ctx)
}
func (rcm *RootCoordMock) AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
return rcm.CallAllocTimestamp(ctx, req)
}
func (rcm *RootCoordMock) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
return rcm.CallAllocID(ctx, req)
}
@ -349,6 +354,15 @@ func NewRootCoordMock() *RootCoordMock {
Count: req.Count,
}, nil
},
CallAllocTimestamp: func(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
return &rootcoordpb.AllocTimestampResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Timestamp: 1,
Count: req.Count,
}, nil
},
}
}

View File

@ -170,7 +170,9 @@ func testIndexCoord(t *testing.T) {
assert.NoError(t, err)
mockKv := NewMockEtcdKVWithReal(ic.etcdKV)
ic.metaTable, err = NewMetaTable(mockKv)
ic.metaTable.catalog = &indexcoord.Catalog{
Txn: mockKv,
}
assert.NoError(t, err)
err = ic.Register()
@ -201,6 +203,7 @@ func testIndexCoord(t *testing.T) {
CollectionID: collID,
FieldID: fieldID,
IndexName: indexName,
Timestamp: createTs,
}
resp, err := ic.CreateIndex(ctx, req)
assert.NoError(t, err)

View File

@ -196,7 +196,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
PartitionID: segmentInfo.PartitionID,
NumRows: segmentInfo.NumOfRows,
IndexID: cit.indexID,
CreateTime: segmentInfo.StartPosition.Timestamp,
CreateTime: cit.req.GetTimestamp(),
}
have, buildID, err := cit.indexCoordClient.createIndexForSegment(segIdx)
if err != nil {