enhance: Use partitionID when delete by partitionKey (#38231)

When delete by partition_key, Milvus will generates L0 segments
globally. During L0 Compaction, those L0 segments will touch all
partitions collection wise. Due to the false-positive rate of segment
bloomfilters, L0 compactions will append false deltalogs to completed
irrelevant partitions, which causes *partition deletion amplification.

This PR uses partition_key to set targeted partitionID when producing
deleteMsgs into MsgStreams. This'll narrow down L0 segments scope to
partition level, and remove the false-positive influence
collection-wise.

However, due to DeleteMsg structure, we can only label one partition to
one deleteMsg, so this enhancement fails if user wants to delete over 2
partition_keys in one deletion.

See also: #34665

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/38610/head
XuanYang-cn 2024-12-20 11:18:46 +08:00 committed by GitHub
parent a728646534
commit ca7ec23198
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 798 additions and 617 deletions

View File

@ -209,7 +209,6 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
zap.Int64("partitionID", r.GetPartitionID()),
zap.String("channelName", r.GetChannelName()),
zap.Uint32("count", r.GetCount()),
zap.String("segment level", r.GetLevel().String()),
)
// Load the collection info from Root Coordinator, if it is not found in server meta.

View File

@ -171,7 +171,7 @@ message SegmentIDRequest {
int64 partitionID = 4;
bool isImport = 5; // deprecated
int64 importTaskID = 6; // deprecated
SegmentLevel level = 7;
SegmentLevel level = 7; // deprecated
}
message AllocSegmentRequest {

View File

@ -273,7 +273,7 @@ func getPartitionIDs(ctx context.Context, dbName string, collectionName string,
useRegexp := Params.ProxyCfg.PartitionNameRegexp.GetAsBool()
partitionsSet := typeutil.NewSet[int64]()
partitionsSet := typeutil.NewUniqueSet()
for _, partitionName := range partitionNames {
if useRegexp {
// Legacy feature, use partition name as regexp
@ -298,9 +298,7 @@ func getPartitionIDs(ctx context.Context, dbName string, collectionName string,
// TODO change after testcase updated: return nil, merr.WrapErrPartitionNotFound(partitionName)
return nil, fmt.Errorf("partition name %s not found", partitionName)
}
if !partitionsSet.Contain(partitionID) {
partitionsSet.Insert(partitionID)
}
partitionsSet.Insert(partitionID)
}
}
return partitionsSet.Collect(), nil

View File

@ -53,11 +53,10 @@ type deleteTask struct {
idAllocator allocator.Interface
// delete info
primaryKeys *schemapb.IDs
collectionID UniqueID
partitionID UniqueID
dbID UniqueID
partitionKeyMode bool
primaryKeys *schemapb.IDs
collectionID UniqueID
partitionID UniqueID
dbID UniqueID
// set by scheduler
ts Timestamp
@ -135,7 +134,6 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error {
func (dt *deleteTask) Execute(ctx context.Context) (err error) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute")
defer sp.End()
// log := log.Ctx(ctx)
if len(dt.req.GetExpr()) == 0 {
return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression")
@ -232,13 +230,13 @@ func repackDeleteMsgByHash(
commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionID: collectionID,
PartitionID: partitionID,
ShardName: vchannel,
CollectionName: collectionName,
PartitionName: partitionName,
DbName: dbName,
CollectionID: collectionID,
PartitionID: partitionID,
PrimaryKeys: &schemapb.IDs{},
ShardName: vchannel,
},
}
}
@ -296,11 +294,11 @@ type deleteRunner struct {
limiter types.Limiter
// delete info
schema *schemaInfo
dbID UniqueID
collectionID UniqueID
partitionID UniqueID
partitionKeyMode bool
schema *schemaInfo
dbID UniqueID
collectionID UniqueID
partitionIDs []UniqueID
plan *planpb.PlanNode
// for query
msgID int64
@ -349,29 +347,52 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
return ErrWithLog(log, "Failed to get collection schema", err)
}
dr.partitionKeyMode = dr.schema.IsPartitionKeyCollection()
// get partitionIDs of delete
dr.partitionID = common.AllPartitionsID
if len(dr.req.PartitionName) > 0 {
if dr.partitionKeyMode {
dr.plan, err = planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr(), dr.req.GetExprTemplateValues())
if err != nil {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err))
}
if planparserv2.IsAlwaysTruePlan(dr.plan) {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr()))
}
// Set partitionIDs, could be empty if no partition name specified and no partition key
partName := dr.req.GetPartitionName()
if dr.schema.IsPartitionKeyCollection() {
if len(partName) > 0 {
return errors.New("not support manually specifying the partition names if partition key mode is used")
}
partName := dr.req.GetPartitionName()
expr, err := exprutil.ParseExprFromPlan(dr.plan)
if err != nil {
return err
}
partitionKeys := exprutil.ParseKeys(expr, exprutil.PartitionKey)
hashedPartitionNames, err := assignPartitionKeys(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), partitionKeys)
if err != nil {
return err
}
dr.partitionIDs, err = getPartitionIDs(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), hashedPartitionNames)
if err != nil {
return err
}
} else if len(partName) > 0 {
// static validation
if err := validatePartitionTag(partName, true); err != nil {
return ErrWithLog(log, "Invalid partition name", err)
}
// dynamic validation
partID, err := globalMetaCache.GetPartitionID(ctx, dr.req.GetDbName(), collName, partName)
if err != nil {
return ErrWithLog(log, "Failed to get partition id", err)
}
dr.partitionID = partID
dr.partitionIDs = []UniqueID{partID} // only one partID
}
// hash primary keys to channels
// set vchannels
channelNames, err := dr.chMgr.getVChannels(dr.collectionID)
if err != nil {
return ErrWithLog(log, "Failed to get primary keys from expr", err)
return ErrWithLog(log, "Failed to get vchannels from collection", err)
}
dr.vChannels = channelNames
@ -385,16 +406,7 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
}
func (dr *deleteRunner) Run(ctx context.Context) error {
plan, err := planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr(), dr.req.GetExprTemplateValues())
if err != nil {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err))
}
if planparserv2.IsAlwaysTruePlan(plan) {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr()))
}
isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, plan)
isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, dr.plan)
if isSimple {
// if could get delete.primaryKeys from delete expr
err := dr.simpleDelete(ctx, pk, numRow)
@ -404,7 +416,7 @@ func (dr *deleteRunner) Run(ctx context.Context) error {
} else {
// if get complex delete expr
// need query from querynode before delete
err = dr.complexDelete(ctx, plan)
err := dr.complexDelete(ctx, dr.plan)
if err != nil {
log.Ctx(ctx).Warn("complex delete failed,but delete some data", zap.Int64("count", dr.result.DeleteCnt), zap.String("expr", dr.req.GetExpr()))
return err
@ -413,21 +425,21 @@ func (dr *deleteRunner) Run(ctx context.Context) error {
return nil
}
func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs) (*deleteTask, error) {
func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs, partitionID UniqueID) (*deleteTask, error) {
dt := &deleteTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
req: dr.req,
idAllocator: dr.idAllocator,
chMgr: dr.chMgr,
chTicker: dr.chTicker,
collectionID: dr.collectionID,
partitionID: dr.partitionID,
partitionKeyMode: dr.partitionKeyMode,
vChannels: dr.vChannels,
primaryKeys: primaryKeys,
dbID: dr.dbID,
ctx: ctx,
Condition: NewTaskCondition(ctx),
req: dr.req,
idAllocator: dr.idAllocator,
chMgr: dr.chMgr,
chTicker: dr.chTicker,
collectionID: dr.collectionID,
partitionID: partitionID,
vChannels: dr.vChannels,
primaryKeys: primaryKeys,
dbID: dr.dbID,
}
var enqueuedTask task = dt
if streamingutil.IsStreamingServiceEnabled() {
enqueuedTask = &deleteTaskByStreamingService{deleteTask: dt}
@ -445,30 +457,9 @@ func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs)
// make sure it concurrent safe
func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) executeFunc {
return func(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channel string) error {
var partitionIDs []int64
// optimize query when partitionKey on
if dr.partitionKeyMode {
expr, err := exprutil.ParseExprFromPlan(plan)
if err != nil {
return err
}
partitionKeys := exprutil.ParseKeys(expr, exprutil.PartitionKey)
hashedPartitionNames, err := assignPartitionKeys(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), partitionKeys)
if err != nil {
return err
}
partitionIDs, err = getPartitionIDs(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), hashedPartitionNames)
if err != nil {
return err
}
} else if dr.partitionID != common.InvalidFieldID {
partitionIDs = []int64{dr.partitionID}
}
log := log.Ctx(ctx).With(
zap.Int64("collectionID", dr.collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64s("partitionIDs", dr.partitionIDs),
zap.String("channel", channel),
zap.Int64("nodeID", nodeID))
@ -494,7 +485,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
ReqID: paramtable.GetNodeID(),
DbID: 0, // TODO
CollectionID: dr.collectionID,
PartitionIDs: partitionIDs,
PartitionIDs: dr.partitionIDs,
SerializedExprPlan: serializedPlan,
OutputFieldsId: outputFieldIDs,
GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dr.ts, dr.ts, dr.req.GetConsistencyLevel()),
@ -515,7 +506,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
taskCh := make(chan *deleteTask, 256)
var receiveErr error
go func() {
receiveErr = dr.receiveQueryResult(ctx, client, taskCh, partitionIDs)
receiveErr = dr.receiveQueryResult(ctx, client, taskCh)
close(taskCh)
}()
var allQueryCnt int64
@ -543,7 +534,15 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
}
}
func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask, partitionIDs []int64) error {
func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) error {
// If a complex delete tries to delete multiple partitions in the filter, use AllPartitionID
// otherwise use the target partitionID, which can come from partition name(UDF) or a partition key expression
// TODO: Get partitionID from Query results
msgPartitionID := common.AllPartitionsID
if len(dr.partitionIDs) == 1 {
msgPartitionID = dr.partitionIDs[0]
}
for {
result, err := client.Recv()
if err != nil {
@ -561,14 +560,14 @@ func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.Q
}
if dr.limiter != nil {
err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds()))
err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: dr.partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds()))
if err != nil {
log.Ctx(ctx).Warn("query stream for delete failed because rate limiter", zap.Int64("msgID", dr.msgID), zap.Error(err))
return err
}
}
task, err := dr.produce(ctx, result.GetIds())
task, err := dr.produce(ctx, result.GetIds(), msgPartitionID)
if err != nil {
log.Ctx(ctx).Warn("produce delete task failed", zap.Error(err))
return err
@ -615,12 +614,16 @@ func (dr *deleteRunner) complexDelete(ctx context.Context, plan *planpb.PlanNode
}
func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numRow int64) error {
partitionID := common.AllPartitionsID
if len(dr.partitionIDs) == 1 {
partitionID = dr.partitionIDs[0]
}
log.Ctx(ctx).Debug("get primary keys from expr",
zap.Int64("len of primary keys", numRow),
zap.Int64("collectionID", dr.collectionID),
zap.Int64("partitionID", dr.partitionID))
zap.Int64("partitionID", partitionID))
task, err := dr.produce(ctx, pk)
task, err := dr.produce(ctx, pk, partitionID)
if err != nil {
log.Ctx(ctx).Warn("produce delete task failed")
return err
@ -634,70 +637,71 @@ func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numR
return err
}
func getPrimaryKeysFromPlan(schema *schemapb.CollectionSchema, plan *planpb.PlanNode) (bool, *schemapb.IDs, int64) {
// simple delete request need expr with "pk in [a, b]"
func getPrimaryKeysFromPlan(schema *schemapb.CollectionSchema, plan *planpb.PlanNode) (isSimpleDelete bool, pks *schemapb.IDs, pkCount int64) {
var err error
// simple delete request with "pk in [a, b]"
termExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_TermExpr)
if ok {
if !termExpr.TermExpr.GetColumnInfo().GetIsPrimaryKey() {
return false, nil, 0
}
ids, rowNum, err := getPrimaryKeysFromTermExpr(schema, termExpr)
pks, pkCount, err = getPrimaryKeysFromTermExpr(schema, termExpr)
if err != nil {
return false, nil, 0
}
return true, ids, rowNum
return true, pks, pkCount
}
// simple delete if expr with "pk == a"
// simple delete with "pk == a"
unaryRangeExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_UnaryRangeExpr)
if ok {
if unaryRangeExpr.UnaryRangeExpr.GetOp() != planpb.OpType_Equal || !unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetIsPrimaryKey() {
return false, nil, 0
}
ids, err := getPrimaryKeysFromUnaryRangeExpr(schema, unaryRangeExpr)
pks, err = getPrimaryKeysFromUnaryRangeExpr(schema, unaryRangeExpr)
if err != nil {
return false, nil, 0
}
return true, ids, 1
return true, pks, 1
}
return false, nil, 0
}
func getPrimaryKeysFromUnaryRangeExpr(schema *schemapb.CollectionSchema, unaryRangeExpr *planpb.Expr_UnaryRangeExpr) (res *schemapb.IDs, err error) {
res = &schemapb.IDs{}
func getPrimaryKeysFromUnaryRangeExpr(schema *schemapb.CollectionSchema, unaryRangeExpr *planpb.Expr_UnaryRangeExpr) (pks *schemapb.IDs, err error) {
pks = &schemapb.IDs{}
switch unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetDataType() {
case schemapb.DataType_Int64:
res.IdField = &schemapb.IDs_IntId{
pks.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{unaryRangeExpr.UnaryRangeExpr.GetValue().GetInt64Val()},
},
}
case schemapb.DataType_VarChar:
res.IdField = &schemapb.IDs_StrId{
pks.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: []string{unaryRangeExpr.UnaryRangeExpr.GetValue().GetStringVal()},
},
}
default:
return res, fmt.Errorf("invalid field data type specifyed in simple delete expr")
return pks, fmt.Errorf("invalid field data type specifyed in simple delete expr")
}
return res, nil
return pks, nil
}
func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (res *schemapb.IDs, rowNum int64, err error) {
res = &schemapb.IDs{}
rowNum = int64(len(termExpr.TermExpr.Values))
func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (pks *schemapb.IDs, pkCount int64, err error) {
pks = &schemapb.IDs{}
pkCount = int64(len(termExpr.TermExpr.Values))
switch termExpr.TermExpr.ColumnInfo.GetDataType() {
case schemapb.DataType_Int64:
ids := make([]int64, 0)
for _, v := range termExpr.TermExpr.Values {
ids = append(ids, v.GetInt64Val())
}
res.IdField = &schemapb.IDs_IntId{
pks.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: ids,
},
@ -707,14 +711,14 @@ func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *pla
for _, v := range termExpr.TermExpr.Values {
ids = append(ids, v.GetStringVal())
}
res.IdField = &schemapb.IDs_StrId{
pks.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: ids,
},
}
default:
return res, 0, fmt.Errorf("invalid field data type specifyed in simple delete expr")
return pks, 0, fmt.Errorf("invalid field data type specifyed in simple delete expr")
}
return res, rowNum, nil
return pks, pkCount, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -123,7 +123,7 @@ func TestDeleteNotExistName(t *testing.T) {
cp := hp.NewCreateCollectionParams(hp.Int64Vec)
_, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, cp, hp.TNewFieldsOption(), hp.TNewSchemaOption())
_, errDelete = mc.Delete(ctx, client.NewDeleteOption(schema.CollectionName).WithPartition("aaa"))
_, errDelete = mc.Delete(ctx, client.NewDeleteOption(schema.CollectionName).WithPartition("aaa").WithExpr("int64 < 10"))
common.CheckErr(t, errDelete, false, "partition not found[partition=aaa]")
}

View File

@ -21,13 +21,9 @@ import (
"fmt"
"time"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
@ -35,68 +31,6 @@ import (
"github.com/milvus-io/milvus/tests/integration"
)
func (s *LevelZeroSuite) createCollection(collection string) {
schema := integration.ConstructSchema(collection, s.dim, false)
marshaledSchema, err := proto.Marshal(schema)
s.Require().NoError(err)
status, err := s.Cluster.Proxy.CreateCollection(context.TODO(), &milvuspb.CreateCollectionRequest{
CollectionName: collection,
Schema: marshaledSchema,
ShardsNum: 1,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(status))
log.Info("CreateCollection result", zap.Any("status", status))
}
func (s *LevelZeroSuite) generateSegment(collection string, numRows int, startPk int64, seal bool) {
log.Info("=========================Start generate one segment=========================")
pkColumn := integration.NewInt64FieldDataWithStart(integration.Int64Field, numRows, startPk)
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, numRows, s.dim)
hashKeys := integration.GenerateHashKeys(numRows)
insertResult, err := s.Cluster.Proxy.Insert(context.TODO(), &milvuspb.InsertRequest{
CollectionName: collection,
FieldsData: []*schemapb.FieldData{pkColumn, fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(numRows),
})
s.Require().NoError(err)
s.True(merr.Ok(insertResult.GetStatus()))
s.Require().EqualValues(numRows, insertResult.GetInsertCnt())
s.Require().EqualValues(numRows, len(insertResult.GetIDs().GetIntId().GetData()))
if seal {
log.Info("=========================Start to flush =========================",
zap.String("collection", collection),
zap.Int("numRows", numRows),
zap.Int64("startPK", startPk),
)
flushResp, err := s.Cluster.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{
CollectionNames: []string{collection},
})
s.NoError(err)
segmentLongArr, has := flushResp.GetCollSegIDs()[collection]
s.Require().True(has)
segmentIDs := segmentLongArr.GetData()
s.Require().NotEmpty(segmentLongArr)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[collection]
s.True(has)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.WaitForFlush(ctx, segmentIDs, flushTs, "", collection)
log.Info("=========================Finish to generate one segment=========================",
zap.String("collection", collection),
zap.Int("numRows", numRows),
zap.Int64("startPK", startPk),
)
}
}
func (s *LevelZeroSuite) TestDeleteOnGrowing() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
@ -113,7 +47,9 @@ func (s *LevelZeroSuite) TestDeleteOnGrowing() {
)
collectionName := "TestLevelZero_" + funcutil.GenRandomStr()
s.createCollection(collectionName)
s.schema = integration.ConstructSchema(collectionName, s.dim, false)
req := s.buildCreateCollectionRequest(collectionName, s.schema, 0)
s.createCollection(req)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
@ -134,9 +70,9 @@ func (s *LevelZeroSuite) TestDeleteOnGrowing() {
s.Require().NoError(err)
s.WaitForLoad(ctx, collectionName)
s.generateSegment(collectionName, 1, 0, true)
s.generateSegment(collectionName, 2, 1, true)
s.generateSegment(collectionName, 2, 3, false)
s.generateSegment(collectionName, 1, 0, true, -1)
s.generateSegment(collectionName, 2, 1, true, -1)
s.generateSegment(collectionName, 2, 3, false, -1)
checkRowCount := func(rowCount int) {
// query

View File

@ -0,0 +1,171 @@
package levelzero
import (
"context"
"fmt"
"time"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/milvus-io/milvus/tests/integration"
)
func (s *LevelZeroSuite) TestDeletePartitionKeyHint() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
const (
indexType = integration.IndexFaissIvfFlat
metricType = metric.L2
vecType = schemapb.DataType_FloatVector
)
collectionName := "TestLevelZero_" + funcutil.GenRandomStr()
// create a collection with partition key field "partition_key"
s.schema = integration.ConstructSchema(collectionName, s.dim, false)
s.schema.Fields = append(s.schema.Fields, &schemapb.FieldSchema{
FieldID: 102,
Name: "partition_key",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
})
req := s.buildCreateCollectionRequest(collectionName, s.schema, 2)
s.createCollection(req)
c := s.Cluster
// create index and load
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(s.dim, indexType, metricType),
})
err = merr.CheckRPCCall(createIndexStatus, err)
s.NoError(err)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(loadStatus, err)
s.Require().NoError(err)
s.WaitForLoad(ctx, collectionName)
// Generate 2 growing segments with 2 differenct partition key 0, 1001, with exactlly same PK start from 0
s.generateSegment(collectionName, 1000, 0, false, 0)
s.generateSegment(collectionName, 1001, 0, false, 1001)
segments, err := s.Cluster.MetaWatcher.ShowSegments()
s.Require().NoError(err)
s.Require().EqualValues(len(segments), 2)
for _, segment := range segments {
s.Require().EqualValues(commonpb.SegmentState_Growing, segment.GetState())
s.Require().EqualValues(commonpb.SegmentLevel_L1, segment.GetLevel())
}
L1SegIDs := lo.Map(segments, func(seg *datapb.SegmentInfo, _ int) int64 {
return seg.GetID()
})
L1SegIDSet := typeutil.NewUniqueSet(L1SegIDs...)
checkRowCount := func(rowCount int) {
// query
queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{
CollectionName: collectionName,
OutputFields: []string{"count(*)"},
})
err = merr.CheckRPCCall(queryResult, err)
s.NoError(err)
s.EqualValues(rowCount, queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0])
}
checkRowCount(2001)
// delete all data belongs to partition_key == 1001
// expr: partition_key == 1001 && pk >= 0
// - for previous implementation, the delete pk >= 0 will touch every segments and leave only 1 numRows
// - for latest enhancements, the expr "pk >= 0" will only touch partitions that contains partition key == 1001
deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{
CollectionName: collectionName,
Expr: fmt.Sprintf("partition_key == 1001 && %s >= 0", integration.Int64Field),
})
err = merr.CheckRPCCall(deleteResult, err)
s.NoError(err)
checkRowCount(1000)
// Flush will generates 2 Flushed L1 segments and 1 Flushed L0 segment
s.Flush(collectionName)
segments, err = s.Cluster.MetaWatcher.ShowSegments()
s.Require().NoError(err)
s.Require().EqualValues(len(segments), 3)
for _, segment := range segments {
s.Require().EqualValues(commonpb.SegmentState_Flushed, segment.GetState())
// L1 segments
if L1SegIDSet.Contain(segment.GetID()) {
s.Require().EqualValues(commonpb.SegmentLevel_L1, segment.GetLevel())
} else { // L0 segment with 1001 delete entries count
s.Require().EqualValues(commonpb.SegmentLevel_L0, segment.GetLevel())
s.EqualValues(1001, segment.Deltalogs[0].GetBinlogs()[0].GetEntriesNum())
}
}
l0Dropped := func() bool {
segments, err := s.Cluster.MetaWatcher.ShowSegments()
s.Require().NoError(err)
s.Require().EqualValues(len(segments), 3)
for _, segment := range segments {
// Return if L0 segments not compacted
if !L1SegIDSet.Contain(segment.GetID()) && segment.GetState() == commonpb.SegmentState_Flushed {
return false
}
// If L0 segment compacted
if !L1SegIDSet.Contain(segment.GetID()) && segment.GetState() == commonpb.SegmentState_Dropped {
// find the segment belong to partition_key == 1001
// check for the deltalog entries count == 1001
if segment.GetLevel() == datapb.SegmentLevel_L1 && segment.GetNumOfRows() == 1001 {
s.True(L1SegIDSet.Contain(segment.GetID()))
s.EqualValues(1001, segment.Deltalogs[0].GetBinlogs()[0].GetEntriesNum())
}
// find segment of another partition_key == 0
// check compaction doesn't touch it even though delete expression will delete it all
if segment.GetLevel() == datapb.SegmentLevel_L1 && segment.GetNumOfRows() == 1000 {
s.True(L1SegIDSet.Contain(segment.GetID()))
s.Empty(segment.Deltalogs)
}
return true
}
}
return false
}
checkL0CompactionTouchOnePartition := func() {
failT := time.NewTimer(3 * time.Minute)
checkT := time.NewTicker(1 * time.Second)
for {
select {
case <-failT.C:
s.FailNow("L0 compaction timeout")
case <-checkT.C:
if l0Dropped() {
failT.Stop()
return
}
}
}
}
checkL0CompactionTouchOnePartition()
}

View File

@ -17,10 +17,18 @@
package levelzero
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)
@ -28,20 +36,114 @@ import (
type LevelZeroSuite struct {
integration.MiniClusterSuite
dim int
schema *schemapb.CollectionSchema
dim int
}
func (s *LevelZeroSuite) SetupSuite() {
paramtable.Init()
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableStatsTask.Key, "false")
s.MiniClusterSuite.SetupSuite()
s.dim = 768
paramtable.Init()
}
func (s *LevelZeroSuite) TearDownSuite() {
s.MiniClusterSuite.TearDownSuite()
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableStatsTask.Key)
}
func TestLevelZero(t *testing.T) {
suite.Run(t, new(LevelZeroSuite))
}
func (s *LevelZeroSuite) buildCreateCollectionRequest(
collection string,
schema *schemapb.CollectionSchema,
numPartitions int64,
) *milvuspb.CreateCollectionRequest {
marshaledSchema, err := proto.Marshal(schema)
s.Require().NoError(err)
return &milvuspb.CreateCollectionRequest{
CollectionName: collection,
Schema: marshaledSchema,
ShardsNum: 1,
NumPartitions: numPartitions,
}
}
func (s *LevelZeroSuite) createCollection(req *milvuspb.CreateCollectionRequest) {
status, err := s.Cluster.Proxy.CreateCollection(context.TODO(), req)
s.Require().NoError(err)
s.Require().True(merr.Ok(status))
log.Info("CreateCollection result", zap.Any("status", status))
}
// For PrimaryKey field, startPK will be the start PK of this generation
// For PartitionKey field, partitikonKey will be the same in this generation
func (s *LevelZeroSuite) buildFieldDataBySchema(schema *schemapb.CollectionSchema, numRows int, startPK int64, partitionKey int64) []*schemapb.FieldData {
var fieldData []*schemapb.FieldData
for _, field := range schema.Fields {
switch field.DataType {
case schemapb.DataType_Int64:
if field.IsPartitionKey {
fieldData = append(fieldData, integration.NewInt64SameFieldData(field.Name, numRows, partitionKey))
} else {
fieldData = append(fieldData, integration.NewInt64FieldDataWithStart(field.Name, numRows, startPK))
}
case schemapb.DataType_FloatVector:
fieldData = append(fieldData, integration.NewFloatVectorFieldData(field.Name, numRows, s.dim))
default:
s.Fail("not supported yet")
}
}
return fieldData
}
func (s *LevelZeroSuite) generateSegment(collection string, numRows int, startPk int64, seal bool, partitionKey int64) {
log.Info("=========================Start generate one segment=========================")
fieldData := s.buildFieldDataBySchema(s.schema, numRows, startPk, partitionKey)
hashKeys := integration.GenerateHashKeys(numRows)
insertResult, err := s.Cluster.Proxy.Insert(context.TODO(), &milvuspb.InsertRequest{
CollectionName: collection,
FieldsData: fieldData,
HashKeys: hashKeys,
NumRows: uint32(numRows),
})
s.Require().NoError(err)
s.True(merr.Ok(insertResult.GetStatus()))
s.Require().EqualValues(numRows, insertResult.GetInsertCnt())
s.Require().EqualValues(numRows, len(insertResult.GetIDs().GetIntId().GetData()))
if seal {
log.Info("=========================Start to flush =========================",
zap.String("collection", collection),
zap.Int("numRows", numRows),
zap.Int64("startPK", startPk),
)
s.Flush(collection)
log.Info("=========================Finish to generate one segment=========================",
zap.String("collection", collection),
zap.Int("numRows", numRows),
zap.Int64("startPK", startPk),
)
}
}
func (s *LevelZeroSuite) Flush(collection string) {
flushResp, err := s.Cluster.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{
CollectionNames: []string{collection},
})
s.NoError(err)
segmentLongArr, has := flushResp.GetCollSegIDs()[collection]
s.Require().True(has)
segmentIDs := segmentLongArr.GetData() // segmentIDs might be empty
// s.Require().NotEmpty(segmentLongArr)
flushTs, has := flushResp.GetCollFlushTs()[collection]
s.True(has)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.WaitForFlush(ctx, segmentIDs, flushTs, "", collection)
}