Save indexName and indexID in IndexCodec

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-02-02 19:56:11 +08:00 committed by yefu.chen
parent 899d31e72d
commit 45b99c0cf3
8 changed files with 37 additions and 22 deletions

View File

@ -98,7 +98,7 @@ func TestGrpcService(t *testing.T) {
var binlogLock sync.Mutex
binlogPathArray := make([]string, 0, 16)
core.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) {
core.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) {
binlogLock.Lock()
defer binlogLock.Unlock()
binlogPathArray = append(binlogPathArray, binlog...)

View File

@ -247,7 +247,7 @@ func (it *IndexBuildTask) Execute() error {
}
var indexCodec storage.IndexCodec
serializedIndexBlobs, err := indexCodec.Serialize(getStorageBlobs(indexBlobs), indexParams)
serializedIndexBlobs, err := indexCodec.Serialize(getStorageBlobs(indexBlobs), indexParams, it.cmd.Req.IndexName, it.cmd.Req.IndexID)
if err != nil {
return err
}

View File

@ -152,7 +152,7 @@ type Core struct {
GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
//TODO, call index builder's client to build index, return build id
BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error)
BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error)
//TODO, proxy service interface, notify proxy service to drop collection
InvalidateCollectionMetaCache func(ts typeutil.Timestamp, dbName string, collectionName string) error
@ -671,13 +671,11 @@ func (c *Core) SetDataService(s DataServiceInterface) error {
}
func (c *Core) SetIndexService(s IndexServiceInterface) error {
c.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) {
c.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) {
rsp, err := s.BuildIndex(&indexpb.BuildIndexRequest{
DataPaths: binlog,
TypeParams: typeParams,
IndexParams: indexParams,
IndexID: indexID,
IndexName: indexName,
})
if err != nil {
return 0, err

View File

@ -628,7 +628,7 @@ func (t *CreateIndexTask) BuildIndex() error {
})
}
}
bldID, err = t.core.BuildIndexReq(binlogs, t.fieldSchema.TypeParams, t.indexParams, idxID, t.indexName)
bldID, err = t.core.BuildIndexReq(binlogs, t.fieldSchema.TypeParams, t.indexParams)
if err != nil {
return err
}

View File

@ -231,7 +231,7 @@ func (s *loadService) loadIndex(indexPath []string) ([][]byte, indexParam, error
// get index params when detecting indexParamPrefix
if path.Base(p) == storage.IndexParamsFile {
indexCodec := storage.NewIndexCodec()
_, indexParams, err = indexCodec.Deserialize([]*storage.Blob{
_, indexParams, _, _, err = indexCodec.Deserialize([]*storage.Blob{
{
Key: storage.IndexParamsFile,
Value: []byte(indexPiece),

View File

@ -853,7 +853,7 @@ func generateIndex(segmentID UniqueID) ([]string, error) {
// serialize index params
var indexCodec storage.IndexCodec
serializedIndexBlobs, err := indexCodec.Serialize(binarySet, indexParams)
serializedIndexBlobs, err := indexCodec.Serialize(binarySet, indexParams, "index_test_name", 1234)
if err != nil {
return nil, err
}

View File

@ -635,8 +635,16 @@ func NewIndexCodec() *IndexCodec {
return &IndexCodec{}
}
func (indexCodec *IndexCodec) Serialize(blobs []*Blob, params map[string]string) ([]*Blob, error) {
paramsBytes, err := json.Marshal(params)
func (indexCodec *IndexCodec) Serialize(blobs []*Blob, params map[string]string, indexName string, indexID UniqueID) ([]*Blob, error) {
paramsBytes, err := json.Marshal(struct {
Params map[string]string
IndexName string
IndexID UniqueID
}{
Params: params,
IndexName: indexName,
IndexID: indexID,
})
if err != nil {
return nil, err
}
@ -644,20 +652,27 @@ func (indexCodec *IndexCodec) Serialize(blobs []*Blob, params map[string]string)
return blobs, nil
}
func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, map[string]string, error) {
var params map[string]string
func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, map[string]string, string, UniqueID, error) {
var file *Blob
for i := 0; i < len(blobs); i++ {
if blobs[i].Key != IndexParamsFile {
continue
}
if err := json.Unmarshal(blobs[i].Value, &params); err != nil {
return nil, nil, err
}
file = blobs[i]
blobs = append(blobs[:i], blobs[i+1:]...)
break
}
if params == nil {
return nil, nil, errors.New("can not find params blob")
if file == nil {
return nil, nil, "", -1, errors.New("can not find params blob")
}
return blobs, params, nil
info := struct {
Params map[string]string
IndexName string
IndexID UniqueID
}{}
if err := json.Unmarshal(file.Value, &info); err != nil {
return nil, nil, "", -1, errors.New("json unmarshal error: " + err.Error())
}
return blobs, info.Params, info.IndexName, info.IndexID, nil
}

View File

@ -310,15 +310,17 @@ func TestIndexCodec(t *testing.T) {
indexParams := map[string]string{
"k1": "v1", "k2": "v2",
}
blobsInput, err := indexCodec.Serialize(blobs, indexParams)
blobsInput, err := indexCodec.Serialize(blobs, indexParams, "index_test_name", 1234)
assert.Nil(t, err)
assert.EqualValues(t, 4, len(blobsInput))
assert.EqualValues(t, IndexParamsFile, blobsInput[3])
blobsOutput, indexParamsOutput, err := indexCodec.Deserialize(blobsInput)
assert.EqualValues(t, IndexParamsFile, blobsInput[3].Key)
blobsOutput, indexParamsOutput, indexName, indexID, err := indexCodec.Deserialize(blobsInput)
assert.Nil(t, err)
assert.EqualValues(t, 3, len(blobsOutput))
for i := 0; i < 3; i++ {
assert.EqualValues(t, blobs[i], blobsOutput[i])
}
assert.EqualValues(t, indexParams, indexParamsOutput)
assert.EqualValues(t, "index_test_name", indexName)
assert.EqualValues(t, 1234, indexID)
}