From db94d7771f04eb2b28298049befaa2c3c8b47f83 Mon Sep 17 00:00:00 2001 From: godchen Date: Sat, 24 Jul 2021 09:25:22 +0800 Subject: [PATCH] Read vector from disk (#6707) * Read vector from disk Signed-off-by: godchen * go fmt Signed-off-by: godchen * fix git action error Signed-off-by: godchen * fix error Signed-off-by: godchen * fix test error Signed-off-by: godchen * fix action error Signed-off-by: godchen * fix caculate error Signed-off-by: godchen * change var name Signed-off-by: godchen * remove unused method Signed-off-by: godchen * remove unused method Signed-off-by: godchen * fix error Signed-off-by: godchen * fix len error Signed-off-by: godchen * remove unused code Signed-off-by: godchen * change bytes to float method Signed-off-by: godchen * change float to bytes method Signed-off-by: godchen * fix action error Signed-off-by: godchen --- .../datanode/flow_graph_insert_buffer_node.go | 78 +++++++----- .../flow_graph_insert_buffer_node_test.go | 10 +- internal/datanode/param_table.go | 4 +- internal/indexnode/indexnode_test.go | 12 +- internal/querycoord/mock_test.go | 8 +- internal/querynode/load_service_test.go | 8 +- internal/querynode/query_collection.go | 114 +++++++++++------ internal/querynode/query_service.go | 34 +++++- internal/querynode/query_service_test.go | 3 +- internal/querynode/segment.go | 47 ++----- internal/querynode/segment_loader.go | 54 ++------ internal/storage/data_codec.go | 115 +++--------------- internal/storage/data_codec_test.go | 78 ++++++------ internal/storage/data_sorter_test.go | 24 ++-- internal/storage/local_chunk_manager.go | 18 +-- internal/storage/minio_chunk_manager.go | 4 +- internal/storage/print_binlog_test.go | 48 ++++---- internal/storage/types.go | 4 +- internal/storage/vector_chunk_manager.go | 68 ++++------- internal/storage/vector_chunk_manager_test.go | 25 ++-- internal/util/typeutil/convension.go | 17 +++ 21 files changed, 365 insertions(+), 408 deletions(-) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 46e9a88c72..5fe9f7451a 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -83,10 +83,10 @@ type segmentFlushUnit struct { type insertBuffer struct { insertData map[UniqueID]*InsertData // SegmentID to InsertData - maxSize int32 + maxSize int64 } -func (ib *insertBuffer) size(segmentID UniqueID) int32 { +func (ib *insertBuffer) size(segmentID UniqueID) int64 { if ib.insertData == nil || len(ib.insertData) <= 0 { return 0 } @@ -95,16 +95,32 @@ func (ib *insertBuffer) size(segmentID UniqueID) int32 { return 0 } - var maxSize int32 = 0 + var maxSize int64 = 0 for _, data := range idata.Data { fdata, ok := data.(*storage.FloatVectorFieldData) - if ok && int32(fdata.NumRows) > maxSize { - maxSize = int32(fdata.NumRows) + if ok { + totalNumRows := int64(0) + if fdata.NumRows != nil { + for _, numRow := range fdata.NumRows { + totalNumRows += numRow + } + } + if totalNumRows > maxSize { + maxSize = totalNumRows + } } bdata, ok := data.(*storage.BinaryVectorFieldData) - if ok && int32(bdata.NumRows) > maxSize { - maxSize = int32(bdata.NumRows) + if ok { + totalNumRows := int64(0) + if bdata.NumRows != nil { + for _, numRow := range bdata.NumRows { + totalNumRows += numRow + } + } + if totalNumRows > maxSize { + maxSize = totalNumRows + } } } @@ -112,7 +128,7 @@ func (ib *insertBuffer) size(segmentID UniqueID) int32 { } func (ib *insertBuffer) full(segmentID UniqueID) bool { - log.Debug("Segment size", zap.Any("segment", segmentID), zap.Int32("size", ib.size(segmentID)), zap.Int32("maxsize", ib.maxSize)) + log.Debug("Segment size", zap.Any("segment", segmentID), zap.Int64("size", ib.size(segmentID)), zap.Int64("maxsize", ib.maxSize)) return ib.size(segmentID) >= ib.maxSize } @@ -247,7 +263,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.FloatVectorFieldData{ - NumRows: 0, + NumRows: make([]int64, 0, 1), Data: make([]float32, 0), Dim: dim, } @@ -269,7 +285,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } } pos += offset - fieldData.NumRows += len(msg.RowIDs) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_BinaryVector: var dim int @@ -289,7 +305,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{ - NumRows: 0, + NumRows: make([]int64, 0, 1), Data: make([]byte, 0), Dim: dim, } @@ -303,12 +319,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { offset = len(bv) } pos += offset - fieldData.NumRows += len(msg.RowData) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Bool: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.BoolFieldData{ - NumRows: 0, + NumRows: make([]int64, 0, 1), Data: make([]bool, 0), } } @@ -324,12 +340,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows += len(msg.RowIDs) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Int8: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.Int8FieldData{ - NumRows: 0, + NumRows: make([]int64, 0, 1), Data: make([]int8, 0), } } @@ -344,12 +360,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { fieldData.Data = append(fieldData.Data, v) } pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows += len(msg.RowIDs) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Int16: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.Int16FieldData{ - NumRows: 0, + NumRows: make([]int64, 0, 1), Data: make([]int16, 0), } } @@ -364,12 +380,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { fieldData.Data = append(fieldData.Data, v) } pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows += len(msg.RowIDs) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Int32: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.Int32FieldData{ - NumRows: 0, + NumRows: make([]int64, 0, 1), Data: make([]int32, 0), } } @@ -379,17 +395,17 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for _, blob := range msg.RowData { buf := bytes.NewReader(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Error("binary.Read int32 wrong", zap.Error(err)) + log.Error("binary.Read int64 wrong", zap.Error(err)) } fieldData.Data = append(fieldData.Data, v) } pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows += len(msg.RowIDs) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Int64: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.Int64FieldData{ - NumRows: 0, + NumRows: make([]int64, 0, 1), Data: make([]int64, 0), } } @@ -398,12 +414,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { switch field.FieldID { case 0: // rowIDs fieldData.Data = append(fieldData.Data, msg.RowIDs...) - fieldData.NumRows += len(msg.RowIDs) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case 1: // Timestamps for _, ts := range msg.Timestamps { fieldData.Data = append(fieldData.Data, int64(ts)) } - fieldData.NumRows += len(msg.Timestamps) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) default: var v int64 for _, blob := range msg.RowData { @@ -414,13 +430,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { fieldData.Data = append(fieldData.Data, v) } pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows += len(msg.RowIDs) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) } case schemapb.DataType_Float: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.FloatFieldData{ - NumRows: 0, + NumRows: make([]int64, 0, 1), Data: make([]float32, 0), } } @@ -435,12 +451,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { fieldData.Data = append(fieldData.Data, v) } pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows += len(msg.RowIDs) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Double: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.DoubleFieldData{ - NumRows: 0, + NumRows: make([]int64, 0, 1), Data: make([]float64, 0), } } @@ -456,7 +472,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows += len(msg.RowIDs) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) } } @@ -475,7 +491,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { log.Debug("......") break } - log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int32("buffer size", ibNode.insertBuffer.size(k))) + log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int64("buffer size", ibNode.insertBuffer.size(k))) stopSign++ } } @@ -486,7 +502,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // If full, auto flush if ibNode.insertBuffer.full(segToFlush) { log.Debug(". Insert Buffer full, auto flushing ", - zap.Int32("num of rows", ibNode.insertBuffer.size(segToFlush))) + zap.Int64("num of rows", ibNode.insertBuffer.size(segToFlush))) collMeta, err := ibNode.getCollMetabySegID(segToFlush, iMsg.timeRange.timestampMax) if err != nil { diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 0c43ca8203..683ee318d5 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -151,15 +151,15 @@ func TestFlushSegment(t *testing.T) { Data: make(map[storage.FieldID]storage.FieldData), } insertData.Data[0] = &storage.Int64FieldData{ - NumRows: 10, + NumRows: []int64{10}, Data: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, } insertData.Data[1] = &storage.Int64FieldData{ - NumRows: 10, + NumRows: []int64{10}, Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, } insertData.Data[107] = &storage.FloatFieldData{ - NumRows: 10, + NumRows: []int64{10}, Data: make([]float32, 10), } flushMap.Store(segmentID, insertData) @@ -374,9 +374,9 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { if i == 1 { assert.Equal(t, test.expectedSegID, flushUnit[0].segID) - assert.Equal(t, int32(0), iBNode.insertBuffer.size(UniqueID(i+1))) + assert.Equal(t, int64(0), iBNode.insertBuffer.size(UniqueID(i+1))) } else { - assert.Equal(t, int32(1), iBNode.insertBuffer.size(UniqueID(i+1))) + assert.Equal(t, int64(1), iBNode.insertBuffer.size(UniqueID(i+1))) } } diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index caf296c7ff..22b2f817eb 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -30,7 +30,7 @@ type ParamTable struct { Port int FlowGraphMaxQueueLength int32 FlowGraphMaxParallelism int32 - FlushInsertBufferSize int32 + FlushInsertBufferSize int64 InsertBinlogRootPath string StatsBinlogRootPath string Log log.Config @@ -124,7 +124,7 @@ func (p *ParamTable) initFlowGraphMaxParallelism() { // ---- flush configs ---- func (p *ParamTable) initFlushInsertBufferSize() { - p.FlushInsertBufferSize = p.ParseInt32("datanode.flush.insertBufSize") + p.FlushInsertBufferSize = p.ParseInt64("datanode.flush.insertBufSize") } func (p *ParamTable) initInsertBinlogRootPath() { diff --git a/internal/indexnode/indexnode_test.go b/internal/indexnode/indexnode_test.go index a100188e4c..92185b2e9f 100644 --- a/internal/indexnode/indexnode_test.go +++ b/internal/indexnode/indexnode_test.go @@ -86,11 +86,11 @@ func TestIndexNode(t *testing.T) { tsData[i] = int64(i + 100) } data[tsFieldID] = &storage.Int64FieldData{ - NumRows: nb, + NumRows: []int64{nb}, Data: tsData, } data[floatVectorFieldID] = &storage.FloatVectorFieldData{ - NumRows: nb, + NumRows: []int64{nb}, Data: generateFloatVectors(), Dim: dim, } @@ -201,11 +201,11 @@ func TestIndexNode(t *testing.T) { tsData[i] = int64(i + 100) } data[tsFieldID] = &storage.Int64FieldData{ - NumRows: nb, + NumRows: []int64{nb}, Data: tsData, } data[binaryVectorFieldID] = &storage.BinaryVectorFieldData{ - NumRows: nb, + NumRows: []int64{nb}, Data: generateBinaryVectors(), Dim: dim, } @@ -313,11 +313,11 @@ func TestIndexNode(t *testing.T) { tsData[i] = int64(i + 100) } data[tsFieldID] = &storage.Int64FieldData{ - NumRows: nb, + NumRows: []int64{nb}, Data: tsData, } data[floatVectorFieldID] = &storage.FloatVectorFieldData{ - NumRows: nb, + NumRows: []int64{nb}, Data: generateFloatVectors(), Dim: dim, } diff --git a/internal/querycoord/mock_test.go b/internal/querycoord/mock_test.go index 10b333e611..7db7a22e38 100644 --- a/internal/querycoord/mock_test.go +++ b/internal/querycoord/mock_test.go @@ -146,19 +146,19 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID insertData := &storage.InsertData{ Data: map[int64]storage.FieldData{ 0: &storage.Int64FieldData{ - NumRows: msgLength, + NumRows: []int64{msgLength}, Data: idData, }, 1: &storage.Int64FieldData{ - NumRows: msgLength, + NumRows: []int64{msgLength}, Data: timestamps, }, 100: &storage.Int64FieldData{ - NumRows: msgLength, + NumRows: []int64{msgLength}, Data: fieldAgeData, }, 101: &storage.FloatVectorFieldData{ - NumRows: msgLength, + NumRows: []int64{msgLength}, Data: fieldVecData, Dim: DIM, }, diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index 7f9f14ade8..806efbc6e8 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -772,20 +772,20 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID insertData := &storage.InsertData{ Data: map[int64]storage.FieldData{ 0: &storage.Int64FieldData{ - NumRows: msgLength, + NumRows: []int64{msgLength}, Data: idData, }, 1: &storage.Int64FieldData{ - NumRows: msgLength, + NumRows: []int64{msgLength}, Data: timestamps, }, 100: &storage.FloatVectorFieldData{ - NumRows: msgLength, + NumRows: []int64{msgLength}, Data: fieldVecData, Dim: DIM, }, 101: &storage.Int32FieldData{ - NumRows: msgLength, + NumRows: []int64{msgLength}, Data: fieldAgeData, }, }, diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index a7d51271ea..a4c7c54ce8 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -12,6 +12,7 @@ package querynode import ( + "bytes" "context" "encoding/binary" "fmt" @@ -27,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/schemapb" @@ -63,6 +65,8 @@ type queryCollection struct { queryMsgStream msgstream.MsgStream queryResultMsgStream msgstream.MsgStream + + vcm *storage.VectorChunkManager } type ResultEntityIds []UniqueID @@ -72,13 +76,17 @@ func newQueryCollection(releaseCtx context.Context, collectionID UniqueID, historical *historical, streaming *streaming, - factory msgstream.Factory) *queryCollection { + factory msgstream.Factory, + lcm storage.ChunkManager, + rcm storage.ChunkManager) *queryCollection { unsolvedMsg := make([]queryMsg, 0) queryStream, _ := factory.NewQueryMsgStream(releaseCtx) queryResultStream, _ := factory.NewQueryMsgStream(releaseCtx) + vcm := storage.NewVectorChunkManager(lcm, rcm) + qc := &queryCollection{ releaseCtx: releaseCtx, cancel: cancel, @@ -93,6 +101,8 @@ func newQueryCollection(releaseCtx context.Context, queryMsgStream: queryStream, queryResultMsgStream: queryResultStream, + + vcm: vcm, } qc.register() @@ -1059,55 +1069,79 @@ func (q *queryCollection) search(msg queryMsg) error { } func (q *queryCollection) fillVectorFieldsData(segment *Segment, result *segcorepb.RetrieveResults) error { + collection, _ := q.streaming.replica.getCollectionByID(q.collectionID) + schema := &etcdpb.CollectionMeta{ + ID: q.collectionID, + Schema: collection.schema} + schemaHelper, err := typeutil.CreateSchemaHelper(collection.schema) + if err != nil { + return err + } for _, resultFieldData := range result.FieldsData { vecFieldInfo, err := segment.getVectorFieldInfo(resultFieldData.FieldId) if err != nil { continue } - - // if vector raw data is in memory, result should has been filled in valid vector raw data - if vecFieldInfo.getRawDataInMemory() { - continue - } - - // load vector field data - if err = q.historical.loader.loadSegmentVectorFieldData(vecFieldInfo); err != nil { - return err - } + log.Debug("FillVectorFieldData", zap.Any("fieldID", resultFieldData.FieldId)) for i, offset := range result.Offset { - var success bool - for _, path := range vecFieldInfo.fieldBinlog.Binlogs { - rawData := vecFieldInfo.getRawData(path) - - var numRows, dim int64 - switch fieldData := rawData.(type) { - case *storage.FloatVectorFieldData: - numRows = int64(fieldData.NumRows) - dim = int64(fieldData.Dim) - if offset < numRows { - copy(resultFieldData.GetVectors().GetFloatVector().Data[int64(i)*dim:int64(i+1)*dim], fieldData.Data[offset*dim:(offset+1)*dim]) - success = true - } else { - offset -= numRows - } - case *storage.BinaryVectorFieldData: - numRows = int64(fieldData.NumRows) - dim = int64(fieldData.Dim) - if offset < numRows { - x := resultFieldData.GetVectors().GetData().(*schemapb.VectorField_BinaryVector) - copy(x.BinaryVector[int64(i)*dim/8:int64(i+1)*dim/8], fieldData.Data[offset*dim/8:(offset+1)*dim/8]) - success = true - } else { - offset -= numRows - } - default: - return fmt.Errorf("unexpected field data type") - } - if success { + var vecPath string + for index, idBinlogRowSize := range segment.idBinlogRowSizes { + if offset < idBinlogRowSize { + vecPath = vecFieldInfo.fieldBinlog.Binlogs[index] break + } else { + offset -= idBinlogRowSize } } + log.Debug("FillVectorFieldData", zap.Any("path", vecPath)) + err := q.vcm.DownloadVectorFile(vecPath, schema) + if err != nil { + return err + } + + dim := resultFieldData.GetVectors().GetDim() + log.Debug("FillVectorFieldData", zap.Any("dim", dim)) + schema, err := schemaHelper.GetFieldFromID(resultFieldData.FieldId) + if err != nil { + return err + } + dataType := schema.DataType + log.Debug("FillVectorFieldData", zap.Any("datatype", dataType)) + + switch dataType { + case schemapb.DataType_BinaryVector: + rowBytes := dim / 8 + x := resultFieldData.GetVectors().GetData().(*schemapb.VectorField_BinaryVector) + content := make([]byte, rowBytes) + _, err := q.vcm.ReadAt(vecPath, content, offset*rowBytes) + if err != nil { + return err + } + log.Debug("FillVectorFieldData", zap.Any("binaryVectorResult", content)) + + resultLen := dim / 8 + copy(x.BinaryVector[i*int(resultLen):(i+1)*int(resultLen)], content) + case schemapb.DataType_FloatVector: + x := resultFieldData.GetVectors().GetData().(*schemapb.VectorField_FloatVector) + rowBytes := dim * 4 + content := make([]byte, rowBytes) + _, err := q.vcm.ReadAt(vecPath, content, offset*rowBytes) + if err != nil { + return err + } + floatResult := make([]float32, dim) + buf := bytes.NewReader(content) + err = binary.Read(buf, binary.LittleEndian, &floatResult) + if err != nil { + return err + } + log.Debug("FillVectorFieldData", zap.Any("floatVectorResult", floatResult)) + + resultLen := dim + copy(x.FloatVector.Data[i*int(resultLen):(i+1)*int(resultLen)], floatResult) + } + } } return nil diff --git a/internal/querynode/query_service.go b/internal/querynode/query_service.go index 6b5a56e115..e5c8388832 100644 --- a/internal/querynode/query_service.go +++ b/internal/querynode/query_service.go @@ -17,8 +17,10 @@ import ( "go.uber.org/zap" + miniokv "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/storage" ) type queryService struct { @@ -31,6 +33,9 @@ type queryService struct { queryCollections map[UniqueID]*queryCollection factory msgstream.Factory + + lcm storage.ChunkManager + rcm storage.ChunkManager } func newQueryService(ctx context.Context, @@ -39,6 +44,28 @@ func newQueryService(ctx context.Context, factory msgstream.Factory) *queryService { queryServiceCtx, queryServiceCancel := context.WithCancel(ctx) + + path, err := Params.Load("storage.path") + if err != nil { + panic(err) + } + lcm := storage.NewLocalChunkManager(path) + + option := &miniokv.Option{ + Address: Params.MinioEndPoint, + AccessKeyID: Params.MinioAccessKeyID, + SecretAccessKeyID: Params.MinioSecretAccessKey, + UseSSL: Params.MinioUseSSLStr, + CreateBucket: true, + BucketName: Params.MinioBucketName, + } + + client, err := miniokv.NewMinIOKV(ctx, option) + if err != nil { + panic(err) + } + rcm := storage.NewMinioChunkManager(client) + return &queryService{ ctx: queryServiceCtx, cancel: queryServiceCancel, @@ -49,6 +76,9 @@ func newQueryService(ctx context.Context, queryCollections: make(map[UniqueID]*queryCollection), factory: factory, + + lcm: lcm, + rcm: rcm, } } @@ -73,7 +103,9 @@ func (q *queryService) addQueryCollection(collectionID UniqueID) { collectionID, q.historical, q.streaming, - q.factory) + q.factory, + q.lcm, + q.rcm) q.queryCollections[collectionID] = qc } diff --git a/internal/querynode/query_service_test.go b/internal/querynode/query_service_test.go index bc9b28ec0f..86bf931a1f 100644 --- a/internal/querynode/query_service_test.go +++ b/internal/querynode/query_service_test.go @@ -145,7 +145,6 @@ func TestSearch_Search(t *testing.T) { node.historical, node.streaming, msFactory) - node.queryService.addQueryCollection(collectionID) // load segment err = node.historical.replica.addSegment(segmentID, defaultPartitionID, collectionID, "", segmentTypeSealed, true) @@ -155,6 +154,8 @@ func TestSearch_Search(t *testing.T) { err = loadFields(segment, DIM, N) assert.NoError(t, err) + node.queryService.addQueryCollection(collectionID) + err = sendSearchRequest(node.queryNodeLoopCtx, DIM) assert.NoError(t, err) diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 267a0964b3..0af77891fe 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -36,7 +36,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/segcorepb" - "github.com/milvus-io/milvus/internal/storage" ) type segmentType int32 @@ -49,47 +48,15 @@ const ( ) type VectorFieldInfo struct { - mu sync.RWMutex - fieldBinlog *datapb.FieldBinlog - rawDataInMemory bool - rawData map[string]storage.FieldData // map[binlogPath]FieldData + fieldBinlog *datapb.FieldBinlog } func newVectorFieldInfo(fieldBinlog *datapb.FieldBinlog) *VectorFieldInfo { return &VectorFieldInfo{ - fieldBinlog: fieldBinlog, - rawDataInMemory: false, - rawData: make(map[string]storage.FieldData), + fieldBinlog: fieldBinlog, } } -func (v *VectorFieldInfo) setRawData(binlogPath string, data storage.FieldData) { - v.mu.Lock() - defer v.mu.Unlock() - v.rawData[binlogPath] = data -} - -func (v *VectorFieldInfo) getRawData(binlogPath string) storage.FieldData { - v.mu.Lock() - defer v.mu.Unlock() - if data, ok := v.rawData[binlogPath]; ok { - return data - } - return nil -} - -func (v *VectorFieldInfo) setRawDataInMemory(flag bool) { - v.mu.Lock() - defer v.mu.Unlock() - v.rawDataInMemory = flag -} - -func (v *VectorFieldInfo) getRawDataInMemory() bool { - v.mu.Lock() - defer v.mu.Unlock() - return v.rawDataInMemory -} - //-------------------------------------------------------------------------------------- type Segment struct { segmentPtr C.CSegmentInterface @@ -116,6 +83,8 @@ type Segment struct { paramMutex sync.RWMutex // guards index indexInfos map[int64]*indexInfo + idBinlogRowSizes []int64 + vectorFieldMutex sync.RWMutex // guards vectorFieldInfos vectorFieldInfos map[UniqueID]*VectorFieldInfo } @@ -137,6 +106,14 @@ func (s *Segment) getEnableIndex() bool { return s.enableIndex } +func (s *Segment) setIDBinlogRowSizes(sizes []int64) { + s.idBinlogRowSizes = sizes +} + +func (s *Segment) getIDBinlogRowSizes() []int64 { + return s.idBinlogRowSizes +} + func (s *Segment) setRecentlyModified(modify bool) { s.rmMutex.Lock() defer s.rmMutex.Unlock() diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index b8d6206bae..cbaa95c07b 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" @@ -242,10 +243,6 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog } blobs = append(blobs, blob) } - // mark the flag that vector raw data will be loaded into memory - if vecFieldInfo, err := segment.getVectorFieldInfo(fb.FieldID); err == nil { - vecFieldInfo.setRawDataInMemory(true) - } } _, _, insertData, err := iCodec.Deserialize(blobs) @@ -255,7 +252,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog } for fieldID, value := range insertData.Data { - var numRows int + var numRows []int64 var data interface{} switch fieldData := value.(type) { case *storage.BoolFieldData: @@ -291,7 +288,14 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog default: return errors.New("unexpected field data type") } - err = segment.segmentLoadFieldData(fieldID, numRows, data) + if fieldID == rootcoord.TimeStampField { + segment.setIDBinlogRowSizes(numRows) + } + totalNumRows := int64(0) + for _, numRow := range numRows { + totalNumRows += numRow + } + err = segment.segmentLoadFieldData(fieldID, int(totalNumRows), data) if err != nil { // TODO: return or continue? return err @@ -301,44 +305,6 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog return nil } -func (loader *segmentLoader) loadSegmentVectorFieldData(info *VectorFieldInfo) error { - iCodec := storage.InsertCodec{} - defer func() { - err := iCodec.Close() - if err != nil { - log.Error(err.Error()) - } - }() - for _, path := range info.fieldBinlog.Binlogs { - if data := info.getRawData(path); data != nil { - continue - } - - log.Debug("load vector raw data", zap.String("path", path)) - - binLog, err := loader.minioKV.Load(path) - if err != nil { - return err - } - - blob := &storage.Blob{ - Key: path, - Value: []byte(binLog), - } - - insertFieldData, err := iCodec.DeserializeOneVectorBinlog(blob) - if err != nil { - log.Error(err.Error()) - return err - } - - // save raw data into segment.vectorFieldInfo - info.setRawData(path, insertFieldData.Data) - } - - return nil -} - func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader { option := &minioKV.Option{ Address: Params.MinioEndPoint, diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 86acd5f822..ca0f07aa86 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -72,44 +72,44 @@ func (b Blob) GetValue() []byte { type FieldData interface{} type BoolFieldData struct { - NumRows int + NumRows []int64 Data []bool } type Int8FieldData struct { - NumRows int + NumRows []int64 Data []int8 } type Int16FieldData struct { - NumRows int + NumRows []int64 Data []int16 } type Int32FieldData struct { - NumRows int + NumRows []int64 Data []int32 } type Int64FieldData struct { - NumRows int + NumRows []int64 Data []int64 } type FloatFieldData struct { - NumRows int + NumRows []int64 Data []float32 } type DoubleFieldData struct { - NumRows int + NumRows []int64 Data []float64 } type StringFieldData struct { - NumRows int + NumRows []int64 Data []string } type BinaryVectorFieldData struct { - NumRows int + NumRows []int64 Data []byte Dim int } type FloatVectorFieldData struct { - NumRows int + NumRows []int64 Data []float32 Dim int } @@ -134,12 +134,6 @@ type InsertData struct { Infos []BlobInfo } -type InsertFieldData struct { - ID FieldID - Data FieldData - Infos []BlobInfo -} - // Blob key example: // ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx} type InsertCodec struct { @@ -299,7 +293,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length - boolFieldData.NumRows += length + boolFieldData.NumRows = append(boolFieldData.NumRows, int64(length)) resultData.Data[fieldID] = boolFieldData case schemapb.DataType_Int8: if resultData.Data[fieldID] == nil { @@ -316,7 +310,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length - int8FieldData.NumRows += length + int8FieldData.NumRows = append(int8FieldData.NumRows, int64(length)) resultData.Data[fieldID] = int8FieldData case schemapb.DataType_Int16: if resultData.Data[fieldID] == nil { @@ -333,7 +327,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length - int16FieldData.NumRows += length + int16FieldData.NumRows = append(int16FieldData.NumRows, int64(length)) resultData.Data[fieldID] = int16FieldData case schemapb.DataType_Int32: if resultData.Data[fieldID] == nil { @@ -350,7 +344,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length - int32FieldData.NumRows += length + int32FieldData.NumRows = append(int32FieldData.NumRows, int64(length)) resultData.Data[fieldID] = int32FieldData case schemapb.DataType_Int64: if resultData.Data[fieldID] == nil { @@ -367,7 +361,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length - int64FieldData.NumRows += length + int64FieldData.NumRows = append(int64FieldData.NumRows, int64(length)) resultData.Data[fieldID] = int64FieldData case schemapb.DataType_Float: if resultData.Data[fieldID] == nil { @@ -384,7 +378,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length - floatFieldData.NumRows += length + floatFieldData.NumRows = append(floatFieldData.NumRows, int64(length)) resultData.Data[fieldID] = floatFieldData case schemapb.DataType_Double: if resultData.Data[fieldID] == nil { @@ -401,7 +395,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length - doubleFieldData.NumRows += length + doubleFieldData.NumRows = append(doubleFieldData.NumRows, int64(length)) resultData.Data[fieldID] = doubleFieldData case schemapb.DataType_String: if resultData.Data[fieldID] == nil { @@ -413,7 +407,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length - stringFieldData.NumRows += length + stringFieldData.NumRows = append(stringFieldData.NumRows, int64(length)) for i := 0; i < length; i++ { singleString, err := eventReader.GetOneStringFromPayload(i) if err != nil { @@ -438,7 +432,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length - binaryVectorFieldData.NumRows += length + binaryVectorFieldData.NumRows = append(binaryVectorFieldData.NumRows, int64(length)) resultData.Data[fieldID] = binaryVectorFieldData case schemapb.DataType_FloatVector: if resultData.Data[fieldID] == nil { @@ -456,7 +450,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } totalLength += length - floatVectorFieldData.NumRows += length + floatVectorFieldData.NumRows = append(floatVectorFieldData.NumRows, int64(length)) resultData.Data[fieldID] = floatVectorFieldData default: return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("undefined data type %d", dataType) @@ -474,75 +468,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return pID, sID, resultData, nil } -func (insertCodec *InsertCodec) DeserializeOneVectorBinlog(blob *Blob) (data *InsertFieldData, err error) { - resultData := &InsertFieldData{ - ID: InvalidUniqueID, - } - binlogReader, err := NewBinlogReader(blob.Value) - if err != nil { - return nil, err - } - - dataType := binlogReader.PayloadDataType - fieldID := binlogReader.FieldID - totalLength := 0 - for { - eventReader, err := binlogReader.NextEventReader() - if err != nil { - return nil, err - } - if eventReader == nil { - break - } - switch dataType { - case schemapb.DataType_BinaryVector: - if resultData.ID == InvalidUniqueID { - resultData.ID = fieldID - resultData.Data = &BinaryVectorFieldData{} - } - binaryVectorFieldData := resultData.Data.(*BinaryVectorFieldData) - var singleData []byte - singleData, binaryVectorFieldData.Dim, err = eventReader.GetBinaryVectorFromPayload() - if err != nil { - return nil, err - } - binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - return nil, err - } - totalLength += length - binaryVectorFieldData.NumRows += length - resultData.Data = binaryVectorFieldData - case schemapb.DataType_FloatVector: - if resultData.ID == InvalidUniqueID { - resultData.ID = fieldID - resultData.Data = &FloatVectorFieldData{} - } - floatVectorFieldData := resultData.Data.(*FloatVectorFieldData) - var singleData []float32 - singleData, floatVectorFieldData.Dim, err = eventReader.GetFloatVectorFromPayload() - if err != nil { - return nil, err - } - floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - return nil, err - } - totalLength += length - floatVectorFieldData.NumRows += length - resultData.Data = floatVectorFieldData - default: - return nil, fmt.Errorf("undefined data type %d", dataType) - } - } - if err = binlogReader.Close(); err != nil { - return nil, err - } - return resultData, nil -} - func (insertCodec *InsertCodec) Close() error { for _, closeFunc := range insertCodec.readerCloseFunc { err := closeFunc() diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index fc2003dcec..57842c8ea1 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -142,52 +142,52 @@ func TestInsertCodec(t *testing.T) { insertData1 := &InsertData{ Data: map[int64]FieldData{ RowIDField: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, TimestampField: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, BoolField: &BoolFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []bool{true, false}, }, Int8Field: &Int8FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int8{3, 4}, }, Int16Field: &Int16FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int16{3, 4}, }, Int32Field: &Int32FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int32{3, 4}, }, Int64Field: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, FloatField: &FloatFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{3, 4}, }, DoubleField: &DoubleFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float64{3, 4}, }, StringField: &StringFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []string{"3", "4"}, }, BinaryVectorField: &BinaryVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []byte{0, 255}, Dim: 8, }, FloatVectorField: &FloatVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{4, 5, 6, 7, 4, 5, 6, 7}, Dim: 4, }, @@ -197,52 +197,52 @@ func TestInsertCodec(t *testing.T) { insertData2 := &InsertData{ Data: map[int64]FieldData{ RowIDField: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{1, 2}, }, TimestampField: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{1, 2}, }, BoolField: &BoolFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []bool{true, false}, }, Int8Field: &Int8FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int8{1, 2}, }, Int16Field: &Int16FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int16{1, 2}, }, Int32Field: &Int32FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int32{1, 2}, }, Int64Field: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{1, 2}, }, FloatField: &FloatFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{1, 2}, }, DoubleField: &DoubleFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float64{1, 2}, }, StringField: &StringFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []string{"1", "2"}, }, BinaryVectorField: &BinaryVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []byte{0, 255}, Dim: 8, }, FloatVectorField: &FloatVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{0, 1, 2, 3, 0, 1, 2, 3}, Dim: 4, }, @@ -265,18 +265,18 @@ func TestInsertCodec(t *testing.T) { assert.Nil(t, err) assert.Equal(t, UniqueID(PartitionID), partID) assert.Equal(t, UniqueID(SegmentID), segID) - assert.Equal(t, 4, resultData.Data[RowIDField].(*Int64FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[TimestampField].(*Int64FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[BoolField].(*BoolFieldData).NumRows) - assert.Equal(t, 4, resultData.Data[Int8Field].(*Int8FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[Int16Field].(*Int16FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[Int32Field].(*Int32FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[Int64Field].(*Int64FieldData).NumRows) - assert.Equal(t, 4, resultData.Data[FloatField].(*FloatFieldData).NumRows) - assert.Equal(t, 4, resultData.Data[DoubleField].(*DoubleFieldData).NumRows) - assert.Equal(t, 4, resultData.Data[StringField].(*StringFieldData).NumRows) - assert.Equal(t, 4, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).NumRows) - assert.Equal(t, 4, resultData.Data[FloatVectorField].(*FloatVectorFieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[RowIDField].(*Int64FieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[TimestampField].(*Int64FieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[BoolField].(*BoolFieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[Int8Field].(*Int8FieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[Int16Field].(*Int16FieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[Int32Field].(*Int32FieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[Int64Field].(*Int64FieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[FloatField].(*FloatFieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[DoubleField].(*DoubleFieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[StringField].(*StringFieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).NumRows) + assert.Equal(t, []int64{2, 2}, resultData.Data[FloatVectorField].(*FloatVectorFieldData).NumRows) assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[RowIDField].(*Int64FieldData).Data) assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[TimestampField].(*Int64FieldData).Data) assert.Equal(t, []bool{true, false, true, false}, resultData.Data[BoolField].(*BoolFieldData).Data) @@ -412,15 +412,15 @@ func TestSchemaError(t *testing.T) { insertData := &InsertData{ Data: map[int64]FieldData{ RowIDField: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, TimestampField: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, BoolField: &BoolFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []bool{true, false}, }, }, diff --git a/internal/storage/data_sorter_test.go b/internal/storage/data_sorter_test.go index 37d461d7ef..e5305af9c5 100644 --- a/internal/storage/data_sorter_test.go +++ b/internal/storage/data_sorter_test.go @@ -123,52 +123,52 @@ func TestDataSorter(t *testing.T) { insertDataFirst := &InsertData{ Data: map[int64]FieldData{ 0: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{6, 4}, }, 1: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, 100: &BoolFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []bool{true, false}, }, 101: &Int8FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int8{3, 4}, }, 102: &Int16FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int16{3, 4}, }, 103: &Int32FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int32{3, 4}, }, 104: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, 105: &FloatFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{3, 4}, }, 106: &DoubleFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float64{3, 4}, }, 107: &StringFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []string{"3", "4"}, }, 108: &BinaryVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []byte{0, 255}, Dim: 8, }, 109: &FloatVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, Dim: 8, }, diff --git a/internal/storage/local_chunk_manager.go b/internal/storage/local_chunk_manager.go index e40df69874..11add30031 100644 --- a/internal/storage/local_chunk_manager.go +++ b/internal/storage/local_chunk_manager.go @@ -25,15 +25,12 @@ type LocalChunkManager struct { } func NewLocalChunkManager(localPath string) *LocalChunkManager { - if _, err := os.Stat(localPath); os.IsNotExist(err) { - os.MkdirAll(localPath, os.ModePerm) - } return &LocalChunkManager{ localPath: localPath, } } -func (lcm *LocalChunkManager) Load(key string) (string, error) { +func (lcm *LocalChunkManager) GetPath(key string) (string, error) { if !lcm.Exist(key) { return "", errors.New("local file cannot be found with key:" + key) } @@ -42,8 +39,15 @@ func (lcm *LocalChunkManager) Load(key string) (string, error) { } func (lcm *LocalChunkManager) Write(key string, content []byte) error { - path := path.Join(lcm.localPath, key) - err := ioutil.WriteFile(path, content, 0644) + filePath := path.Join(lcm.localPath, key) + dir := path.Dir(filePath) + if _, err := os.Stat(dir); os.IsNotExist(err) { + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return err + } + } + err := ioutil.WriteFile(filePath, content, 0644) if err != nil { return err } @@ -59,7 +63,7 @@ func (lcm *LocalChunkManager) Exist(key string) bool { return true } -func (lcm *LocalChunkManager) ReadAll(key string) ([]byte, error) { +func (lcm *LocalChunkManager) Read(key string) ([]byte, error) { path := path.Join(lcm.localPath, key) file, err := os.Open(path) if err != nil { diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index 8b9fcb184c..39de4ee0f7 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -27,7 +27,7 @@ func NewMinioChunkManager(minio *miniokv.MinIOKV) *MinioChunkManager { } } -func (mcm *MinioChunkManager) Load(key string) (string, error) { +func (mcm *MinioChunkManager) GetPath(key string) (string, error) { if !mcm.Exist(key) { return "", errors.New("minio file manage cannot be found with key:" + key) } @@ -42,7 +42,7 @@ func (mcm *MinioChunkManager) Exist(key string) bool { return mcm.minio.Exist(key) } -func (mcm *MinioChunkManager) ReadAll(key string) ([]byte, error) { +func (mcm *MinioChunkManager) Read(key string) ([]byte, error) { results, err := mcm.minio.Load(key) return []byte(results), err } diff --git a/internal/storage/print_binlog_test.go b/internal/storage/print_binlog_test.go index 800913c431..f812785a0c 100644 --- a/internal/storage/print_binlog_test.go +++ b/internal/storage/print_binlog_test.go @@ -172,52 +172,52 @@ func TestPrintBinlogFiles(t *testing.T) { insertDataFirst := &InsertData{ Data: map[int64]FieldData{ 0: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, 1: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, 100: &BoolFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []bool{true, false}, }, 101: &Int8FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int8{3, 4}, }, 102: &Int16FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int16{3, 4}, }, 103: &Int32FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int32{3, 4}, }, 104: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, 105: &FloatFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{3, 4}, }, 106: &DoubleFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float64{3, 4}, }, 107: &StringFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []string{"3", "4"}, }, 108: &BinaryVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []byte{0, 255}, Dim: 8, }, 109: &FloatVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7}, Dim: 8, }, @@ -227,52 +227,52 @@ func TestPrintBinlogFiles(t *testing.T) { insertDataSecond := &InsertData{ Data: map[int64]FieldData{ 0: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{1, 2}, }, 1: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{1, 2}, }, 100: &BoolFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []bool{true, false}, }, 101: &Int8FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int8{1, 2}, }, 102: &Int16FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int16{1, 2}, }, 103: &Int32FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int32{1, 2}, }, 104: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{1, 2}, }, 105: &FloatFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{1, 2}, }, 106: &DoubleFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float64{1, 2}, }, 107: &StringFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []string{"1", "2"}, }, 108: &BinaryVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []byte{0, 255}, Dim: 8, }, 109: &FloatVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7}, Dim: 8, }, diff --git a/internal/storage/types.go b/internal/storage/types.go index 51fb662314..2cdcf3455d 100644 --- a/internal/storage/types.go +++ b/internal/storage/types.go @@ -12,9 +12,9 @@ package storage type ChunkManager interface { - Load(key string) (string, error) + GetPath(key string) (string, error) Write(key string, content []byte) error Exist(key string) bool - ReadAll(key string) ([]byte, error) + Read(key string) ([]byte, error) ReadAt(key string, p []byte, off int64) (n int, err error) } diff --git a/internal/storage/vector_chunk_manager.go b/internal/storage/vector_chunk_manager.go index c8d2a4022b..367da739b0 100644 --- a/internal/storage/vector_chunk_manager.go +++ b/internal/storage/vector_chunk_manager.go @@ -12,8 +12,9 @@ package storage import ( + "bytes" "encoding/binary" - "math" + "errors" "github.com/milvus-io/milvus/internal/proto/etcdpb" ) @@ -21,35 +22,29 @@ import ( type VectorChunkManager struct { localChunkManager ChunkManager remoteChunkManager ChunkManager - - insertCodec *InsertCodec } -func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager ChunkManager, schema *etcdpb.CollectionMeta) *VectorChunkManager { - insertCodec := NewInsertCodec(schema) +func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager ChunkManager) *VectorChunkManager { return &VectorChunkManager{ localChunkManager: localChunkManager, remoteChunkManager: remoteChunkManager, - insertCodec: insertCodec, } } -func (vcm *VectorChunkManager) Load(key string) (string, error) { - if vcm.localChunkManager.Exist(key) { - return vcm.localChunkManager.Load(key) - } - content, err := vcm.remoteChunkManager.ReadAll(key) +func (vcm *VectorChunkManager) DownloadVectorFile(key string, schema *etcdpb.CollectionMeta) error { + insertCodec := NewInsertCodec(schema) + content, err := vcm.remoteChunkManager.Read(key) if err != nil { - return "", err + return err } blob := &Blob{ Key: key, Value: content, } - _, _, data, err := vcm.insertCodec.Deserialize([]*Blob{blob}) + _, _, data, err := insertCodec.Deserialize([]*Blob{blob}) if err != nil { - return "", err + return err } for _, singleData := range data.Data { @@ -59,16 +54,23 @@ func (vcm *VectorChunkManager) Load(key string) (string, error) { } floatVector, ok := singleData.(*FloatVectorFieldData) if ok { - floatData := floatVector.Data - result := make([]byte, 0) - for _, singleFloat := range floatData { - result = append(result, Float32ToByte(singleFloat)...) + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, floatVector.Data) + if err != nil { + return err } - vcm.localChunkManager.Write(key, result) + vcm.localChunkManager.Write(key, buf.Bytes()) } } - vcm.insertCodec.Close() - return vcm.localChunkManager.Load(key) + insertCodec.Close() + return nil +} + +func (vcm *VectorChunkManager) GetPath(key string) (string, error) { + if vcm.localChunkManager.Exist(key) { + return vcm.localChunkManager.GetPath(key) + } + return vcm.localChunkManager.GetPath(key) } func (vcm *VectorChunkManager) Write(key string, content []byte) error { @@ -79,31 +81,13 @@ func (vcm *VectorChunkManager) Exist(key string) bool { return vcm.localChunkManager.Exist(key) } -func (vcm *VectorChunkManager) ReadAll(key string) ([]byte, error) { +func (vcm *VectorChunkManager) Read(key string) ([]byte, error) { if vcm.localChunkManager.Exist(key) { - return vcm.localChunkManager.ReadAll(key) + return vcm.localChunkManager.Read(key) } - _, err := vcm.Load(key) - if err != nil { - return nil, err - } - return vcm.localChunkManager.ReadAll(key) + return nil, errors.New("the vector file doesn't exist, please call download first") } func (vcm *VectorChunkManager) ReadAt(key string, p []byte, off int64) (n int, err error) { return vcm.localChunkManager.ReadAt(key, p, off) } - -func Float32ToByte(float float32) []byte { - bits := math.Float32bits(float) - bytes := make([]byte, 4) - binary.LittleEndian.PutUint32(bytes, bits) - - return bytes -} - -func ByteToFloat32(bytes []byte) float32 { - bits := binary.LittleEndian.Uint32(bytes) - - return math.Float32frombits(bits) -} diff --git a/internal/storage/vector_chunk_manager_test.go b/internal/storage/vector_chunk_manager_test.go index 2285906907..168b4bc3f9 100644 --- a/internal/storage/vector_chunk_manager_test.go +++ b/internal/storage/vector_chunk_manager_test.go @@ -22,6 +22,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" ) @@ -44,7 +45,7 @@ func TestVectorChunkManager(t *testing.T) { lcm := NewLocalChunkManager(localPath) schema := initSchema() - vcm := NewVectorChunkManager(lcm, rcm, schema) + vcm := NewVectorChunkManager(lcm, rcm) assert.NotNil(t, vcm) binlogs := initBinlogFile(schema) @@ -52,22 +53,22 @@ func TestVectorChunkManager(t *testing.T) { for _, binlog := range binlogs { rcm.Write(binlog.Key, binlog.Value) } - _, err = vcm.Load("108") + err = vcm.DownloadVectorFile("108", schema) assert.Nil(t, err) - _, err = vcm.Load("109") + err = vcm.DownloadVectorFile("109", schema) assert.Nil(t, err) - content, err := vcm.ReadAll("108") + content, err := vcm.Read("108") assert.Nil(t, err) assert.Equal(t, []byte{0, 255}, content) - content, err = vcm.ReadAll("109") + content, err = vcm.Read("109") assert.Nil(t, err) floatResult := make([]float32, 0) for i := 0; i < len(content)/4; i++ { - singleData := ByteToFloat32(content[i*4 : i*4+4]) + singleData := typeutil.ByteToFloat32(content[i*4 : i*4+4]) floatResult = append(floatResult, singleData) } assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666}, floatResult) @@ -79,7 +80,7 @@ func TestVectorChunkManager(t *testing.T) { floatResult = make([]float32, 0) for i := 0; i < len(content)/4; i++ { - singleData := ByteToFloat32(content[i*4 : i*4+4]) + singleData := typeutil.ByteToFloat32(content[i*4 : i*4+4]) floatResult = append(floatResult, singleData) } assert.Equal(t, []float32{0, 111, 222, 333, 444, 555, 777, 666}, floatResult) @@ -163,24 +164,24 @@ func initBinlogFile(schema *etcdpb.CollectionMeta) []*Blob { insertData := &InsertData{ Data: map[int64]FieldData{ 0: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, 1: &Int64FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int64{3, 4}, }, 101: &Int8FieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []int8{3, 4}, }, 108: &BinaryVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []byte{0, 255}, Dim: 8, }, 109: &FloatVectorFieldData{ - NumRows: 2, + NumRows: []int64{2}, Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666}, Dim: 8, }, diff --git a/internal/util/typeutil/convension.go b/internal/util/typeutil/convension.go index 55d3af8b5f..eb5b3bf6c9 100644 --- a/internal/util/typeutil/convension.go +++ b/internal/util/typeutil/convension.go @@ -14,9 +14,26 @@ package typeutil import ( "encoding/binary" "fmt" + "math" "reflect" ) +// Float32ToByte converts a float to byte slice. +func Float32ToByte(float float32) []byte { + bits := math.Float32bits(float) + bytes := make([]byte, 4) + binary.LittleEndian.PutUint32(bytes, bits) + + return bytes +} + +// BytesToFloat32 converts a byte slice to float32. +func ByteToFloat32(bytes []byte) float32 { + bits := binary.LittleEndian.Uint32(bytes) + + return math.Float32frombits(bits) +} + // BytesToUint64 converts a byte slice to uint64. func BytesToInt64(b []byte) (int64, error) { if len(b) != 8 {