enhance: Add `metautil.Channel` to convert string compare to int (#32749)

See also #32748

This PR:

- Add `metautil.Channel` utiltiy which convert virtual name to physical
channel name, collectionID and shard idx
- Add channel mapper interface & implementation to convert limited
physical channel name into int index
- Apply `metautil.Channel` filter in querynode segment manager logic

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/32830/head
congqixia 2024-05-07 19:13:35 +08:00 committed by GitHub
parent 6843d6d376
commit 40728ce83d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 523 additions and 159 deletions

View File

@ -18,6 +18,7 @@ package delegator
import (
"context"
"fmt"
"path"
"strconv"
"testing"
@ -61,6 +62,8 @@ type DelegatorDataSuite struct {
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream
channel metautil.Channel
mapper metautil.ChannelMapper
delegator *shardDelegator
rootPath string
@ -71,6 +74,15 @@ func (s *DelegatorDataSuite) SetupSuite() {
paramtable.Init()
paramtable.SetNodeID(1)
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.Key, "1")
s.collectionID = 1000
s.replicaID = 65535
s.vchannelName = "rootcoord-dml_1000v0"
s.version = 2000
var err error
s.mapper = metautil.NewDynChannelMapper()
s.channel, err = metautil.ParseChannel(s.vchannelName, s.mapper)
s.Require().NoError(err)
}
func (s *DelegatorDataSuite) TearDownSuite() {
@ -78,10 +90,6 @@ func (s *DelegatorDataSuite) TearDownSuite() {
}
func (s *DelegatorDataSuite) SetupTest() {
s.collectionID = 1000
s.replicaID = 65535
s.vchannelName = "rootcoord-dml_1000_v0"
s.version = 2000
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
@ -290,9 +298,10 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
defer cancel()
err := s.delegator.LoadGrowing(ctx, []*querypb.SegmentLoadInfo{
{
SegmentID: 1001,
CollectionID: s.collectionID,
PartitionID: 500,
SegmentID: 1001,
CollectionID: s.collectionID,
PartitionID: 500,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
}, 0)
s.Require().NoError(err)
@ -308,6 +317,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
@ -334,6 +344,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 5000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 5000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
@ -390,6 +401,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
Version: 1,
@ -424,6 +436,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
Version: 2,
@ -482,6 +495,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
@ -563,6 +577,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
Deltalogs: []*datapb.FieldBinlog{},
Level: datapb.SegmentLevel_L0,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
@ -578,6 +593,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
@ -724,6 +740,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 2},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 2},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
@ -750,6 +767,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
@ -788,6 +806,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
@ -832,6 +851,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
@ -896,9 +916,10 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
defer cancel()
err := s.delegator.LoadGrowing(ctx, []*querypb.SegmentLoadInfo{
{
SegmentID: 1001,
CollectionID: s.collectionID,
PartitionID: 500,
SegmentID: 1001,
CollectionID: s.collectionID,
PartitionID: 500,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
}, 0)
s.Require().NoError(err)
@ -914,6 +935,7 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
@ -1066,7 +1088,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() {
ms.EXPECT().Partition().Return(1)
ms.EXPECT().InsertCount().Return(0)
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().Shard().Return(s.vchannelName)
ms.EXPECT().Shard().Return(s.channel)
ms.EXPECT().Level().Return(datapb.SegmentLevel_L1)
s.manager.Segment.Put(context.Background(), segments.SegmentTypeGrowing, ms)
}

View File

@ -18,6 +18,7 @@ package querynodev2
import (
"context"
"fmt"
"testing"
"github.com/samber/lo"
@ -121,9 +122,10 @@ func (suite *LocalWorkerTestSuite) TestLoadSegment() {
CollectionID: suite.collectionID,
Infos: lo.Map(suite.segmentIDs, func(segID int64, _ int) *querypb.SegmentLoadInfo {
return &querypb.SegmentLoadInfo{
CollectionID: suite.collectionID,
PartitionID: suite.partitionIDs[segID%2],
SegmentID: segID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionIDs[segID%2],
SegmentID: segID,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
}
}),
Schema: schema,

View File

@ -41,10 +41,14 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/cache"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// TODO maybe move to manager and change segment constructor
var channelMapper = metautil.NewDynChannelMapper()
// SegmentFilter is the interface for segment selection criteria.
type SegmentFilter interface {
Filter(segment Segment) bool
@ -109,8 +113,14 @@ func WithPartition(partitionID typeutil.UniqueID) SegmentFilter {
}
func WithChannel(channel string) SegmentFilter {
ac, err := metautil.ParseChannel(channel, channelMapper)
if err != nil {
return SegmentFilterFunc(func(segment Segment) bool {
return false
})
}
return SegmentFilterFunc(func(segment Segment) bool {
return segment.Shard() == channel
return segment.Shard().Equal(ac)
})
}

View File

@ -34,7 +34,7 @@ func (s *ManagerSuite) SetupSuite() {
s.segmentIDs = []int64{1, 2, 3, 4}
s.collectionIDs = []int64{100, 200, 300, 400}
s.partitionIDs = []int64{10, 11, 12, 13}
s.channels = []string{"dml1", "dml2", "dml3", "dml4"}
s.channels = []string{"by-dev-rootcoord-dml_0_100v0", "by-dev-rootcoord-dml_1_200v0", "by-dev-rootcoord-dml_2_300v0", "by-dev-rootcoord-dml_3_400v0"}
s.types = []SegmentType{SegmentTypeSealed, SegmentTypeGrowing, SegmentTypeSealed, SegmentTypeSealed}
s.levels = []datapb.SegmentLevel{datapb.SegmentLevel_Legacy, datapb.SegmentLevel_Legacy, datapb.SegmentLevel_L1, datapb.SegmentLevel_L0}
}

View File

@ -1387,7 +1387,7 @@ func genSearchPlanAndRequests(collection *Collection, segments []int64, indexTyp
iReq, _ := genSearchRequest(nq, indexType, collection)
queryReq := &querypb.SearchRequest{
Req: iReq,
DmlChannels: []string{"dml"},
DmlChannels: []string{fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collection.ID())},
SegmentIDs: segments,
Scope: querypb.DataScope_Historical,
}
@ -1448,7 +1448,7 @@ func genInsertMsg(collection *Collection, partitionID, segment int64, numRows in
CollectionID: collection.ID(),
PartitionID: partitionID,
SegmentID: segment,
ShardName: "dml",
ShardName: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collection.ID()),
Timestamps: genSimpleTimestampFieldData(numRows),
RowIDs: genSimpleRowIDField(numRows),
FieldsData: fieldsData,

View File

@ -9,6 +9,8 @@ import (
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
metautil "github.com/milvus-io/milvus/pkg/util/metautil"
mock "github.com/stretchr/testify/mock"
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
@ -1327,14 +1329,14 @@ func (_c *MockSegment_Search_Call) RunAndReturn(run func(context.Context, *Searc
}
// Shard provides a mock function with given fields:
func (_m *MockSegment) Shard() string {
func (_m *MockSegment) Shard() metautil.Channel {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
var r0 metautil.Channel
if rf, ok := ret.Get(0).(func() metautil.Channel); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
r0 = ret.Get(0).(metautil.Channel)
}
return r0
@ -1357,12 +1359,12 @@ func (_c *MockSegment_Shard_Call) Run(run func()) *MockSegment_Shard_Call {
return _c
}
func (_c *MockSegment_Shard_Call) Return(_a0 string) *MockSegment_Shard_Call {
func (_c *MockSegment_Shard_Call) Return(_a0 metautil.Channel) *MockSegment_Shard_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_Shard_Call) RunAndReturn(run func() string) *MockSegment_Shard_Call {
func (_c *MockSegment_Shard_Call) RunAndReturn(run func() metautil.Channel) *MockSegment_Shard_Call {
_c.Call.Return(run)
return _c
}

View File

@ -18,6 +18,7 @@ package segments
import (
"context"
"fmt"
"log"
"math"
"testing"
@ -82,8 +83,8 @@ func (suite *ReduceSuite) SetupTest() {
SegmentID: suite.segmentID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)

View File

@ -18,6 +18,7 @@ package segments
import (
"context"
"fmt"
"io"
"testing"
@ -91,8 +92,8 @@ func (suite *RetrieveSuite) SetupTest() {
SegmentID: suite.segmentID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)
@ -120,7 +121,7 @@ func (suite *RetrieveSuite) SetupTest() {
SegmentID: suite.segmentID + 1,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)

View File

@ -18,6 +18,7 @@ package segments
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/suite"
@ -82,8 +83,8 @@ func (suite *SearchSuite) SetupTest() {
SegmentID: suite.segmentID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)
@ -111,7 +112,7 @@ func (suite *SearchSuite) SetupTest() {
SegmentID: suite.segmentID + 1,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)

View File

@ -58,6 +58,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -87,23 +88,31 @@ type baseSegment struct {
bloomFilterSet *pkoracle.BloomFilterSet
loadInfo *atomic.Pointer[querypb.SegmentLoadInfo]
isLazyLoad bool
channel metautil.Channel
resourceUsageCache *atomic.Pointer[ResourceUsage]
needUpdatedVersion *atomic.Int64 // only for lazy load mode update index
}
func newBaseSegment(collection *Collection, segmentType SegmentType, version int64, loadInfo *querypb.SegmentLoadInfo) baseSegment {
return baseSegment{
collection: collection,
loadInfo: atomic.NewPointer[querypb.SegmentLoadInfo](loadInfo),
version: atomic.NewInt64(version),
isLazyLoad: isLazyLoad(collection, segmentType),
segmentType: segmentType,
bloomFilterSet: pkoracle.NewBloomFilterSet(loadInfo.GetSegmentID(), loadInfo.GetPartitionID(), segmentType),
func newBaseSegment(collection *Collection, segmentType SegmentType, version int64, loadInfo *querypb.SegmentLoadInfo) (baseSegment, error) {
channel, err := metautil.ParseChannel(loadInfo.GetInsertChannel(), channelMapper)
if err != nil {
return baseSegment{}, err
}
bs := baseSegment{
collection: collection,
loadInfo: atomic.NewPointer[querypb.SegmentLoadInfo](loadInfo),
version: atomic.NewInt64(version),
segmentType: segmentType,
bloomFilterSet: pkoracle.NewBloomFilterSet(loadInfo.GetSegmentID(), loadInfo.GetPartitionID(), segmentType),
channel: channel,
isLazyLoad: isLazyLoad(collection, segmentType),
resourceUsageCache: atomic.NewPointer[ResourceUsage](nil),
needUpdatedVersion: atomic.NewInt64(0),
}
return bs, nil
}
// isLazyLoad checks if the segment is lazy load
@ -139,8 +148,8 @@ func (s *baseSegment) ResourceGroup() string {
return s.collection.GetResourceGroup()
}
func (s *baseSegment) Shard() string {
return s.loadInfo.Load().GetInsertChannel()
func (s *baseSegment) Shard() metautil.Channel {
return s.channel
}
func (s *baseSegment) Type() SegmentType {
@ -258,6 +267,12 @@ func NewSegment(ctx context.Context,
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
return NewL0Segment(collection, segmentType, version, loadInfo)
}
base, err := newBaseSegment(collection, segmentType, version, loadInfo)
if err != nil {
return nil, err
}
var cSegType C.SegmentType
var locker *state.LoadStateLock
switch segmentType {
@ -272,7 +287,7 @@ func NewSegment(ctx context.Context,
}
var newPtr C.CSegmentInterface
_, err := GetDynamicPool().Submit(func() (any, error) {
_, err = GetDynamicPool().Submit(func() (any, error) {
status := C.NewSegment(collection.collectionPtr, cSegType, C.int64_t(loadInfo.GetSegmentID()), &newPtr)
err := HandleCStatus(ctx, &status, "NewSegmentFailed",
zap.Int64("collectionID", loadInfo.GetCollectionID()),
@ -294,7 +309,7 @@ func NewSegment(ctx context.Context,
)
segment := &LocalSegment{
baseSegment: newBaseSegment(collection, segmentType, version, loadInfo),
baseSegment: base,
ptrLock: locker,
ptr: newPtr,
lastDeltaTimestamp: atomic.NewUint64(0),
@ -326,6 +341,10 @@ func NewSegmentV2(
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
return NewL0Segment(collection, segmentType, version, loadInfo)
}
base, err := newBaseSegment(collection, segmentType, version, loadInfo)
if err != nil {
return nil, err
}
var segmentPtr C.CSegmentInterface
var status C.CStatus
var locker *state.LoadStateLock
@ -360,7 +379,7 @@ func NewSegmentV2(
}
segment := &LocalSegment{
baseSegment: newBaseSegment(collection, segmentType, version, loadInfo),
baseSegment: base,
ptrLock: locker,
ptr: segmentPtr,
lastDeltaTimestamp: atomic.NewUint64(0),

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -46,7 +47,7 @@ type Segment interface {
ResourceGroup() string
Collection() int64
Partition() int64
Shard() string
Shard() metautil.Channel
Version() int64
CASVersion(int64, int64) bool
StartPosition() *msgpb.MsgPosition

View File

@ -58,8 +58,13 @@ func NewL0Segment(collection *Collection,
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.String("segmentType", segmentType.String()))
base, err := newBaseSegment(collection, segmentType, version, loadInfo)
if err != nil {
return nil, err
}
segment := &L0Segment{
baseSegment: newBaseSegment(collection, segmentType, version, loadInfo),
baseSegment: base,
}
// level 0 segments are always in memory

View File

@ -399,7 +399,7 @@ func (loader *segmentLoaderV2) LoadSegment(ctx context.Context,
log := log.Ctx(ctx).With(
zap.Int64("collectionID", segment.Collection()),
zap.Int64("partitionID", segment.Partition()),
zap.String("shard", segment.Shard()),
zap.String("shard", segment.Shard().VirtualName()),
zap.Int64("segmentID", segment.ID()),
)
log.Info("start loading segment files",
@ -1075,7 +1075,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context,
log := log.Ctx(ctx).With(
zap.Int64("collectionID", segment.Collection()),
zap.Int64("partitionID", segment.Partition()),
zap.String("shard", segment.Shard()),
zap.String("shard", segment.Shard().VirtualName()),
zap.Int64("segmentID", segment.ID()),
)
log.Info("start loading segment files",

View File

@ -18,6 +18,7 @@ package segments
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
@ -120,12 +121,13 @@ func (suite *SegmentLoaderSuite) TestLoad() {
suite.NoError(err)
_, err = suite.loader.Load(ctx, suite.collectionID, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.NoError(err)
@ -141,12 +143,13 @@ func (suite *SegmentLoaderSuite) TestLoad() {
suite.NoError(err)
_, err = suite.loader.Load(ctx, suite.collectionID, SegmentTypeGrowing, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID + 1,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: suite.segmentID + 1,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.NoError(err)
}
@ -175,12 +178,13 @@ func (suite *SegmentLoaderSuite) TestLoadFail() {
}
_, err = suite.loader.Load(ctx, suite.collectionID, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.Error(err)
}
@ -203,12 +207,13 @@ func (suite *SegmentLoaderSuite) TestLoadMultipleSegments() {
)
suite.NoError(err)
loadInfos = append(loadInfos, &querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
}
@ -237,12 +242,13 @@ func (suite *SegmentLoaderSuite) TestLoadMultipleSegments() {
)
suite.NoError(err)
loadInfos = append(loadInfos, &querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
}
@ -288,13 +294,14 @@ func (suite *SegmentLoaderSuite) TestLoadWithIndex() {
)
suite.NoError(err)
loadInfos = append(loadInfos, &querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
IndexInfos: []*querypb.FieldIndexInfo{indexInfo},
NumOfRows: int64(msgLength),
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
IndexInfos: []*querypb.FieldIndexInfo{indexInfo},
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
}
@ -326,12 +333,13 @@ func (suite *SegmentLoaderSuite) TestLoadBloomFilter() {
suite.NoError(err)
loadInfos = append(loadInfos, &querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
}
@ -373,13 +381,14 @@ func (suite *SegmentLoaderSuite) TestLoadDeltaLogs() {
suite.NoError(err)
loadInfos = append(loadInfos, &querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
Deltalogs: deltaLogs,
NumOfRows: int64(msgLength),
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
Deltalogs: deltaLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
}
@ -425,13 +434,14 @@ func (suite *SegmentLoaderSuite) TestLoadDupDeltaLogs() {
suite.NoError(err)
loadInfos = append(loadInfos, &querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
Deltalogs: deltaLogs,
NumOfRows: int64(msgLength),
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
Deltalogs: deltaLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
}
@ -469,6 +479,7 @@ func (suite *SegmentLoaderSuite) TestLoadIndex() {
IndexFilePaths: []string{},
},
},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
}
segment := &LocalSegment{
baseSegment: baseSegment{
@ -507,12 +518,13 @@ func (suite *SegmentLoaderSuite) TestLoadWithMmap() {
suite.NoError(err)
_, err = suite.loader.Load(ctx, suite.collectionID, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.NoError(err)
}
@ -545,13 +557,14 @@ func (suite *SegmentLoaderSuite) TestPatchEntryNum() {
)
suite.NoError(err)
loadInfo := &querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
IndexInfos: []*querypb.FieldIndexInfo{indexInfo},
NumOfRows: int64(msgLength),
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
IndexInfos: []*querypb.FieldIndexInfo{indexInfo},
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
}
// mock legacy binlog entry num is zero case
@ -593,12 +606,13 @@ func (suite *SegmentLoaderSuite) TestRunOutMemory() {
suite.NoError(err)
_, err = suite.loader.Load(ctx, suite.collectionID, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.Error(err)
@ -614,32 +628,35 @@ func (suite *SegmentLoaderSuite) TestRunOutMemory() {
suite.NoError(err)
_, err = suite.loader.Load(ctx, suite.collectionID, SegmentTypeGrowing, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID + 1,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: suite.segmentID + 1,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.Error(err)
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.MmapDirPath.Key, "./mmap")
_, err = suite.loader.Load(ctx, suite.collectionID, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.Error(err)
_, err = suite.loader.Load(ctx, suite.collectionID, SegmentTypeGrowing, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID + 1,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
SegmentID: suite.segmentID + 1,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.Error(err)
}
@ -720,10 +737,11 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
})
suite.segmentManager.EXPECT().UpdateBy(mock.Anything, mock.Anything, mock.Anything).Return(0)
infos = suite.loader.prepare(context.Background(), SegmentTypeSealed, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
NumOfRows: 100,
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
NumOfRows: 100,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, []int64{suite.segmentID}, 0)
@ -748,10 +766,11 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
return nil
})
infos = suite.loader.prepare(context.Background(), SegmentTypeSealed, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
NumOfRows: 100,
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
NumOfRows: 100,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, []int64{suite.segmentID}, 0)
@ -766,10 +785,11 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
return nil
})
suite.loader.prepare(context.Background(), SegmentTypeSealed, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
NumOfRows: 100,
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
NumOfRows: 100,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
ctx, cancel := context.WithCancel(context.Background())
@ -806,6 +826,7 @@ func (suite *SegmentLoaderDetailSuite) TestRequestResource() {
},
},
},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.NoError(err)
@ -944,6 +965,7 @@ func (suite *SegmentLoaderV2Suite) TestLoad() {
CollectionID: suite.collectionID,
NumOfRows: int64(msgLength),
StorageVersion: 3,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.NoError(err)
@ -953,6 +975,7 @@ func (suite *SegmentLoaderV2Suite) TestLoad() {
CollectionID: suite.collectionID,
NumOfRows: int64(msgLength),
StorageVersion: 3,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
suite.NoError(err)

View File

@ -2,6 +2,7 @@ package segments
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/suite"
@ -69,7 +70,7 @@ func (suite *SegmentSuite) SetupTest() {
CollectionID: suite.collectionID,
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
NumOfRows: int64(msgLength),
BinlogPaths: []*datapb.FieldBinlog{
@ -111,7 +112,7 @@ func (suite *SegmentSuite) SetupTest() {
SegmentID: suite.segmentID + 1,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)

View File

@ -18,6 +18,7 @@ package querynodev2
import (
"context"
"fmt"
"os"
"sync/atomic"
"testing"
@ -231,8 +232,8 @@ func (suite *QueryNodeSuite) TestStop() {
SegmentID: 100,
PartitionID: 10,
CollectionID: 1,
InsertChannel: "test_stop_channel",
Level: datapb.SegmentLevel_Legacy,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", 1),
},
)
suite.NoError(err)

View File

@ -598,7 +598,7 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *querypb.GetSegmen
info := &querypb.SegmentInfo{
SegmentID: segment.ID(),
SegmentState: segment.Type(),
DmChannel: segment.Shard(),
DmChannel: segment.Shard().VirtualName(),
PartitionID: segment.Partition(),
CollectionID: segment.Collection(),
NodeID: node.GetNodeID(),
@ -1165,7 +1165,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
ID: s.ID(),
Collection: s.Collection(),
Partition: s.Partition(),
Channel: s.Shard(),
Channel: s.Shard().VirtualName(),
Version: s.Version(),
LastDeltaTimestamp: s.LastDeltaTimestamp(),
IndexInfo: lo.SliceToMap(s.Indexes(), func(info *segments.IndexedFieldInfo) (int64, *querypb.FieldIndexInfo) {

View File

@ -52,6 +52,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -72,6 +73,8 @@ type ServiceSuite struct {
// Test channel
vchannel string
pchannel string
channel metautil.Channel
mapper metautil.ChannelMapper
position *msgpb.MsgPosition
// Dependency
@ -100,9 +103,14 @@ func (suite *ServiceSuite) SetupSuite() {
suite.flushedSegmentIDs = []int64{4, 5, 6}
suite.droppedSegmentIDs = []int64{7, 8, 9}
var err error
suite.mapper = metautil.NewDynChannelMapper()
// channel data
suite.vchannel = "test-channel"
suite.vchannel = "by-dev-rootcoord-dml_0_111v0"
suite.pchannel = funcutil.ToPhysicalChannel(suite.vchannel)
suite.channel, err = metautil.ParseChannel(suite.vchannel, suite.mapper)
suite.Require().NoError(err)
suite.position = &msgpb.MsgPosition{
ChannelName: suite.vchannel,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
@ -472,7 +480,7 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
l0Segment.EXPECT().Level().Return(datapb.SegmentLevel_L0)
l0Segment.EXPECT().Type().Return(commonpb.SegmentState_Sealed)
l0Segment.EXPECT().Indexes().Return(nil)
l0Segment.EXPECT().Shard().Return(suite.vchannel)
l0Segment.EXPECT().Shard().Return(suite.channel)
l0Segment.EXPECT().Release(ctx).Return()
suite.node.manager.Segment.Put(ctx, segments.SegmentTypeSealed, l0Segment)

View File

@ -0,0 +1,152 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metautil
import (
"fmt"
"regexp"
"strconv"
"sync"
"github.com/milvus-io/milvus/pkg/util/merr"
)
const (
rgnPhysicalName = `PhysicalName`
rgnCollectionID = `CollectionID`
rgnShardIdx = `ShardIdx`
)
var channelNameFormat = regexp.MustCompile(fmt.Sprintf(`^(?P<%s>.*)_(?P<%s>\d+)v(?P<%s>\d+)$`, rgnPhysicalName, rgnCollectionID, rgnShardIdx))
// ChannelMapper is the interface provides physical channel name mapping functions.
type ChannelMapper interface {
ChannelIdx(string) int
ChannelName(int) string
}
// dynamicChannelMapper implements ChannelMapper.
// provides dynamically changed indexing for services without global channel names.
type dynamicChannelMapper struct {
mut sync.RWMutex
nameIdx map[string]int
channels []string
}
func (m *dynamicChannelMapper) channelIdx(name string) (int, bool) {
m.mut.RLock()
defer m.mut.RUnlock()
idx, ok := m.nameIdx[name]
return idx, ok
}
func (m *dynamicChannelMapper) ChannelIdx(name string) int {
idx, ok := m.channelIdx(name)
if ok {
return idx
}
m.mut.Lock()
defer m.mut.Unlock()
idx, ok = m.nameIdx[name]
if ok {
return idx
}
idx = len(m.channels)
m.channels = append(m.channels, name)
m.nameIdx[name] = idx
return idx
}
func (m *dynamicChannelMapper) ChannelName(idx int) string {
m.mut.RLock()
defer m.mut.RUnlock()
return m.channels[idx]
}
func NewDynChannelMapper() *dynamicChannelMapper {
return &dynamicChannelMapper{
nameIdx: make(map[string]int),
}
}
// Channel struct maintains the channel information
type Channel struct {
ChannelMapper
channelIdx int
collectionID int64
shardIdx int64
}
func (c Channel) PhysicalName() string {
return c.ChannelName(c.channelIdx)
}
func (c Channel) VirtualName() string {
return fmt.Sprintf("%s_%dv%d", c.PhysicalName(), c.collectionID, c.shardIdx)
}
func (c Channel) Equal(ac Channel) bool {
return c.channelIdx == ac.channelIdx &&
c.collectionID == ac.collectionID &&
c.shardIdx == ac.shardIdx
}
func (c Channel) EqualString(str string) bool {
ac, err := ParseChannel(str, c.ChannelMapper)
if err != nil {
return false
}
return c.Equal(ac)
}
func ParseChannel(virtualName string, mapper ChannelMapper) (Channel, error) {
if !channelNameFormat.MatchString(virtualName) {
return Channel{}, merr.WrapErrParameterInvalidMsg("virtual channel name(%s) is not valid", virtualName)
}
matches := channelNameFormat.FindStringSubmatch(virtualName)
physicalName := matches[channelNameFormat.SubexpIndex(rgnPhysicalName)]
collectionIDRaw := matches[channelNameFormat.SubexpIndex(rgnCollectionID)]
shardIdxRaw := matches[channelNameFormat.SubexpIndex(rgnShardIdx)]
collectionID, err := strconv.ParseInt(collectionIDRaw, 10, 64)
if err != nil {
return Channel{}, err
}
shardIdx, err := strconv.ParseInt(shardIdxRaw, 10, 64)
if err != nil {
return Channel{}, err
}
return NewChannel(physicalName, collectionID, shardIdx, mapper), nil
}
// NewChannel returns a Channel instance with provided physical channel and other informations.
func NewChannel(physicalName string, collectionID int64, idx int64, mapper ChannelMapper) Channel {
c := Channel{
ChannelMapper: mapper,
collectionID: collectionID,
shardIdx: idx,
}
c.channelIdx = c.ChannelIdx(physicalName)
return c
}

View File

@ -0,0 +1,115 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metautil
import (
"testing"
"github.com/stretchr/testify/suite"
)
type ChannelSuite struct {
suite.Suite
}
func (s *ChannelSuite) TestParseChannel() {
type testCase struct {
tag string
virtualName string
expectError bool
expPhysical string
expCollectionID int64
expShardIdx int64
}
cases := []testCase{
{
tag: "valid_virtual1",
virtualName: "by-dev-rootcoord-dml_0_449413615133917325v0",
expectError: false,
expPhysical: "by-dev-rootcoord-dml_0",
expCollectionID: 449413615133917325,
expShardIdx: 0,
},
{
tag: "valid_virtual2",
virtualName: "by-dev-rootcoord-dml_1_449413615133917325v1",
expectError: false,
expPhysical: "by-dev-rootcoord-dml_1",
expCollectionID: 449413615133917325,
expShardIdx: 1,
},
{
tag: "bad_format",
virtualName: "by-dev-rootcoord-dml_2",
expectError: true,
},
{
tag: "non_int_collection_id",
virtualName: "by-dev-rootcoord-dml_0_collectionnamev0",
expectError: true,
},
{
tag: "non_int_shard_idx",
virtualName: "by-dev-rootcoord-dml_1_449413615133917325vunknown",
expectError: true,
},
}
mapper := NewDynChannelMapper()
for _, tc := range cases {
s.Run(tc.tag, func() {
channel, err := ParseChannel(tc.virtualName, mapper)
if tc.expectError {
s.Error(err)
return
}
s.Equal(tc.expPhysical, channel.PhysicalName())
s.Equal(tc.expCollectionID, channel.collectionID)
s.Equal(tc.expShardIdx, channel.shardIdx)
s.Equal(tc.virtualName, channel.VirtualName())
})
}
}
func (s *ChannelSuite) TestCompare() {
virtualName1 := "by-dev-rootcoord-dml_0_449413615133917325v0"
virtualName2 := "by-dev-rootcoord-dml_1_449413615133917325v1"
mapper := NewDynChannelMapper()
channel1, err := ParseChannel(virtualName1, mapper)
s.Require().NoError(err)
channel2, err := ParseChannel(virtualName2, mapper)
s.Require().NoError(err)
channel3, err := ParseChannel(virtualName1, mapper)
s.Require().NoError(err)
s.True(channel1.Equal(channel1))
s.False(channel1.Equal(channel2))
s.False(channel2.Equal(channel1))
s.True(channel1.Equal(channel3))
s.True(channel1.EqualString(virtualName1))
s.False(channel1.EqualString(virtualName2))
s.False(channel1.EqualString("abc"))
}
func TestChannel(t *testing.T) {
suite.Run(t, new(ChannelSuite))
}