mirror of https://github.com/milvus-io/milvus.git
Fix invalidate metacache error
Signed-off-by: godchen <qingxiang.chen@zilliz.com>pull/4973/head^2
parent
e384222803
commit
0f620ab149
|
@ -49,8 +49,7 @@ func (c *Client) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.Re
|
|||
}
|
||||
|
||||
func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
|
||||
_, err := c.proxyServiceClient.InvalidateCollectionMetaCache(c.ctx, request)
|
||||
return nil, err
|
||||
return c.proxyServiceClient.InvalidateCollectionMetaCache(c.ctx, request)
|
||||
}
|
||||
|
||||
func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
|
||||
|
|
|
@ -839,7 +839,7 @@ func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Sta
|
|||
if err != nil {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: "Create collection failed: " + err.Error(),
|
||||
Reason: "Drop collection failed: " + err.Error(),
|
||||
}, nil
|
||||
}
|
||||
return &commonpb.Status{
|
||||
|
|
|
@ -177,10 +177,8 @@ func (m *InsertChannelsMap) closeAllMsgStream() {
|
|||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
for loc, stream := range m.insertMsgStreams {
|
||||
if m.droppedBitMap[loc] == 0 && m.usageHistogram[loc] >= 1 {
|
||||
stream.Close()
|
||||
}
|
||||
for _, stream := range m.insertMsgStreams {
|
||||
stream.Close()
|
||||
}
|
||||
|
||||
m.collectionID2InsertChannels = make(map[UniqueID]int)
|
||||
|
|
|
@ -188,6 +188,9 @@ func (m *MetaCache) GetPartitionID(collectionName string, partitionName string)
|
|||
}
|
||||
}
|
||||
partInfo := m.collInfo[collectionName].partInfo
|
||||
if partInfo == nil {
|
||||
partInfo = map[string]typeutil.UniqueID{}
|
||||
}
|
||||
|
||||
for i := 0; i < len(partitions.PartitionIDs); i++ {
|
||||
_, ok := partInfo[partitions.PartitionNames[i]]
|
||||
|
|
|
@ -280,13 +280,19 @@ func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateC
|
|||
|
||||
err = s.sched.InvalidateCollectionMetaCacheTaskQueue.Enqueue(t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
err = t.WaitToFinish()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return t.response, nil
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ type NaiveNodeIDAllocatorImpl struct {
|
|||
func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID {
|
||||
allocator.mtx.Lock()
|
||||
defer func() {
|
||||
// allocator.now++
|
||||
allocator.now++
|
||||
allocator.mtx.Unlock()
|
||||
}()
|
||||
return allocator.now
|
||||
|
@ -32,6 +32,6 @@ func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID {
|
|||
|
||||
func NewNodeIDAllocator() NodeIDAllocator {
|
||||
return &NaiveNodeIDAllocatorImpl{
|
||||
now: 1,
|
||||
now: 0,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,17 +2,12 @@ package proxyservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
|
||||
)
|
||||
|
||||
type TaskEnum = int
|
||||
|
@ -73,7 +68,6 @@ func (t *RegisterLinkTask) PreExecute() error {
|
|||
|
||||
func (t *RegisterLinkTask) Execute() error {
|
||||
info, err := t.nodeInfos.Pick()
|
||||
fmt.Println("info: ", info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -135,6 +129,7 @@ func (t *RegisterNodeTask) PostExecute() error {
|
|||
type InvalidateCollectionMetaCacheTask struct {
|
||||
Condition
|
||||
request *proxypb.InvalidateCollMetaCacheRequest
|
||||
response *commonpb.Status
|
||||
nodeInfos *GlobalNodeInfoTable
|
||||
}
|
||||
|
||||
|
@ -157,6 +152,9 @@ func (t *InvalidateCollectionMetaCacheTask) Execute() error {
|
|||
return errors.New(status.Reason)
|
||||
}
|
||||
}
|
||||
t.response = &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -83,8 +83,8 @@ func (ttBarrier *softTimeTickBarrier) Start() error {
|
|||
case ttmsgs := <-ttBarrier.ttStream.Chan():
|
||||
//log.Println("ttmsgs: ", ttmsgs)
|
||||
ttBarrier.peerMtx.RLock()
|
||||
//log.Println("peer2LastTt map: ", ttBarrier.peer2LastTt)
|
||||
//log.Println("len(ttmsgs.Msgs): ", len(ttmsgs.Msgs))
|
||||
log.Println("peer2LastTt map: ", ttBarrier.peer2LastTt)
|
||||
log.Println("len(ttmsgs.Msgs): ", len(ttmsgs.Msgs))
|
||||
if len(ttmsgs.Msgs) > 0 {
|
||||
for _, timetickmsg := range ttmsgs.Msgs {
|
||||
ttmsg := timetickmsg.(*ms.TimeTickMsg)
|
||||
|
|
Loading…
Reference in New Issue