diff --git a/configs/advanced/write_node.yaml b/configs/advanced/write_node.yaml index 4ea1770b7d..619ece9330 100644 --- a/configs/advanced/write_node.yaml +++ b/configs/advanced/write_node.yaml @@ -33,3 +33,8 @@ writeNode: #streamBufSize: 1024 # msgPack chan buffer size recvBufSize: 1024 # msgPack chan buffer size pulsarBufSize: 1024 # pulsar chan buffer size + + flush: + # max buffer size to flush + insertBufSize: 20 + ddBufSize: 20 diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 6e2cd2bcdb..42860387dd 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -29,6 +29,7 @@ minio: accessKeyID: minioadmin secretAccessKey: minioadmin useSSL: false + bucketName: "A-bucket" pulsar: address: localhost diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index b7b84ad5ce..c5913f833e 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -13,6 +13,11 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) +const ( + Ts = "ts" + DDL = "ddl" +) + type ( UniqueID = typeutil.UniqueID FieldID = typeutil.UniqueID @@ -54,13 +59,6 @@ func (b Blob) GetValue() []byte { return b.Value } -type Base struct { - Version int - CommitID int - TenantID UniqueID - Schema *etcdpb.CollectionMeta -} - type FieldData interface{} type BoolFieldData struct { @@ -122,10 +120,14 @@ type InsertData struct { // Blob key example: // ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx} type InsertCodec struct { - Base + Schema *etcdpb.CollectionMeta readerCloseFunc []func() error } +func NewInsertCodec(schema *etcdpb.CollectionMeta) *InsertCodec { + return &InsertCodec{Schema: schema} +} + func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) { var blobs []*Blob var writer *InsertBinlogWriter @@ -425,20 +427,59 @@ func (insertCodec *InsertCodec) Close() error { } // Blob key example: -// ${tenant}/data_definition_log/${collection_id}/${field_type}/${log_idx} +// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} +// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx} type DataDefinitionCodec struct { - Base + collectionID int64 readerCloseFunc []func() error } +func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec { + return &DataDefinitionCodec{collectionID: collectionID} +} + func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) { - writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.Schema.ID) + writer, err := NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.collectionID) if err != nil { return nil, err } var blobs []*Blob + eventWriter, err := writer.NextCreateCollectionEventWriter() + if err != nil { + return nil, err + } + var int64Ts []int64 + for _, singleTs := range ts { + int64Ts = append(int64Ts, int64(singleTs)) + } + err = eventWriter.AddInt64ToPayload(int64Ts) + if err != nil { + return nil, err + } + eventWriter.SetStartTimestamp(ts[0]) + eventWriter.SetEndTimestamp(ts[len(ts)-1]) + writer.SetStartTimeStamp(ts[0]) + writer.SetEndTimeStamp(ts[len(ts)-1]) + err = writer.Close() + if err != nil { + return nil, err + } + buffer, err := writer.GetBuffer() + if err != nil { + return nil, err + } + blobs = append(blobs, &Blob{ + Key: Ts, + Value: buffer, + }) + + writer, err = NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.collectionID) + if err != nil { + return nil, err + } + for pos, req := range ddRequests { switch eventTypes[pos] { case CreateCollectionEventType: @@ -493,46 +534,12 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - buffer, err := writer.GetBuffer() - if err != nil { - return nil, err - } - blobs = append(blobs, &Blob{ - Key: "", - Value: buffer, - }) - - writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.Schema.ID) - if err != nil { - return nil, err - } - - eventWriter, err := writer.NextCreateCollectionEventWriter() - if err != nil { - return nil, err - } - var int64Ts []int64 - for _, singleTs := range ts { - int64Ts = append(int64Ts, int64(singleTs)) - } - err = eventWriter.AddInt64ToPayload(int64Ts) - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(ts[0]) - eventWriter.SetEndTimestamp(ts[len(ts)-1]) - writer.SetStartTimeStamp(ts[0]) - writer.SetEndTimeStamp(ts[len(ts)-1]) - err = writer.Close() - if err != nil { - return nil, err - } buffer, err = writer.GetBuffer() if err != nil { return nil, err } blobs = append(blobs, &Blob{ - Key: "", + Key: DDL, Value: buffer, }) @@ -620,7 +627,10 @@ func (dataDefinitionCodec *DataDefinitionCodec) Close() error { //func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, error) {} type IndexCodec struct { - Base +} + +func NewIndexCodec() *IndexCodec { + return &IndexCodec{} } func (indexCodec *IndexCodec) Serialize(blobs []*Blob) ([]*Blob, error) { diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 8a112ec7b8..9a4cd72d2f 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -10,112 +10,104 @@ import ( ) func TestInsertCodec(t *testing.T) { - base := Base{ - Version: 1, - CommitID: 1, - TenantID: 1, - Schema: &etcdpb.CollectionMeta{ - ID: 1, - CreateTime: 1, - SegmentIDs: []int64{0, 1}, - PartitionTags: []string{"partition_0", "partition_1"}, - Schema: &schemapb.CollectionSchema{ - Name: "schema", - Description: "schema", - AutoID: true, - Fields: []*schemapb.FieldSchema{ - { - FieldID: 0, - Name: "row_id", - IsPrimaryKey: false, - Description: "row_id", - DataType: schemapb.DataType_INT64, - }, - { - FieldID: 1, - Name: "Ts", - IsPrimaryKey: false, - Description: "Ts", - DataType: schemapb.DataType_INT64, - }, - { - FieldID: 100, - Name: "field_bool", - IsPrimaryKey: false, - Description: "description_2", - DataType: schemapb.DataType_BOOL, - }, - { - FieldID: 101, - Name: "field_int8", - IsPrimaryKey: false, - Description: "description_3", - DataType: schemapb.DataType_INT8, - }, - { - FieldID: 102, - Name: "field_int16", - IsPrimaryKey: false, - Description: "description_4", - DataType: schemapb.DataType_INT16, - }, - { - FieldID: 103, - Name: "field_int32", - IsPrimaryKey: false, - Description: "description_5", - DataType: schemapb.DataType_INT32, - }, - { - FieldID: 104, - Name: "field_int64", - IsPrimaryKey: false, - Description: "description_6", - DataType: schemapb.DataType_INT64, - }, - { - FieldID: 105, - Name: "field_float", - IsPrimaryKey: false, - Description: "description_7", - DataType: schemapb.DataType_FLOAT, - }, - { - FieldID: 106, - Name: "field_double", - IsPrimaryKey: false, - Description: "description_8", - DataType: schemapb.DataType_DOUBLE, - }, - { - FieldID: 107, - Name: "field_string", - IsPrimaryKey: false, - Description: "description_9", - DataType: schemapb.DataType_STRING, - }, - { - FieldID: 108, - Name: "field_binary_vector", - IsPrimaryKey: false, - Description: "description_10", - DataType: schemapb.DataType_VECTOR_BINARY, - }, - { - FieldID: 109, - Name: "field_float_vector", - IsPrimaryKey: false, - Description: "description_11", - DataType: schemapb.DataType_VECTOR_FLOAT, - }, + Schema := &etcdpb.CollectionMeta{ + ID: 1, + CreateTime: 1, + SegmentIDs: []int64{0, 1}, + PartitionTags: []string{"partition_0", "partition_1"}, + Schema: &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 0, + Name: "row_id", + IsPrimaryKey: false, + Description: "row_id", + DataType: schemapb.DataType_INT64, + }, + { + FieldID: 1, + Name: "Ts", + IsPrimaryKey: false, + Description: "Ts", + DataType: schemapb.DataType_INT64, + }, + { + FieldID: 100, + Name: "field_bool", + IsPrimaryKey: false, + Description: "description_2", + DataType: schemapb.DataType_BOOL, + }, + { + FieldID: 101, + Name: "field_int8", + IsPrimaryKey: false, + Description: "description_3", + DataType: schemapb.DataType_INT8, + }, + { + FieldID: 102, + Name: "field_int16", + IsPrimaryKey: false, + Description: "description_4", + DataType: schemapb.DataType_INT16, + }, + { + FieldID: 103, + Name: "field_int32", + IsPrimaryKey: false, + Description: "description_5", + DataType: schemapb.DataType_INT32, + }, + { + FieldID: 104, + Name: "field_int64", + IsPrimaryKey: false, + Description: "description_6", + DataType: schemapb.DataType_INT64, + }, + { + FieldID: 105, + Name: "field_float", + IsPrimaryKey: false, + Description: "description_7", + DataType: schemapb.DataType_FLOAT, + }, + { + FieldID: 106, + Name: "field_double", + IsPrimaryKey: false, + Description: "description_8", + DataType: schemapb.DataType_DOUBLE, + }, + { + FieldID: 107, + Name: "field_string", + IsPrimaryKey: false, + Description: "description_9", + DataType: schemapb.DataType_STRING, + }, + { + FieldID: 108, + Name: "field_binary_vector", + IsPrimaryKey: false, + Description: "description_10", + DataType: schemapb.DataType_VECTOR_BINARY, + }, + { + FieldID: 109, + Name: "field_float_vector", + IsPrimaryKey: false, + Description: "description_11", + DataType: schemapb.DataType_VECTOR_FLOAT, }, }, }, } - insertCodec := &InsertCodec{ - base, - make([]func() error, 0), - } + insertCodec := NewInsertCodec(Schema) insertDataFirst := &InsertData{ Data: map[int64]FieldData{ 0: &Int64FieldData{ @@ -268,58 +260,7 @@ func TestInsertCodec(t *testing.T) { assert.Nil(t, insertCodec.Close()) } func TestDDCodec(t *testing.T) { - base := Base{ - Version: 1, - CommitID: 1, - TenantID: 1, - Schema: &etcdpb.CollectionMeta{ - ID: 1, - CreateTime: 1, - SegmentIDs: []int64{0, 1}, - PartitionTags: []string{"partition_0", "partition_1"}, - Schema: &schemapb.CollectionSchema{ - Name: "schema", - Description: "schema", - AutoID: true, - Fields: []*schemapb.FieldSchema{ - { - Name: "field_1", - IsPrimaryKey: false, - Description: "description_1", - DataType: schemapb.DataType_INT32, - }, - { - Name: "field_2", - IsPrimaryKey: false, - Description: "description_1", - DataType: schemapb.DataType_INT64, - }, - { - Name: "field_3", - IsPrimaryKey: false, - Description: "description_1", - DataType: schemapb.DataType_STRING, - }, - { - Name: "field_3", - IsPrimaryKey: false, - Description: "description_1", - DataType: schemapb.DataType_STRING, - }, - { - Name: "field_3", - IsPrimaryKey: false, - Description: "description_1", - DataType: schemapb.DataType_STRING, - }, - }, - }, - }, - } - dataDefinitionCodec := &DataDefinitionCodec{ - base, - make([]func() error, 0), - } + dataDefinitionCodec := NewDataDefinitionCodec(int64(1)) ts := []Timestamp{ 1, 2, @@ -351,9 +292,7 @@ func TestDDCodec(t *testing.T) { } func TestIndexCodec(t *testing.T) { - indexCodec := &IndexCodec{ - Base{}, - } + indexCodec := NewIndexCodec() blobs := []*Blob{ { "12345", diff --git a/internal/util/typeutil/hash.go b/internal/util/typeutil/hash.go index 0973274c2f..4014d3803b 100644 --- a/internal/util/typeutil/hash.go +++ b/internal/util/typeutil/hash.go @@ -24,3 +24,12 @@ func Hash32Uint64(v uint64) (uint32, error) { func Hash32Int64(v int64) (uint32, error) { return Hash32Uint64(uint64(v)) } + +func Hash32String(s string) (int64, error) { + b := []byte(s) + v, err := Hash32Bytes(b) + if err != nil { + return 0, err + } + return int64(v), nil +} diff --git a/internal/util/typeutil/hash_test.go b/internal/util/typeutil/hash_test.go index 035659131c..ef177b10ae 100644 --- a/internal/util/typeutil/hash_test.go +++ b/internal/util/typeutil/hash_test.go @@ -1,6 +1,7 @@ package typeutil import ( + "log" "testing" "unsafe" @@ -29,3 +30,20 @@ func TestHash32_Uint64(t *testing.T) { t.Log(h2) assert.Equal(t, h, h2) } + +func TestHash32_String(t *testing.T) { + var u string = "ok" + h, err := Hash32String(u) + assert.Nil(t, err) + + t.Log(h) + log.Println(h) + + b := []byte(u) + h2, err := Hash32Bytes(b) + assert.Nil(t, err) + log.Println(h2) + + assert.Equal(t, uint32(h), h2) + +} diff --git a/internal/writenode/data_sync_service.go b/internal/writenode/data_sync_service.go index ad156d46ec..6e3a07265c 100644 --- a/internal/writenode/data_sync_service.go +++ b/internal/writenode/data_sync_service.go @@ -41,7 +41,7 @@ func (dsService *dataSyncService) initNodes() { var ddNode Node = newDDNode() var filterDmNode Node = newFilteredDmNode() - var insertBufferNode Node = newInsertBufferNode() + var insertBufferNode Node = newInsertBufferNode(dsService.ctx) dsService.fg.AddNode(&dmStreamNode) dsService.fg.AddNode(&ddStreamNode) diff --git a/internal/writenode/data_sync_service_test.go b/internal/writenode/data_sync_service_test.go index e36ca0315b..13bb6513c5 100644 --- a/internal/writenode/data_sync_service_test.go +++ b/internal/writenode/data_sync_service_test.go @@ -57,7 +57,8 @@ func TestDataSyncService_Start(t *testing.T) { } // Binary vector - var bvector = [2]byte{0, 255} + // Dimension of binary vector is 32 + var bvector = [4]byte{255, 255, 255, 0} for _, ele := range bvector { bs := make([]byte, 4) binary.LittleEndian.PutUint32(bs, uint32(ele)) @@ -66,7 +67,7 @@ func TestDataSyncService_Start(t *testing.T) { // Bool bb := make([]byte, 4) - var fieldBool = false + var fieldBool = true var fieldBoolInt uint32 if fieldBool { fieldBoolInt = 1 @@ -256,7 +257,7 @@ func newMeta() { TypeParams: []*commonpb.KeyValuePair{ { Key: "dim", - Value: "8", + Value: "32", }, { Key: "col1_f2_tk2", diff --git a/internal/writenode/flow_graph_insert_buffer_node.go b/internal/writenode/flow_graph_insert_buffer_node.go index f058b12298..91fdca6ccb 100644 --- a/internal/writenode/flow_graph_insert_buffer_node.go +++ b/internal/writenode/flow_graph_insert_buffer_node.go @@ -1,6 +1,7 @@ package writenode import ( + "context" "encoding/binary" "log" "math" @@ -9,10 +10,16 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/kv" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/storage" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.etcd.io/etcd/clientv3" ) @@ -29,11 +36,14 @@ type ( BaseNode kvClient *etcdkv.EtcdKV insertBuffer *insertBuffer + minIOKV kv.Base + minioPrifex string + idAllocator *allocator.IDAllocator } insertBuffer struct { insertData map[UniqueID]*InsertData // SegmentID to InsertData - maxSize int // GOOSE TODO set from write_node.yaml + maxSize int } ) @@ -75,23 +85,23 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { log.Println("=========== insert buffer Node Operating") if len(in) != 1 { - log.Println("Invalid operate message input in insertBuffertNode, input length = ", len(in)) + log.Println("Error: Invalid operate message input in insertBuffertNode, input length = ", len(in)) // TODO: add error handling } iMsg, ok := (*in[0]).(*insertMsg) if !ok { - log.Println("type assertion failed for insertMsg") + log.Println("Error: type assertion failed for insertMsg") // TODO: add error handling } for _, task := range iMsg.insertMessages { if len(task.RowIDs) != len(task.Timestamps) || len(task.RowIDs) != len(task.RowData) { - log.Println("Error, misaligned messages detected") + log.Println("Error: misaligned messages detected") continue } // iMsg is insertMsg - // 1. iMsg -> binLogs -> buffer + // 1. iMsg -> buffer for _, msg := range iMsg.insertMessages { currentSegID := msg.GetSegmentID() @@ -102,10 +112,21 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { } } - idata.Data[1] = msg.BeginTimestamp + // Timestamps + _, ok = idata.Data[1].(*storage.Int64FieldData) + if !ok { + idata.Data[1] = &storage.Int64FieldData{ + Data: []int64{}, + NumRows: 0, + } + } + tsData := idata.Data[1].(*storage.Int64FieldData) + for _, ts := range msg.Timestamps { + tsData.Data = append(tsData.Data, int64(ts)) + } + tsData.NumRows += len(msg.Timestamps) // 1.1 Get CollectionMeta from etcd - // GOOSE TODO get meta from metaTable segMeta := etcdpb.SegmentMeta{} key := path.Join(SegmentPrefix, strconv.FormatInt(currentSegID, 10)) @@ -145,23 +166,32 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - data := make([]float32, 0) + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.FloatVectorFieldData{ + NumRows: 0, + Data: make([]float32, 0), + Dim: dim, + } + } + + fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData) + for _, blob := range msg.RowData { - for j := pos; j < dim; j++ { - v := binary.LittleEndian.Uint32(blob.GetValue()[j*4:]) - data = append(data, math.Float32frombits(v)) + for j := 0; j < dim; j++ { + v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) + fieldData.Data = append(fieldData.Data, math.Float32frombits(v)) pos++ } } - idata.Data[field.FieldID] = storage.FloatVectorFieldData{ - NumRows: len(msg.RowIDs), - Data: data, - Dim: dim, - } + fieldData.NumRows += len(msg.RowIDs) + log.Println("Float vector data:", + idata.Data[field.FieldID].(*storage.FloatVectorFieldData).Data, + "NumRows:", + idata.Data[field.FieldID].(*storage.FloatVectorFieldData).NumRows, + "Dim:", + idata.Data[field.FieldID].(*storage.FloatVectorFieldData).Dim) - log.Println("aaaaaaaa", idata) case schemapb.DataType_VECTOR_BINARY: - // GOOSE TODO var dim int for _, t := range field.TypeParams { if t.Key == "dim" { @@ -177,101 +207,216 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - data := make([]byte, 0) + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{ + NumRows: 0, + Data: make([]byte, 0), + Dim: dim, + } + } + fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData) + for _, blob := range msg.RowData { - for d := 0; d < dim/4; d++ { + for d := 0; d < dim/8; d++ { v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - data = append(data, byte(v)) + fieldData.Data = append(fieldData.Data, byte(v)) pos++ } } - idata.Data[field.FieldID] = storage.BinaryVectorFieldData{ - NumRows: len(data) * 8 / dim, - Data: data, - Dim: dim, - } - log.Println("aaaaaaaa", idata) + + fieldData.NumRows += len(msg.RowData) + log.Println( + "Binary vector data:", + idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).Data, + "NumRows:", + idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).NumRows, + "Dim:", + idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).Dim) case schemapb.DataType_BOOL: - data := make([]bool, 0) + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.BoolFieldData{ + NumRows: 0, + Data: make([]bool, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData) for _, blob := range msg.RowData { boolInt := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) if boolInt == 1 { - data = append(data, true) + fieldData.Data = append(fieldData.Data, true) } else { - data = append(data, false) + fieldData.Data = append(fieldData.Data, false) } pos++ } - idata.Data[field.FieldID] = data - log.Println("aaaaaaaa", idata) + + fieldData.NumRows += len(msg.RowIDs) + log.Println("Bool data:", + idata.Data[field.FieldID].(*storage.BoolFieldData).Data) case schemapb.DataType_INT8: - data := make([]int8, 0) + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int8FieldData{ + NumRows: 0, + Data: make([]int8, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData) for _, blob := range msg.RowData { v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - data = append(data, int8(v)) + fieldData.Data = append(fieldData.Data, int8(v)) pos++ } - idata.Data[field.FieldID] = data - log.Println("aaaaaaaa", idata) + fieldData.NumRows += len(msg.RowIDs) + log.Println("Int8 data:", + idata.Data[field.FieldID].(*storage.Int8FieldData).Data) + case schemapb.DataType_INT16: - data := make([]int16, 0) + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int16FieldData{ + NumRows: 0, + Data: make([]int16, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData) for _, blob := range msg.RowData { v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - data = append(data, int16(v)) + fieldData.Data = append(fieldData.Data, int16(v)) pos++ } - idata.Data[field.FieldID] = data - log.Println("aaaaaaaa", idata) + + fieldData.NumRows += len(msg.RowIDs) + log.Println("Int16 data:", + idata.Data[field.FieldID].(*storage.Int16FieldData).Data) case schemapb.DataType_INT32: - data := make([]int32, 0) + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int32FieldData{ + NumRows: 0, + Data: make([]int32, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData) for _, blob := range msg.RowData { v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - data = append(data, int32(v)) + fieldData.Data = append(fieldData.Data, int32(v)) pos++ } - idata.Data[field.FieldID] = data - log.Println("aaaaaaaa", idata) + fieldData.NumRows += len(msg.RowIDs) + log.Println("Int32 data:", + idata.Data[field.FieldID].(*storage.Int32FieldData).Data) + case schemapb.DataType_INT64: - data := make([]int64, 0) + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int64FieldData{ + NumRows: 0, + Data: make([]int64, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData) for _, blob := range msg.RowData { v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - data = append(data, int64(v)) + fieldData.Data = append(fieldData.Data, int64(v)) pos++ } - idata.Data[field.FieldID] = data - log.Println("aaaaaaaa", idata) + + fieldData.NumRows += len(msg.RowIDs) + log.Println("Int64 data:", + idata.Data[field.FieldID].(*storage.Int64FieldData).Data) + case schemapb.DataType_FLOAT: - data := make([]float32, 0) + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.FloatFieldData{ + NumRows: 0, + Data: make([]float32, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData) for _, blob := range msg.RowData { v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - data = append(data, math.Float32frombits(v)) + fieldData.Data = append(fieldData.Data, math.Float32frombits(v)) pos++ } - idata.Data[field.FieldID] = data - log.Println("aaaaaaaa", idata) + + fieldData.NumRows += len(msg.RowIDs) + log.Println("Float32 data:", + idata.Data[field.FieldID].(*storage.FloatFieldData).Data) + case schemapb.DataType_DOUBLE: - // GOOSE TODO pos - data := make([]float64, 0) + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.DoubleFieldData{ + NumRows: 0, + Data: make([]float64, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData) for _, blob := range msg.RowData { v := binary.LittleEndian.Uint64(blob.GetValue()[pos*4:]) - data = append(data, math.Float64frombits(v)) + fieldData.Data = append(fieldData.Data, math.Float64frombits(v)) pos++ } - idata.Data[field.FieldID] = data - log.Println("aaaaaaaa", idata) + + fieldData.NumRows += len(msg.RowIDs) + log.Println("Float64 data:", + idata.Data[field.FieldID].(*storage.DoubleFieldData).Data) } } // 1.3 store in buffer ibNode.insertBuffer.insertData[currentSegID] = idata - // 1.4 Send hardTimeTick msg + // 1.4 Send hardTimeTick msg, GOOSE TODO // 1.5 if full // 1.5.1 generate binlogs - // GOOSE TODO partitionTag -> partitionID - // 1.5.2 binLogs -> minIO/S3 if ibNode.insertBuffer.full(currentSegID) { - continue + // partitionTag -> partitionID + partitionTag := msg.GetPartitionTag() + partitionID, err := typeutil.Hash32String(partitionTag) + if err != nil { + log.Println("partitionTag to partitionID Wrong") + } + + inCodec := storage.NewInsertCodec(&collMeta) + + // buffer data to binlogs + binLogs, err := inCodec.Serialize(partitionID, + currentSegID, ibNode.insertBuffer.insertData[currentSegID]) + for _, v := range binLogs { + log.Println("key ", v.Key, "- value ", v.Value) + } + if err != nil { + log.Println("generate binlog wrong") + } + + // clear buffer + log.Println("=========", binLogs) + delete(ibNode.insertBuffer.insertData, currentSegID) + + // 1.5.2 binLogs -> minIO/S3 + collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10) + partitionIDStr := strconv.FormatInt(partitionID, 10) + segIDStr := strconv.FormatInt(currentSegID, 10) + keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr) + + for _, blob := range binLogs { + uid, err := ibNode.idAllocator.AllocOne() + if err != nil { + log.Println("Allocate Id failed") + // GOOSE TODO error handle + } + + key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) + err = ibNode.minIOKV.Save(key, string(blob.Value[:])) + if err != nil { + log.Println("Save to MinIO failed") + // GOOSE TODO error handle + } + } } } @@ -283,7 +428,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { return nil } -func newInsertBufferNode() *insertBufferNode { +func newInsertBufferNode(ctx context.Context) *insertBufferNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -292,8 +437,7 @@ func newInsertBufferNode() *insertBufferNode { baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) - // GOOSE TODO maxSize read from yaml - maxSize := 10 + maxSize := Params.FlushInsertBufSize iBuffer := &insertBuffer{ insertData: make(map[UniqueID]*InsertData), maxSize: maxSize, @@ -309,9 +453,28 @@ func newInsertBufferNode() *insertBufferNode { }) kvClient := etcdkv.NewEtcdKV(cli, MetaRootPath) + // MinIO + minioendPoint := Params.MinioAddress + miniioAccessKeyID := Params.MinioAccessKeyID + miniioSecretAccessKey := Params.MinioSecretAccessKey + minioUseSSL := Params.MinioUseSSL + minioBucketName := Params.MinioBucketName + + minioClient, _ := minio.New(minioendPoint, &minio.Options{ + Creds: credentials.NewStaticV4(miniioAccessKeyID, miniioSecretAccessKey, ""), + Secure: minioUseSSL, + }) + minIOKV, _ := miniokv.NewMinIOKV(ctx, minioClient, minioBucketName) + minioPrefix := Params.InsertLogRootPath + + idAllocator, _ := allocator.NewIDAllocator(ctx, Params.MasterAddress) + return &insertBufferNode{ BaseNode: baseNode, kvClient: kvClient, insertBuffer: iBuffer, + minIOKV: minIOKV, + minioPrifex: minioPrefix, + idAllocator: idAllocator, } } diff --git a/internal/writenode/param_table.go b/internal/writenode/param_table.go index 793a367b3a..375733cd07 100644 --- a/internal/writenode/param_table.go +++ b/internal/writenode/param_table.go @@ -12,6 +12,7 @@ type ParamTable struct { paramtable.BaseTable PulsarAddress string + MasterAddress string WriteNodeID UniqueID WriteNodeNum int @@ -35,8 +36,19 @@ type ParamTable struct { DefaultPartitionTag string SliceIndex int - EtcdAddress string - MetaRootPath string + EtcdAddress string + MetaRootPath string + MinioAddress string + MinioAccessKeyID string + MinioSecretAccessKey string + MinioUseSSL bool + MinioBucketName string + + FlushInsertBufSize int + FlushDdBufSize int + + InsertLogRootPath string + DdLogRootPath string } var Params ParamTable @@ -62,9 +74,12 @@ func (p *ParamTable) Init() { panic(err) } + p.initMasterAddress() p.initPulsarAddress() p.initEtcdAddress() p.initMetaRootPath() + p.initInsertLogRootPath() + p.initDdLogRootPath() p.initWriteNodeID() p.initWriteNodeNum() @@ -85,6 +100,15 @@ func (p *ParamTable) Init() { p.initDDChannelNames() p.initDDReceiveBufSize() p.initDDPulsarBufSize() + + p.initMinioAddress() + p.initMinioAccessKeyID() + p.initMinioSecretAccessKey() + p.initMinioUseSSL() + p.initMinioBucketName() + + p.initFlushInsertBufSize() + p.initFlushDdBufSize() } func (p *ParamTable) initWriteNodeID() { @@ -107,6 +131,14 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = url } +func (p *ParamTable) initMasterAddress() { + addr, err := p.Load("_MasterAddress") + if err != nil { + panic(err) + } + p.MasterAddress = addr +} + func (p *ParamTable) initInsertChannelRange() { insertChannelRange, err := p.Load("msgChannel.channelRange.insert") if err != nil { @@ -266,3 +298,83 @@ func (p *ParamTable) initMetaRootPath() { } p.MetaRootPath = rootPath + "/" + subPath } + +func (p *ParamTable) initInsertLogRootPath() { + rootPath, err := p.Load("etcd.rootPath") + if err != nil { + panic(err) + } + p.InsertLogRootPath = rootPath + "/insert_log" +} + +func (p *ParamTable) initDdLogRootPath() { + rootPath, err := p.Load("etcd.rootPath") + if err != nil { + panic(err) + } + p.DdLogRootPath = rootPath + "/data_definition_log" +} + +func (p *ParamTable) initMinioAddress() { + endpoint, err := p.Load("_MinioAddress") + if err != nil { + panic(err) + } + p.MinioAddress = endpoint +} + +func (p *ParamTable) initMinioAccessKeyID() { + keyID, err := p.Load("minio.accessKeyID") + if err != nil { + panic(err) + } + p.MinioAccessKeyID = keyID +} + +func (p *ParamTable) initMinioSecretAccessKey() { + key, err := p.Load("minio.secretAccessKey") + if err != nil { + panic(err) + } + p.MinioSecretAccessKey = key +} + +func (p *ParamTable) initMinioUseSSL() { + usessl, err := p.Load("minio.useSSL") + if err != nil { + panic(err) + } + p.MinioUseSSL, _ = strconv.ParseBool(usessl) +} + +func (p *ParamTable) initMinioBucketName() { + bucketName, err := p.Load("minio.bucketName") + if err != nil { + panic(err) + } + p.MinioBucketName = bucketName +} + +func (p *ParamTable) initFlushInsertBufSize() { + sizeStr, err := p.Load("writenode.flush.insertBufSize") + if err != nil { + panic(err) + } + + p.FlushInsertBufSize, err = strconv.Atoi(sizeStr) + if err != nil { + panic(err) + } +} + +func (p *ParamTable) initFlushDdBufSize() { + sizeStr, err := p.Load("writenode.flush.ddBufSize") + if err != nil { + panic(err) + } + + p.FlushDdBufSize, err = strconv.Atoi(sizeStr) + if err != nil { + panic(err) + } +} diff --git a/internal/writenode/param_table_test.go b/internal/writenode/param_table_test.go index d29c1442b0..ed9405ccb6 100644 --- a/internal/writenode/param_table_test.go +++ b/internal/writenode/param_table_test.go @@ -66,4 +66,44 @@ func TestParamTable_WriteNode(t *testing.T) { name := Params.WriteNodeTimeTickChannelName assert.Equal(t, name, "writeNodeTimeTick") }) + + t.Run("Test minioAccessKeyID", func(t *testing.T) { + id := Params.MinioAccessKeyID + assert.Equal(t, id, "minioadmin") + }) + + t.Run("Test minioSecretAccessKey", func(t *testing.T) { + id := Params.MinioSecretAccessKey + assert.Equal(t, id, "minioadmin") + }) + + t.Run("Test MinioUseSSL", func(t *testing.T) { + id := Params.MinioUseSSL + assert.Equal(t, id, false) + }) + + t.Run("Test MinioBucketName", func(t *testing.T) { + name := Params.MinioBucketName + assert.Equal(t, name, "A-bucket") + }) + + t.Run("Test FlushInsertBufSize", func(t *testing.T) { + name := Params.FlushInsertBufSize + assert.Equal(t, name, 20) + }) + + t.Run("Test FlushDdBufSize", func(t *testing.T) { + name := Params.FlushDdBufSize + assert.Equal(t, name, 20) + }) + + t.Run("Test InsertLogRootPath", func(t *testing.T) { + name := Params.InsertLogRootPath + assert.Equal(t, name, "by-dev/insert_log") + }) + + t.Run("Test DdLogRootPath", func(t *testing.T) { + name := Params.DdLogRootPath + assert.Equal(t, name, "by-dev/data_definition_log") + }) }