Add tSafeReplica and task scheduler unittests (#7587)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/7608/head
bigsheeper 2021-09-08 20:50:00 +08:00 committed by GitHub
parent 074b610811
commit d3a18a66b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 260 additions and 0 deletions

View File

@ -0,0 +1,73 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
fac, err := genFactory()
assert.NoError(t, err)
fg := newQueryNodeFlowGraph(ctx,
loadTypeCollection,
defaultCollectionID,
defaultPartitionID,
streaming.replica,
streaming.tSafeReplica,
defaultVChannel,
fac)
err = fg.consumerFlowGraph(defaultVChannel, defaultSubName)
assert.NoError(t, err)
}
func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
fac, err := genFactory()
assert.NoError(t, err)
fg := newQueryNodeFlowGraph(ctx,
loadTypeCollection,
defaultCollectionID,
defaultPartitionID,
streaming.replica,
streaming.tSafeReplica,
defaultVChannel,
fac)
position := &internalpb.MsgPosition{
ChannelName: defaultVChannel,
MsgID: []byte{},
MsgGroup: defaultSubName,
Timestamp: 0,
}
err = fg.seekQueryNodeFlowGraph(position)
assert.Error(t, err)
}

View File

@ -0,0 +1,59 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestBaseTaskQueue_addUnissuedTask(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := &taskScheduler{
ctx: ctx,
cancel: cancel,
}
t.Run("test full", func(t *testing.T) {
taskQueue := newLoadAndReleaseTaskQueue(s)
task := &mockTask{}
for i := 0; i < maxTaskNum; i++ {
err := taskQueue.addUnissuedTask(task)
assert.NoError(t, err)
}
err := taskQueue.addUnissuedTask(task)
assert.Error(t, err)
})
t.Run("add task to front", func(t *testing.T) {
taskQueue := newLoadAndReleaseTaskQueue(s)
mt := &mockTask{
timestamp: 1000,
}
err := taskQueue.addUnissuedTask(mt)
fmt.Println(taskQueue.unissuedTasks.Back().Value.(task).Timestamp())
assert.NoError(t, err)
err = taskQueue.addUnissuedTask(mt)
fmt.Println(taskQueue.unissuedTasks.Back().Value.(task).Timestamp())
assert.NoError(t, err)
mt2 := &mockTask{
timestamp: 0,
}
err = taskQueue.addUnissuedTask(mt2)
assert.NoError(t, err)
})
}

View File

@ -0,0 +1,75 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"errors"
"testing"
)
type mockTask struct {
baseTask
preExecuteError bool
executeError bool
timestamp Timestamp
}
func (m *mockTask) Timestamp() Timestamp {
return m.timestamp
}
func (m *mockTask) OnEnqueue() error {
return nil
}
func (m *mockTask) PreExecute(ctx context.Context) error {
if m.preExecuteError {
return errors.New("test error")
}
return nil
}
func (m *mockTask) Execute(ctx context.Context) error {
if m.executeError {
return errors.New("test error")
}
return nil
}
func (m *mockTask) PostExecute(ctx context.Context) error {
return nil
}
func TestTaskScheduler(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := newTaskScheduler(ctx)
ts.Start()
task := &mockTask{
baseTask: baseTask{
ctx: ctx,
done: make(chan error, 1024),
},
preExecuteError: true,
executeError: false,
}
ts.processTask(task, ts.queue)
task.preExecuteError = false
task.executeError = true
ts.processTask(task, ts.queue)
ts.Close()
}

View File

@ -0,0 +1,53 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestTSafeReplica_valid(t *testing.T) {
replica := newTSafeReplica()
replica.addTSafe(defaultVChannel)
watcher := newTSafeWatcher()
replica.registerTSafeWatcher(defaultVChannel, watcher)
timestamp := Timestamp(1000)
replica.setTSafe(defaultVChannel, defaultCollectionID, timestamp)
time.Sleep(20 * time.Millisecond)
resT := replica.getTSafe(defaultVChannel)
assert.Equal(t, timestamp, resT)
replica.removeTSafe(defaultVChannel)
}
func TestTSafeReplica_invalid(t *testing.T) {
replica := newTSafeReplica()
watcher := newTSafeWatcher()
replica.registerTSafeWatcher(defaultVChannel, watcher)
timestamp := Timestamp(1000)
replica.setTSafe(defaultVChannel, defaultCollectionID, timestamp)
time.Sleep(20 * time.Millisecond)
resT := replica.getTSafe(defaultVChannel)
assert.Equal(t, Timestamp(0), resT)
replica.removeTSafe(defaultVChannel)
replica.addTSafe(defaultVChannel)
replica.addTSafe(defaultVChannel)
}