mirror of https://github.com/milvus-io/milvus.git
Cover error cases that proxy is unhealthy (#7951)
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/7976/head
parent
18431c1549
commit
d2e18fd53b
|
@ -155,7 +155,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat
|
|||
zap.Uint64("timestamp", request.Base.Timestamp),
|
||||
zap.String("db", request.DbName),
|
||||
zap.String("collection", request.CollectionName),
|
||||
zap.Any("schema", request.Schema))
|
||||
)
|
||||
}()
|
||||
|
||||
err = cct.WaitToFinish()
|
||||
|
|
|
@ -1236,6 +1236,26 @@ func TestProxy(t *testing.T) {
|
|||
resp, err := proxy.GetMetrics(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
|
||||
// get from cache
|
||||
resp, err = proxy.GetMetrics(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
|
||||
// failed to parse metric type
|
||||
resp, err = proxy.GetMetrics(ctx, &milvuspb.GetMetricsRequest{
|
||||
Base: &commonpb.MsgBase{},
|
||||
Request: "not in json format",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
|
||||
// not implemented metric
|
||||
notImplemented, err := metricsinfo.ConstructRequestByMetricType("not implemented")
|
||||
assert.NoError(t, err)
|
||||
resp, err = proxy.GetMetrics(ctx, notImplemented)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("release partition", func(t *testing.T) {
|
||||
|
@ -1404,6 +1424,17 @@ func TestProxy(t *testing.T) {
|
|||
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("Delete", func(t *testing.T) {
|
||||
_, err := proxy.Delete(ctx, &milvuspb.DeleteRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionName: partitionName,
|
||||
Expr: "",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("drop collection", func(t *testing.T) {
|
||||
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
@ -1459,5 +1490,229 @@ func TestProxy(t *testing.T) {
|
|||
assert.Equal(t, 0, len(resp.CollectionNames))
|
||||
})
|
||||
|
||||
// proxy unhealthy
|
||||
//
|
||||
//notStateCode := "not state code"
|
||||
//proxy.stateCode.Store(notStateCode)
|
||||
//
|
||||
//t.Run("GetComponentStates fail", func(t *testing.T) {
|
||||
// _, err := proxy.GetComponentStates(ctx)
|
||||
// assert.Error(t, err)
|
||||
//})
|
||||
|
||||
proxy.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
|
||||
t.Run("ReleaseDQLMessageStream fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.ReleaseDQLMessageStream(ctx, &proxypb.ReleaseDQLMessageStreamRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("CreateCollection fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("DropCollection fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("HasCollection fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("LoadCollection fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("ReleaseCollection fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("DescribeCollection fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("GetCollectionStatistics fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("ShowCollections fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("CreatePartition fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("DropPartition fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("HasPartition fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("LoadPartitions fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("ReleasePartitions fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.ReleasePartitions(ctx, &milvuspb.ReleasePartitionsRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("GetPartitionStatistics fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("ShowPartitions fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("CreateIndex fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("DescribeIndex fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("DropIndex fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.DropIndex(ctx, &milvuspb.DropIndexRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("GetIndexBuildProgress fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.GetIndexBuildProgress(ctx, &milvuspb.GetIndexBuildProgressRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("GetIndexState fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.GetIndexState(ctx, &milvuspb.GetIndexStateRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("Insert fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.Insert(ctx, &milvuspb.InsertRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("Delete fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.Delete(ctx, &milvuspb.DeleteRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("Search fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.Search(ctx, &milvuspb.SearchRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("Flush fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.Flush(ctx, &milvuspb.FlushRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("Query fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.Query(ctx, &milvuspb.QueryRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("GetPersistentSegmentInfo fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.GetPersistentSegmentInfo(ctx, &milvuspb.GetPersistentSegmentInfoRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("GetQuerySegmentInfo fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("RegisterLink fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.RegisterLink(ctx, &milvuspb.RegisterLinkRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("GetMetrics fail, unhealthy", func(t *testing.T) {
|
||||
resp, err := proxy.GetMetrics(ctx, &milvuspb.GetMetricsRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
proxy.UpdateStateCode(internalpb.StateCode_Healthy)
|
||||
|
||||
// queue full
|
||||
|
||||
ddParallel := proxy.sched.ddQueue.maxTaskNum
|
||||
proxy.sched.ddQueue.maxTaskNum = 0
|
||||
|
||||
t.Run("failed to create collection, dd queue full", func(t *testing.T) {
|
||||
resp, err := proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
// TODO(dragondriver): other tasks
|
||||
|
||||
proxy.sched.ddQueue.maxTaskNum = ddParallel
|
||||
|
||||
// timeout
|
||||
|
||||
timeout := time.Nanosecond
|
||||
shortCtx, shortCancel := context.WithTimeout(ctx, timeout)
|
||||
defer shortCancel()
|
||||
time.Sleep(timeout)
|
||||
|
||||
t.Run("failed to create collection, timeout", func(t *testing.T) {
|
||||
resp, err := proxy.CreateCollection(shortCtx, &milvuspb.CreateCollectionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
// TODO(dragondriver): other tasks
|
||||
|
||||
cancel()
|
||||
}
|
||||
|
|
|
@ -372,9 +372,9 @@ func newDqTaskQueue(tsoAllocatorIns tsoAllocator, idAllocatorIns idAllocatorInte
|
|||
}
|
||||
|
||||
type taskScheduler struct {
|
||||
ddQueue taskQueue
|
||||
ddQueue *ddTaskQueue
|
||||
dmQueue *dmTaskQueue
|
||||
dqQueue taskQueue
|
||||
dqQueue *dqTaskQueue
|
||||
|
||||
wg sync.WaitGroup
|
||||
ctx context.Context
|
||||
|
|
Loading…
Reference in New Issue