Refactor segment allocator, use collectionID and partitionID

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/4973/head^2
zhenshan.cao 2021-01-31 10:15:54 +08:00 committed by yefu.chen
parent 854accf95b
commit c2914dd113
4 changed files with 100 additions and 74 deletions

View File

@ -192,7 +192,7 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
return errors.New("nil producer in msg stream")
}
reBucketValues := make([][]int32, len(tsMsgs))
for channelID, tsMsg := range tsMsgs {
for idx, tsMsg := range tsMsgs {
hashValues := tsMsg.HashKeys()
bucketValues := make([]int32, len(hashValues))
for index, hashValue := range hashValues {
@ -203,12 +203,12 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
if channelIDInt >= int64(len(ms.producers)) {
return errors.New("Failed to produce pulsar msg to unKnow channel")
}
bucketValues[index] = int32(channelIDInt)
bucketValues[index] = int32(hashValue % uint32(len(ms.producers)))
continue
}
bucketValues[index] = int32(hashValue % uint32(len(ms.producers)))
}
reBucketValues[channelID] = bucketValues
reBucketValues[idx] = bucketValues
}
var result map[int32]*MsgPack

View File

@ -132,6 +132,22 @@ func (m *InsertChannelsMap) closeInsertMsgStream(collID UniqueID) error {
return nil
}
func (m *InsertChannelsMap) getInsertChannels(collID UniqueID) ([]string, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()
loc, ok := m.collectionID2InsertChannels[collID]
if !ok {
return nil, errors.New("cannot find collection with id: " + strconv.Itoa(int(collID)))
}
if m.droppedBitMap[loc] != 0 {
return nil, errors.New("insert message stream already closed")
}
ret := append([]string(nil), m.insertChannels[loc]...)
return ret, nil
}
func (m *InsertChannelsMap) getInsertMsgStream(collID UniqueID) (msgstream.MsgStream, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()

View File

@ -23,7 +23,8 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
channelCountMap := make(map[UniqueID]map[int32]uint32) // reqID --> channelID to count
channelMaxTSMap := make(map[UniqueID]map[int32]Timestamp) // reqID --> channelID to max Timestamp
reqSchemaMap := make(map[UniqueID][]string)
reqSchemaMap := make(map[UniqueID][]UniqueID) // reqID --> channelID [2]UniqueID {CollectionID, PartitionID}
channelNamesMap := make(map[UniqueID][]string) // collectionID --> channelNames
for i, request := range tsMsgs {
if request.Type() != commonpb.MsgType_kInsert {
@ -54,7 +55,7 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
}
if _, ok := reqSchemaMap[reqID]; !ok {
reqSchemaMap[reqID] = []string{insertRequest.CollectionName, insertRequest.PartitionName}
reqSchemaMap[reqID] = []UniqueID{insertRequest.CollectionID, insertRequest.PartitionID}
}
for idx, channelID := range keys {
@ -68,6 +69,22 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
}
}
collID := insertRequest.CollectionID
if _, ok := channelNamesMap[collID]; !ok {
channelNames, err := globalInsertChannelsMap.getInsertChannels(collID)
if err != nil {
return nil, err
}
channelNamesMap[collID] = channelNames
}
}
var getChannelName = func(collID UniqueID, channelID int32) string {
if _, ok := channelNamesMap[collID]; !ok {
return ""
}
names := channelNamesMap[collID]
return names[channelID]
}
reqSegCountMap := make(map[UniqueID]map[int32]map[UniqueID]uint32)
@ -77,14 +94,18 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
reqSegCountMap[reqID] = make(map[int32]map[UniqueID]uint32)
}
schema := reqSchemaMap[reqID]
collName, partitionTag := schema[0], schema[1]
collID, partitionID := schema[0], schema[1]
for channelID, count := range countInfo {
ts, ok := channelMaxTSMap[reqID][channelID]
if !ok {
ts = typeutil.ZeroTimestamp
log.Println("Warning: did not get max Timstamp!")
}
mapInfo, err := segIDAssigner.GetSegmentID(collName, partitionTag, channelID, count, ts)
channelName := getChannelName(collID, channelID)
if channelName == "" {
return nil, errors.New("ProxyNode, repack_func, can not found channelName")
}
mapInfo, err := segIDAssigner.GetSegmentID(collID, partitionID, channelName, count, ts)
if err != nil {
return nil, err
}
@ -156,7 +177,9 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
keys := hashKeys[i]
reqID := insertRequest.Base.MsgID
collectionName := insertRequest.CollectionName
partitionTag := insertRequest.PartitionName
collectionID := insertRequest.CollectionID
partitionID := insertRequest.PartitionID
partitionName := insertRequest.PartitionName
proxyID := insertRequest.Base.SourceID
for index, key := range keys {
ts := insertRequest.Timestamps[index]
@ -175,13 +198,16 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
Timestamp: ts,
SourceID: proxyID,
},
CollectionID: collectionID,
PartitionID: partitionID,
CollectionName: collectionName,
PartitionName: partitionTag,
PartitionName: partitionName,
SegmentID: segmentID,
ChannelID: strconv.FormatInt(int64(key), 10),
Timestamps: []uint64{ts},
RowIDs: []int64{rowID},
RowData: []*commonpb.Blob{row},
// todo rename to ChannelName
ChannelID: strconv.FormatInt(int64(key), 10),
Timestamps: []uint64{ts},
RowIDs: []int64{rowID},
RowData: []*commonpb.Blob{row},
}
insertMsg := &msgstream.InsertMsg{
InsertRequest: sliceRequest,

View File

@ -5,7 +5,6 @@ import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
@ -25,14 +24,12 @@ type Allocator = allocator.Allocator
type segRequest struct {
allocator.BaseRequest
count uint32
colName string
partitionName string
collID UniqueID
partitionID UniqueID
segInfo map[UniqueID]uint32
channelID int32
timestamp Timestamp
count uint32
collID UniqueID
partitionID UniqueID
segInfo map[UniqueID]uint32
channelName string
timestamp Timestamp
}
type segInfo struct {
@ -44,9 +41,7 @@ type segInfo struct {
type assignInfo struct {
collID UniqueID
partitionID UniqueID
collName string
partitionName string
channelID int32
channelName string
segID UniqueID
segInfos *list.List
segCapacity uint32
@ -122,7 +117,7 @@ func (info *assignInfo) IsActive(now time.Time) bool {
type SegIDAssigner struct {
Allocator
assignInfos map[string]*list.List // collectionName -> *list.List
assignInfos map[UniqueID]*list.List // collectionID -> *list.List
segReqs []*datapb.SegIDRequest
getTickFunc func() Timestamp
PeerID UniqueID
@ -140,7 +135,7 @@ func NewSegIDAssigner(ctx context.Context, client DataServiceClient, getTickFunc
},
countPerRPC: SegCountPerRPC,
serviceClient: client,
assignInfos: make(map[string]*list.List),
assignInfos: make(map[UniqueID]*list.List),
getTickFunc: getTickFunc,
}
sa.TChan = &allocator.Ticker{
@ -160,7 +155,6 @@ func (sa *SegIDAssigner) SetServiceClient(client DataServiceClient) {
func (sa *SegIDAssigner) collectExpired() {
ts := sa.getTickFunc()
//now := time.Now()
for _, info := range sa.assignInfos {
for e := info.Front(); e != nil; e = e.Next() {
assign := e.Value.(*assignInfo)
@ -176,36 +170,33 @@ func (sa *SegIDAssigner) pickCanDoFunc() {
if sa.ToDoReqs == nil {
return
}
records := make(map[string]map[string]map[int32]uint32)
records := make(map[UniqueID]map[UniqueID]map[string]uint32)
newTodoReqs := sa.ToDoReqs[0:0]
for _, req := range sa.ToDoReqs {
segRequest := req.(*segRequest)
colName := segRequest.colName
partitionName := segRequest.partitionName
channelID := segRequest.channelID
collID := segRequest.collID
partitionID := segRequest.partitionID
channelName := segRequest.channelName
if _, ok := records[colName]; !ok {
records[colName] = make(map[string]map[int32]uint32)
if _, ok := records[collID]; !ok {
records[collID] = make(map[UniqueID]map[string]uint32)
}
if _, ok := records[colName][partitionName]; !ok {
records[colName][partitionName] = make(map[int32]uint32)
if _, ok := records[collID][partitionID]; !ok {
records[collID][partitionID] = make(map[string]uint32)
}
if _, ok := records[colName][partitionName][channelID]; !ok {
records[colName][partitionName][channelID] = 0
if _, ok := records[collID][partitionID][channelName]; !ok {
records[collID][partitionID][channelName] = 0
}
records[colName][partitionName][channelID] += segRequest.count
assign := sa.getAssign(segRequest.colName, segRequest.partitionName, segRequest.channelID)
if assign == nil || assign.Capacity(segRequest.timestamp) < records[colName][partitionName][channelID] {
partitionID, _ := typeutil.Hash32String(segRequest.colName)
records[collID][partitionID][channelName] += segRequest.count
assign := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName)
if assign == nil || assign.Capacity(segRequest.timestamp) < records[collID][partitionID][channelName] {
sa.segReqs = append(sa.segReqs, &datapb.SegIDRequest{
ChannelName: strconv.FormatUint(uint64(segRequest.channelID), 10),
Count: segRequest.count,
CollName: segRequest.colName,
PartitionName: segRequest.partitionName,
CollectionID: 0,
PartitionID: partitionID,
ChannelName: channelName,
Count: segRequest.count,
CollectionID: collID,
PartitionID: partitionID,
})
newTodoReqs = append(newTodoReqs, req)
} else {
@ -215,15 +206,15 @@ func (sa *SegIDAssigner) pickCanDoFunc() {
sa.ToDoReqs = newTodoReqs
}
func (sa *SegIDAssigner) getAssign(colName, partitionName string, channelID int32) *assignInfo {
assignInfos, ok := sa.assignInfos[colName]
func (sa *SegIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channelName string) *assignInfo {
assignInfos, ok := sa.assignInfos[collID]
if !ok {
return nil
}
for e := assignInfos.Front(); e != nil; e = e.Next() {
info := e.Value.(*assignInfo)
if info.partitionName != partitionName || info.channelID != channelID {
if info.partitionID != partitionID || info.channelName != channelName {
continue
}
return info
@ -244,7 +235,7 @@ func (sa *SegIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegIDRequest) bool
if req1 == req2 {
return true
}
return req1.CollName == req2.CollName && req1.PartitionName == req2.PartitionName && req1.ChannelName == req2.ChannelName
return req1.CollectionID == req2.CollectionID && req1.PartitionID == req2.PartitionID && req1.ChannelName == req2.ChannelName
}
func (sa *SegIDAssigner) reduceSegReqs() {
@ -305,19 +296,14 @@ func (sa *SegIDAssigner) syncSegments() bool {
log.Println("SyncSegment Error:", info.Status.Reason)
continue
}
// FIXME: use channelName
channel, err := strconv.Atoi(info.ChannelName)
if err != nil {
return false
}
assign := sa.getAssign(info.CollName, info.PartitionName, int32(channel))
assign := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName)
segInfo := &segInfo{
segID: info.SegID,
count: info.Count,
expireTime: info.ExpireTime,
}
if assign == nil {
colInfos, ok := sa.assignInfos[info.CollName]
colInfos, ok := sa.assignInfos[info.CollectionID]
if !ok {
colInfos = list.New()
}
@ -325,15 +311,13 @@ func (sa *SegIDAssigner) syncSegments() bool {
segInfos.PushBack(segInfo)
assign = &assignInfo{
collID: info.CollectionID,
partitionID: info.PartitionID,
channelID: int32(channel),
segInfos: segInfos,
partitionName: info.PartitionName,
collName: info.CollName,
collID: info.CollectionID,
partitionID: info.PartitionID,
channelName: info.ChannelName,
segInfos: segInfos,
}
colInfos.PushBack(assign)
sa.assignInfos[info.CollName] = colInfos
sa.assignInfos[info.CollectionID] = colInfos
} else {
assign.segInfos.PushBack(segInfo)
}
@ -345,7 +329,7 @@ func (sa *SegIDAssigner) syncSegments() bool {
func (sa *SegIDAssigner) processFunc(req allocator.Request) error {
segRequest := req.(*segRequest)
assign := sa.getAssign(segRequest.colName, segRequest.partitionName, segRequest.channelID)
assign := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName)
if assign == nil {
return errors.New("Failed to GetSegmentID")
}
@ -354,14 +338,14 @@ func (sa *SegIDAssigner) processFunc(req allocator.Request) error {
return err
}
func (sa *SegIDAssigner) GetSegmentID(colName, partitionName string, channelID int32, count uint32, ts Timestamp) (map[UniqueID]uint32, error) {
func (sa *SegIDAssigner) GetSegmentID(collID UniqueID, partitionID UniqueID, channelName string, count uint32, ts Timestamp) (map[UniqueID]uint32, error) {
req := &segRequest{
BaseRequest: allocator.BaseRequest{Done: make(chan error), Valid: false},
colName: colName,
partitionName: partitionName,
channelID: channelID,
count: count,
timestamp: ts,
BaseRequest: allocator.BaseRequest{Done: make(chan error), Valid: false},
collID: collID,
partitionID: partitionID,
channelName: channelName,
count: count,
timestamp: ts,
}
sa.Reqs <- req
req.Wait()