Make users get vectors without using local storage (#7134)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/7160/head
godchen 2021-08-18 16:30:11 +08:00 committed by GitHub
parent 11d16a8479
commit 8cc24e09c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 190 additions and 71 deletions

View File

@ -116,8 +116,9 @@ dataNode:
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
storage:
localStorage:
path: /var/lib/milvus/data/
enabled: true
log:
level: debug # info, warn, error, panic, fatal

View File

@ -179,7 +179,7 @@ func (h *historical) removeGlobalSegmentIDsByPartitionIds(partitionIDs []UniqueI
}
}
func (h *historical) retrieve(collID UniqueID, partIDs []UniqueID, vcm *storage.VectorChunkManager,
func (h *historical) retrieve(collID UniqueID, partIDs []UniqueID, vcm storage.ChunkManager,
plan *RetrievePlan) ([]*segcorepb.RetrieveResults, []UniqueID, error) {
retrieveResults := make([]*segcorepb.RetrieveResults, 0)
@ -202,11 +202,6 @@ func (h *historical) retrieve(collID UniqueID, partIDs []UniqueID, vcm *storage.
}
}
col, err := h.replica.getCollectionByID(collID)
if err != nil {
return nil, nil, err
}
for _, partID := range retrievePartIDs {
segIDs, err := h.replica.getSegmentIDs(partID)
if err != nil {
@ -222,7 +217,7 @@ func (h *historical) retrieve(collID UniqueID, partIDs []UniqueID, vcm *storage.
return retrieveResults, retrieveSegmentIDs, err
}
if err = seg.fillVectorFieldsData(collID, col.schema, vcm, result); err != nil {
if err = seg.fillVectorFieldsData(collID, vcm, result); err != nil {
return retrieveResults, retrieveSegmentIDs, err
}
retrieveResults = append(retrieveResults, result)

View File

@ -49,6 +49,7 @@ type queryCollection struct {
cancel context.CancelFunc
collectionID UniqueID
collection *Collection
historical *historical
streaming *streaming
@ -64,7 +65,7 @@ type queryCollection struct {
queryMsgStream msgstream.MsgStream
queryResultMsgStream msgstream.MsgStream
vcm *storage.VectorChunkManager
vcm storage.ChunkManager
}
type ResultEntityIds []UniqueID
@ -75,21 +76,22 @@ func newQueryCollection(releaseCtx context.Context,
historical *historical,
streaming *streaming,
factory msgstream.Factory,
lcm storage.ChunkManager,
rcm storage.ChunkManager) *queryCollection {
vcm storage.ChunkManager,
) *queryCollection {
unsolvedMsg := make([]queryMsg, 0)
queryStream, _ := factory.NewQueryMsgStream(releaseCtx)
queryResultStream, _ := factory.NewQueryMsgStream(releaseCtx)
vcm := storage.NewVectorChunkManager(lcm, rcm)
collection, _ := streaming.replica.getCollectionByID(collectionID)
qc := &queryCollection{
releaseCtx: releaseCtx,
cancel: cancel,
collectionID: collectionID,
collection: collection,
historical: historical,
streaming: streaming,
@ -784,12 +786,7 @@ func (q *queryCollection) search(msg queryMsg) error {
searchTimestamp := searchMsg.BeginTs()
travelTimestamp := searchMsg.TravelTimestamp
collectionID := searchMsg.CollectionID
collection, err := q.streaming.replica.getCollectionByID(collectionID)
if err != nil {
return err
}
schema, err := typeutil.CreateSchemaHelper(collection.schema)
schema, err := typeutil.CreateSchemaHelper(q.collection.schema)
if err != nil {
return err
}
@ -797,13 +794,13 @@ func (q *queryCollection) search(msg queryMsg) error {
var plan *SearchPlan
if searchMsg.GetDslType() == commonpb.DslType_BoolExprV1 {
expr := searchMsg.SerializedExprPlan
plan, err = createSearchPlanByExpr(collection, expr)
plan, err = createSearchPlanByExpr(q.collection, expr)
if err != nil {
return err
}
} else {
dsl := searchMsg.Dsl
plan, err = createSearchPlan(collection, dsl)
plan, err = createSearchPlan(q.collection, dsl)
if err != nil {
return err
}
@ -841,13 +838,13 @@ func (q *queryCollection) search(msg queryMsg) error {
if len(searchMsg.PartitionIDs) > 0 {
globalSealedSegments = q.historical.getGlobalSegmentIDsByPartitionIds(searchMsg.PartitionIDs)
} else {
globalSealedSegments = q.historical.getGlobalSegmentIDsByCollectionID(collectionID)
globalSealedSegments = q.historical.getGlobalSegmentIDsByCollectionID(q.collection.id)
}
searchResults := make([]*SearchResult, 0)
// historical search
hisSearchResults, sealedSegmentSearched, err1 := q.historical.search(searchRequests, collectionID, searchMsg.PartitionIDs, plan, travelTimestamp)
hisSearchResults, sealedSegmentSearched, err1 := q.historical.search(searchRequests, q.collection.id, searchMsg.PartitionIDs, plan, travelTimestamp)
if err1 != nil {
log.Warn(err1.Error())
return err1
@ -857,9 +854,9 @@ func (q *queryCollection) search(msg queryMsg) error {
// streaming search
var err2 error
for _, channel := range collection.getVChannels() {
for _, channel := range q.collection.getVChannels() {
var strSearchResults []*SearchResult
strSearchResults, err2 = q.streaming.search(searchRequests, collectionID, searchMsg.PartitionIDs, channel, plan, travelTimestamp)
strSearchResults, err2 = q.streaming.search(searchRequests, q.collection.id, searchMsg.PartitionIDs, channel, plan, travelTimestamp)
if err2 != nil {
log.Warn(err2.Error())
return err2
@ -913,14 +910,14 @@ func (q *queryCollection) search(msg queryMsg) error {
SlicedNumCount: 1,
MetricType: plan.getMetricType(),
SealedSegmentIDsSearched: sealedSegmentSearched,
ChannelIDsSearched: collection.getVChannels(),
ChannelIDsSearched: q.collection.getVChannels(),
GlobalSealedSegmentIDs: globalSealedSegments,
},
}
log.Debug("QueryNode Empty SearchResultMsg",
zap.Any("collectionID", collection.ID()),
zap.Any("collectionID", q.collection.id),
zap.Any("msgID", searchMsg.ID()),
zap.Any("vChannels", collection.getVChannels()),
zap.Any("vChannels", q.collection.getVChannels()),
zap.Any("sealedSegmentSearched", sealedSegmentSearched),
)
err = q.publishQueryResult(searchResultMsg, searchMsg.CollectionID)
@ -1004,14 +1001,14 @@ func (q *queryCollection) search(msg queryMsg) error {
SlicedNumCount: 1,
MetricType: plan.getMetricType(),
SealedSegmentIDsSearched: sealedSegmentSearched,
ChannelIDsSearched: collection.getVChannels(),
ChannelIDsSearched: q.collection.getVChannels(),
GlobalSealedSegmentIDs: globalSealedSegments,
},
}
log.Debug("QueryNode SearchResultMsg",
zap.Any("collectionID", collection.ID()),
zap.Any("collectionID", q.collection.id),
zap.Any("msgID", searchMsg.ID()),
zap.Any("vChannels", collection.getVChannels()),
zap.Any("vChannels", q.collection.getVChannels()),
zap.Any("sealedSegmentSearched", sealedSegmentSearched),
)

View File

@ -62,7 +62,7 @@ func TestQueryCollection_withoutVChannel(t *testing.T) {
assert.Nil(t, err)
ctx, cancel := context.WithCancel(context.Background())
queryCollection := newQueryCollection(ctx, cancel, 0, historical, streaming, factory, nil, nil)
queryCollection := newQueryCollection(ctx, cancel, 0, historical, streaming, factory, nil)
producerChannels := []string{"testResultChannel"}
queryCollection.queryResultMsgStream.AsProducer(producerChannels)

View File

@ -14,12 +14,14 @@ package querynode
import "C"
import (
"context"
"strconv"
"go.uber.org/zap"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
)
@ -34,8 +36,9 @@ type queryService struct {
factory msgstream.Factory
lcm storage.ChunkManager
rcm storage.ChunkManager
lcm storage.ChunkManager
rcm storage.ChunkManager
localCacheEnabled bool
}
func newQueryService(ctx context.Context,
@ -45,10 +48,14 @@ func newQueryService(ctx context.Context,
queryServiceCtx, queryServiceCancel := context.WithCancel(ctx)
path, err := Params.Load("storage.path")
//TODO godchen: change this to configuration
path, err := Params.Load("localStorage.Path")
if err != nil {
panic(err)
path = "/tmp/milvus/data"
}
enabled, _ := Params.Load("localStorage.enabled")
localCacheEnabled, _ := strconv.ParseBool(enabled)
lcm := storage.NewLocalChunkManager(path)
option := &miniokv.Option{
@ -77,8 +84,9 @@ func newQueryService(ctx context.Context,
factory: factory,
lcm: lcm,
rcm: rcm,
lcm: lcm,
rcm: rcm,
localCacheEnabled: localCacheEnabled,
}
}
@ -96,6 +104,13 @@ func (q *queryService) addQueryCollection(collectionID UniqueID) {
log.Warn("query collection already exists", zap.Any("collectionID", collectionID))
return
}
collection, _ := q.historical.replica.getCollectionByID(collectionID)
vcm := storage.NewVectorChunkManager(q.lcm, q.rcm,
&etcdpb.CollectionMeta{
ID: collection.id,
Schema: collection.schema,
}, q.localCacheEnabled)
ctx1, cancel := context.WithCancel(q.ctx)
qc := newQueryCollection(ctx1,
@ -104,8 +119,8 @@ func (q *queryService) addQueryCollection(collectionID UniqueID) {
q.historical,
q.streaming,
q.factory,
q.lcm,
q.rcm)
vcm,
)
q.queryCollections[collectionID] = qc
}

View File

@ -318,8 +318,8 @@ func (s *Segment) getEntityByIds(plan *RetrievePlan) (*segcorepb.RetrieveResults
return result, nil
}
func (s *Segment) fillVectorFieldsData(collectionID UniqueID, schema *schemapb.CollectionSchema,
vcm *storage.VectorChunkManager, result *segcorepb.RetrieveResults) error {
func (s *Segment) fillVectorFieldsData(collectionID UniqueID,
vcm storage.ChunkManager, result *segcorepb.RetrieveResults) error {
for _, fieldData := range result.FieldsData {
log.Debug("FillVectorFieldData for fieldID", zap.Any("fieldID", fieldData.FieldId))
@ -352,10 +352,6 @@ func (s *Segment) fillVectorFieldsData(collectionID UniqueID, schema *schemapb.C
}
}
log.Debug("FillVectorFieldData", zap.Any("path", vecPath))
err := vcm.DownloadVectorFile(vecPath, collectionID, schema)
if err != nil {
return err
}
switch fieldData.Type {
case schemapb.DataType_BinaryVector:

View File

@ -13,6 +13,7 @@ package storage
import (
"errors"
"io"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
)
@ -47,6 +48,19 @@ func (mcm *MinioChunkManager) Read(key string) ([]byte, error) {
return []byte(results), err
}
func (mcm *MinioChunkManager) ReadAt(key string, p []byte, off int64) (n int, err error) {
return 0, errors.New("Minio file manager cannot readat")
func (mcm *MinioChunkManager) ReadAt(key string, p []byte, off int64) (int, error) {
results, err := mcm.minio.Load(key)
if err != nil {
return -1, err
}
if off < 0 || int64(len([]byte(results))) < off {
return 0, errors.New("MinioChunkManager: invalid offset")
}
n := copy(p, []byte(results)[off:])
if n < len(p) {
return n, io.EOF
}
return n, nil
}

View File

@ -15,34 +15,38 @@ import (
"bytes"
"encoding/binary"
"errors"
"io"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
type VectorChunkManager struct {
localChunkManager ChunkManager
remoteChunkManager ChunkManager
schema *etcdpb.CollectionMeta
localCacheEnable bool
}
func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager ChunkManager) *VectorChunkManager {
func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager ChunkManager, schema *etcdpb.CollectionMeta, localCacheEnable bool) *VectorChunkManager {
return &VectorChunkManager{
localChunkManager: localChunkManager,
remoteChunkManager: remoteChunkManager,
schema: schema,
localCacheEnable: localCacheEnable,
}
}
func (vcm *VectorChunkManager) DownloadVectorFile(key string, collectionID UniqueID, schema *schemapb.CollectionSchema) error {
func (vcm *VectorChunkManager) downloadVectorFile(key string) ([]byte, error) {
if vcm.localChunkManager.Exist(key) {
return nil
return vcm.localChunkManager.Read(key)
}
insertCodec := NewInsertCodec(&etcdpb.CollectionMeta{
ID: collectionID,
Schema: schema,
})
insertCodec := NewInsertCodec(vcm.schema)
content, err := vcm.remoteChunkManager.Read(key)
if err != nil {
return err
return nil, err
}
blob := &Blob{
Key: key,
@ -51,36 +55,40 @@ func (vcm *VectorChunkManager) DownloadVectorFile(key string, collectionID Uniqu
_, _, data, err := insertCodec.Deserialize([]*Blob{blob})
if err != nil {
return err
return nil, err
}
defer insertCodec.Close()
var results []byte
for _, singleData := range data.Data {
binaryVector, ok := singleData.(*BinaryVectorFieldData)
if ok {
vcm.localChunkManager.Write(key, binaryVector.Data)
results = binaryVector.Data
}
floatVector, ok := singleData.(*FloatVectorFieldData)
if ok {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, floatVector.Data)
if err != nil {
return err
return nil, err
}
vcm.localChunkManager.Write(key, buf.Bytes())
results = buf.Bytes()
}
}
insertCodec.Close()
return nil
return results, nil
}
func (vcm *VectorChunkManager) GetPath(key string) (string, error) {
if vcm.localChunkManager.Exist(key) {
if vcm.localChunkManager.Exist(key) && vcm.localCacheEnable {
return vcm.localChunkManager.GetPath(key)
}
return vcm.localChunkManager.GetPath(key)
return vcm.remoteChunkManager.GetPath(key)
}
func (vcm *VectorChunkManager) Write(key string, content []byte) error {
if !vcm.localCacheEnable {
return errors.New("Cannot write local file for local cache is not allowed")
}
return vcm.localChunkManager.Write(key, content)
}
@ -89,12 +97,54 @@ func (vcm *VectorChunkManager) Exist(key string) bool {
}
func (vcm *VectorChunkManager) Read(key string) ([]byte, error) {
if vcm.localChunkManager.Exist(key) {
if vcm.localCacheEnable {
if vcm.localChunkManager.Exist(key) {
return vcm.localChunkManager.Read(key)
}
bytes, err := vcm.downloadVectorFile(key)
if err != nil {
return nil, err
}
err = vcm.localChunkManager.Write(key, bytes)
if err != nil {
return nil, err
}
return vcm.localChunkManager.Read(key)
}
return nil, errors.New("the vector file doesn't exist, please call download first")
return vcm.downloadVectorFile(key)
}
func (vcm *VectorChunkManager) ReadAt(key string, p []byte, off int64) (n int, err error) {
return vcm.localChunkManager.ReadAt(key, p, off)
func (vcm *VectorChunkManager) ReadAt(key string, p []byte, off int64) (int, error) {
if vcm.localCacheEnable {
if vcm.localChunkManager.Exist(key) {
return vcm.localChunkManager.ReadAt(key, p, off)
}
bytes, err := vcm.downloadVectorFile(key)
if err != nil {
return -1, err
}
err = vcm.localChunkManager.Write(key, bytes)
if err != nil {
return -1, err
}
return vcm.localChunkManager.ReadAt(key, p, off)
}
bytes, err := vcm.downloadVectorFile(key)
if err != nil {
return -1, err
}
if bytes == nil {
return 0, errors.New("vectorChunkManager: data downloaded is nil")
}
if off < 0 || int64(len(bytes)) < off {
return 0, errors.New("vectorChunkManager: invalid offset")
}
n := copy(p, bytes[off:])
if n < len(p) {
return n, io.EOF
}
return n, nil
}

View File

@ -45,7 +45,7 @@ func TestVectorChunkManager(t *testing.T) {
lcm := NewLocalChunkManager(localPath)
meta := initMeta()
vcm := NewVectorChunkManager(lcm, rcm)
vcm := NewVectorChunkManager(lcm, rcm, meta, false)
assert.NotNil(t, vcm)
binlogs := initBinlogFile(meta)
@ -53,11 +53,62 @@ func TestVectorChunkManager(t *testing.T) {
for _, binlog := range binlogs {
rcm.Write(binlog.Key, binlog.Value)
}
err = vcm.DownloadVectorFile("108", meta.ID, meta.Schema)
content, err := vcm.Read("108")
assert.Nil(t, err)
assert.Equal(t, []byte{0, 255}, content)
content, err = vcm.Read("109")
assert.Nil(t, err)
err = vcm.DownloadVectorFile("109", meta.ID, meta.Schema)
floatResult := make([]float32, 0)
for i := 0; i < len(content)/4; i++ {
singleData := typeutil.ByteToFloat32(content[i*4 : i*4+4])
floatResult = append(floatResult, singleData)
}
assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666}, floatResult)
content = make([]byte, 8*4)
byteLen, err := vcm.ReadAt("109", content, 8*4)
assert.Nil(t, err)
assert.Equal(t, 32, byteLen)
floatResult = make([]float32, 0)
for i := 0; i < len(content)/4; i++ {
singleData := typeutil.ByteToFloat32(content[i*4 : i*4+4])
floatResult = append(floatResult, singleData)
}
assert.Equal(t, []float32{0, 111, 222, 333, 444, 555, 777, 666}, floatResult)
os.Remove(path.Join(localPath, "108"))
os.Remove(path.Join(localPath, "109"))
}
func TestVectorChunkManagerWithLocalCache(t *testing.T) {
Params.Init()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bucketName := "fantastic-tech-test"
minIOKV, err := newMinIOKVClient(ctx, bucketName)
assert.Nil(t, err)
defer minIOKV.RemoveWithPrefix("")
rcm := NewMinioChunkManager(minIOKV)
localPath := "/tmp/milvus/data"
lcm := NewLocalChunkManager(localPath)
meta := initMeta()
vcm := NewVectorChunkManager(lcm, rcm, meta, true)
assert.NotNil(t, vcm)
binlogs := initBinlogFile(meta)
assert.NotNil(t, binlogs)
for _, binlog := range binlogs {
rcm.Write(binlog.Key, binlog.Value)
}
content, err := vcm.Read("108")
assert.Nil(t, err)