Fix upgrade cause panic (#23835)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
2.2.7
Xiaofan 2023-05-01 23:02:37 -07:00 committed by GitHub
parent 204962c0b9
commit 944d665b27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 119 additions and 40 deletions

View File

@ -626,7 +626,10 @@ func (s *Server) watchNodes(revision int64) {
)
s.nodeMgr.Add(session.NewNodeInfo(nodeID, addr))
s.nodeUpEventChan <- nodeID
s.notifyNodeUp <- struct{}{}
select {
case s.notifyNodeUp <- struct{}{}:
default:
}
case sessionutil.SessionUpdateEvent:
nodeID := event.Session.ServerID

View File

@ -20,14 +20,16 @@ import (
"container/heap"
"context"
"fmt"
"strconv"
"strings"
"sync"
"github.com/milvus-io/milvus/internal/metrics"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
type dmlMsgStream struct {
@ -309,3 +311,32 @@ func (d *dmlChannels) removeChannels(names ...string) {
func genChannelName(prefix string, idx int64) string {
return fmt.Sprintf("%s_%d", prefix, idx)
}
func parseChannelNameIndex(channeName string) int {
index := strings.LastIndex(channeName, "_")
if index < 0 {
log.Error("invalid channel name", zap.String("chanName", channeName))
panic("invalid channel name: " + channeName)
}
index, err := strconv.Atoi(channeName[index+1:])
if err != nil {
log.Error("invalid channel name", zap.String("chanName", channeName), zap.Error(err))
panic("invalid channel name: " + channeName)
}
return index
}
func getNeedChanNum(setNum int, chanMap map[typeutil.UniqueID][]string) int {
// find the largest number of current channel usage
maxChanUsed := setNum
for _, chanNames := range chanMap {
for _, chanName := range chanNames {
index := parseChannelNameIndex(chanName)
if maxChanUsed < index+1 {
maxChanUsed = index + 1
}
}
}
return maxChanUsed
}

View File

@ -110,10 +110,10 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp {
func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync {
// if the old channels number used by the user is greater than the set default value currently
// keep the old channels
defaultChanNum := getNeedChanNum(int(Params.RootCoordCfg.DmlChannelNum), chanMap)
channelNum := getNeedChanNum(int(Params.RootCoordCfg.DmlChannelNum), chanMap)
// initialize dml channels used for insert
dmlChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDml, int64(defaultChanNum))
dmlChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDml, int64(channelNum))
// recover physical channels for all collections
for collID, chanNames := range chanMap {

View File

@ -118,3 +118,80 @@ func Test_ttHistogram_get(t *testing.T) {
assert.Equal(t, typeutil.ZeroTimestamp, h.get("ch1"))
assert.Equal(t, typeutil.ZeroTimestamp, h.get("ch2"))
}
func TestTimetickSyncWithExistChannels(t *testing.T) {
ctx := context.Background()
sourceID := int64(100)
factory := dependency.NewDefaultFactory(true)
//chanMap := map[typeutil.UniqueID][]string{
// int64(1): {"rootcoord-dml_0"},
//}
Params.RootCoordCfg.DmlChannelNum = 2
Params.CommonCfg.RootCoordDml = "rootcoord-dml"
Params.CommonCfg.RootCoordDelta = "rootcoord-delta"
chans := map[UniqueID][]string{}
chans[UniqueID(100)] = []string{"rootcoord-dml_4", "rootcoord-dml_8"}
chans[UniqueID(102)] = []string{"rootcoord-dml_2", "rootcoord-dml_9"}
ttSync := newTimeTickSync(ctx, sourceID, factory, chans)
var wg sync.WaitGroup
wg.Add(1)
t.Run("sendToChannel", func(t *testing.T) {
defer wg.Done()
ttSync.sendToChannel()
ttSync.sess2ChanTsMap[1] = nil
ttSync.sendToChannel()
msg := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
},
}
ttSync.sess2ChanTsMap[1] = newChanTsMsg(msg, 1)
ttSync.sendToChannel()
})
wg.Add(1)
t.Run("assign channels", func(t *testing.T) {
defer wg.Done()
channels := ttSync.getDmlChannelNames(int(4))
assert.Equal(t, channels, []string{"rootcoord-dml_0", "rootcoord-dml_1", "rootcoord-dml_3", "rootcoord-dml_5"})
channels = ttSync.getDmlChannelNames(int(4))
assert.Equal(t, channels, []string{"rootcoord-dml_6", "rootcoord-dml_7", "rootcoord-dml_0", "rootcoord-dml_1"})
})
// test get new channels
}
func TestTimetickSyncInvalidName(t *testing.T) {
ctx := context.Background()
sourceID := int64(100)
factory := dependency.NewDefaultFactory(true)
//chanMap := map[typeutil.UniqueID][]string{
// int64(1): {"rootcoord-dml_0"},
//}
Params.RootCoordCfg.DmlChannelNum = 2
Params.CommonCfg.RootCoordDml = "rootcoord-dml"
Params.CommonCfg.RootCoordDelta = "rootcoord-delta"
chans := map[UniqueID][]string{}
chans[UniqueID(100)] = []string{"rootcoord-dml4"}
assert.Panics(t, func() {
newTimeTickSync(ctx, sourceID, factory, chans)
})
chans = map[UniqueID][]string{}
chans[UniqueID(102)] = []string{"rootcoord-dml_a"}
assert.Panics(t, func() {
newTimeTickSync(ctx, sourceID, factory, chans)
})
}

View File

@ -137,16 +137,3 @@ func getTravelTs(req TimeTravelRequest) Timestamp {
func isMaxTs(ts Timestamp) bool {
return ts == typeutil.MaxTimestamp
}
func getNeedChanNum(setNum int, chanMap map[typeutil.UniqueID][]string) int {
chanNames := typeutil.NewSet[string]()
for _, chanName := range chanMap {
chanNames.Insert(chanName...)
}
ret := chanNames.Len()
if setNum > chanNames.Len() {
ret = setNum
}
return ret
}

View File

@ -150,22 +150,3 @@ func Test_isMaxTs(t *testing.T) {
})
}
}
func Test_GetNeedChanNum(t *testing.T) {
chanMap := map[typeutil.UniqueID][]string{
int64(1): {"rootcoord-dml_101"},
int64(2): {"rootcoord-dml_102"},
int64(3): {"rootcoord-dml_103"},
}
num := getNeedChanNum(2, chanMap)
assert.Equal(t, num, 3)
chanMap = map[typeutil.UniqueID][]string{
int64(1): {"rootcoord-dml_101", "rootcoord-dml_102"},
int64(2): {"rootcoord-dml_102", "rootcoord-dml_101"},
int64(3): {"rootcoord-dml_103", "rootcoord-dml_102"},
}
num = getNeedChanNum(2, chanMap)
assert.Equal(t, num, 3)
}

View File

@ -370,7 +370,7 @@ class TestCompactionOperation(TestcaseBase):
3.delete and flush (new insert)
4.compact
5.load and search
expected: Triggre two types compaction
expected: Trigger two types compaction
"""
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
ids = []
@ -545,10 +545,10 @@ class TestCompactionOperation(TestcaseBase):
c_plans = collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact)[0]
# waiting for handoff completed and search
cost = 60
cost = 180
start = time()
while True:
sleep(5)
sleep(1)
segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
if len(segment_info) != 0 and segment_info[0].segmentID == c_plans.plans[0].target:
log.debug(segment_info)