mirror of https://github.com/milvus-io/milvus.git
Add meta cache in Proxy and use meta cache to generate id in InsertTask
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/4973/head^2
parent
2514e8b8a9
commit
aabe9afd75
|
@ -34,6 +34,7 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb.
|
|||
},
|
||||
},
|
||||
manipulationMsgStream: p.manipulationMsgStream,
|
||||
rowIDAllocator: p.idAllocator,
|
||||
}
|
||||
|
||||
var cancel func()
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
|
||||
)
|
||||
|
||||
type MetaCache interface {
|
||||
Hit(collectionName string) bool
|
||||
Get(collectionName string) (*servicepb.CollectionDescription, error)
|
||||
Update(collectionName string) error
|
||||
//Write(collectionName string, schema *servicepb.CollectionDescription) error
|
||||
}
|
||||
|
||||
var globalMetaCache MetaCache
|
||||
|
||||
type SimpleMetaCache struct {
|
||||
mu sync.RWMutex
|
||||
metas map[string]*servicepb.CollectionDescription // collection name to schema
|
||||
masterClient masterpb.MasterClient
|
||||
reqIDAllocator *allocator.IDAllocator
|
||||
tsoAllocator *allocator.TimestampAllocator
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (smc *SimpleMetaCache) Hit(collectionName string) bool {
|
||||
smc.mu.RLock()
|
||||
defer smc.mu.RUnlock()
|
||||
_, ok := smc.metas[collectionName]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (smc *SimpleMetaCache) Get(collectionName string) (*servicepb.CollectionDescription, error) {
|
||||
smc.mu.RLock()
|
||||
defer smc.mu.RUnlock()
|
||||
schema, ok := smc.metas[collectionName]
|
||||
if !ok {
|
||||
return nil, errors.New("collection meta miss")
|
||||
}
|
||||
return schema, nil
|
||||
}
|
||||
|
||||
func (smc *SimpleMetaCache) Update(collectionName string) error {
|
||||
reqID, err := smc.reqIDAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ts, err := smc.tsoAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req := &internalpb.DescribeCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kDescribeCollection,
|
||||
ReqID: reqID,
|
||||
Timestamp: ts,
|
||||
ProxyID: 0,
|
||||
CollectionName: &servicepb.CollectionName{
|
||||
CollectionName: collectionName,
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := smc.masterClient.DescribeCollection(smc.ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
smc.mu.Lock()
|
||||
defer smc.mu.Unlock()
|
||||
smc.metas[collectionName] = resp
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newSimpleMetaCache(ctx context.Context,
|
||||
mCli masterpb.MasterClient,
|
||||
idAllocator *allocator.IDAllocator,
|
||||
tsoAllocator *allocator.TimestampAllocator) *SimpleMetaCache {
|
||||
return &SimpleMetaCache{
|
||||
metas: make(map[string]*servicepb.CollectionDescription),
|
||||
masterClient: mCli,
|
||||
reqIDAllocator: idAllocator,
|
||||
tsoAllocator: tsoAllocator,
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
||||
func initGlobalMetaCache(ctx context.Context,
|
||||
mCli masterpb.MasterClient,
|
||||
idAllocator *allocator.IDAllocator,
|
||||
tsoAllocator *allocator.TimestampAllocator) {
|
||||
globalMetaCache = newSimpleMetaCache(ctx, mCli, idAllocator, tsoAllocator)
|
||||
}
|
|
@ -122,6 +122,7 @@ func (p *Proxy) startProxy() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
initGlobalMetaCache(p.proxyLoopCtx, p.masterClient, p.idAllocator, p.tsoAllocator)
|
||||
p.manipulationMsgStream.Start()
|
||||
p.queryMsgStream.Start()
|
||||
p.queryResultMsgStream.Start()
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"log"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
|
@ -34,6 +35,7 @@ type InsertTask struct {
|
|||
result *servicepb.IntegerRangeResponse
|
||||
manipulationMsgStream *msgstream.PulsarMsgStream
|
||||
ctx context.Context
|
||||
rowIDAllocator *allocator.IDAllocator
|
||||
}
|
||||
|
||||
func (it *InsertTask) SetTs(ts Timestamp) {
|
||||
|
@ -61,6 +63,28 @@ func (it *InsertTask) PreExecute() error {
|
|||
}
|
||||
|
||||
func (it *InsertTask) Execute() error {
|
||||
collectionName := it.BaseInsertTask.CollectionName
|
||||
if !globalMetaCache.Hit(collectionName) {
|
||||
err := globalMetaCache.Update(collectionName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
description, err := globalMetaCache.Get(collectionName)
|
||||
if err != nil || description == nil {
|
||||
return err
|
||||
}
|
||||
autoID := description.Schema.AutoID
|
||||
if autoID || true {
|
||||
rowNums := len(it.BaseInsertTask.RowData)
|
||||
rowIDBegin, rowIDEnd, _ := it.rowIDAllocator.Alloc(uint32(rowNums))
|
||||
it.BaseInsertTask.RowIDs = make([]UniqueID, rowNums)
|
||||
for i := rowIDBegin; i < rowIDEnd; i++ {
|
||||
offset := i - rowIDBegin
|
||||
it.BaseInsertTask.RowIDs[offset] = i
|
||||
}
|
||||
}
|
||||
|
||||
var tsMsg msgstream.TsMsg = &it.BaseInsertTask
|
||||
msgPack := &msgstream.MsgPack{
|
||||
BeginTs: it.BeginTs(),
|
||||
|
@ -68,7 +92,7 @@ func (it *InsertTask) Execute() error {
|
|||
Msgs: make([]msgstream.TsMsg, 1),
|
||||
}
|
||||
msgPack.Msgs[0] = tsMsg
|
||||
err := it.manipulationMsgStream.Produce(msgPack)
|
||||
err = it.manipulationMsgStream.Produce(msgPack)
|
||||
it.result = &servicepb.IntegerRangeResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
|
@ -379,18 +403,14 @@ func (dct *DescribeCollectionTask) PreExecute() error {
|
|||
}
|
||||
|
||||
func (dct *DescribeCollectionTask) Execute() error {
|
||||
resp, err := dct.masterClient.DescribeCollection(dct.ctx, &dct.DescribeCollectionRequest)
|
||||
if err != nil {
|
||||
log.Printf("describe collection failed, error= %v", err)
|
||||
dct.result = &servicepb.CollectionDescription{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: "internal error",
|
||||
},
|
||||
if !globalMetaCache.Hit(dct.CollectionName.CollectionName) {
|
||||
err := globalMetaCache.Update(dct.CollectionName.CollectionName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
dct.result = resp
|
||||
}
|
||||
var err error
|
||||
dct.result, err = globalMetaCache.Get(dct.CollectionName.CollectionName)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue