mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: longjiquan <jiquan.long@zilliz.com>hotfix-2.2.14
parent
49ef097bc3
commit
bb9064fb94
|
@ -57,7 +57,7 @@ type Channel interface {
|
|||
getCollectionID() UniqueID
|
||||
getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error)
|
||||
getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error)
|
||||
getChannelName(segID UniqueID) string
|
||||
getChannelName() string
|
||||
|
||||
listAllSegmentIDs() []UniqueID
|
||||
listNotFlushedSegmentIDs() []UniqueID
|
||||
|
@ -177,7 +177,7 @@ func (c *ChannelMeta) getCollectionAndPartitionID(segID UniqueID) (collID, parti
|
|||
return 0, 0, fmt.Errorf("cannot find segment, id = %d", segID)
|
||||
}
|
||||
|
||||
func (c *ChannelMeta) getChannelName(segID UniqueID) string {
|
||||
func (c *ChannelMeta) getChannelName() string {
|
||||
return c.channelName
|
||||
}
|
||||
|
||||
|
|
|
@ -477,6 +477,12 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Auto with manual flush", func(t *testing.T) {
|
||||
param := Params.DataNodeCfg.MaxParallelSyncTaskNum
|
||||
Params.DataNodeCfg.MaxParallelSyncTaskNum = 100000
|
||||
defer func() {
|
||||
Params.DataNodeCfg.MaxParallelSyncTaskNum = param
|
||||
}()
|
||||
|
||||
tmp := Params.DataNodeCfg.FlushInsertBufferSize
|
||||
Params.DataNodeCfg.FlushInsertBufferSize = 200
|
||||
defer func() {
|
||||
|
|
|
@ -91,6 +91,7 @@ var _ flushManager = (*rendezvousFlushManager)(nil)
|
|||
type orderFlushQueue struct {
|
||||
sync.Once
|
||||
segmentID UniqueID
|
||||
channel string
|
||||
injectCh chan *taskInjection
|
||||
|
||||
// MsgID => flushTask
|
||||
|
@ -106,9 +107,10 @@ type orderFlushQueue struct {
|
|||
}
|
||||
|
||||
// newOrderFlushQueue creates an orderFlushQueue
|
||||
func newOrderFlushQueue(segID UniqueID, f notifyMetaFunc) *orderFlushQueue {
|
||||
func newOrderFlushQueue(segID UniqueID, channel string, f notifyMetaFunc) *orderFlushQueue {
|
||||
q := &orderFlushQueue{
|
||||
segmentID: segID,
|
||||
channel: channel,
|
||||
notifyFunc: f,
|
||||
injectCh: make(chan *taskInjection, 100),
|
||||
}
|
||||
|
@ -129,6 +131,7 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flush
|
|||
t := actual.(*flushTaskRunner)
|
||||
// not loaded means the task runner is new, do initializtion
|
||||
if !loaded {
|
||||
getOrCreateFlushTaskCounter().increase(q.channel)
|
||||
// take over injection if task queue is handling it
|
||||
q.injectMut.Lock()
|
||||
q.runningTasks++
|
||||
|
@ -150,6 +153,7 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flush
|
|||
func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc) {
|
||||
// delete task from working map
|
||||
q.working.Delete(getSyncTaskID(pack.pos))
|
||||
getOrCreateFlushTaskCounter().decrease(q.channel)
|
||||
// after descreasing working count, check whether flush queue is empty
|
||||
q.injectMut.Lock()
|
||||
q.runningTasks--
|
||||
|
@ -277,7 +281,7 @@ type rendezvousFlushManager struct {
|
|||
|
||||
// getFlushQueue gets or creates an orderFlushQueue for segment id if not found
|
||||
func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQueue {
|
||||
newQueue := newOrderFlushQueue(segmentID, m.notifyFunc)
|
||||
newQueue := newOrderFlushQueue(segmentID, m.getChannelName(), m.notifyFunc)
|
||||
actual, _ := m.dispatcher.LoadOrStore(segmentID, newQueue)
|
||||
// all operation on dispatcher is private, assertion ok guaranteed
|
||||
queue := actual.(*orderFlushQueue)
|
||||
|
@ -342,16 +346,8 @@ func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flush
|
|||
|
||||
// isFull return true if the task pool is full
|
||||
func (m *rendezvousFlushManager) isFull() bool {
|
||||
var num int
|
||||
m.dispatcher.Range(func(_, q any) bool {
|
||||
queue := q.(*orderFlushQueue)
|
||||
queue.working.Range(func(_, _ any) bool {
|
||||
num++
|
||||
return true
|
||||
})
|
||||
return true
|
||||
})
|
||||
return num >= Params.DataNodeCfg.MaxParallelSyncTaskNum
|
||||
return getOrCreateFlushTaskCounter().getOrZero(m.getChannelName()) >=
|
||||
int32(Params.DataNodeCfg.MaxParallelSyncTaskNum)
|
||||
}
|
||||
|
||||
// flushBufferData notifies flush manager insert buffer data.
|
||||
|
|
|
@ -63,7 +63,7 @@ func TestOrderFlushQueue_Execute(t *testing.T) {
|
|||
|
||||
size := 1000
|
||||
finish.Add(size)
|
||||
q := newOrderFlushQueue(1, func(*segmentFlushPack) {
|
||||
q := newOrderFlushQueue(1, "", func(*segmentFlushPack) {
|
||||
counter.Inc()
|
||||
finish.Done()
|
||||
})
|
||||
|
@ -104,7 +104,7 @@ func TestOrderFlushQueue_Order(t *testing.T) {
|
|||
size := 1000
|
||||
finish.Add(size)
|
||||
resultList := make([][]byte, 0, size)
|
||||
q := newOrderFlushQueue(1, func(pack *segmentFlushPack) {
|
||||
q := newOrderFlushQueue(1, "", func(pack *segmentFlushPack) {
|
||||
counter.Inc()
|
||||
resultList = append(resultList, pack.pos.MsgID)
|
||||
finish.Done()
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package datanode
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
type flushTaskCounter struct {
|
||||
inner sync.Map // channel -> counter (*atomic.Int32)
|
||||
}
|
||||
|
||||
func (c *flushTaskCounter) getOrZero(channel string) int32 {
|
||||
counter, exist := c.inner.Load(channel)
|
||||
if !exist {
|
||||
return 0
|
||||
}
|
||||
return counter.(*atomic.Int32).Load()
|
||||
}
|
||||
|
||||
func (c *flushTaskCounter) increaseImpl(channel string, delta int32) {
|
||||
counter, _ := c.inner.LoadOrStore(channel, atomic.NewInt32(0))
|
||||
counter.(*atomic.Int32).Add(delta)
|
||||
}
|
||||
|
||||
func (c *flushTaskCounter) increase(channel string) {
|
||||
c.increaseImpl(channel, 1)
|
||||
}
|
||||
|
||||
func (c *flushTaskCounter) decrease(channel string) {
|
||||
c.increaseImpl(channel, -1)
|
||||
}
|
||||
|
||||
func (c *flushTaskCounter) close() {
|
||||
allChannels := make([]string, 0)
|
||||
c.inner.Range(func(channel any, _ any) bool {
|
||||
allChannels = append(allChannels, channel.(string))
|
||||
return false
|
||||
})
|
||||
for _, channel := range allChannels {
|
||||
c.inner.Delete(channel)
|
||||
}
|
||||
}
|
||||
|
||||
func newFlushTaskCounter() *flushTaskCounter {
|
||||
return &flushTaskCounter{}
|
||||
}
|
||||
|
||||
var (
|
||||
globalFlushTaskCounter *flushTaskCounter
|
||||
flushTaskCounterOnce sync.Once
|
||||
)
|
||||
|
||||
func getOrCreateFlushTaskCounter() *flushTaskCounter {
|
||||
flushTaskCounterOnce.Do(func() {
|
||||
globalFlushTaskCounter = newFlushTaskCounter()
|
||||
})
|
||||
return globalFlushTaskCounter
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package datanode
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_flushTaskCounter_getOrZero(t *testing.T) {
|
||||
c := newFlushTaskCounter()
|
||||
defer c.close()
|
||||
|
||||
assert.Zero(t, c.getOrZero("non-exist"))
|
||||
|
||||
n := 10
|
||||
channel := "channel"
|
||||
assert.Zero(t, c.getOrZero(channel))
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
c.increase(channel)
|
||||
}
|
||||
assert.Equal(t, int32(n), c.getOrZero(channel))
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
c.decrease(channel)
|
||||
}
|
||||
assert.Zero(t, c.getOrZero(channel))
|
||||
}
|
Loading…
Reference in New Issue