Simplify dd buffer & add lock (#5513)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/5779/head
congqixia 2021-05-31 18:18:32 +08:00 committed by zhenshan.cao
parent 6ea11f67cb
commit 31c4a4f5ed
1 changed files with 38 additions and 76 deletions

View File

@ -56,7 +56,9 @@ type ddData struct {
}
type ddBuffer struct {
ddData map[UniqueID]*ddData // collection ID
// ddData map[UniqueID]*ddData // collection ID
ddData ddData
sync.Mutex
}
type ddRecords struct {
@ -64,15 +66,26 @@ type ddRecords struct {
partitionRecords map[UniqueID]interface{}
}
func (d *ddBuffer) size(collectionID UniqueID) int {
if d.ddData == nil || len(d.ddData) <= 0 {
return 0
func (d *ddBuffer) getData() ddData {
d.Lock()
defer d.Unlock()
r := ddData{
ddRequestString: d.ddData.ddRequestString,
timestamps: d.ddData.timestamps,
eventTypes: d.ddData.eventTypes,
}
d.ddData.ddRequestString = make([]string, 0, 10)
d.ddData.timestamps = make([]Timestamp, 0, 10)
d.ddData.eventTypes = make([]storage.EventTypeCode, 0, 10)
return r
}
if data, ok := d.ddData[collectionID]; ok {
return len(data.ddRequestString)
}
return 0
func (d *ddBuffer) append(request string, timestamp Timestamp, eventType storage.EventTypeCode) {
d.Lock()
defer d.Unlock()
d.ddData.ddRequestString = append(d.ddData.ddRequestString, request)
d.ddData.timestamps = append(d.ddData.timestamps, timestamp)
d.ddData.eventTypes = append(d.ddData.eventTypes, eventType)
}
func (ddNode *ddNode) Name() string {
@ -164,16 +177,16 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
seg, _ := ddNode.replica.getSegmentByID(segID)
collID := seg.collectionID
if ddNode.ddBuffer.size(collID) > 0 {
log.Debug(".. ddl buffer not empty, flushing ...")
ddNode.flushMap.Store(collID, ddNode.ddBuffer.ddData[collID])
delete(ddNode.ddBuffer.ddData, collID)
buf := ddNode.ddBuffer.getData()
if len(buf.ddRequestString) == 0 {
fmsg.ddlFlushedCh <- []*datapb.DDLBinlogMeta{}
} else {
ddNode.flushMap.Store(collID, &buf)
log.Debug(".. ddl buffer not empty, flushing ...")
binlogMetaCh := make(chan *datapb.DDLBinlogMeta)
go flush(collID, ddNode.flushMap, ddNode.kv, ddNode.idAllocator, binlogMetaCh)
go ddNode.flushComplete(binlogMetaCh, collID, fmsg.ddlFlushedCh)
} else {
fmsg.ddlFlushedCh <- make([]*datapb.DDLBinlogMeta, 0)
}
log.Debug(".. notifying insertbuffer ...")
@ -321,18 +334,7 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
timestamp: msg.Base.Timestamp,
})
_, ok := ddNode.ddBuffer.ddData[collectionID]
if !ok {
ddNode.ddBuffer.ddData[collectionID] = &ddData{
ddRequestString: make([]string, 0),
timestamps: make([]Timestamp, 0),
eventTypes: make([]storage.EventTypeCode, 0),
}
}
ddNode.ddBuffer.ddData[collectionID].ddRequestString = append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.CreateCollectionRequest.String())
ddNode.ddBuffer.ddData[collectionID].timestamps = append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Base.Timestamp)
ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.CreateCollectionEventType)
ddNode.ddBuffer.append(msg.CreateCollectionRequest.String(), msg.Base.Timestamp, storage.CreateCollectionEventType)
}
/*
@ -358,19 +360,7 @@ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
timestamp: msg.Base.Timestamp,
})
_, ok := ddNode.ddBuffer.ddData[collectionID]
if !ok {
ddNode.ddBuffer.ddData[collectionID] = &ddData{
ddRequestString: make([]string, 0),
timestamps: make([]Timestamp, 0),
eventTypes: make([]storage.EventTypeCode, 0),
}
}
ddNode.ddBuffer.ddData[collectionID].ddRequestString = append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.DropCollectionRequest.String())
ddNode.ddBuffer.ddData[collectionID].timestamps = append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Base.Timestamp)
ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropCollectionEventType)
ddNode.ddBuffer.append(msg.DropCollectionRequest.String(), msg.Base.Timestamp, storage.DropCollectionEventType)
ddNode.ddMsg.gcRecord.collections = append(ddNode.ddMsg.gcRecord.collections, collectionID)
}
@ -380,7 +370,7 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
defer sp.Finish()
partitionID := msg.PartitionID
collectionID := msg.CollectionID
// collectionID := msg.CollectionID
// add partition
if _, ok := ddNode.ddRecords.partitionRecords[partitionID]; ok {
@ -395,23 +385,7 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
timestamp: msg.Base.Timestamp,
})
_, ok := ddNode.ddBuffer.ddData[collectionID]
if !ok {
ddNode.ddBuffer.ddData[collectionID] = &ddData{
ddRequestString: make([]string, 0),
timestamps: make([]Timestamp, 0),
eventTypes: make([]storage.EventTypeCode, 0),
}
}
ddNode.ddBuffer.ddData[collectionID].ddRequestString =
append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.CreatePartitionRequest.String())
ddNode.ddBuffer.ddData[collectionID].timestamps =
append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Base.Timestamp)
ddNode.ddBuffer.ddData[collectionID].eventTypes =
append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.CreatePartitionEventType)
ddNode.ddBuffer.append(msg.CreatePartitionRequest.String(), msg.Base.Timestamp, storage.CreatePartitionEventType)
}
func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
@ -419,7 +393,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
msg.SetTraceCtx(ctx)
defer sp.Finish()
partitionID := msg.PartitionID
collectionID := msg.CollectionID
// collectionID := msg.CollectionID
// remove partition
if _, ok := ddNode.ddRecords.partitionRecords[partitionID]; !ok {
@ -436,23 +410,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
timestamp: msg.Base.Timestamp,
})
_, ok := ddNode.ddBuffer.ddData[collectionID]
if !ok {
ddNode.ddBuffer.ddData[collectionID] = &ddData{
ddRequestString: make([]string, 0),
timestamps: make([]Timestamp, 0),
eventTypes: make([]storage.EventTypeCode, 0),
}
}
ddNode.ddBuffer.ddData[collectionID].ddRequestString =
append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.DropPartitionRequest.String())
ddNode.ddBuffer.ddData[collectionID].timestamps =
append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Base.Timestamp)
ddNode.ddBuffer.ddData[collectionID].eventTypes =
append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropPartitionEventType)
ddNode.ddBuffer.append(msg.DropPartitionRequest.String(), msg.Base.Timestamp, storage.DropPartitionEventType)
}
func newDDNode(ctx context.Context, inFlushCh <-chan *flushMsg,
@ -487,7 +445,11 @@ func newDDNode(ctx context.Context, inFlushCh <-chan *flushMsg,
BaseNode: baseNode,
ddRecords: ddRecords,
ddBuffer: &ddBuffer{
ddData: make(map[UniqueID]*ddData),
ddData: ddData{
ddRequestString: make([]string, 0, 10),
timestamps: make([]Timestamp, 0, 10),
eventTypes: make([]storage.EventTypeCode, 0, 10),
},
},
inFlushCh: inFlushCh,