From e7ebfcb05a860547e6267048bce4bd0a882705ff Mon Sep 17 00:00:00 2001 From: sunby Date: Wed, 23 Dec 2020 15:13:45 +0800 Subject: [PATCH] Save index meta to meta table Signed-off-by: sunby --- internal/master/flush_scheduler.go | 18 +- internal/master/index_builder_scheduler.go | 69 ++++- internal/master/meta_table.go | 16 ++ internal/master/persistence_scheduler_test.go | 20 ++ internal/storage/data_codec.go | 104 ++++--- internal/storage/data_codec_test.go | 253 +++++++++++------- 6 files changed, 320 insertions(+), 160 deletions(-) diff --git a/internal/master/flush_scheduler.go b/internal/master/flush_scheduler.go index f4aba60d85..7648330b5d 100644 --- a/internal/master/flush_scheduler.go +++ b/internal/master/flush_scheduler.go @@ -5,6 +5,8 @@ import ( "log" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/errors" ) @@ -77,7 +79,21 @@ func (scheduler *FlushScheduler) describe() error { return err } } - //TODO: Save data to meta table + // Save data to meta table + segMeta, err := scheduler.metaTable.GetSegmentByID(singleSegmentID) + if err != nil { + return err + } + segMeta.BinlogFilePaths = make([]*etcdpb.FieldBinlogFiles, 0) + for k, v := range mapData { + segMeta.BinlogFilePaths = append(segMeta.BinlogFilePaths, &etcdpb.FieldBinlogFiles{ + FieldID: k, + BinlogFiles: v, + }) + } + if err = scheduler.metaTable.UpdateSegment(segMeta); err != nil { + return err + } log.Printf("flush segment %d finished", singleSegmentID) break } diff --git a/internal/master/index_builder_scheduler.go b/internal/master/index_builder_scheduler.go index 1600d73992..ca7dc536a0 100644 --- a/internal/master/index_builder_scheduler.go +++ b/internal/master/index_builder_scheduler.go @@ -5,6 +5,10 @@ import ( "log" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" ) @@ -15,8 +19,9 @@ type IndexBuildInfo struct { binlogFilePath []string } type IndexBuildChannelInfo struct { - id UniqueID - info *IndexBuildInfo + id UniqueID + info *IndexBuildInfo + indexParams []*commonpb.KeyValuePair } type IndexBuildScheduler struct { @@ -47,14 +52,43 @@ func NewIndexBuildScheduler(ctx context.Context, client BuildIndexClient, metaTa func (scheduler *IndexBuildScheduler) schedule(info interface{}) error { indexBuildInfo := info.(*IndexBuildInfo) - indexID, err := scheduler.client.BuildIndexWithoutID(indexBuildInfo.binlogFilePath, nil, nil) + segMeta, err := scheduler.metaTable.GetSegmentByID(indexBuildInfo.segmentID) + if err != nil { + return err + } + + // parse index params + indexParams, err := scheduler.metaTable.GetFieldIndexParams(segMeta.CollectionID, indexBuildInfo.fieldID) + if err != nil { + return err + } + indexParamsMap := make(map[string]string) + for _, kv := range indexParams { + indexParamsMap[kv.Key] = kv.Value + } + + indexID, err := scheduler.client.BuildIndexWithoutID(indexBuildInfo.binlogFilePath, nil, indexParamsMap) log.Printf("build index for segment %d field %d", indexBuildInfo.segmentID, indexBuildInfo.fieldID) if err != nil { return err } + + err = scheduler.metaTable.AddFieldIndexMeta(&etcdpb.FieldIndexMeta{ + SegmentID: indexBuildInfo.segmentID, + FieldID: indexBuildInfo.fieldID, + IndexID: indexID, + IndexParams: indexParams, + Status: indexbuilderpb.IndexStatus_NONE, + }) + if err != nil { + log.Printf("WARNING: " + err.Error()) + //return err + } + scheduler.indexDescribe <- &IndexBuildChannelInfo{ - id: indexID, - info: indexBuildInfo, + id: indexID, + info: indexBuildInfo, + indexParams: indexParams, } return nil } @@ -100,7 +134,18 @@ func (scheduler *IndexBuildScheduler) describe() error { fieldName: fieldName, indexFilePaths: filePaths, } - //TODO: Save data to meta table + // Save data to meta table + err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{ + SegmentID: indexBuildInfo.segmentID, + FieldID: indexBuildInfo.fieldID, + IndexID: indexID, + IndexParams: channelInfo.indexParams, + Status: indexbuilderpb.IndexStatus_FINISHED, + IndexFilePaths: filePaths, + }) + if err != nil { + return err + } err = scheduler.indexLoadSch.Enqueue(info) log.Printf("build index for segment %d field %d enqueue load index", indexBuildInfo.segmentID, indexBuildInfo.fieldID) @@ -108,6 +153,18 @@ func (scheduler *IndexBuildScheduler) describe() error { return err } log.Printf("build index for segment %d field %d finished", indexBuildInfo.segmentID, indexBuildInfo.fieldID) + } else { + // save status to meta table + err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{ + SegmentID: indexBuildInfo.segmentID, + FieldID: indexBuildInfo.fieldID, + IndexID: indexID, + IndexParams: channelInfo.indexParams, + Status: description.Status, + }) + if err != nil { + return err + } } time.Sleep(1 * time.Second) } diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 7bb91c805b..601f412a4c 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -619,3 +619,19 @@ func (mt *metaTable) removeSegmentIndexMeta(segID UniqueID) error { return nil } + +func (mt *metaTable) GetFieldIndexParams(collID UniqueID, fieldID UniqueID) ([]*commonpb.KeyValuePair, error) { + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() + + if _, ok := mt.collID2Meta[collID]; !ok { + return nil, fmt.Errorf("can not find collection with id %d", collID) + } + + for _, fieldSchema := range mt.collID2Meta[collID].Schema.Fields { + if fieldSchema.FieldID == fieldID { + return fieldSchema.IndexParams, nil + } + } + return nil, fmt.Errorf("can not find field %d in collection %d", fieldID, collID) +} diff --git a/internal/master/persistence_scheduler_test.go b/internal/master/persistence_scheduler_test.go index 3c13bd8d65..025d9b313b 100644 --- a/internal/master/persistence_scheduler_test.go +++ b/internal/master/persistence_scheduler_test.go @@ -5,6 +5,9 @@ import ( "testing" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/stretchr/testify/assert" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" @@ -34,6 +37,23 @@ func TestPersistenceScheduler(t *testing.T) { assert.Nil(t, err) defer meta.client.Close() + err = meta.AddCollection(&etcdpb.CollectionMeta{ + ID: 1, + Schema: &schemapb.CollectionSchema{ + Name: "testcoll", + Fields: []*schemapb.FieldSchema{ + {FieldID: 1}, + {FieldID: 100}, + }, + }, + }) + assert.Nil(t, err) + err = meta.AddSegment(&etcdpb.SegmentMeta{ + SegmentID: 1, + CollectionID: 1, + }) + assert.Nil(t, err) + //Init scheduler indexLoadSch := NewIndexLoadScheduler(ctx, loadIndexClient, meta) indexBuildSch := NewIndexBuildScheduler(ctx, buildIndexClient, meta, indexLoadSch) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index c5913f833e..b7b84ad5ce 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -13,11 +13,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) -const ( - Ts = "ts" - DDL = "ddl" -) - type ( UniqueID = typeutil.UniqueID FieldID = typeutil.UniqueID @@ -59,6 +54,13 @@ func (b Blob) GetValue() []byte { return b.Value } +type Base struct { + Version int + CommitID int + TenantID UniqueID + Schema *etcdpb.CollectionMeta +} + type FieldData interface{} type BoolFieldData struct { @@ -120,14 +122,10 @@ type InsertData struct { // Blob key example: // ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx} type InsertCodec struct { - Schema *etcdpb.CollectionMeta + Base readerCloseFunc []func() error } -func NewInsertCodec(schema *etcdpb.CollectionMeta) *InsertCodec { - return &InsertCodec{Schema: schema} -} - func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) { var blobs []*Blob var writer *InsertBinlogWriter @@ -427,59 +425,20 @@ func (insertCodec *InsertCodec) Close() error { } // Blob key example: -// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} -// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx} +// ${tenant}/data_definition_log/${collection_id}/${field_type}/${log_idx} type DataDefinitionCodec struct { - collectionID int64 + Base readerCloseFunc []func() error } -func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec { - return &DataDefinitionCodec{collectionID: collectionID} -} - func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) { - writer, err := NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.collectionID) + writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.Schema.ID) if err != nil { return nil, err } var blobs []*Blob - eventWriter, err := writer.NextCreateCollectionEventWriter() - if err != nil { - return nil, err - } - var int64Ts []int64 - for _, singleTs := range ts { - int64Ts = append(int64Ts, int64(singleTs)) - } - err = eventWriter.AddInt64ToPayload(int64Ts) - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(ts[0]) - eventWriter.SetEndTimestamp(ts[len(ts)-1]) - writer.SetStartTimeStamp(ts[0]) - writer.SetEndTimeStamp(ts[len(ts)-1]) - err = writer.Close() - if err != nil { - return nil, err - } - buffer, err := writer.GetBuffer() - if err != nil { - return nil, err - } - blobs = append(blobs, &Blob{ - Key: Ts, - Value: buffer, - }) - - writer, err = NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.collectionID) - if err != nil { - return nil, err - } - for pos, req := range ddRequests { switch eventTypes[pos] { case CreateCollectionEventType: @@ -534,12 +493,46 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } + buffer, err := writer.GetBuffer() + if err != nil { + return nil, err + } + blobs = append(blobs, &Blob{ + Key: "", + Value: buffer, + }) + + writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.Schema.ID) + if err != nil { + return nil, err + } + + eventWriter, err := writer.NextCreateCollectionEventWriter() + if err != nil { + return nil, err + } + var int64Ts []int64 + for _, singleTs := range ts { + int64Ts = append(int64Ts, int64(singleTs)) + } + err = eventWriter.AddInt64ToPayload(int64Ts) + if err != nil { + return nil, err + } + eventWriter.SetStartTimestamp(ts[0]) + eventWriter.SetEndTimestamp(ts[len(ts)-1]) + writer.SetStartTimeStamp(ts[0]) + writer.SetEndTimeStamp(ts[len(ts)-1]) + err = writer.Close() + if err != nil { + return nil, err + } buffer, err = writer.GetBuffer() if err != nil { return nil, err } blobs = append(blobs, &Blob{ - Key: DDL, + Key: "", Value: buffer, }) @@ -627,10 +620,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Close() error { //func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, error) {} type IndexCodec struct { -} - -func NewIndexCodec() *IndexCodec { - return &IndexCodec{} + Base } func (indexCodec *IndexCodec) Serialize(blobs []*Blob) ([]*Blob, error) { diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 9a4cd72d2f..8a112ec7b8 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -10,104 +10,112 @@ import ( ) func TestInsertCodec(t *testing.T) { - Schema := &etcdpb.CollectionMeta{ - ID: 1, - CreateTime: 1, - SegmentIDs: []int64{0, 1}, - PartitionTags: []string{"partition_0", "partition_1"}, - Schema: &schemapb.CollectionSchema{ - Name: "schema", - Description: "schema", - AutoID: true, - Fields: []*schemapb.FieldSchema{ - { - FieldID: 0, - Name: "row_id", - IsPrimaryKey: false, - Description: "row_id", - DataType: schemapb.DataType_INT64, - }, - { - FieldID: 1, - Name: "Ts", - IsPrimaryKey: false, - Description: "Ts", - DataType: schemapb.DataType_INT64, - }, - { - FieldID: 100, - Name: "field_bool", - IsPrimaryKey: false, - Description: "description_2", - DataType: schemapb.DataType_BOOL, - }, - { - FieldID: 101, - Name: "field_int8", - IsPrimaryKey: false, - Description: "description_3", - DataType: schemapb.DataType_INT8, - }, - { - FieldID: 102, - Name: "field_int16", - IsPrimaryKey: false, - Description: "description_4", - DataType: schemapb.DataType_INT16, - }, - { - FieldID: 103, - Name: "field_int32", - IsPrimaryKey: false, - Description: "description_5", - DataType: schemapb.DataType_INT32, - }, - { - FieldID: 104, - Name: "field_int64", - IsPrimaryKey: false, - Description: "description_6", - DataType: schemapb.DataType_INT64, - }, - { - FieldID: 105, - Name: "field_float", - IsPrimaryKey: false, - Description: "description_7", - DataType: schemapb.DataType_FLOAT, - }, - { - FieldID: 106, - Name: "field_double", - IsPrimaryKey: false, - Description: "description_8", - DataType: schemapb.DataType_DOUBLE, - }, - { - FieldID: 107, - Name: "field_string", - IsPrimaryKey: false, - Description: "description_9", - DataType: schemapb.DataType_STRING, - }, - { - FieldID: 108, - Name: "field_binary_vector", - IsPrimaryKey: false, - Description: "description_10", - DataType: schemapb.DataType_VECTOR_BINARY, - }, - { - FieldID: 109, - Name: "field_float_vector", - IsPrimaryKey: false, - Description: "description_11", - DataType: schemapb.DataType_VECTOR_FLOAT, + base := Base{ + Version: 1, + CommitID: 1, + TenantID: 1, + Schema: &etcdpb.CollectionMeta{ + ID: 1, + CreateTime: 1, + SegmentIDs: []int64{0, 1}, + PartitionTags: []string{"partition_0", "partition_1"}, + Schema: &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 0, + Name: "row_id", + IsPrimaryKey: false, + Description: "row_id", + DataType: schemapb.DataType_INT64, + }, + { + FieldID: 1, + Name: "Ts", + IsPrimaryKey: false, + Description: "Ts", + DataType: schemapb.DataType_INT64, + }, + { + FieldID: 100, + Name: "field_bool", + IsPrimaryKey: false, + Description: "description_2", + DataType: schemapb.DataType_BOOL, + }, + { + FieldID: 101, + Name: "field_int8", + IsPrimaryKey: false, + Description: "description_3", + DataType: schemapb.DataType_INT8, + }, + { + FieldID: 102, + Name: "field_int16", + IsPrimaryKey: false, + Description: "description_4", + DataType: schemapb.DataType_INT16, + }, + { + FieldID: 103, + Name: "field_int32", + IsPrimaryKey: false, + Description: "description_5", + DataType: schemapb.DataType_INT32, + }, + { + FieldID: 104, + Name: "field_int64", + IsPrimaryKey: false, + Description: "description_6", + DataType: schemapb.DataType_INT64, + }, + { + FieldID: 105, + Name: "field_float", + IsPrimaryKey: false, + Description: "description_7", + DataType: schemapb.DataType_FLOAT, + }, + { + FieldID: 106, + Name: "field_double", + IsPrimaryKey: false, + Description: "description_8", + DataType: schemapb.DataType_DOUBLE, + }, + { + FieldID: 107, + Name: "field_string", + IsPrimaryKey: false, + Description: "description_9", + DataType: schemapb.DataType_STRING, + }, + { + FieldID: 108, + Name: "field_binary_vector", + IsPrimaryKey: false, + Description: "description_10", + DataType: schemapb.DataType_VECTOR_BINARY, + }, + { + FieldID: 109, + Name: "field_float_vector", + IsPrimaryKey: false, + Description: "description_11", + DataType: schemapb.DataType_VECTOR_FLOAT, + }, }, }, }, } - insertCodec := NewInsertCodec(Schema) + insertCodec := &InsertCodec{ + base, + make([]func() error, 0), + } insertDataFirst := &InsertData{ Data: map[int64]FieldData{ 0: &Int64FieldData{ @@ -260,7 +268,58 @@ func TestInsertCodec(t *testing.T) { assert.Nil(t, insertCodec.Close()) } func TestDDCodec(t *testing.T) { - dataDefinitionCodec := NewDataDefinitionCodec(int64(1)) + base := Base{ + Version: 1, + CommitID: 1, + TenantID: 1, + Schema: &etcdpb.CollectionMeta{ + ID: 1, + CreateTime: 1, + SegmentIDs: []int64{0, 1}, + PartitionTags: []string{"partition_0", "partition_1"}, + Schema: &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + Fields: []*schemapb.FieldSchema{ + { + Name: "field_1", + IsPrimaryKey: false, + Description: "description_1", + DataType: schemapb.DataType_INT32, + }, + { + Name: "field_2", + IsPrimaryKey: false, + Description: "description_1", + DataType: schemapb.DataType_INT64, + }, + { + Name: "field_3", + IsPrimaryKey: false, + Description: "description_1", + DataType: schemapb.DataType_STRING, + }, + { + Name: "field_3", + IsPrimaryKey: false, + Description: "description_1", + DataType: schemapb.DataType_STRING, + }, + { + Name: "field_3", + IsPrimaryKey: false, + Description: "description_1", + DataType: schemapb.DataType_STRING, + }, + }, + }, + }, + } + dataDefinitionCodec := &DataDefinitionCodec{ + base, + make([]func() error, 0), + } ts := []Timestamp{ 1, 2, @@ -292,7 +351,9 @@ func TestDDCodec(t *testing.T) { } func TestIndexCodec(t *testing.T) { - indexCodec := NewIndexCodec() + indexCodec := &IndexCodec{ + Base{}, + } blobs := []*Blob{ { "12345",