mirror of https://github.com/milvus-io/milvus.git
488 lines
13 KiB
Go
488 lines
13 KiB
Go
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
package querynode
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"path"
|
|
"strconv"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/internal/common"
|
|
"github.com/milvus-io/milvus/internal/kv"
|
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
|
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
"github.com/milvus-io/milvus/internal/types"
|
|
"github.com/milvus-io/milvus/internal/util/funcutil"
|
|
)
|
|
|
|
const (
|
|
queryNodeSegmentMetaPrefix = "queryNode-segmentMeta"
|
|
)
|
|
|
|
// segmentLoader is only responsible for loading the field data from binlog
|
|
type segmentLoader struct {
|
|
historicalReplica ReplicaInterface
|
|
|
|
dataCoord types.DataCoord
|
|
|
|
minioKV kv.DataKV // minio minioKV
|
|
etcdKV *etcdkv.EtcdKV
|
|
|
|
indexLoader *indexLoader
|
|
}
|
|
|
|
func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest) error {
|
|
// no segment needs to load, return
|
|
if len(req.Infos) == 0 {
|
|
return nil
|
|
}
|
|
|
|
newSegments := make(map[UniqueID]*Segment)
|
|
segmentGC := func() {
|
|
for _, s := range newSegments {
|
|
deleteSegment(s)
|
|
}
|
|
}
|
|
|
|
segmentFieldBinLogs := make(map[UniqueID][]*datapb.FieldBinlog)
|
|
segmentIndexedFieldIDs := make(map[UniqueID][]FieldID)
|
|
segmentSizes := make(map[UniqueID]int64)
|
|
|
|
// prepare and estimate segments size
|
|
for _, info := range req.Infos {
|
|
segmentID := info.SegmentID
|
|
partitionID := info.PartitionID
|
|
collectionID := info.CollectionID
|
|
|
|
collection, err := loader.historicalReplica.getCollectionByID(collectionID)
|
|
if err != nil {
|
|
segmentGC()
|
|
return err
|
|
}
|
|
segment := newSegment(collection, segmentID, partitionID, collectionID, "", segmentTypeSealed, true)
|
|
newSegments[segmentID] = segment
|
|
fieldBinlog, indexedFieldID, err := loader.getFieldAndIndexInfo(segment, info)
|
|
if err != nil {
|
|
segmentGC()
|
|
return err
|
|
}
|
|
segmentSize, err := loader.estimateSegmentSize(segment, fieldBinlog, indexedFieldID)
|
|
if err != nil {
|
|
segmentGC()
|
|
return err
|
|
}
|
|
segmentFieldBinLogs[segmentID] = fieldBinlog
|
|
segmentIndexedFieldIDs[segmentID] = indexedFieldID
|
|
segmentSizes[segmentID] = segmentSize
|
|
}
|
|
|
|
// check memory limit
|
|
err := loader.checkSegmentSize(req.Infos[0].CollectionID, segmentSizes)
|
|
if err != nil {
|
|
segmentGC()
|
|
return err
|
|
}
|
|
|
|
// start to load
|
|
for _, info := range req.Infos {
|
|
segmentID := info.SegmentID
|
|
if newSegments[segmentID] == nil || segmentFieldBinLogs[segmentID] == nil || segmentIndexedFieldIDs[segmentID] == nil {
|
|
segmentGC()
|
|
return errors.New(fmt.Sprintln("unexpected error, cannot find load infos, this error should not happen, collectionID = ", req.Infos[0].CollectionID))
|
|
}
|
|
err = loader.loadSegmentInternal(newSegments[segmentID],
|
|
segmentFieldBinLogs[segmentID],
|
|
segmentIndexedFieldIDs[segmentID],
|
|
info)
|
|
if err != nil {
|
|
segmentGC()
|
|
return err
|
|
}
|
|
}
|
|
|
|
// set segments
|
|
for _, s := range newSegments {
|
|
err := loader.historicalReplica.setSegment(s)
|
|
if err != nil {
|
|
segmentGC()
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (loader *segmentLoader) loadSegmentInternal(segment *Segment,
|
|
fieldBinLogs []*datapb.FieldBinlog,
|
|
indexFieldIDs []FieldID,
|
|
segmentLoadInfo *querypb.SegmentLoadInfo) error {
|
|
log.Debug("loading insert...")
|
|
err := loader.loadSegmentFieldsData(segment, fieldBinLogs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pkIDField, err := loader.historicalReplica.getPKFieldIDByCollectionID(segment.collectionID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if pkIDField == common.InvalidFieldID {
|
|
log.Warn("segment primary key field doesn't exist when load segment")
|
|
} else {
|
|
log.Debug("loading bloom filter...")
|
|
pkStatsBinlogs := loader.filterPKStatsBinlogs(segmentLoadInfo.Statslogs, pkIDField)
|
|
err = loader.loadSegmentBloomFilter(segment, pkStatsBinlogs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
log.Debug("loading delta...")
|
|
err = loader.loadDeltaLogs(segment, segmentLoadInfo.Deltalogs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, id := range indexFieldIDs {
|
|
log.Debug("loading index...")
|
|
err = loader.indexLoader.loadIndex(segment, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) []string {
|
|
result := make([]string, 0)
|
|
for _, fieldBinlog := range fieldBinlogs {
|
|
if fieldBinlog.FieldID == pkFieldID {
|
|
result = append(result, fieldBinlog.Binlogs...)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (loader *segmentLoader) filterFieldBinlogs(fieldBinlogs []*datapb.FieldBinlog, skipFieldIDs []int64) []*datapb.FieldBinlog {
|
|
result := make([]*datapb.FieldBinlog, 0)
|
|
for _, fieldBinlog := range fieldBinlogs {
|
|
if !funcutil.SliceContain(skipFieldIDs, fieldBinlog.FieldID) {
|
|
result = append(result, fieldBinlog)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlogs []*datapb.FieldBinlog) error {
|
|
iCodec := storage.InsertCodec{}
|
|
defer func() {
|
|
err := iCodec.Close()
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
}
|
|
}()
|
|
blobs := make([]*storage.Blob, 0)
|
|
for _, fb := range fieldBinlogs {
|
|
log.Debug("load segment fields data",
|
|
zap.Int64("segmentID", segment.segmentID),
|
|
zap.Any("fieldID", fb.FieldID),
|
|
zap.String("paths", fmt.Sprintln(fb.Binlogs)),
|
|
)
|
|
for _, path := range fb.Binlogs {
|
|
p := path
|
|
binLog, err := loader.minioKV.Load(path)
|
|
if err != nil {
|
|
// TODO: return or continue?
|
|
return err
|
|
}
|
|
blob := &storage.Blob{
|
|
Key: p,
|
|
Value: []byte(binLog),
|
|
}
|
|
blobs = append(blobs, blob)
|
|
}
|
|
}
|
|
|
|
_, _, insertData, err := iCodec.Deserialize(blobs)
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
return err
|
|
}
|
|
|
|
for fieldID, value := range insertData.Data {
|
|
var numRows []int64
|
|
var data interface{}
|
|
switch fieldData := value.(type) {
|
|
case *storage.BoolFieldData:
|
|
numRows = fieldData.NumRows
|
|
data = fieldData.Data
|
|
case *storage.Int8FieldData:
|
|
numRows = fieldData.NumRows
|
|
data = fieldData.Data
|
|
case *storage.Int16FieldData:
|
|
numRows = fieldData.NumRows
|
|
data = fieldData.Data
|
|
case *storage.Int32FieldData:
|
|
numRows = fieldData.NumRows
|
|
data = fieldData.Data
|
|
case *storage.Int64FieldData:
|
|
numRows = fieldData.NumRows
|
|
data = fieldData.Data
|
|
case *storage.FloatFieldData:
|
|
numRows = fieldData.NumRows
|
|
data = fieldData.Data
|
|
case *storage.DoubleFieldData:
|
|
numRows = fieldData.NumRows
|
|
data = fieldData.Data
|
|
case *storage.StringFieldData:
|
|
numRows = fieldData.NumRows
|
|
data = fieldData.Data
|
|
case *storage.FloatVectorFieldData:
|
|
numRows = fieldData.NumRows
|
|
data = fieldData.Data
|
|
case *storage.BinaryVectorFieldData:
|
|
numRows = fieldData.NumRows
|
|
data = fieldData.Data
|
|
default:
|
|
return errors.New("unexpected field data type")
|
|
}
|
|
if fieldID == common.TimeStampField {
|
|
segment.setIDBinlogRowSizes(numRows)
|
|
}
|
|
totalNumRows := int64(0)
|
|
for _, numRow := range numRows {
|
|
totalNumRows += numRow
|
|
}
|
|
err = segment.segmentLoadFieldData(fieldID, int(totalNumRows), data)
|
|
if err != nil {
|
|
// TODO: return or continue?
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (loader *segmentLoader) loadSegmentBloomFilter(segment *Segment, binlogPaths []string) error {
|
|
if len(binlogPaths) == 0 {
|
|
log.Info("there are no stats logs saved with segment", zap.Any("segmentID", segment.segmentID))
|
|
return nil
|
|
}
|
|
|
|
values, err := loader.minioKV.MultiLoad(binlogPaths)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
blobs := make([]*storage.Blob, 0)
|
|
for i := 0; i < len(values); i++ {
|
|
blobs = append(blobs, &storage.Blob{Value: []byte(values[i])})
|
|
}
|
|
|
|
stats, err := storage.DeserializeStats(blobs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, stat := range stats {
|
|
if stat.BF == nil {
|
|
log.Warn("stat log with nil bloom filter", zap.Int64("segmentID", segment.segmentID), zap.Any("stat", stat))
|
|
continue
|
|
}
|
|
err = segment.pkFilter.Merge(stat.BF)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb.DeltaLogInfo) error {
|
|
if len(deltaLogs) == 0 {
|
|
log.Info("there are no delta logs saved with segment", zap.Any("segmentID", segment.segmentID))
|
|
return nil
|
|
}
|
|
dCodec := storage.DeleteCodec{}
|
|
blobs := make([]*storage.Blob, 0)
|
|
for _, deltaLog := range deltaLogs {
|
|
value, err := loader.minioKV.Load(deltaLog.DeltaLogPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
blob := &storage.Blob{
|
|
Key: deltaLog.DeltaLogPath,
|
|
Value: []byte(value),
|
|
}
|
|
blobs = append(blobs, blob)
|
|
}
|
|
_, _, deltaData, err := dCodec.Deserialize(blobs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = segment.segmentLoadDeletedRecord(deltaData.Pks, deltaData.Tss, deltaData.RowCount)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// JoinIDPath joins ids to path format.
|
|
func JoinIDPath(ids ...UniqueID) string {
|
|
idStr := make([]string, len(ids))
|
|
for _, id := range ids {
|
|
idStr = append(idStr, strconv.FormatInt(id, 10))
|
|
}
|
|
return path.Join(idStr...)
|
|
}
|
|
|
|
func (loader *segmentLoader) getFieldAndIndexInfo(segment *Segment,
|
|
segmentLoadInfo *querypb.SegmentLoadInfo) ([]*datapb.FieldBinlog, []FieldID, error) {
|
|
collectionID := segment.collectionID
|
|
vectorFieldIDs, err := loader.historicalReplica.getVecFieldIDsByCollectionID(collectionID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if len(vectorFieldIDs) <= 0 {
|
|
return nil, nil, fmt.Errorf("no vector field in collection %d", collectionID)
|
|
}
|
|
|
|
// add VectorFieldInfo for vector fields
|
|
for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
|
|
if funcutil.SliceContain(vectorFieldIDs, fieldBinlog.FieldID) {
|
|
vectorFieldInfo := newVectorFieldInfo(fieldBinlog)
|
|
segment.setVectorFieldInfo(fieldBinlog.FieldID, vectorFieldInfo)
|
|
}
|
|
}
|
|
|
|
indexedFieldIDs := make([]FieldID, 0)
|
|
for _, vecFieldID := range vectorFieldIDs {
|
|
err = loader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID)
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
continue
|
|
}
|
|
indexedFieldIDs = append(indexedFieldIDs, vecFieldID)
|
|
}
|
|
|
|
// we don't need to load raw data for indexed vector field
|
|
fieldBinlogs := loader.filterFieldBinlogs(segmentLoadInfo.BinlogPaths, indexedFieldIDs)
|
|
return fieldBinlogs, indexedFieldIDs, nil
|
|
}
|
|
|
|
func (loader *segmentLoader) estimateSegmentSize(segment *Segment,
|
|
fieldBinLogs []*datapb.FieldBinlog,
|
|
indexFieldIDs []FieldID) (int64, error) {
|
|
segmentSize := int64(0)
|
|
// get fields data size, if len(indexFieldIDs) == 0, vector field would be involved in fieldBinLogs
|
|
for _, fb := range fieldBinLogs {
|
|
log.Debug("estimate segment fields size",
|
|
zap.Any("collectionID", segment.collectionID),
|
|
zap.Any("segmentID", segment.ID()),
|
|
zap.Any("fieldID", fb.FieldID),
|
|
zap.Any("paths", fb.Binlogs),
|
|
)
|
|
for _, binlogPath := range fb.Binlogs {
|
|
logSize, err := storage.EstimateMemorySize(loader.minioKV, binlogPath)
|
|
if err != nil {
|
|
logSize, err = storage.GetBinlogSize(loader.minioKV, binlogPath)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
segmentSize += logSize
|
|
}
|
|
}
|
|
// get index size
|
|
for _, fieldID := range indexFieldIDs {
|
|
indexSize, err := loader.indexLoader.estimateIndexBinlogSize(segment, fieldID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
segmentSize += indexSize
|
|
}
|
|
return segmentSize, nil
|
|
}
|
|
|
|
func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentSizes map[UniqueID]int64) error {
|
|
const thresholdFactor = 0.9
|
|
usedMem, err := getUsedMemory()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
totalMem, err := getTotalMemory()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
segmentTotalSize := int64(0)
|
|
for _, size := range segmentSizes {
|
|
segmentTotalSize += size
|
|
}
|
|
|
|
for segmentID, size := range segmentSizes {
|
|
log.Debug("memory stats when load segment",
|
|
zap.Any("collectionIDs", collectionID),
|
|
zap.Any("segmentID", segmentID),
|
|
zap.Any("totalMem", totalMem),
|
|
zap.Any("usedMem", usedMem),
|
|
zap.Any("segmentTotalSize", segmentTotalSize),
|
|
zap.Any("currentSegmentSize", size),
|
|
zap.Any("thresholdFactor", thresholdFactor),
|
|
)
|
|
if int64(usedMem)+segmentTotalSize+size > int64(float64(totalMem)*thresholdFactor) {
|
|
return errors.New(fmt.Sprintln("load segment failed, OOM if load, "+
|
|
"collectionID = ", collectionID, ", ",
|
|
"usedMem = ", usedMem, ", ",
|
|
"segmentTotalSize = ", segmentTotalSize, ", ",
|
|
"currentSegmentSize = ", size, ", ",
|
|
"totalMem = ", totalMem, ", ",
|
|
"thresholdFactor = ", thresholdFactor,
|
|
))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader {
|
|
option := &minioKV.Option{
|
|
Address: Params.MinioEndPoint,
|
|
AccessKeyID: Params.MinioAccessKeyID,
|
|
SecretAccessKeyID: Params.MinioSecretAccessKey,
|
|
UseSSL: Params.MinioUseSSLStr,
|
|
CreateBucket: true,
|
|
BucketName: Params.MinioBucketName,
|
|
}
|
|
|
|
client, err := minioKV.NewMinIOKV(ctx, option)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
iLoader := newIndexLoader(ctx, rootCoord, indexCoord, replica)
|
|
return &segmentLoader{
|
|
historicalReplica: replica,
|
|
|
|
minioKV: client,
|
|
etcdKV: etcdKV,
|
|
|
|
indexLoader: iLoader,
|
|
}
|
|
}
|