MS-502 Update tasktable_test in scheduler

Former-commit-id: 019eb3180cfdd09ba7bc016e9b7990c35da4028e
pull/191/head
wxyu 2019-09-07 11:00:33 +08:00
parent c651fc8b3c
commit d3e91efce1
4 changed files with 372 additions and 41 deletions

View File

@ -87,6 +87,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-482 - Change search stream transport to unary in grpc - MS-482 - Change search stream transport to unary in grpc
- MS-487 - Define metric type in CreateTable - MS-487 - Define metric type in CreateTable
- MS-488 - Improve code format in scheduler - MS-488 - Improve code format in scheduler
- MS-502 - Update tasktable_test in scheduler
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr - MS-343 - Implement ResourceMgr

View File

@ -136,7 +136,7 @@ std::vector<uint64_t>
TaskTable::PickToLoad(uint64_t limit) { TaskTable::PickToLoad(uint64_t limit) {
std::vector<uint64_t> indexes; std::vector<uint64_t> indexes;
bool cross = false; bool cross = false;
for (uint64_t i = last_finish_, count = 0; i < table_.size() && count < limit; ++i) { for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
if (not cross && table_[i]->IsFinish()) { if (not cross && table_[i]->IsFinish()) {
last_finish_ = i; last_finish_ = i;
} else if (table_[i]->state == TaskTableItemState::START) { } else if (table_[i]->state == TaskTableItemState::START) {
@ -152,7 +152,7 @@ std::vector<uint64_t>
TaskTable::PickToExecute(uint64_t limit) { TaskTable::PickToExecute(uint64_t limit) {
std::vector<uint64_t> indexes; std::vector<uint64_t> indexes;
bool cross = false; bool cross = false;
for (uint64_t i = last_finish_, count = 0; i < table_.size() && count < limit; ++i) { for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
if (not cross && table_[i]->IsFinish()) { if (not cross && table_[i]->IsFinish()) {
last_finish_ = i; last_finish_ = i;
} else if (table_[i]->state == TaskTableItemState::LOADED) { } else if (table_[i]->state == TaskTableItemState::LOADED) {
@ -200,15 +200,15 @@ TaskTable::Get(uint64_t index) {
return table_[index]; return table_[index];
} }
void //void
TaskTable::Clear() { //TaskTable::Clear() {
// find first task is NOT (done or moved), erase from begin to it; //// find first task is NOT (done or moved), erase from begin to it;
// auto iterator = table_.begin(); //// auto iterator = table_.begin();
// while (iterator->state == TaskTableItemState::EXECUTED or //// while (iterator->state == TaskTableItemState::EXECUTED or
// iterator->state == TaskTableItemState::MOVED) //// iterator->state == TaskTableItemState::MOVED)
// iterator++; //// iterator++;
// table_.erase(table_.begin(), iterator); //// table_.erase(table_.begin(), iterator);
} //}
std::string std::string

View File

@ -40,10 +40,10 @@ struct TaskTimestamp {
}; };
struct TaskTableItem { struct TaskTableItem {
TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex() {} TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {}
TaskTableItem(const TaskTableItem &src) TaskTableItem(const TaskTableItem &src) = delete;
: id(src.id), state(src.state), mutex() {} TaskTableItem(TaskTableItem &&) = delete;
uint64_t id; // auto increment from 0; uint64_t id; // auto increment from 0;
TaskPtr task; // the task; TaskPtr task; // the task;
@ -114,8 +114,8 @@ public:
* Remove sequence task which is DONE or MOVED from front; * Remove sequence task which is DONE or MOVED from front;
* Called by ? * Called by ?
*/ */
void // void
Clear(); // Clear();
/* /*
* Return true if task table empty, otherwise false; * Return true if task table empty, otherwise false;
@ -229,7 +229,9 @@ private:
std::function<void(void)> subscriber_ = nullptr; std::function<void(void)> subscriber_ = nullptr;
// cache last finish avoid Pick task from begin always // cache last finish avoid Pick task from begin always
uint64_t last_finish_ = 0; // pick from (last_finish_ + 1)
// init with -1, pick from (last_finish_ + 1) = 0
uint64_t last_finish_ = -1;
}; };

View File

@ -5,30 +5,37 @@
using namespace zilliz::milvus::engine; using namespace zilliz::milvus::engine;
/************ TaskTableBaseTest ************/
class TaskTableItemTest : public ::testing::Test { class TaskTableItemTest : public ::testing::Test {
protected: protected:
void void
SetUp() override { SetUp() override {
item1_.id = 0; std::vector<TaskTableItemState> states{
item1_.state = TaskTableItemState::MOVED; TaskTableItemState::INVALID,
item1_.priority = 10; TaskTableItemState::START,
TaskTableItemState::LOADING,
TaskTableItemState::LOADED,
TaskTableItemState::EXECUTING,
TaskTableItemState::EXECUTED,
TaskTableItemState::MOVING,
TaskTableItemState::MOVED};
for (auto &state : states) {
auto item = std::make_shared<TaskTableItem>();
item->state = state;
items_.emplace_back(item);
}
} }
TaskTableItem default_; TaskTableItem default_;
TaskTableItem item1_; std::vector<TaskTableItemPtr> items_;
}; };
TEST_F(TaskTableItemTest, construct) { TEST_F(TaskTableItemTest, construct) {
ASSERT_EQ(default_.id, 0); ASSERT_EQ(default_.id, 0);
ASSERT_EQ(default_.task, nullptr);
ASSERT_EQ(default_.state, TaskTableItemState::INVALID); ASSERT_EQ(default_.state, TaskTableItemState::INVALID);
ASSERT_EQ(default_.priority, 0);
}
TEST_F(TaskTableItemTest, copy) {
TaskTableItem another(item1_);
ASSERT_EQ(another.id, item1_.id);
ASSERT_EQ(another.state, item1_.state);
ASSERT_EQ(another.priority, item1_.priority);
} }
TEST_F(TaskTableItemTest, destruct) { TEST_F(TaskTableItemTest, destruct) {
@ -36,6 +43,107 @@ TEST_F(TaskTableItemTest, destruct) {
delete p_item; delete p_item;
} }
TEST_F(TaskTableItemTest, is_finish) {
for (auto &item : items_) {
if (item->state == TaskTableItemState::EXECUTED
|| item->state == TaskTableItemState::MOVED) {
ASSERT_TRUE(item->IsFinish());
} else {
ASSERT_FALSE(item->IsFinish());
}
}
}
TEST_F(TaskTableItemTest, dump) {
for (auto &item : items_) {
ASSERT_FALSE(item->Dump().empty());
}
}
TEST_F(TaskTableItemTest, load) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Load();
if (before_state == TaskTableItemState::START) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::LOADING);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
TEST_F(TaskTableItemTest, loaded) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Loaded();
if (before_state == TaskTableItemState::LOADING) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::LOADED);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
TEST_F(TaskTableItemTest, execute) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Execute();
if (before_state == TaskTableItemState::LOADED) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::EXECUTING);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
TEST_F(TaskTableItemTest, executed) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Executed();
if (before_state == TaskTableItemState::EXECUTING) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::EXECUTED);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
TEST_F(TaskTableItemTest, move) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Move();
if (before_state == TaskTableItemState::LOADED) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::MOVING);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
TEST_F(TaskTableItemTest, moved) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Moved();
if (before_state == TaskTableItemState::MOVING) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::MOVED);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
/************ TaskTableBaseTest ************/ /************ TaskTableBaseTest ************/
@ -55,6 +163,16 @@ protected:
TaskTable empty_table_; TaskTable empty_table_;
}; };
TEST_F(TaskTableBaseTest, subscriber) {
bool flag = false;
auto callback = [&]() {
flag = true;
};
empty_table_.RegisterSubscriber(callback);
empty_table_.Put(task1_);
ASSERT_TRUE(flag);
}
TEST_F(TaskTableBaseTest, put_task) { TEST_F(TaskTableBaseTest, put_task) {
empty_table_.Put(task1_); empty_table_.Put(task1_);
@ -78,6 +196,125 @@ TEST_F(TaskTableBaseTest, put_empty_batch) {
empty_table_.Put(tasks); empty_table_.Put(tasks);
} }
TEST_F(TaskTableBaseTest, empty) {
ASSERT_TRUE(empty_table_.Empty());
empty_table_.Put(task1_);
ASSERT_FALSE(empty_table_.Empty());
}
TEST_F(TaskTableBaseTest, size) {
ASSERT_EQ(empty_table_.Size(), 0);
empty_table_.Put(task1_);
ASSERT_EQ(empty_table_.Size(), 1);
}
TEST_F(TaskTableBaseTest, operator_) {
empty_table_.Put(task1_);
ASSERT_EQ(empty_table_.Get(0), empty_table_[0]);
}
TEST_F(TaskTableBaseTest, pick_to_load) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
auto indexes = empty_table_.PickToLoad(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
}
TEST_F(TaskTableBaseTest, pick_to_load_limit) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
auto indexes = empty_table_.PickToLoad(3);
ASSERT_EQ(indexes.size(), 3);
ASSERT_EQ(indexes[0], 2);
ASSERT_EQ(indexes[1], 3);
ASSERT_EQ(indexes[2], 4);
}
TEST_F(TaskTableBaseTest, pick_to_load_cache) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
// first pick, non-cache
auto indexes = empty_table_.PickToLoad(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
// second pick, iterate from 2
// invalid state change
empty_table_[1]->state = TaskTableItemState::START;
indexes = empty_table_.PickToLoad(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
}
TEST_F(TaskTableBaseTest, pick_to_execute) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
empty_table_[2]->state = TaskTableItemState::LOADED;
auto indexes = empty_table_.PickToExecute(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
}
TEST_F(TaskTableBaseTest, pick_to_execute_limit) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
empty_table_[2]->state = TaskTableItemState::LOADED;
empty_table_[3]->state = TaskTableItemState::LOADED;
auto indexes = empty_table_.PickToExecute(3);
ASSERT_EQ(indexes.size(), 2);
ASSERT_EQ(indexes[0], 2);
ASSERT_EQ(indexes[1], 3);
}
TEST_F(TaskTableBaseTest, pick_to_execute_cache) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
empty_table_[2]->state = TaskTableItemState::LOADED;
// first pick, non-cache
auto indexes = empty_table_.PickToExecute(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
// second pick, iterate from 2
// invalid state change
empty_table_[1]->state = TaskTableItemState::START;
indexes = empty_table_.PickToExecute(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
}
/************ TaskTableAdvanceTest ************/ /************ TaskTableAdvanceTest ************/
class TaskTableAdvanceTest : public ::testing::Test { class TaskTableAdvanceTest : public ::testing::Test {
@ -104,25 +341,116 @@ protected:
}; };
TEST_F(TaskTableAdvanceTest, load) { TEST_F(TaskTableAdvanceTest, load) {
table1_.Load(1); std::vector<TaskTableItemState> before_state;
table1_.Loaded(2); for (auto &task : table1_) {
before_state.push_back(task->state);
}
ASSERT_EQ(table1_.Get(1)->state, TaskTableItemState::LOADING); for (size_t i = 0; i < table1_.Size(); ++i) {
ASSERT_EQ(table1_.Get(2)->state, TaskTableItemState::LOADED); table1_.Load(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::START) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::LOADING);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, loaded) {
std::vector<TaskTableItemState> before_state;
for (auto &task : table1_) {
before_state.push_back(task->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
table1_.Loaded(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::LOADING) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::LOADED);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
} }
TEST_F(TaskTableAdvanceTest, execute) { TEST_F(TaskTableAdvanceTest, execute) {
table1_.Execute(3); std::vector<TaskTableItemState> before_state;
table1_.Executed(4); for (auto &task : table1_) {
before_state.push_back(task->state);
}
ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::EXECUTING); for (size_t i = 0; i < table1_.Size(); ++i) {
ASSERT_EQ(table1_.Get(4)->state, TaskTableItemState::EXECUTED); table1_.Execute(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::LOADED) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::EXECUTING);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, executed) {
std::vector<TaskTableItemState> before_state;
for (auto &task : table1_) {
before_state.push_back(task->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
table1_.Executed(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::EXECUTING) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::EXECUTED);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
} }
TEST_F(TaskTableAdvanceTest, move) { TEST_F(TaskTableAdvanceTest, move) {
table1_.Move(3); std::vector<TaskTableItemState> before_state;
table1_.Moved(6); for (auto &task : table1_) {
before_state.push_back(task->state);
}
ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::MOVING); for (size_t i = 0; i < table1_.Size(); ++i) {
ASSERT_EQ(table1_.Get(6)->state, TaskTableItemState::MOVED); table1_.Move(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::LOADED) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::MOVING);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
} }
TEST_F(TaskTableAdvanceTest, moved) {
std::vector<TaskTableItemState> before_state;
for (auto &task : table1_) {
before_state.push_back(task->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
table1_.Moved(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::MOVING) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::MOVED);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
}