Do not drop collections or partitions in flow graph

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2021-02-03 18:12:48 +08:00 committed by yefu.chen
parent fd9e42d3ed
commit f12a0490bb
21 changed files with 259 additions and 935 deletions

View File

@ -23,10 +23,6 @@ type Collection struct {
partitions []*Partition
}
//func (c *Collection) Name() string {
// return c.schema.Name
//}
func (c *Collection) ID() UniqueID {
return c.id
}

View File

@ -43,17 +43,14 @@ type collectionReplica interface {
getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error)
// partition
// Partition tags in different collections are not unique,
// so partition api should specify the target collection.
// TODO: remove collection ID, add a `map[partitionID]partition` to replica implement
getPartitionNum(collectionID UniqueID) (int, error)
addPartition2(collectionID UniqueID, partitionTag string) error
addPartition(collectionID UniqueID, partitionID UniqueID) error
removePartition(collectionID UniqueID, partitionTag string) error
addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error
removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error
getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error)
removePartition(collectionID UniqueID, partitionID UniqueID) error
addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error
removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error
getPartitionByID(collectionID UniqueID, partitionID UniqueID) (*Partition, error)
hasPartition(collectionID UniqueID, partitionTag string) bool
hasPartition(collectionID UniqueID, partitionID UniqueID) bool
enablePartitionDM(collectionID UniqueID, partitionID UniqueID) error
disablePartitionDM(collectionID UniqueID, partitionID UniqueID) error
getEnablePartitionDM(collectionID UniqueID, partitionID UniqueID) (bool, error)
@ -61,7 +58,6 @@ type collectionReplica interface {
// segment
getSegmentNum() int
getSegmentStatistics() []*internalpb2.SegmentStats
addSegment2(segmentID UniqueID, partitionTag string, collectionID UniqueID, segType segmentType) error
addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error
removeSegment(segmentID UniqueID) error
getSegmentByID(segmentID UniqueID) (*Segment, error)
@ -197,21 +193,6 @@ func (colReplica *collectionReplicaImpl) getPartitionNum(collectionID UniqueID)
return len(collection.partitions), nil
}
func (colReplica *collectionReplicaImpl) addPartition2(collectionID UniqueID, partitionTag string) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
return err
}
var newPartition = newPartition2(partitionTag)
*collection.Partitions() = append(*collection.Partitions(), newPartition)
return nil
}
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
@ -227,14 +208,14 @@ func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, par
return nil
}
func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionTag string) error {
func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
return colReplica.removePartitionPrivate(collectionID, partitionTag)
return colReplica.removePartitionPrivate(collectionID, partitionID)
}
func (colReplica *collectionReplicaImpl) removePartitionPrivate(collectionID UniqueID, partitionTag string) error {
func (colReplica *collectionReplicaImpl) removePartitionPrivate(collectionID UniqueID, partitionID UniqueID) error {
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
return err
@ -242,7 +223,7 @@ func (colReplica *collectionReplicaImpl) removePartitionPrivate(collectionID Uni
var tmpPartitions = make([]*Partition, 0)
for _, p := range *collection.Partitions() {
if p.Tag() == partitionTag {
if p.ID() == partitionID {
for _, s := range *p.Segments() {
deleteSegment(colReplica.segments[s.ID()])
delete(colReplica.segments, s.ID())
@ -257,30 +238,30 @@ func (colReplica *collectionReplicaImpl) removePartitionPrivate(collectionID Uni
}
// deprecated
func (colReplica *collectionReplicaImpl) addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error {
func (colReplica *collectionReplicaImpl) addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error {
if !colReplica.hasCollection(colMeta.ID) {
err := errors.New("Cannot find collection, id = " + strconv.FormatInt(colMeta.ID, 10))
return err
}
pToAdd := make([]string, 0)
for _, partitionTag := range colMeta.PartitionTags {
if !colReplica.hasPartition(colMeta.ID, partitionTag) {
pToAdd = append(pToAdd, partitionTag)
pToAdd := make([]UniqueID, 0)
for _, partitionID := range colMeta.PartitionIDs {
if !colReplica.hasPartition(colMeta.ID, partitionID) {
pToAdd = append(pToAdd, partitionID)
}
}
for _, tag := range pToAdd {
err := colReplica.addPartition2(colMeta.ID, tag)
for _, id := range pToAdd {
err := colReplica.addPartition(colMeta.ID, id)
if err != nil {
log.Println(err)
}
fmt.Println("add partition: ", tag)
fmt.Println("add partition: ", id)
}
return nil
}
func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error {
func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
@ -289,37 +270,30 @@ func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMet
return err
}
pToDel := make([]string, 0)
pToDel := make([]UniqueID, 0)
for _, partition := range col.partitions {
hasPartition := false
for _, tag := range colMeta.PartitionTags {
if partition.partitionTag == tag {
for _, id := range colMeta.PartitionIDs {
if partition.ID() == id {
hasPartition = true
}
}
if !hasPartition {
pToDel = append(pToDel, partition.partitionTag)
pToDel = append(pToDel, partition.ID())
}
}
for _, tag := range pToDel {
err := colReplica.removePartitionPrivate(col.ID(), tag)
for _, id := range pToDel {
err := colReplica.removePartitionPrivate(col.ID(), id)
if err != nil {
log.Println(err)
}
fmt.Println("delete partition: ", tag)
fmt.Println("delete partition: ", id)
}
return nil
}
func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return colReplica.getPartitionByTagPrivate(collectionID, partitionTag)
}
func (colReplica *collectionReplicaImpl) getPartitionByID(collectionID UniqueID, partitionID UniqueID) (*Partition, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
@ -327,21 +301,6 @@ func (colReplica *collectionReplicaImpl) getPartitionByID(collectionID UniqueID,
return colReplica.getPartitionByIDPrivate(collectionID, partitionID)
}
func (colReplica *collectionReplicaImpl) getPartitionByTagPrivate(collectionID UniqueID, partitionTag string) (*Partition, error) {
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
return nil, err
}
for _, p := range *collection.Partitions() {
if p.Tag() == partitionTag {
return p, nil
}
}
return nil, errors.New("cannot find partition, tag = " + partitionTag)
}
func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(collectionID UniqueID, partitionID UniqueID) (*Partition, error) {
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
@ -357,7 +316,7 @@ func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(collectionID Un
return nil, errors.New("cannot find partition, id = " + strconv.FormatInt(partitionID, 10))
}
func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, partitionTag string) bool {
func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, partitionID UniqueID) bool {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
@ -368,7 +327,7 @@ func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, par
}
for _, p := range *collection.Partitions() {
if p.Tag() == partitionTag {
if p.ID() == partitionID {
return true
}
}
@ -446,28 +405,6 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S
return statisticData
}
func (colReplica *collectionReplicaImpl) addSegment2(segmentID UniqueID, partitionTag string, collectionID UniqueID, segType segmentType) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
return err
}
partition, err2 := colReplica.getPartitionByTagPrivate(collectionID, partitionTag)
if err2 != nil {
return err2
}
var newSegment = newSegment2(collection, segmentID, partitionTag, collectionID, segType)
colReplica.segments[segmentID] = newSegment
*partition.Segments() = append(*partition.Segments(), newSegment)
return nil
}
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()

View File

@ -61,18 +61,18 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) {
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
partitionTags := []string{"a", "b", "c"}
for _, tag := range partitionTags {
err := node.replica.addPartition2(collectionID, tag)
partitionIDs := []UniqueID{1, 2, 3}
for _, id := range partitionIDs {
err := node.replica.addPartition(collectionID, id)
assert.NoError(t, err)
partition, err := node.replica.getPartitionByTag(collectionID, tag)
partition, err := node.replica.getPartitionByID(collectionID, id)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, tag)
assert.Equal(t, partition.ID(), id)
}
partitionNum, err := node.replica.getPartitionNum(collectionID)
assert.NoError(t, err)
assert.Equal(t, partitionNum, len(partitionTags)+1) // _default
assert.Equal(t, partitionNum, len(partitionIDs)+1) // _default
node.Stop()
}
@ -81,13 +81,13 @@ func TestCollectionReplica_addPartition(t *testing.T) {
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
partitionTags := []string{"a", "b", "c"}
for _, tag := range partitionTags {
err := node.replica.addPartition2(collectionID, tag)
partitionIDs := []UniqueID{1, 2, 3}
for _, id := range partitionIDs {
err := node.replica.addPartition(collectionID, id)
assert.NoError(t, err)
partition, err := node.replica.getPartitionByTag(collectionID, tag)
partition, err := node.replica.getPartitionByID(collectionID, id)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, tag)
assert.Equal(t, partition.ID(), id)
}
node.Stop()
}
@ -97,15 +97,15 @@ func TestCollectionReplica_removePartition(t *testing.T) {
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
partitionTags := []string{"a", "b", "c"}
partitionIDs := []UniqueID{1, 2, 3}
for _, tag := range partitionTags {
err := node.replica.addPartition2(collectionID, tag)
for _, id := range partitionIDs {
err := node.replica.addPartition(collectionID, id)
assert.NoError(t, err)
partition, err := node.replica.getPartitionByTag(collectionID, tag)
partition, err := node.replica.getPartitionByID(collectionID, id)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, tag)
err = node.replica.removePartition(collectionID, tag)
assert.Equal(t, partition.ID(), id)
err = node.replica.removePartition(collectionID, id)
assert.NoError(t, err)
}
node.Stop()
@ -117,18 +117,18 @@ func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) {
initTestMeta(t, node, collectionID, 0)
collectionMeta := genTestCollectionMeta(collectionID, false)
collectionMeta.PartitionTags = []string{"p0", "p1", "p2"}
collectionMeta.PartitionIDs = []UniqueID{0, 1, 2}
err := node.replica.addPartitionsByCollectionMeta(collectionMeta)
assert.NoError(t, err)
partitionNum, err := node.replica.getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, len(collectionMeta.PartitionTags)+1)
hasPartition := node.replica.hasPartition(UniqueID(0), "p0")
assert.Equal(t, partitionNum, len(collectionMeta.PartitionIDs)+1)
hasPartition := node.replica.hasPartition(UniqueID(0), UniqueID(0))
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p1")
hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(1))
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p2")
hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(2))
assert.Equal(t, hasPartition, true)
node.Stop()
@ -140,19 +140,19 @@ func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) {
initTestMeta(t, node, collectionID, 0)
collectionMeta := genTestCollectionMeta(collectionID, false)
collectionMeta.PartitionTags = []string{"p0"}
collectionMeta.PartitionIDs = []UniqueID{0}
err := node.replica.addPartitionsByCollectionMeta(collectionMeta)
assert.NoError(t, err)
partitionNum, err := node.replica.getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, len(collectionMeta.PartitionTags)+1)
assert.Equal(t, partitionNum, len(collectionMeta.PartitionIDs)+1)
hasPartition := node.replica.hasPartition(UniqueID(0), "p0")
hasPartition := node.replica.hasPartition(UniqueID(0), UniqueID(0))
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p1")
hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(1))
assert.Equal(t, hasPartition, false)
hasPartition = node.replica.hasPartition(UniqueID(0), "p2")
hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(2))
assert.Equal(t, hasPartition, false)
node.Stop()
@ -165,12 +165,12 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) {
collectionMeta := genTestCollectionMeta(collectionID, false)
for _, tag := range collectionMeta.PartitionTags {
err := node.replica.addPartition2(collectionID, tag)
for _, id := range collectionMeta.PartitionIDs {
err := node.replica.addPartition(collectionID, id)
assert.NoError(t, err)
partition, err := node.replica.getPartitionByTag(collectionID, tag)
partition, err := node.replica.getPartitionByID(collectionID, id)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, tag)
assert.Equal(t, partition.ID(), id)
assert.NotNil(t, partition)
}
node.Stop()
@ -182,11 +182,11 @@ func TestCollectionReplica_hasPartition(t *testing.T) {
initTestMeta(t, node, collectionID, 0)
collectionMeta := genTestCollectionMeta(collectionID, false)
err := node.replica.addPartition2(collectionID, collectionMeta.PartitionTags[0])
err := node.replica.addPartition(collectionID, collectionMeta.PartitionIDs[0])
assert.NoError(t, err)
hasPartition := node.replica.hasPartition(collectionID, "default")
hasPartition := node.replica.hasPartition(collectionID, defaultPartitionID)
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(collectionID, "default1")
hasPartition = node.replica.hasPartition(collectionID, defaultPartitionID+1)
assert.Equal(t, hasPartition, false)
node.Stop()
}
@ -198,9 +198,8 @@ func TestCollectionReplica_addSegment(t *testing.T) {
initTestMeta(t, node, collectionID, 0)
const segmentNum = 3
tag := "default"
for i := 0; i < segmentNum; i++ {
err := node.replica.addSegment2(UniqueID(i), tag, collectionID, segTypeGrowing)
err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segTypeGrowing)
assert.NoError(t, err)
targetSeg, err := node.replica.getSegmentByID(UniqueID(i))
assert.NoError(t, err)
@ -216,10 +215,9 @@ func TestCollectionReplica_removeSegment(t *testing.T) {
initTestMeta(t, node, collectionID, 0)
const segmentNum = 3
tag := "default"
for i := 0; i < segmentNum; i++ {
err := node.replica.addSegment2(UniqueID(i), tag, collectionID, segTypeGrowing)
err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segTypeGrowing)
assert.NoError(t, err)
targetSeg, err := node.replica.getSegmentByID(UniqueID(i))
assert.NoError(t, err)
@ -237,10 +235,9 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) {
initTestMeta(t, node, collectionID, 0)
const segmentNum = 3
tag := "default"
for i := 0; i < segmentNum; i++ {
err := node.replica.addSegment2(UniqueID(i), tag, collectionID, segTypeGrowing)
err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segTypeGrowing)
assert.NoError(t, err)
targetSeg, err := node.replica.getSegmentByID(UniqueID(i))
assert.NoError(t, err)
@ -256,10 +253,9 @@ func TestCollectionReplica_hasSegment(t *testing.T) {
initTestMeta(t, node, collectionID, 0)
const segmentNum = 3
tag := "default"
for i := 0; i < segmentNum; i++ {
err := node.replica.addSegment2(UniqueID(i), tag, collectionID, segTypeGrowing)
err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segTypeGrowing)
assert.NoError(t, err)
targetSeg, err := node.replica.getSegmentByID(UniqueID(i))
assert.NoError(t, err)

View File

@ -61,12 +61,12 @@ func TestDataSyncService_Start(t *testing.T) {
Timestamp: uint64(i + 1000),
SourceID: 0,
},
CollectionID: UniqueID(0),
PartitionName: "default",
SegmentID: int64(0),
ChannelID: "0",
Timestamps: []uint64{uint64(i + 1000), uint64(i + 1000)},
RowIDs: []int64{int64(i), int64(i)},
CollectionID: UniqueID(0),
PartitionID: defaultPartitionID,
SegmentID: int64(0),
ChannelID: "0",
Timestamps: []uint64{uint64(i + 1000), uint64(i + 1000)},
RowIDs: []int64{int64(i), int64(i)},
RowData: []*commonpb.Blob{
{Value: rawData},
{Value: rawData},

View File

@ -38,7 +38,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
var ddMsg = ddMsg{
collectionRecords: make(map[UniqueID][]metaOperateRecord),
partitionRecords: make(map[string][]metaOperateRecord),
partitionRecords: make(map[UniqueID][]metaOperateRecord),
timeRange: TimeRange{
timestampMin: msMsg.TimestampMin(),
timestampMax: msMsg.TimestampMax(),
@ -102,7 +102,8 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
}
// add default partition
err = ddNode.replica.addPartition2(collectionID, Params.DefaultPartitionTag)
// TODO: allocate default partition id in master
err = ddNode.replica.addPartition(collectionID, UniqueID(2021))
if err != nil {
log.Println(err)
return
@ -118,12 +119,6 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
collectionID := msg.CollectionID
//err := ddNode.replica.removeCollection(collectionID)
//if err != nil {
// log.Println(err)
// return
//}
ddNode.ddMsg.collectionRecords[collectionID] = append(ddNode.ddMsg.collectionRecords[collectionID],
metaOperateRecord{
createOrDrop: false,
@ -135,17 +130,15 @@ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
collectionID := msg.CollectionID
partitionName := msg.PartitionName
partitionID := msg.PartitionID
err := ddNode.replica.addPartition2(collectionID, partitionName)
// TODO:: add partition by partitionID
//err := ddNode.replica.addPartition(collectionID, msg.PartitionID)
err := ddNode.replica.addPartition(collectionID, partitionID)
if err != nil {
log.Println(err)
return
}
ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName],
ddNode.ddMsg.partitionRecords[partitionID] = append(ddNode.ddMsg.partitionRecords[partitionID],
metaOperateRecord{
createOrDrop: true,
timestamp: msg.Base.Timestamp,
@ -154,22 +147,16 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
collectionID := msg.CollectionID
partitionName := msg.PartitionName
partitionID := msg.PartitionID
//err := ddNode.replica.removePartition(collectionID, partitionTag)
//if err != nil {
// log.Println(err)
// return
//}
ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName],
ddNode.ddMsg.partitionRecords[partitionID] = append(ddNode.ddMsg.partitionRecords[partitionID],
metaOperateRecord{
createOrDrop: false,
timestamp: msg.Base.Timestamp,
})
ddNode.ddMsg.gcRecord.partitions = append(ddNode.ddMsg.gcRecord.partitions, partitionWithID{
partitionTag: partitionName,
partitionID: partitionID,
collectionID: collectionID,
})
}

View File

@ -21,27 +21,31 @@ func (gcNode *gcNode) Operate(in []*Msg) []*Msg {
// TODO: add error handling
}
gcMsg, ok := (*in[0]).(*gcMsg)
_, ok := (*in[0]).(*gcMsg)
if !ok {
log.Println("type assertion failed for gcMsg")
// TODO: add error handling
}
// drop collections
for _, collectionID := range gcMsg.gcRecord.collections {
err := gcNode.replica.removeCollection(collectionID)
if err != nil {
log.Println(err)
}
}
// Use `releasePartition` and `releaseCollection`,
// because if we drop collections or partitions here, query service doesn't know this behavior,
// which would lead the wrong result of `showCollections` or `showPartition`
// drop partitions
for _, partition := range gcMsg.gcRecord.partitions {
err := gcNode.replica.removePartition(partition.collectionID, partition.partitionTag)
if err != nil {
log.Println(err)
}
}
//// drop collections
//for _, collectionID := range gcMsg.gcRecord.collections {
// err := gcNode.replica.removeCollection(collectionID)
// if err != nil {
// log.Println(err)
// }
//}
//
//// drop partitions
//for _, partition := range gcMsg.gcRecord.partitions {
// err := gcNode.replica.removePartition(partition.collectionID, partition.partitionID)
// if err != nil {
// log.Println(err)
// }
//}
return nil
}

View File

@ -81,7 +81,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
// check if segment exists, if not, create this segment
if !iNode.replica.hasSegment(task.SegmentID) {
err := iNode.replica.addSegment2(task.SegmentID, task.PartitionName, task.CollectionID, segTypeGrowing)
err := iNode.replica.addSegment(task.SegmentID, task.PartitionID, task.CollectionID, segTypeGrowing)
if err != nil {
log.Println(err)
continue

View File

@ -15,7 +15,7 @@ type key2SegMsg struct {
type ddMsg struct {
collectionRecords map[UniqueID][]metaOperateRecord
partitionRecords map[string][]metaOperateRecord
partitionRecords map[UniqueID][]metaOperateRecord
gcRecord *gcRecord
timeRange TimeRange
}
@ -63,17 +63,16 @@ type DeletePreprocessData struct {
count int32
}
// TODO: replace partitionWithID by partition id
// TODO: delete collection id
type partitionWithID struct {
partitionTag string
partitionID UniqueID
collectionID UniqueID
}
type gcRecord struct {
// collections and partitions to be dropped
collections []UniqueID
// TODO: use partition id
partitions []partitionWithID
partitions []partitionWithID
}
func (ksMsg *key2SegMsg) TimeTick() Timestamp {

View File

@ -19,6 +19,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
@ -89,7 +90,7 @@ import (
// SourceID: 0,
// },
// CollectionID: UniqueID(collectionID),
// PartitionName: "default",
// PartitionID: defaultPartitionID,
// SegmentID: segmentID,
// ChannelID: "0",
// Timestamps: timestamps,
@ -173,8 +174,6 @@ import (
// log.Print("marshal placeholderGroup failed")
// }
// query := milvuspb.SearchRequest{
// CollectionName: "collection0",
// PartitionNames: []string{"default"},
// Dsl: dslString,
// PlaceholderGroup: placeGroupByte,
// }
@ -425,7 +424,7 @@ import (
// SourceID: 0,
// },
// CollectionID: UniqueID(collectionID),
// PartitionName: "default",
// PartitionID: defaultPartitionID,
// SegmentID: segmentID,
// ChannelID: "0",
// Timestamps: timestamps,
@ -498,8 +497,6 @@ import (
// log.Print("marshal placeholderGroup failed")
// }
// query := milvuspb.SearchRequest{
// CollectionName: "collection0",
// PartitionNames: []string{"default"},
// Dsl: dslString,
// PlaceholderGroup: placeGroupByte,
// }
@ -674,6 +671,72 @@ import (
//}
///////////////////////////////////////////////////////////////////////////////////////////////////////////
func genETCDCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionMeta {
var fieldVec schemapb.FieldSchema
if isBinary {
fieldVec = schemapb.FieldSchema{
FieldID: UniqueID(100),
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_BINARY,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "metric_type",
Value: "JACCARD",
},
},
}
} else {
fieldVec = schemapb.FieldSchema{
FieldID: UniqueID(100),
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "metric_type",
Value: "L2",
},
},
}
}
fieldInt := schemapb.FieldSchema{
FieldID: UniqueID(101),
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
}
schema := schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
Schema: &schema,
CreateTime: Timestamp(0),
PartitionIDs: []UniqueID{defaultPartitionID},
}
return &collectionMeta
}
func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, keyPrefix string) ([]*internalpb2.StringList, []int64, error) {
const (
msgLength = 1000
@ -725,7 +788,7 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID
}
// buffer data to binLogs
collMeta := genTestCollectionMeta(collectionID, false)
collMeta := genETCDCollectionMeta(collectionID, false)
collMeta.Schema.Fields = append(collMeta.Schema.Fields, &schemapb.FieldSchema{
FieldID: 0,
Name: "uid",
@ -870,7 +933,7 @@ func generateIndex(segmentID UniqueID) ([]string, error) {
return indexPaths, nil
}
func doInsert(ctx context.Context, collectionID UniqueID, partitionTag string, segmentID UniqueID) error {
func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) error {
const msgLength = 1000
const DIM = 16
@ -906,12 +969,12 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionTag string, s
Timestamp: uint64(i + 1000),
SourceID: 0,
},
CollectionID: collectionID,
PartitionName: partitionTag,
SegmentID: segmentID,
ChannelID: "0",
Timestamps: []uint64{uint64(i + 1000)},
RowIDs: []int64{int64(i)},
CollectionID: collectionID,
PartitionID: partitionID,
SegmentID: segmentID,
ChannelID: "0",
Timestamps: []uint64{uint64(i + 1000)},
RowIDs: []int64{int64(i)},
RowData: []*commonpb.Blob{
{Value: rawData},
},

View File

@ -6,16 +6,14 @@ import (
"log"
"path"
"reflect"
"strconv"
"strings"
"time"
"github.com/golang/protobuf/proto"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"go.etcd.io/etcd/clientv3"
)
const (
@ -91,23 +89,7 @@ func isSegmentObj(key string) bool {
return index == 0
}
func isSegmentChannelRangeInQueryNodeChannelRange(segment *etcdpb.SegmentMeta) bool {
if segment.ChannelStart > segment.ChannelEnd {
log.Printf("Illegal segment channel range")
return false
}
var queryNodeChannelStart = Params.InsertChannelRange[0]
var queryNodeChannelEnd = Params.InsertChannelRange[1]
if segment.ChannelStart >= int32(queryNodeChannelStart) && segment.ChannelEnd <= int32(queryNodeChannelEnd) {
return true
}
return false
}
func printCollectionStruct(obj *etcdpb.CollectionMeta) {
func printCollectionStruct(obj *etcdpb.CollectionInfo) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v)
typeOfS := v.Type()
@ -120,7 +102,7 @@ func printCollectionStruct(obj *etcdpb.CollectionMeta) {
}
}
func printSegmentStruct(obj *etcdpb.SegmentMeta) {
func printSegmentStruct(obj *datapb.SegmentInfo) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v)
typeOfS := v.Type()
@ -140,8 +122,8 @@ func (mService *metaService) processCollectionCreate(id string, value string) {
if err != nil {
log.Println(err)
}
for _, partitionTag := range col.PartitionTags {
err = mService.replica.addPartition2(col.ID, partitionTag)
for _, partitionID := range col.PartitionIDs {
err = mService.replica.addPartition(col.ID, partitionID)
if err != nil {
log.Println(err)
}
@ -153,14 +135,11 @@ func (mService *metaService) processSegmentCreate(id string, value string) {
//println("Create Segment: ", id)
seg := mService.segmentUnmarshal(value)
if !isSegmentChannelRangeInQueryNodeChannelRange(seg) {
log.Println("Illegal segment channel range")
return
}
// TODO: what if seg == nil? We need to notify master and return rpc request failed
if seg != nil {
err := mService.replica.addSegment2(seg.SegmentID, seg.PartitionTag, seg.CollectionID, segTypeGrowing)
// TODO: get partition id from segment meta
err := mService.replica.addSegment(seg.SegmentID, seg.PartitionID, seg.CollectionID, segTypeGrowing)
if err != nil {
log.Println(err)
return
@ -181,122 +160,6 @@ func (mService *metaService) processCreate(key string, msg string) {
}
}
func (mService *metaService) processSegmentModify(id string, value string) {
seg := mService.segmentUnmarshal(value)
if !isSegmentChannelRangeInQueryNodeChannelRange(seg) {
return
}
if seg != nil {
targetSegment, err := mService.replica.getSegmentByID(seg.SegmentID)
if err != nil {
log.Println(err)
return
}
// TODO: do modify
fmt.Println(targetSegment)
}
}
func (mService *metaService) processCollectionModify(id string, value string) {
//println("Modify Collection: ", id)
col := mService.collectionUnmarshal(value)
if col != nil {
err := mService.replica.addPartitionsByCollectionMeta(col)
if err != nil {
log.Println(err)
}
err = mService.replica.removePartitionsByCollectionMeta(col)
if err != nil {
log.Println(err)
}
}
}
func (mService *metaService) processModify(key string, msg string) {
if isCollectionObj(key) {
objID := GetCollectionObjID(key)
mService.processCollectionModify(objID, msg)
} else if isSegmentObj(key) {
objID := GetSegmentObjID(key)
mService.processSegmentModify(objID, msg)
} else {
println("can not process modify msg:", key)
}
}
func (mService *metaService) processSegmentDelete(id string) {
//println("Delete segment: ", id)
var segmentID, err = strconv.ParseInt(id, 10, 64)
if err != nil {
log.Println("Cannot parse segment id:" + id)
}
err = mService.replica.removeSegment(segmentID)
if err != nil {
log.Println(err)
return
}
}
func (mService *metaService) processCollectionDelete(id string) {
//println("Delete collection: ", id)
var collectionID, err = strconv.ParseInt(id, 10, 64)
if err != nil {
log.Println("Cannot parse collection id:" + id)
}
err = mService.replica.removeCollection(collectionID)
if err != nil {
log.Println(err)
return
}
}
func (mService *metaService) processDelete(key string) {
//println("process delete")
if isCollectionObj(key) {
objID := GetCollectionObjID(key)
mService.processCollectionDelete(objID)
} else if isSegmentObj(key) {
objID := GetSegmentObjID(key)
mService.processSegmentDelete(objID)
} else {
println("can not process delete msg:", key)
}
}
func (mService *metaService) processResp(resp clientv3.WatchResponse) error {
err := resp.Err()
if err != nil {
return err
}
for _, ev := range resp.Events {
if ev.IsCreate() {
key := string(ev.Kv.Key)
msg := string(ev.Kv.Value)
mService.processCreate(key, msg)
} else if ev.IsModify() {
key := string(ev.Kv.Key)
msg := string(ev.Kv.Value)
mService.processModify(key, msg)
} else if ev.Type == mvccpb.DELETE {
key := string(ev.Kv.Key)
mService.processDelete(key)
} else {
println("Unrecognized etcd msg!")
}
}
return nil
}
func (mService *metaService) loadCollections() error {
keys, values, err := mService.kvBase.LoadWithPrefix(CollectionPrefix)
if err != nil {
@ -326,8 +189,8 @@ func (mService *metaService) loadSegments() error {
}
//----------------------------------------------------------------------- Unmarshal and Marshal
func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionMeta {
col := etcdpb.CollectionMeta{}
func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionInfo {
col := etcdpb.CollectionInfo{}
err := proto.UnmarshalText(value, &col)
if err != nil {
log.Println(err)
@ -336,7 +199,7 @@ func (mService *metaService) collectionUnmarshal(value string) *etcdpb.Collectio
return &col
}
func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) string {
func (mService *metaService) collectionMarshal(col *etcdpb.CollectionInfo) string {
value := proto.MarshalTextString(col)
if value == "" {
log.Println("marshal collection failed")
@ -345,8 +208,8 @@ func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) strin
return value
}
func (mService *metaService) segmentUnmarshal(value string) *etcdpb.SegmentMeta {
seg := etcdpb.SegmentMeta{}
func (mService *metaService) segmentUnmarshal(value string) *datapb.SegmentInfo {
seg := datapb.SegmentInfo{}
err := proto.UnmarshalText(value, &seg)
if err != nil {
log.Println(err)

View File

@ -1,12 +1,11 @@
package querynode
import (
"math"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
)
func TestMetaService_start(t *testing.T) {
@ -65,36 +64,6 @@ func TestMetaService_isSegmentObj(t *testing.T) {
assert.Equal(t, b2, false)
}
func TestMetaService_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T) {
var s = etcdpb.SegmentMeta{
SegmentID: UniqueID(0),
CollectionID: UniqueID(0),
PartitionTag: "partition0",
ChannelStart: 0,
ChannelEnd: 1,
OpenTime: Timestamp(0),
CloseTime: Timestamp(math.MaxUint64),
NumRows: UniqueID(0),
}
var b = isSegmentChannelRangeInQueryNodeChannelRange(&s)
assert.Equal(t, b, true)
s = etcdpb.SegmentMeta{
SegmentID: UniqueID(0),
CollectionID: UniqueID(0),
PartitionTag: "partition0",
ChannelStart: 128,
ChannelEnd: 256,
OpenTime: Timestamp(0),
CloseTime: Timestamp(math.MaxUint64),
NumRows: UniqueID(0),
}
b = isSegmentChannelRangeInQueryNodeChannelRange(&s)
assert.Equal(t, b, false)
}
func TestMetaService_printCollectionStruct(t *testing.T) {
collectionID := UniqueID(0)
collectionMeta := genTestCollectionMeta(collectionID, false)
@ -102,14 +71,11 @@ func TestMetaService_printCollectionStruct(t *testing.T) {
}
func TestMetaService_printSegmentStruct(t *testing.T) {
var s = etcdpb.SegmentMeta{
var s = datapb.SegmentInfo{
SegmentID: UniqueID(0),
CollectionID: UniqueID(0),
PartitionTag: "partition0",
ChannelStart: 128,
ChannelEnd: 256,
PartitionID: defaultPartitionID,
OpenTime: Timestamp(0),
CloseTime: Timestamp(math.MaxUint64),
NumRows: UniqueID(0),
}
@ -146,8 +112,7 @@ func TestMetaService_processCollectionCreate(t *testing.T) {
>
>
>
segmentIDs: 0
partition_tags: "default"
partitionIDs: 2021
`
node.metaService.processCollectionCreate(id, value)
@ -168,10 +133,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
id := "0"
value := `partition_tag: "default"
channel_start: 0
channel_end: 1
close_time: 18446744073709551615
value := `partitionID: 2021
`
(*node.metaService).processSegmentCreate(id, value)
@ -212,8 +174,7 @@ func TestMetaService_processCreate(t *testing.T) {
>
>
>
segmentIDs: 0
partition_tags: "default"
partitionIDs: 2021
`
(*node.metaService).processCreate(key1, msg1)
@ -225,10 +186,7 @@ func TestMetaService_processCreate(t *testing.T) {
assert.Equal(t, collection.ID(), UniqueID(0))
key2 := Params.MetaRootPath + "/segment/0"
msg2 := `partition_tag: "default"
channel_start: 0
channel_end: 1
close_time: 18446744073709551615
msg2 := `partitionID: 2021
`
(*node.metaService).processCreate(key2, msg2)
@ -238,430 +196,6 @@ func TestMetaService_processCreate(t *testing.T) {
node.Stop()
}
func TestMetaService_processSegmentModify(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
segmentID := UniqueID(0)
initTestMeta(t, node, collectionID, segmentID)
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
id := "0"
value := `partition_tag: "default"
channel_start: 0
channel_end: 1
close_time: 18446744073709551615
`
(*node.metaService).processSegmentCreate(id, value)
s, err := node.replica.getSegmentByID(segmentID)
assert.NoError(t, err)
assert.Equal(t, s.segmentID, segmentID)
newValue := `partition_tag: "default"
channel_start: 0
channel_end: 1
close_time: 18446744073709551615
`
// TODO: modify segment for testing processCollectionModify
(*node.metaService).processSegmentModify(id, newValue)
seg, err := node.replica.getSegmentByID(segmentID)
assert.NoError(t, err)
assert.Equal(t, seg.segmentID, segmentID)
node.Stop()
}
func TestMetaService_processCollectionModify(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
id := "0"
value := `schema: <
name: "test"
fields: <
fieldID:100
name: "vec"
data_type: VECTOR_FLOAT
type_params: <
key: "dim"
value: "16"
>
index_params: <
key: "metric_type"
value: "L2"
>
>
fields: <
fieldID:101
name: "age"
data_type: INT32
type_params: <
key: "dim"
value: "1"
>
>
>
segmentIDs: 0
partition_tags: "p0"
partition_tags: "p1"
partition_tags: "p2"
`
(*node.metaService).processCollectionCreate(id, value)
collectionNum := node.replica.getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := node.replica.getCollectionByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
partitionNum, err := node.replica.getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 3)
hasPartition := node.replica.hasPartition(UniqueID(0), "p0")
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p1")
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p2")
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p3")
assert.Equal(t, hasPartition, false)
newValue := `schema: <
name: "test"
fields: <
fieldID:100
name: "vec"
data_type: VECTOR_FLOAT
type_params: <
key: "dim"
value: "16"
>
index_params: <
key: "metric_type"
value: "L2"
>
>
fields: <
fieldID:101
name: "age"
data_type: INT32
type_params: <
key: "dim"
value: "1"
>
>
>
segmentIDs: 0
partition_tags: "p1"
partition_tags: "p2"
partition_tags: "p3"
`
(*node.metaService).processCollectionModify(id, newValue)
collection, err = node.replica.getCollectionByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
partitionNum, err = node.replica.getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 3)
hasPartition = node.replica.hasPartition(UniqueID(0), "p0")
assert.Equal(t, hasPartition, false)
hasPartition = node.replica.hasPartition(UniqueID(0), "p1")
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p2")
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p3")
assert.Equal(t, hasPartition, true)
node.Stop()
}
func TestMetaService_processModify(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
key1 := Params.MetaRootPath + "/collection/0"
msg1 := `schema: <
name: "test"
fields: <
fieldID:100
name: "vec"
data_type: VECTOR_FLOAT
type_params: <
key: "dim"
value: "16"
>
index_params: <
key: "metric_type"
value: "L2"
>
>
fields: <
fieldID:101
name: "age"
data_type: INT32
type_params: <
key: "dim"
value: "1"
>
>
>
segmentIDs: 0
partition_tags: "p0"
partition_tags: "p1"
partition_tags: "p2"
`
(*node.metaService).processCreate(key1, msg1)
collectionNum := node.replica.getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := node.replica.getCollectionByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
partitionNum, err := node.replica.getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 3)
hasPartition := node.replica.hasPartition(UniqueID(0), "p0")
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p1")
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p2")
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p3")
assert.Equal(t, hasPartition, false)
key2 := Params.MetaRootPath + "/segment/0"
msg2 := `partition_tag: "p1"
channel_start: 0
channel_end: 1
close_time: 18446744073709551615
`
(*node.metaService).processCreate(key2, msg2)
s, err := node.replica.getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0))
// modify
// TODO: use different index for testing processCollectionModify
msg3 := `schema: <
name: "test"
fields: <
fieldID:100
name: "vec"
data_type: VECTOR_FLOAT
type_params: <
key: "dim"
value: "16"
>
index_params: <
key: "metric_type"
value: "L2"
>
>
fields: <
fieldID:101
name: "age"
data_type: INT32
type_params: <
key: "dim"
value: "1"
>
>
>
segmentIDs: 0
partition_tags: "p1"
partition_tags: "p2"
partition_tags: "p3"
`
(*node.metaService).processModify(key1, msg3)
collection, err = node.replica.getCollectionByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
partitionNum, err = node.replica.getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 3)
hasPartition = node.replica.hasPartition(UniqueID(0), "p0")
assert.Equal(t, hasPartition, false)
hasPartition = node.replica.hasPartition(UniqueID(0), "p1")
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p2")
assert.Equal(t, hasPartition, true)
hasPartition = node.replica.hasPartition(UniqueID(0), "p3")
assert.Equal(t, hasPartition, true)
msg4 := `partition_tag: "p1"
channel_start: 0
channel_end: 1
close_time: 18446744073709551615
`
(*node.metaService).processModify(key2, msg4)
seg, err := node.replica.getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0))
node.Stop()
}
func TestMetaService_processSegmentDelete(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
id := "0"
value := `partition_tag: "default"
channel_start: 0
channel_end: 1
close_time: 18446744073709551615
`
(*node.metaService).processSegmentCreate(id, value)
seg, err := node.replica.getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0))
(*node.metaService).processSegmentDelete("0")
mapSize := node.replica.getSegmentNum()
assert.Equal(t, mapSize, 0)
node.Stop()
}
func TestMetaService_processCollectionDelete(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
id := "0"
value := `schema: <
name: "test"
fields: <
fieldID:100
name: "vec"
data_type: VECTOR_FLOAT
type_params: <
key: "dim"
value: "16"
>
index_params: <
key: "metric_type"
value: "L2"
>
>
fields: <
fieldID:101
name: "age"
data_type: INT32
type_params: <
key: "dim"
value: "1"
>
>
>
segmentIDs: 0
partition_tags: "default"
`
(*node.metaService).processCollectionCreate(id, value)
collectionNum := node.replica.getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := node.replica.getCollectionByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
(*node.metaService).processCollectionDelete(id)
collectionNum = node.replica.getCollectionNum()
assert.Equal(t, collectionNum, 0)
node.Stop()
}
func TestMetaService_processDelete(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
key1 := Params.MetaRootPath + "/collection/0"
msg1 := `schema: <
name: "test"
fields: <
fieldID:100
name: "vec"
data_type: VECTOR_FLOAT
type_params: <
key: "dim"
value: "16"
>
index_params: <
key: "metric_type"
value: "L2"
>
>
fields: <
fieldID:101
name: "age"
data_type: INT32
type_params: <
key: "dim"
value: "1"
>
>
>
segmentIDs: 0
partition_tags: "default"
`
(*node.metaService).processCreate(key1, msg1)
collectionNum := node.replica.getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := node.replica.getCollectionByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
key2 := Params.MetaRootPath + "/segment/0"
msg2 := `partition_tag: "default"
channel_start: 0
channel_end: 1
close_time: 18446744073709551615
`
(*node.metaService).processCreate(key2, msg2)
seg, err := node.replica.getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0))
(*node.metaService).processDelete(key1)
collectionsSize := node.replica.getCollectionNum()
assert.Equal(t, collectionsSize, 0)
mapSize := node.replica.getSegmentNum()
assert.Equal(t, mapSize, 0)
node.Stop()
}
func TestMetaService_processResp(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
metaChan := (*node.metaService).kvBase.WatchWithPrefix("")
select {
case <-node.queryNodeLoopCtx.Done():
return
case resp := <-metaChan:
_ = (*node.metaService).processResp(resp)
}
node.Stop()
}
func TestMetaService_loadCollections(t *testing.T) {
node := newQueryNodeMock()
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)

View File

@ -58,10 +58,9 @@ type ParamTable struct {
StatsChannelName string
StatsReceiveBufSize int64
GracefulTime int64
MsgChannelSubName string
DefaultPartitionTag string
SliceIndex int
GracefulTime int64
MsgChannelSubName string
SliceIndex int
}
var Params ParamTable
@ -133,7 +132,6 @@ func (p *ParamTable) Init() {
p.initGracefulTime()
p.initMsgChannelSubName()
p.initDefaultPartitionTag()
p.initSliceIndex()
p.initFlowGraphMaxQueueLength()
@ -458,15 +456,6 @@ func (p *ParamTable) initDDChannelNames() {
p.DDChannelNames = ret
}
func (p *ParamTable) initDefaultPartitionTag() {
defaultTag, err := p.Load("common.defaultPartitionTag")
if err != nil {
panic(err)
}
p.DefaultPartitionTag = defaultTag
}
func (p *ParamTable) initSliceIndex() {
queryNodeID := p.QueryNodeID
queryNodeIDList := p.QueryNodeIDList()

View File

@ -165,8 +165,3 @@ func TestParamTable_ddChannelName(t *testing.T) {
contains := strings.Contains(names[0], "data-definition-0")
assert.Equal(t, contains, true)
}
func TestParamTable_defaultPartitionTag(t *testing.T) {
tag := Params.DefaultPartitionTag
assert.Equal(t, tag, "_default")
}

View File

@ -13,33 +13,19 @@ package querynode
import "C"
type Partition struct {
partitionTag string
id UniqueID
segments []*Segment
enableDM bool
id UniqueID
segments []*Segment
enableDM bool
}
func (p *Partition) ID() UniqueID {
return p.id
}
func (p *Partition) Tag() string {
return (*p).partitionTag
}
func (p *Partition) Segments() *[]*Segment {
return &(*p).segments
}
func newPartition2(partitionTag string) *Partition {
var newPartition = &Partition{
partitionTag: partitionTag,
enableDM: false,
}
return newPartition
}
func newPartition(partitionID UniqueID) *Partition {
var newPartition = &Partition{
id: partitionID,

View File

@ -19,7 +19,7 @@ func TestPartition_Segments(t *testing.T) {
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := node.replica.addSegment2(UniqueID(i), targetPartition.partitionTag, collection.ID(), segTypeGrowing)
err := node.replica.addSegment(UniqueID(i), targetPartition.ID(), collection.ID(), segTypeGrowing)
assert.NoError(t, err)
}
@ -28,7 +28,7 @@ func TestPartition_Segments(t *testing.T) {
}
func TestPartition_newPartition(t *testing.T) {
partitionTag := "default"
partition := newPartition2(partitionTag)
assert.Equal(t, partition.partitionTag, partitionTag)
partitionID := defaultPartitionID
partition := newPartition(partitionID)
assert.Equal(t, partition.ID(), defaultPartitionID)
}

View File

@ -20,6 +20,8 @@ import (
const ctxTimeInMillisecond = 5000
const closeWithDeadline = true
const defaultPartitionID = UniqueID(2021)
type queryServiceMock struct{}
func setup() {
@ -27,7 +29,7 @@ func setup() {
Params.MetaRootPath = "/etcd/test/root/querynode"
}
func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionMeta {
func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionInfo {
var fieldVec schemapb.FieldSchema
if isBinary {
fieldVec = schemapb.FieldSchema{
@ -76,21 +78,18 @@ func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.Collect
DataType: schemapb.DataType_INT32,
}
collectionName := rand.Int63n(1000000)
schema := schemapb.CollectionSchema{
Name: "collection-" + strconv.FormatInt(collectionName, 10),
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
collectionMeta := etcdpb.CollectionInfo{
ID: collectionID,
Schema: &schema,
CreateTime: Timestamp(0),
PartitionIDs: []UniqueID{defaultPartitionID},
}
return &collectionMeta
@ -111,10 +110,10 @@ func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentI
assert.Equal(t, collection.ID(), collectionID)
assert.Equal(t, node.replica.getCollectionNum(), 1)
err = node.replica.addPartition2(collection.ID(), collectionMeta.PartitionTags[0])
err = node.replica.addPartition(collection.ID(), collectionMeta.PartitionIDs[0])
assert.NoError(t, err)
err = node.replica.addSegment2(segmentID, collectionMeta.PartitionTags[0], collectionID, segTypeGrowing)
err = node.replica.addSegment(segmentID, collectionMeta.PartitionIDs[0], collectionID, segTypeGrowing)
assert.NoError(t, err)
}

View File

@ -18,7 +18,7 @@ func TestReduce_AllFunc(t *testing.T) {
collectionMeta := genTestCollectionMeta(collectionID, false)
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
const DIM = 16
var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}

View File

@ -239,7 +239,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
return errors.New("unmarshal query failed")
}
collectionID := searchMsg.CollectionID
partitionTagsInQuery := query.PartitionNames
collection, err := ss.replica.getCollectionByID(collectionID)
if err != nil {
span.LogFields(oplog.Error(err))
@ -263,29 +262,30 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
searchResults := make([]*SearchResult, 0)
matchedSegments := make([]*Segment, 0)
//fmt.Println("search msg's partitionTag = ", partitionTagsInQuery)
//fmt.Println("search msg's partitionID = ", partitionIDsInQuery)
var partitionTagsInCol []string
var partitionIDsInCol []UniqueID
for _, partition := range collection.partitions {
partitionTag := partition.partitionTag
partitionTagsInCol = append(partitionTagsInCol, partitionTag)
partitionID := partition.ID()
partitionIDsInCol = append(partitionIDsInCol, partitionID)
}
var searchPartitionTag []string
if len(partitionTagsInQuery) == 0 {
searchPartitionTag = partitionTagsInCol
var searchPartitionIDs []UniqueID
partitionIDsInQuery := searchMsg.PartitionIDs
if len(partitionIDsInQuery) == 0 {
searchPartitionIDs = partitionIDsInCol
} else {
for _, tag := range partitionTagsInCol {
for _, toMatchTag := range partitionTagsInQuery {
re := regexp.MustCompile("^" + toMatchTag + "$")
if re.MatchString(tag) {
searchPartitionTag = append(searchPartitionTag, tag)
for _, id := range partitionIDsInCol {
for _, toMatchID := range partitionIDsInQuery {
re := regexp.MustCompile("^" + strconv.FormatInt(toMatchID, 10) + "$")
if re.MatchString(strconv.FormatInt(id, 10)) {
searchPartitionIDs = append(searchPartitionIDs, id)
}
}
}
}
for _, partitionTag := range searchPartitionTag {
partition, _ := ss.replica.getPartitionByTag(collectionID, partitionTag)
for _, partitionID := range searchPartitionIDs {
partition, _ := ss.replica.getPartitionByID(collectionID, partitionID)
for _, segment := range partition.segments {
//fmt.Println("dsl = ", dsl)

View File

@ -61,8 +61,6 @@ func TestSearch_Search(t *testing.T) {
}
query := milvuspb.SearchRequest{
CollectionName: "collection0",
PartitionNames: []string{"default"},
Dsl: dslString,
PlaceholderGroup: placeGroupByte,
}
@ -137,12 +135,12 @@ func TestSearch_Search(t *testing.T) {
Timestamp: uint64(10 + 1000),
SourceID: 0,
},
CollectionID: UniqueID(0),
PartitionName: "default",
SegmentID: int64(0),
ChannelID: "0",
Timestamps: []uint64{uint64(i + 1000)},
RowIDs: []int64{int64(i)},
CollectionID: UniqueID(0),
PartitionID: defaultPartitionID,
SegmentID: int64(0),
ChannelID: "0",
Timestamps: []uint64{uint64(i + 1000)},
RowIDs: []int64{int64(i)},
RowData: []*commonpb.Blob{
{Value: rawData},
},
@ -256,8 +254,6 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
}
query := milvuspb.SearchRequest{
CollectionName: "collection0",
PartitionNames: []string{"default"},
Dsl: dslString,
PlaceholderGroup: placeGroupByte,
}
@ -336,12 +332,12 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
Timestamp: uint64(i + 1000),
SourceID: 0,
},
CollectionID: UniqueID(0),
PartitionName: "default",
SegmentID: int64(segmentID),
ChannelID: "0",
Timestamps: []uint64{uint64(i + 1000)},
RowIDs: []int64{int64(i)},
CollectionID: UniqueID(0),
PartitionID: defaultPartitionID,
SegmentID: int64(segmentID),
ChannelID: "0",
Timestamps: []uint64{uint64(i + 1000)},
RowIDs: []int64{int64(i)},
RowData: []*commonpb.Blob{
{Value: rawData},
},

View File

@ -36,7 +36,6 @@ type Segment struct {
segmentPtr C.CSegmentInterface
segmentID UniqueID
partitionTag string // TODO: use partitionID
partitionID UniqueID
collectionID UniqueID
lastMemSize int64
@ -81,25 +80,6 @@ func (s *Segment) getType() segmentType {
return s.segmentType
}
func newSegment2(collection *Collection, segmentID int64, partitionTag string, collectionID UniqueID, segType segmentType) *Segment {
/*
CSegmentInterface
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
*/
initIndexParam := make(map[int64]indexParam)
segmentPtr := C.NewSegment(collection.collectionPtr, C.ulong(segmentID), segType)
var newSegment = &Segment{
segmentPtr: segmentPtr,
segmentType: segType,
segmentID: segmentID,
partitionTag: partitionTag,
collectionID: collectionID,
indexParam: initIndexParam,
}
return newSegment
}
func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, collectionID UniqueID, segType segmentType) *Segment {
/*
CSegmentInterface

View File

@ -22,7 +22,7 @@ func TestSegment_newSegment(t *testing.T) {
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
deleteSegment(segment)
deleteCollection(collection)
@ -36,7 +36,7 @@ func TestSegment_deleteSegment(t *testing.T) {
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
deleteSegment(segment)
@ -52,7 +52,7 @@ func TestSegment_getRowCount(t *testing.T) {
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
ids := []int64{1, 2, 3}
@ -99,7 +99,7 @@ func TestSegment_getDeletedCount(t *testing.T) {
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
ids := []int64{1, 2, 3}
@ -152,7 +152,7 @@ func TestSegment_getMemSize(t *testing.T) {
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
ids := []int64{1, 2, 3}
@ -199,7 +199,7 @@ func TestSegment_segmentInsert(t *testing.T) {
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
ids := []int64{1, 2, 3}
@ -242,7 +242,7 @@ func TestSegment_segmentDelete(t *testing.T) {
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
ids := []int64{1, 2, 3}
@ -291,7 +291,7 @@ func TestSegment_segmentSearch(t *testing.T) {
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
ids := []int64{1, 2, 3}
@ -372,7 +372,7 @@ func TestSegment_segmentPreInsert(t *testing.T) {
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
const DIM = 16
@ -410,7 +410,7 @@ func TestSegment_segmentPreDelete(t *testing.T) {
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment2(collection, segmentID, Params.DefaultPartitionTag, collectionID, segTypeGrowing)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
ids := []int64{1, 2, 3}