From a0244b7683e473160d15932e052f8aa98f6189e1 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Thu, 24 Dec 2020 16:53:31 +0800 Subject: [PATCH] Add segment expire logic for proxy Signed-off-by: zhenshan.cao --- internal/allocator/allocator.go | 66 +++- internal/allocator/id.go | 31 +- internal/allocator/segment.go | 307 ++++++++++++------- internal/allocator/timestamp.go | 40 ++- internal/master/meta_table.go | 8 +- internal/master/segment_assigner.go | 2 +- internal/proxy/proxy.go | 9 +- internal/proxy/proxy_test.go | 6 +- internal/proxy/repack_func.go | 25 +- internal/proxy/timetick.go | 4 + internal/writenode/data_sync_service_test.go | 2 +- 11 files changed, 336 insertions(+), 164 deletions(-) diff --git a/internal/allocator/allocator.go b/internal/allocator/allocator.go index fe4870fc3a..9cd7e935d7 100644 --- a/internal/allocator/allocator.go +++ b/internal/allocator/allocator.go @@ -58,6 +58,7 @@ type segRequest struct { partition string segInfo map[UniqueID]uint32 channelID int32 + timestamp Timestamp } type syncRequest struct { @@ -121,16 +122,18 @@ type Allocator struct { masterClient masterpb.MasterClient countPerRPC uint32 - toDoReqs []request - - syncReqs []request + toDoReqs []request + canDoReqs []request + syncReqs []request tChan tickerChan forceSyncChan chan request - syncFunc func() + syncFunc func() bool processFunc func(req request) error - checkFunc func(timeout bool) bool + + checkSyncFunc func(timeout bool) bool + pickCanDoFunc func() } func (ta *Allocator) Start() error { @@ -145,7 +148,6 @@ func (ta *Allocator) Start() error { } func (ta *Allocator) connectMaster() error { - log.Printf("Connected to master, master_addr=%s", ta.masterAddress) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() conn, err := grpc.DialContext(ctx, ta.masterAddress, grpc.WithInsecure(), grpc.WithBlock()) @@ -182,7 +184,13 @@ func (ta *Allocator) mainLoop() { ta.finishSyncRequest() case <-ta.tChan.Chan(): - ta.sync(true) + ta.pickCanDo() + ta.finishRequest() + if ta.sync(true) { + ta.pickCanDo() + ta.finishRequest() + } + ta.failRemainRequest() case first := <-ta.reqs: ta.toDoReqs = append(ta.toDoReqs, first) @@ -190,9 +198,13 @@ func (ta *Allocator) mainLoop() { for i := 0; i < pending; i++ { ta.toDoReqs = append(ta.toDoReqs, <-ta.reqs) } - ta.sync(false) - + ta.pickCanDo() ta.finishRequest() + if ta.sync(false) { + ta.pickCanDo() + ta.finishRequest() + } + ta.failRemainRequest() case <-loopCtx.Done(): return @@ -201,19 +213,32 @@ func (ta *Allocator) mainLoop() { } } -func (ta *Allocator) sync(timeout bool) { - if ta.syncFunc == nil { +func (ta *Allocator) pickCanDo() { + if ta.pickCanDoFunc == nil { return } - if ta.checkFunc == nil || !ta.checkFunc(timeout) { - return + ta.pickCanDoFunc() +} + +func (ta *Allocator) sync(timeout bool) bool { + if ta.syncFunc == nil || ta.checkSyncFunc == nil { + ta.canDoReqs = ta.toDoReqs + ta.toDoReqs = ta.toDoReqs[0:0] + return true + } + if !timeout && len(ta.toDoReqs) == 0 { + return false + } + if !ta.checkSyncFunc(timeout) { + return false } - ta.syncFunc() + ret := ta.syncFunc() if !timeout { ta.tChan.Reset() } + return ret } func (ta *Allocator) finishSyncRequest() { @@ -225,14 +250,23 @@ func (ta *Allocator) finishSyncRequest() { ta.syncReqs = ta.syncReqs[0:0] } -func (ta *Allocator) finishRequest() { +func (ta *Allocator) failRemainRequest() { for _, req := range ta.toDoReqs { + if req != nil { + req.Notify(errors.New("failed: unexpected error")) + } + } + ta.toDoReqs = []request{} +} + +func (ta *Allocator) finishRequest() { + for _, req := range ta.canDoReqs { if req != nil { err := ta.processFunc(req) req.Notify(err) } } - ta.toDoReqs = ta.toDoReqs[0:0] + ta.canDoReqs = []request{} } func (ta *Allocator) revokeRequest(err error) { diff --git a/internal/allocator/id.go b/internal/allocator/id.go index fc1d7c06b5..65890d92c8 100644 --- a/internal/allocator/id.go +++ b/internal/allocator/id.go @@ -21,6 +21,8 @@ type IDAllocator struct { idStart UniqueID idEnd UniqueID + + PeerID UniqueID } func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error) { @@ -37,16 +39,17 @@ func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error a.tChan = &emptyTicker{} a.Allocator.syncFunc = a.syncID a.Allocator.processFunc = a.processFunc - a.Allocator.checkFunc = a.checkFunc + a.Allocator.checkSyncFunc = a.checkSyncFunc + a.Allocator.pickCanDoFunc = a.pickCanDoFunc a.init() return a, nil } -func (ia *IDAllocator) syncID() { +func (ia *IDAllocator) syncID() bool { fmt.Println("syncID") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) req := &internalpb.IDRequest{ - PeerID: 1, + PeerID: ia.PeerID, Role: internalpb.PeerRole_Proxy, Count: ia.countPerRPC, } @@ -55,22 +58,32 @@ func (ia *IDAllocator) syncID() { cancel() if err != nil { log.Println("syncID Failed!!!!!") - return + return false } ia.idStart = resp.GetID() ia.idEnd = ia.idStart + int64(resp.GetCount()) + return true } -func (ia *IDAllocator) checkFunc(timeout bool) bool { - if timeout { - return timeout - } +func (ia *IDAllocator) checkSyncFunc(timeout bool) bool { + return timeout || len(ia.toDoReqs) > 0 +} + +func (ia *IDAllocator) pickCanDoFunc() { + total := uint32(ia.idEnd - ia.idStart) need := uint32(0) + idx := 0 for _, req := range ia.toDoReqs { iReq := req.(*idRequest) need += iReq.count + if need <= total { + ia.canDoReqs = append(ia.canDoReqs, req) + idx++ + } else { + break + } } - return ia.idStart+int64(need) >= ia.idEnd + ia.toDoReqs = ia.toDoReqs[idx:] } func (ia *IDAllocator) processFunc(req request) error { diff --git a/internal/allocator/segment.go b/internal/allocator/segment.go index 2209747b45..7d1b535947 100644 --- a/internal/allocator/segment.go +++ b/internal/allocator/segment.go @@ -5,10 +5,9 @@ import ( "context" "fmt" "log" - "sort" "time" - "github.com/cznic/mathutil" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" @@ -16,42 +15,101 @@ import ( const ( SegCountPerRPC = 20000 - ActiveTimeDuration = 100 // Second + ActiveTimeDuration = 100 //second ) +type segInfo struct { + segID UniqueID + count uint32 + expireTime Timestamp +} + type assignInfo struct { collName string partitionTag string channelID int32 - segInfo map[UniqueID]uint32 // segmentID->count map - expireTime time.Time + segID UniqueID + segInfos *list.List + segCapacity uint32 lastInsertTime time.Time } -func (info *assignInfo) IsExpired(now time.Time) bool { - return now.Sub(info.expireTime) >= 0 +func (info *segInfo) IsExpired(ts Timestamp) bool { + return ts > info.expireTime || info.count <= 0 +} + +func (info *segInfo) Capacity(ts Timestamp) uint32 { + if info.IsExpired(ts) { + return 0 + } + return info.count +} + +func (info *segInfo) Assign(ts Timestamp, count uint32) uint32 { + if info.IsExpired(ts) { + return 0 + } + ret := uint32(0) + if info.count >= count { + info.count -= count + ret = count + } else { + info.count = 0 + ret = info.count + } + return ret +} + +func (info *assignInfo) RemoveExpired(ts Timestamp) { + for e := info.segInfos.Front(); e != nil; e = e.Next() { + segInfo := e.Value.(*segInfo) + if segInfo.IsExpired(ts) { + info.segInfos.Remove(e) + } + } +} + +func (info *assignInfo) Capacity(ts Timestamp) uint32 { + ret := uint32(0) + for e := info.segInfos.Front(); e != nil; e = e.Next() { + segInfo := e.Value.(*segInfo) + ret += segInfo.Capacity(ts) + } + return ret +} + +func (info *assignInfo) Assign(ts Timestamp, count uint32) (map[UniqueID]uint32, error) { + capacity := info.Capacity(ts) + if capacity < count { + errMsg := fmt.Sprintf("AssignSegment Failed: capacity:%d is less than count:%d", capacity, count) + return nil, errors.New(errMsg) + } + + result := make(map[UniqueID]uint32) + for e := info.segInfos.Front(); e != nil && count != 0; e = e.Next() { + segInfo := e.Value.(*segInfo) + cur := segInfo.Assign(ts, count) + count -= cur + if cur > 0 { + result[segInfo.segID] += cur + } + } + return result, nil } func (info *assignInfo) IsActive(now time.Time) bool { return now.Sub(info.lastInsertTime) <= ActiveTimeDuration*time.Second } -func (info *assignInfo) IsEnough(count uint32) bool { - total := uint32(0) - for _, count := range info.segInfo { - total += count - } - return total >= count -} - type SegIDAssigner struct { Allocator assignInfos map[string]*list.List // collectionName -> *list.List segReqs []*internalpb.SegIDRequest - canDoReqs []request + getTickFunc func() Timestamp + PeerID UniqueID } -func NewSegIDAssigner(ctx context.Context, masterAddr string) (*SegIDAssigner, error) { +func NewSegIDAssigner(ctx context.Context, masterAddr string, getTickFunc func() Timestamp) (*SegIDAssigner, error) { ctx1, cancel := context.WithCancel(ctx) sa := &SegIDAssigner{ Allocator: Allocator{reqs: make(chan request, maxConcurrentRequests), @@ -61,72 +119,80 @@ func NewSegIDAssigner(ctx context.Context, masterAddr string) (*SegIDAssigner, e countPerRPC: SegCountPerRPC, }, assignInfos: make(map[string]*list.List), + getTickFunc: getTickFunc, } sa.tChan = &ticker{ updateInterval: time.Second, } sa.Allocator.syncFunc = sa.syncSegments sa.Allocator.processFunc = sa.processFunc - sa.Allocator.checkFunc = sa.checkFunc + sa.Allocator.checkSyncFunc = sa.checkSyncFunc + sa.Allocator.pickCanDoFunc = sa.pickCanDoFunc return sa, nil } func (sa *SegIDAssigner) collectExpired() { - now := time.Now() + ts := sa.getTickFunc() + //now := time.Now() for _, info := range sa.assignInfos { for e := info.Front(); e != nil; e = e.Next() { assign := e.Value.(*assignInfo) - if !assign.IsActive(now) || !assign.IsExpired(now) { - continue + assign.RemoveExpired(ts) + if assign.Capacity(ts) == 0 { + info.Remove(e) + //if assign.IsActive(now) { + // sa.segReqs = append(sa.segReqs, &internalpb.SegIDRequest{ + // ChannelID: assign.channelID, + // Count: 0, // intend to set zero + // CollName: assign.collName, + // PartitionTag: assign.partitionTag, + // }) + //} else { + // info.Remove(e) + //} } - sa.segReqs = append(sa.segReqs, &internalpb.SegIDRequest{ - ChannelID: assign.channelID, - Count: sa.countPerRPC, - CollName: assign.collName, - PartitionTag: assign.partitionTag, - }) } } } -func (sa *SegIDAssigner) checkToDoReqs() { +func (sa *SegIDAssigner) pickCanDoFunc() { if sa.toDoReqs == nil { return } - now := time.Now() + records := make(map[string]map[string]map[int32]uint32) + newTodoReqs := sa.toDoReqs[0:0] for _, req := range sa.toDoReqs { segRequest := req.(*segRequest) + colName := segRequest.colName + partition := segRequest.partition + channelID := segRequest.channelID + + if _, ok := records[colName]; !ok { + records[colName] = make(map[string]map[int32]uint32) + } + if _, ok := records[colName][partition]; !ok { + records[colName][partition] = make(map[int32]uint32) + } + + if _, ok := records[colName][partition][channelID]; !ok { + records[colName][partition][channelID] = 0 + } + + records[colName][partition][channelID] += segRequest.count assign := sa.getAssign(segRequest.colName, segRequest.partition, segRequest.channelID) - if assign == nil || assign.IsExpired(now) || !assign.IsEnough(segRequest.count) { + if assign == nil || assign.Capacity(segRequest.timestamp) < records[colName][partition][channelID] { sa.segReqs = append(sa.segReqs, &internalpb.SegIDRequest{ ChannelID: segRequest.channelID, Count: segRequest.count, CollName: segRequest.colName, PartitionTag: segRequest.partition, }) + newTodoReqs = append(newTodoReqs, req) + } else { + sa.canDoReqs = append(sa.canDoReqs, req) } } -} - -func (sa *SegIDAssigner) removeSegInfo(colName, partition string, channelID int32) { - assignInfos, ok := sa.assignInfos[colName] - if !ok { - return - } - - cnt := assignInfos.Len() - if cnt == 0 { - return - } - - for e := assignInfos.Front(); e != nil; e = e.Next() { - assign := e.Value.(*assignInfo) - if assign.partitionTag != partition || assign.channelID != channelID { - continue - } - assignInfos.Remove(e) - } - + sa.toDoReqs = newTodoReqs } func (sa *SegIDAssigner) getAssign(colName, partition string, channelID int32) *assignInfo { @@ -145,72 +211,109 @@ func (sa *SegIDAssigner) getAssign(colName, partition string, channelID int32) * return nil } -func (sa *SegIDAssigner) checkFunc(timeout bool) bool { - if timeout { - sa.collectExpired() - } else { - sa.checkToDoReqs() - } - - return len(sa.segReqs) != 0 +func (sa *SegIDAssigner) checkSyncFunc(timeout bool) bool { + sa.collectExpired() + return timeout || len(sa.segReqs) != 0 } -func (sa *SegIDAssigner) syncSegments() { +func (sa *SegIDAssigner) checkSegReqEqual(req1, req2 *internalpb.SegIDRequest) bool { + if req1 == nil || req2 == nil { + return false + } + + if req1 == req2 { + return true + } + return req1.CollName == req2.CollName && req1.PartitionTag == req2.PartitionTag && req1.ChannelID == req2.ChannelID +} + +func (sa *SegIDAssigner) reduceSegReqs() { + if len(sa.segReqs) == 0 { return } + var newSegReqs []*internalpb.SegIDRequest + for _, req1 := range sa.segReqs { + var req2 *internalpb.SegIDRequest + for _, req3 := range newSegReqs { + if sa.checkSegReqEqual(req1, req3) { + req2 = req3 + break + } + } + if req2 == nil { // not found + newSegReqs = append(newSegReqs, req1) + } else { + req2.Count += req1.Count + } + } + + for _, req := range newSegReqs { + if req.Count == 0 { + req.Count = sa.countPerRPC + } + } + sa.segReqs = newSegReqs +} + +func (sa *SegIDAssigner) syncSegments() bool { + if len(sa.segReqs) == 0 { + return true + } + sa.reduceSegReqs() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() req := &internalpb.AssignSegIDRequest{ - PeerID: 1, + PeerID: sa.PeerID, Role: internalpb.PeerRole_Proxy, PerChannelReq: sa.segReqs, } - sa.segReqs = sa.segReqs[0:0] - fmt.Println("OOOOO", req.PerChannelReq) + sa.segReqs = []*internalpb.SegIDRequest{} resp, err := sa.masterClient.AssignSegmentID(ctx, req) - //if resp.Status.GetErrorCode() != commonpb.ErrorCode_SUCCESS { - // log.Println("GRPC AssignSegmentID Failed", resp, err) - // return - //} - - now := time.Now() - expiredTime := now.Add(time.Millisecond * time.Duration(1000)) - for _, info := range resp.PerChannelAssignment { - sa.removeSegInfo(info.CollName, info.PartitionTag, info.ChannelID) + if err != nil { + log.Println("GRPC AssignSegmentID Failed", resp, err) + return false } + now := time.Now() + success := false for _, info := range resp.PerChannelAssignment { + if info.Status.GetErrorCode() != commonpb.ErrorCode_SUCCESS { + log.Println("SyncSegment Error:", info.Status.Reason) + continue + } assign := sa.getAssign(info.CollName, info.PartitionTag, info.ChannelID) + segInfo := &segInfo{ + segID: info.SegID, + count: info.Count, + expireTime: info.ExpireTime, + } if assign == nil { colInfos, ok := sa.assignInfos[info.CollName] if !ok { colInfos = list.New() } - segInfo := make(map[UniqueID]uint32) - segInfo[info.SegID] = info.Count - newAssign := &assignInfo{ + segInfos := list.New() + + segInfos.PushBack(segInfo) + assign = &assignInfo{ collName: info.CollName, partitionTag: info.PartitionTag, channelID: info.ChannelID, - segInfo: segInfo, + segInfos: segInfos, } - colInfos.PushBack(newAssign) + colInfos.PushBack(assign) sa.assignInfos[info.CollName] = colInfos } else { - assign.segInfo[info.SegID] = info.Count - assign.expireTime = expiredTime - assign.lastInsertTime = now + assign.segInfos.PushBack(segInfo) } + assign.lastInsertTime = now + success = true } - - if err != nil { - log.Println("syncSemgnet Failed!!!!!") - return - } + return success } func (sa *SegIDAssigner) processFunc(req request) error { @@ -219,43 +322,19 @@ func (sa *SegIDAssigner) processFunc(req request) error { if assign == nil { return errors.New("Failed to GetSegmentID") } - - keys := make([]UniqueID, len(assign.segInfo)) - i := 0 - for key := range assign.segInfo { - keys[i] = key - i++ - } - reqCount := segRequest.count - - resultSegInfo := make(map[UniqueID]uint32) - sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) - for _, key := range keys { - if reqCount <= 0 { - break - } - cur := assign.segInfo[key] - minCnt := mathutil.MinUint32(cur, reqCount) - resultSegInfo[key] = minCnt - cur -= minCnt - reqCount -= minCnt - if cur <= 0 { - delete(assign.segInfo, key) - } else { - assign.segInfo[key] = cur - } - } - segRequest.segInfo = resultSegInfo - return nil + result, err := assign.Assign(segRequest.timestamp, segRequest.count) + segRequest.segInfo = result + return err } -func (sa *SegIDAssigner) GetSegmentID(colName, partition string, channelID int32, count uint32) (map[UniqueID]uint32, error) { +func (sa *SegIDAssigner) GetSegmentID(colName, partition string, channelID int32, count uint32, ts Timestamp) (map[UniqueID]uint32, error) { req := &segRequest{ baseRequest: baseRequest{done: make(chan error), valid: false}, colName: colName, partition: partition, channelID: channelID, count: count, + timestamp: ts, } sa.reqs <- req req.Wait() diff --git a/internal/allocator/timestamp.go b/internal/allocator/timestamp.go index 035fc27ebc..8f69a0dfd4 100644 --- a/internal/allocator/timestamp.go +++ b/internal/allocator/timestamp.go @@ -19,6 +19,7 @@ type TimestampAllocator struct { Allocator lastTsBegin Timestamp lastTsEnd Timestamp + PeerID UniqueID } func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAllocator, error) { @@ -36,26 +37,36 @@ func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAl } a.Allocator.syncFunc = a.syncTs a.Allocator.processFunc = a.processFunc - a.Allocator.checkFunc = a.checkFunc + a.Allocator.checkSyncFunc = a.checkSyncFunc + a.Allocator.pickCanDoFunc = a.pickCanDoFunc return a, nil } -func (ta *TimestampAllocator) checkFunc(timeout bool) bool { - if timeout { - return true - } - need := uint32(0) - for _, req := range ta.toDoReqs { - iReq := req.(*tsoRequest) - need += iReq.count - } - return ta.lastTsBegin+Timestamp(need) >= ta.lastTsEnd +func (ta *TimestampAllocator) checkSyncFunc(timeout bool) bool { + return timeout || len(ta.toDoReqs) > 0 } -func (ta *TimestampAllocator) syncTs() { +func (ta *TimestampAllocator) pickCanDoFunc() { + total := uint32(ta.lastTsEnd - ta.lastTsBegin) + need := uint32(0) + idx := 0 + for _, req := range ta.toDoReqs { + tReq := req.(*tsoRequest) + need += tReq.count + if need <= total { + ta.canDoReqs = append(ta.canDoReqs, req) + idx++ + } else { + break + } + } + ta.toDoReqs = ta.toDoReqs[idx:] +} + +func (ta *TimestampAllocator) syncTs() bool { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) req := &internalpb.TsoRequest{ - PeerID: 1, + PeerID: ta.PeerID, Role: internalpb.PeerRole_Proxy, Count: ta.countPerRPC, } @@ -64,10 +75,11 @@ func (ta *TimestampAllocator) syncTs() { cancel() if err != nil { log.Println("syncTimestamp Failed!!!!!") - return + return false } ta.lastTsBegin = resp.GetTimestamp() ta.lastTsEnd = ta.lastTsBegin + uint64(resp.GetCount()) + return true } func (ta *TimestampAllocator) processFunc(req request) error { diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 216583a005..89e8267968 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -377,7 +377,7 @@ func (mt *metaTable) DeletePartition(collID UniqueID, tag string) error { for _, s := range collMeta.SegmentIDs { sm, ok := mt.segID2Meta[s] if !ok { - return errors.Errorf("can't find segment id = %d", s) + return errors.Errorf("DeletePartition:can't find segment id = %d", s) } if sm.PartitionTag != tag { seg = append(seg, s) @@ -444,7 +444,7 @@ func (mt *metaTable) GetSegmentByID(segID UniqueID) (*pb.SegmentMeta, error) { sm, ok := mt.segID2Meta[segID] if !ok { - return nil, errors.Errorf("can't find segment id = %d", segID) + return nil, errors.Errorf("GetSegmentByID:can't find segment id = %d", segID) } return &sm, nil } @@ -455,7 +455,7 @@ func (mt *metaTable) DeleteSegment(segID UniqueID) error { segMeta, ok := mt.segID2Meta[segID] if !ok { - return errors.Errorf("can't find segment. id = " + strconv.FormatInt(segID, 10)) + return errors.Errorf("DeleteSegment:can't find segment. id = " + strconv.FormatInt(segID, 10)) } collMeta, ok := mt.collID2Meta[segMeta.CollectionID] @@ -483,7 +483,7 @@ func (mt *metaTable) CloseSegment(segID UniqueID, closeTs Timestamp) error { segMeta, ok := mt.segID2Meta[segID] if !ok { - return errors.Errorf("can't find segment id = " + strconv.FormatInt(segID, 10)) + return errors.Errorf("CloseSegment:can't find segment id = " + strconv.FormatInt(segID, 10)) } segMeta.CloseTime = closeTs diff --git a/internal/master/segment_assigner.go b/internal/master/segment_assigner.go index 6c9fd4c907..3585392152 100644 --- a/internal/master/segment_assigner.go +++ b/internal/master/segment_assigner.go @@ -98,7 +98,7 @@ func (assigner *SegmentAssigner) Assign(segmentID UniqueID, numRows int) (*Assig return res, err } physicalTs, logicalTs := tsoutil.ParseTS(ts) - expirePhysicalTs := physicalTs.Add(time.Duration(assigner.segmentExpireDuration)) + expirePhysicalTs := physicalTs.Add(time.Duration(assigner.segmentExpireDuration) * time.Millisecond) expireTs := tsoutil.ComposeTS(expirePhysicalTs.UnixNano()/int64(time.Millisecond), int64(logicalTs)) status.lastExpireTime = expireTs status.assignments = append(status.assignments, &Assignment{ diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 02995dbffc..f4232bc822 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -69,18 +69,21 @@ func CreateProxy(ctx context.Context) (*Proxy, error) { return nil, err } p.idAllocator = idAllocator + p.idAllocator.PeerID = Params.ProxyID() tsoAllocator, err := allocator.NewTimestampAllocator(p.proxyLoopCtx, masterAddr) if err != nil { return nil, err } p.tsoAllocator = tsoAllocator + p.tsoAllocator.PeerID = Params.ProxyID() - segAssigner, err := allocator.NewSegIDAssigner(p.proxyLoopCtx, masterAddr) + segAssigner, err := allocator.NewSegIDAssigner(p.proxyLoopCtx, masterAddr, p.lastTick) if err != nil { panic(err) } p.segAssigner = segAssigner + p.segAssigner.PeerID = Params.ProxyID() p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamInsertBufSize()) p.manipulationMsgStream.SetPulsarClient(pulsarAddress) @@ -105,6 +108,10 @@ func (p *Proxy) AddStartCallback(callbacks ...func()) { p.startCallbacks = append(p.startCallbacks, callbacks...) } +func (p *Proxy) lastTick() Timestamp { + return p.tick.LastTick() +} + func (p *Proxy) startProxy() error { err := p.connectMaster() if err != nil { diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 01e8e74fd1..29c32f94e9 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -10,6 +10,9 @@ import ( "strings" "sync" "testing" + "time" + + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" @@ -399,8 +402,9 @@ func TestProxy_AssignSegID(t *testing.T) { collectionName := "CreateCollection1" createCollection(t, collectionName) testNum := 1 + futureTS := tsoutil.ComposeTS(time.Now().Add(time.Second*-1000).UnixNano()/int64(time.Millisecond), 0) for i := 0; i < testNum; i++ { - segID, err := proxyServer.segAssigner.GetSegmentID(collectionName, Params.defaultPartitionTag(), int32(i), 200000) + segID, err := proxyServer.segAssigner.GetSegmentID(collectionName, Params.defaultPartitionTag(), int32(i), 200000, futureTS) assert.Nil(t, err) fmt.Println("segID", segID) } diff --git a/internal/proxy/repack_func.go b/internal/proxy/repack_func.go index 83a44e5b27..44139999e0 100644 --- a/internal/proxy/repack_func.go +++ b/internal/proxy/repack_func.go @@ -4,6 +4,8 @@ import ( "log" "sort" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -18,7 +20,8 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, result := make(map[int32]*msgstream.MsgPack) - channelCountMap := make(map[UniqueID]map[int32]uint32) // reqID --> channelID to count + channelCountMap := make(map[UniqueID]map[int32]uint32) // reqID --> channelID to count + channelMaxTSMap := make(map[UniqueID]map[int32]Timestamp) // reqID --> channelID to max Timestamp reqSchemaMap := make(map[UniqueID][]string) for i, request := range tsMsgs { @@ -45,12 +48,23 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, channelCountMap[reqID] = make(map[int32]uint32) } + if _, ok := channelMaxTSMap[reqID]; !ok { + channelMaxTSMap[reqID] = make(map[int32]Timestamp) + } + if _, ok := reqSchemaMap[reqID]; !ok { reqSchemaMap[reqID] = []string{insertRequest.CollectionName, insertRequest.PartitionTag} } - for _, channelID := range keys { + for idx, channelID := range keys { channelCountMap[reqID][channelID]++ + if _, ok := channelMaxTSMap[reqID][channelID]; !ok { + channelMaxTSMap[reqID][channelID] = typeutil.ZeroTimestamp + } + ts := insertRequest.Timestamps[idx] + if channelMaxTSMap[reqID][channelID] < ts { + channelMaxTSMap[reqID][channelID] = ts + } } } @@ -64,7 +78,12 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, schema := reqSchemaMap[reqID] collName, partitionTag := schema[0], schema[1] for channelID, count := range countInfo { - mapInfo, err := segIDAssigner.GetSegmentID(collName, partitionTag, channelID, count) + ts, ok := channelMaxTSMap[reqID][channelID] + if !ok { + ts = typeutil.ZeroTimestamp + log.Println("Warning: did not get max Timstamp!") + } + mapInfo, err := segIDAssigner.GetSegmentID(collName, partitionTag, channelID, count, ts) if err != nil { return nil, err } diff --git a/internal/proxy/timetick.go b/internal/proxy/timetick.go index ffbfb50771..34b79ec26c 100644 --- a/internal/proxy/timetick.go +++ b/internal/proxy/timetick.go @@ -104,6 +104,10 @@ func (tt *timeTick) tickLoop() { } } +func (tt *timeTick) LastTick() Timestamp { + return tt.lastTick +} + func (tt *timeTick) Start() error { tt.lastTick = 0 ts, err := tt.tsoAllocator.AllocOne() diff --git a/internal/writenode/data_sync_service_test.go b/internal/writenode/data_sync_service_test.go index 13bb6513c5..92d9e29c09 100644 --- a/internal/writenode/data_sync_service_test.go +++ b/internal/writenode/data_sync_service_test.go @@ -24,7 +24,7 @@ import ( // NOTE: start pulsar before test func TestDataSyncService_Start(t *testing.T) { newMeta() - const ctxTimeInMillisecond = 200 + const ctxTimeInMillisecond = 2000 const closeWithDeadline = true var ctx context.Context