From aabe9afd75179ea69ed73862bf472875eaa7cead Mon Sep 17 00:00:00 2001 From: dragondriver Date: Fri, 20 Nov 2020 17:53:31 +0800 Subject: [PATCH] Add meta cache in Proxy and use meta cache to generate id in InsertTask Signed-off-by: dragondriver --- internal/proxy/grpc_service.go | 1 + internal/proxy/meta_cache.go | 98 ++++++++++++++++++++++++++++++++++ internal/proxy/proxy.go | 1 + internal/proxy/task.go | 42 +++++++++++---- 4 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 internal/proxy/meta_cache.go diff --git a/internal/proxy/grpc_service.go b/internal/proxy/grpc_service.go index bb8d924179..488a4dc4e3 100644 --- a/internal/proxy/grpc_service.go +++ b/internal/proxy/grpc_service.go @@ -34,6 +34,7 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb. }, }, manipulationMsgStream: p.manipulationMsgStream, + rowIDAllocator: p.idAllocator, } var cancel func() diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go new file mode 100644 index 0000000000..afc4f78e06 --- /dev/null +++ b/internal/proxy/meta_cache.go @@ -0,0 +1,98 @@ +package proxy + +import ( + "context" + "sync" + + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" +) + +type MetaCache interface { + Hit(collectionName string) bool + Get(collectionName string) (*servicepb.CollectionDescription, error) + Update(collectionName string) error + //Write(collectionName string, schema *servicepb.CollectionDescription) error +} + +var globalMetaCache MetaCache + +type SimpleMetaCache struct { + mu sync.RWMutex + metas map[string]*servicepb.CollectionDescription // collection name to schema + masterClient masterpb.MasterClient + reqIDAllocator *allocator.IDAllocator + tsoAllocator *allocator.TimestampAllocator + ctx context.Context +} + +func (smc *SimpleMetaCache) Hit(collectionName string) bool { + smc.mu.RLock() + defer smc.mu.RUnlock() + _, ok := smc.metas[collectionName] + return ok +} + +func (smc *SimpleMetaCache) Get(collectionName string) (*servicepb.CollectionDescription, error) { + smc.mu.RLock() + defer smc.mu.RUnlock() + schema, ok := smc.metas[collectionName] + if !ok { + return nil, errors.New("collection meta miss") + } + return schema, nil +} + +func (smc *SimpleMetaCache) Update(collectionName string) error { + reqID, err := smc.reqIDAllocator.AllocOne() + if err != nil { + return err + } + ts, err := smc.tsoAllocator.AllocOne() + if err != nil { + return err + } + req := &internalpb.DescribeCollectionRequest{ + MsgType: internalpb.MsgType_kDescribeCollection, + ReqID: reqID, + Timestamp: ts, + ProxyID: 0, + CollectionName: &servicepb.CollectionName{ + CollectionName: collectionName, + }, + } + + resp, err := smc.masterClient.DescribeCollection(smc.ctx, req) + if err != nil { + return err + } + + smc.mu.Lock() + defer smc.mu.Unlock() + smc.metas[collectionName] = resp + + return nil +} + +func newSimpleMetaCache(ctx context.Context, + mCli masterpb.MasterClient, + idAllocator *allocator.IDAllocator, + tsoAllocator *allocator.TimestampAllocator) *SimpleMetaCache { + return &SimpleMetaCache{ + metas: make(map[string]*servicepb.CollectionDescription), + masterClient: mCli, + reqIDAllocator: idAllocator, + tsoAllocator: tsoAllocator, + ctx: ctx, + } +} + +func initGlobalMetaCache(ctx context.Context, + mCli masterpb.MasterClient, + idAllocator *allocator.IDAllocator, + tsoAllocator *allocator.TimestampAllocator) { + globalMetaCache = newSimpleMetaCache(ctx, mCli, idAllocator, tsoAllocator) +} diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index d4cdf5ff14..e13117cf73 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -122,6 +122,7 @@ func (p *Proxy) startProxy() error { if err != nil { return err } + initGlobalMetaCache(p.proxyLoopCtx, p.masterClient, p.idAllocator, p.tsoAllocator) p.manipulationMsgStream.Start() p.queryMsgStream.Start() p.queryResultMsgStream.Start() diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 1aaa00366d..12dc1551ac 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -5,6 +5,7 @@ import ( "errors" "log" + "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" @@ -34,6 +35,7 @@ type InsertTask struct { result *servicepb.IntegerRangeResponse manipulationMsgStream *msgstream.PulsarMsgStream ctx context.Context + rowIDAllocator *allocator.IDAllocator } func (it *InsertTask) SetTs(ts Timestamp) { @@ -61,6 +63,28 @@ func (it *InsertTask) PreExecute() error { } func (it *InsertTask) Execute() error { + collectionName := it.BaseInsertTask.CollectionName + if !globalMetaCache.Hit(collectionName) { + err := globalMetaCache.Update(collectionName) + if err != nil { + return err + } + } + description, err := globalMetaCache.Get(collectionName) + if err != nil || description == nil { + return err + } + autoID := description.Schema.AutoID + if autoID || true { + rowNums := len(it.BaseInsertTask.RowData) + rowIDBegin, rowIDEnd, _ := it.rowIDAllocator.Alloc(uint32(rowNums)) + it.BaseInsertTask.RowIDs = make([]UniqueID, rowNums) + for i := rowIDBegin; i < rowIDEnd; i++ { + offset := i - rowIDBegin + it.BaseInsertTask.RowIDs[offset] = i + } + } + var tsMsg msgstream.TsMsg = &it.BaseInsertTask msgPack := &msgstream.MsgPack{ BeginTs: it.BeginTs(), @@ -68,7 +92,7 @@ func (it *InsertTask) Execute() error { Msgs: make([]msgstream.TsMsg, 1), } msgPack.Msgs[0] = tsMsg - err := it.manipulationMsgStream.Produce(msgPack) + err = it.manipulationMsgStream.Produce(msgPack) it.result = &servicepb.IntegerRangeResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -379,18 +403,14 @@ func (dct *DescribeCollectionTask) PreExecute() error { } func (dct *DescribeCollectionTask) Execute() error { - resp, err := dct.masterClient.DescribeCollection(dct.ctx, &dct.DescribeCollectionRequest) - if err != nil { - log.Printf("describe collection failed, error= %v", err) - dct.result = &servicepb.CollectionDescription{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "internal error", - }, + if !globalMetaCache.Hit(dct.CollectionName.CollectionName) { + err := globalMetaCache.Update(dct.CollectionName.CollectionName) + if err != nil { + return err } - } else { - dct.result = resp } + var err error + dct.result, err = globalMetaCache.Get(dct.CollectionName.CollectionName) return err }