mirror of https://github.com/milvus-io/milvus.git
Using new structure for tasktable
Former-commit-id: 6742f21a429da87456ded0a910d248948dc948b4pull/191/head
parent
4d6516fac7
commit
80b9c79c5d
|
@ -14,6 +14,8 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- \#92 - Speed up CMake build process
|
||||
|
||||
## Feature
|
||||
- \#115 - Using new structure for tasktable
|
||||
|
||||
## Task
|
||||
|
||||
# Milvus 0.5.0 (2019-10-21)
|
||||
|
|
|
@ -34,27 +34,30 @@ namespace scheduler {
|
|||
|
||||
class BuildMgr {
|
||||
public:
|
||||
explicit BuildMgr(int64_t numoftasks) : numoftasks_(numoftasks) {
|
||||
explicit BuildMgr(int64_t concurrent_limit) : available_(concurrent_limit) {
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
Put() {
|
||||
++numoftasks_;
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
++available_;
|
||||
}
|
||||
|
||||
void
|
||||
take() {
|
||||
--numoftasks_;
|
||||
}
|
||||
|
||||
int64_t
|
||||
numoftasks() {
|
||||
return (int64_t)numoftasks_;
|
||||
bool
|
||||
Take() {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
if (available_ < 1) {
|
||||
return false;
|
||||
} else {
|
||||
--available_;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic_long numoftasks_;
|
||||
std::int64_t available_;
|
||||
std::mutex mutex_;
|
||||
};
|
||||
|
||||
using BuildMgrPtr = std::shared_ptr<BuildMgr>;
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
template <typename T>
|
||||
class CircleQueue {
|
||||
using value_type = T;
|
||||
using atomic_size_type = std::atomic_ullong;
|
||||
using size_type = uint64_t;
|
||||
using const_reference = const value_type&;
|
||||
#define MEMORY_ORDER (std::memory_order::memory_order_seq_cst)
|
||||
|
||||
public:
|
||||
explicit CircleQueue(size_type cap) : data_(cap, nullptr), capacity_(cap), front_() {
|
||||
front_.store(cap - 1, MEMORY_ORDER);
|
||||
}
|
||||
|
||||
CircleQueue() = delete;
|
||||
CircleQueue(const CircleQueue& q) = delete;
|
||||
CircleQueue(CircleQueue&& q) = delete;
|
||||
|
||||
public:
|
||||
const_reference operator[](size_type n) {
|
||||
return data_[n % capacity_];
|
||||
}
|
||||
|
||||
size_type
|
||||
front() {
|
||||
return front_.load(MEMORY_ORDER);
|
||||
}
|
||||
|
||||
size_type
|
||||
rear() {
|
||||
return rear_;
|
||||
}
|
||||
|
||||
size_type
|
||||
size() {
|
||||
return size_;
|
||||
}
|
||||
|
||||
size_type
|
||||
capacity() {
|
||||
return capacity_;
|
||||
}
|
||||
|
||||
void
|
||||
set_front(uint64_t last_finish) {
|
||||
if (last_finish == rear_) {
|
||||
throw;
|
||||
}
|
||||
front_.store(last_finish % capacity_, MEMORY_ORDER);
|
||||
}
|
||||
|
||||
void
|
||||
put(const value_type& x) {
|
||||
if ((rear_) % capacity_ == front_.load(MEMORY_ORDER)) {
|
||||
throw;
|
||||
}
|
||||
data_[rear_] = x;
|
||||
rear_ = ++rear_ % capacity_;
|
||||
if (size_ < capacity_) {
|
||||
++size_;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
put(value_type&& x) {
|
||||
if ((rear_) % capacity_ == front_.load(MEMORY_ORDER)) {
|
||||
throw;
|
||||
}
|
||||
data_[rear_] = std::move(x);
|
||||
rear_ = ++rear_ % capacity_;
|
||||
if (size_ < capacity_) {
|
||||
++size_;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<value_type> data_;
|
||||
size_type capacity_;
|
||||
atomic_size_type front_;
|
||||
size_type rear_ = 0;
|
||||
size_type size_ = 0;
|
||||
#undef MEMORY_ORDER
|
||||
};
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
|
@ -20,6 +20,7 @@
|
|||
#include "event/TaskTableUpdatedEvent.h"
|
||||
#include "scheduler/SchedInst.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
#include <ctime>
|
||||
#include <sstream>
|
||||
|
@ -153,7 +154,42 @@ TaskTableItem::Dump() const {
|
|||
|
||||
std::vector<uint64_t>
|
||||
TaskTable::PickToLoad(uint64_t limit) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
#if 1
|
||||
TimeRecorder rc("");
|
||||
std::vector<uint64_t> indexes;
|
||||
bool cross = false;
|
||||
|
||||
uint64_t available_begin = table_.front() + 1;
|
||||
for (uint64_t i = 0, loaded_count = 0, pick_count = 0; i < table_.size() && pick_count < limit; ++i) {
|
||||
auto index = available_begin + i;
|
||||
if (not table_[index])
|
||||
break;
|
||||
if (index % table_.capacity() == table_.rear())
|
||||
break;
|
||||
if (not cross && table_[index]->IsFinish()) {
|
||||
table_.set_front(index);
|
||||
} else if (table_[index]->state == TaskTableItemState::LOADED) {
|
||||
cross = true;
|
||||
++loaded_count;
|
||||
if (loaded_count > 2)
|
||||
return std::vector<uint64_t>();
|
||||
} else if (table_[index]->state == TaskTableItemState::START) {
|
||||
auto task = table_[index]->task;
|
||||
|
||||
// if task is a build index task, limit it
|
||||
if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") {
|
||||
if (not BuildMgrInst::GetInstance()->Take()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
cross = true;
|
||||
indexes.push_back(index);
|
||||
++pick_count;
|
||||
}
|
||||
}
|
||||
rc.ElapseFromBegin("PickToLoad ");
|
||||
return indexes;
|
||||
#else
|
||||
size_t count = 0;
|
||||
for (uint64_t j = last_finish_ + 1; j < table_.size(); ++j) {
|
||||
if (not table_[j]) {
|
||||
|
@ -197,34 +233,44 @@ TaskTable::PickToLoad(uint64_t limit) {
|
|||
}
|
||||
}
|
||||
return indexes;
|
||||
#endif
|
||||
}
|
||||
|
||||
std::vector<uint64_t>
|
||||
TaskTable::PickToExecute(uint64_t limit) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
TimeRecorder rc("");
|
||||
std::vector<uint64_t> indexes;
|
||||
bool cross = false;
|
||||
for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
|
||||
if (not cross && table_[i]->IsFinish()) {
|
||||
last_finish_ = i;
|
||||
} else if (table_[i]->state == TaskTableItemState::LOADED) {
|
||||
uint64_t available_begin = table_.front() + 1;
|
||||
for (uint64_t i = 0, pick_count = 0; i < table_.size() && pick_count < limit; ++i) {
|
||||
uint64_t index = available_begin + i;
|
||||
if (not table_[index]) {
|
||||
break;
|
||||
}
|
||||
if (index % table_.capacity() == table_.rear()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (not cross && table_[index]->IsFinish()) {
|
||||
table_.set_front(index);
|
||||
} else if (table_[index]->state == TaskTableItemState::LOADED) {
|
||||
cross = true;
|
||||
indexes.push_back(i);
|
||||
++count;
|
||||
indexes.push_back(index);
|
||||
++pick_count;
|
||||
}
|
||||
}
|
||||
rc.ElapseFromBegin("PickToExecute ");
|
||||
return indexes;
|
||||
}
|
||||
|
||||
void
|
||||
TaskTable::Put(TaskPtr task) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
auto item = std::make_shared<TaskTableItem>();
|
||||
item->id = id_++;
|
||||
item->task = std::move(task);
|
||||
item->state = TaskTableItemState::START;
|
||||
item->timestamp.start = get_current_timestamp();
|
||||
table_.push_back(item);
|
||||
table_.put(std::move(item));
|
||||
if (subscriber_) {
|
||||
subscriber_();
|
||||
}
|
||||
|
@ -232,14 +278,13 @@ TaskTable::Put(TaskPtr task) {
|
|||
|
||||
void
|
||||
TaskTable::Put(std::vector<TaskPtr>& tasks) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
for (auto& task : tasks) {
|
||||
auto item = std::make_shared<TaskTableItem>();
|
||||
item->id = id_++;
|
||||
item->task = std::move(task);
|
||||
item->state = TaskTableItemState::START;
|
||||
item->timestamp.start = get_current_timestamp();
|
||||
table_.push_back(item);
|
||||
table_.put(std::move(item));
|
||||
}
|
||||
if (subscriber_) {
|
||||
subscriber_();
|
||||
|
@ -248,26 +293,25 @@ TaskTable::Put(std::vector<TaskPtr>& tasks) {
|
|||
|
||||
TaskTableItemPtr
|
||||
TaskTable::Get(uint64_t index) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
return table_[index];
|
||||
}
|
||||
|
||||
// void
|
||||
// TaskTable::Clear() {
|
||||
//// find first task is NOT (done or moved), erase from begin to it;
|
||||
//// auto iterator = table_.begin();
|
||||
//// while (iterator->state == TaskTableItemState::EXECUTED or
|
||||
//// iterator->state == TaskTableItemState::MOVED)
|
||||
//// iterator++;
|
||||
//// table_.erase(table_.begin(), iterator);
|
||||
//}
|
||||
size_t
|
||||
TaskTable::TaskToExecute() {
|
||||
size_t count = 0;
|
||||
auto begin = table_.front() + 1;
|
||||
for (size_t i = 0; i < table_.size(); ++i) {
|
||||
auto index = begin + i;
|
||||
if (table_[index]->state == TaskTableItemState::LOADED) {
|
||||
++count;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
json
|
||||
TaskTable::Dump() const {
|
||||
json ret;
|
||||
for (auto& item : table_) {
|
||||
ret.push_back(item->Dump());
|
||||
}
|
||||
json ret{{"error.message", "not support yet."}};
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "CircleQueue.h"
|
||||
#include "event/Event.h"
|
||||
#include "interface/interfaces.h"
|
||||
#include "task/SearchTask.h"
|
||||
|
@ -99,7 +100,8 @@ using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
|
|||
|
||||
class TaskTable : public interface::dumpable {
|
||||
public:
|
||||
TaskTable() = default;
|
||||
TaskTable() : table_(1ULL << 16ULL) {
|
||||
}
|
||||
|
||||
TaskTable(const TaskTable&) = delete;
|
||||
TaskTable(TaskTable&&) = delete;
|
||||
|
@ -128,20 +130,9 @@ class TaskTable : public interface::dumpable {
|
|||
TaskTableItemPtr
|
||||
Get(uint64_t index);
|
||||
|
||||
/*
|
||||
* TODO(wxyu): BIG GC
|
||||
* Remove sequence task which is DONE or MOVED from front;
|
||||
* Called by ?
|
||||
*/
|
||||
// void
|
||||
// Clear();
|
||||
|
||||
/*
|
||||
* Return true if task table empty, otherwise false;
|
||||
*/
|
||||
inline bool
|
||||
Empty() {
|
||||
return table_.empty();
|
||||
inline size_t
|
||||
Capacity() {
|
||||
return table_.capacity();
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -152,22 +143,14 @@ class TaskTable : public interface::dumpable {
|
|||
return table_.size();
|
||||
}
|
||||
|
||||
size_t
|
||||
TaskToExecute();
|
||||
|
||||
public:
|
||||
TaskTableItemPtr& operator[](uint64_t index) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
const TaskTableItemPtr& operator[](uint64_t index) {
|
||||
return table_[index];
|
||||
}
|
||||
|
||||
std::deque<TaskTableItemPtr>::iterator
|
||||
begin() {
|
||||
return table_.begin();
|
||||
}
|
||||
|
||||
std::deque<TaskTableItemPtr>::iterator
|
||||
end() {
|
||||
return table_.end();
|
||||
}
|
||||
|
||||
public:
|
||||
std::vector<uint64_t>
|
||||
PickToLoad(uint64_t limit);
|
||||
|
@ -249,8 +232,7 @@ class TaskTable : public interface::dumpable {
|
|||
|
||||
private:
|
||||
std::uint64_t id_ = 0;
|
||||
mutable std::mutex mutex_;
|
||||
std::deque<TaskTableItemPtr> table_;
|
||||
CircleQueue<TaskTableItemPtr> table_;
|
||||
std::function<void(void)> subscriber_ = nullptr;
|
||||
|
||||
// cache last finish avoid Pick task from begin always
|
||||
|
|
|
@ -123,12 +123,7 @@ Resource::Dump() const {
|
|||
|
||||
uint64_t
|
||||
Resource::NumOfTaskToExec() {
|
||||
uint64_t count = 0;
|
||||
for (auto& task : task_table_) {
|
||||
if (task->state == TaskTableItemState::LOADED)
|
||||
++count;
|
||||
}
|
||||
return count;
|
||||
return task_table_.TaskToExecute();
|
||||
}
|
||||
|
||||
TaskTableItemPtr
|
||||
|
|
|
@ -15,11 +15,10 @@
|
|||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include "scheduler/TaskTable.h"
|
||||
#include "scheduler/task/TestTask.h"
|
||||
#include "scheduler/tasklabel/DefaultLabel.h"
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
/************ TaskTableBaseTest ************/
|
||||
|
||||
|
@ -28,15 +27,11 @@ class TaskTableItemTest : public ::testing::Test {
|
|||
void
|
||||
SetUp() override {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> states{
|
||||
milvus::scheduler::TaskTableItemState::INVALID,
|
||||
milvus::scheduler::TaskTableItemState::START,
|
||||
milvus::scheduler::TaskTableItemState::LOADING,
|
||||
milvus::scheduler::TaskTableItemState::LOADED,
|
||||
milvus::scheduler::TaskTableItemState::EXECUTING,
|
||||
milvus::scheduler::TaskTableItemState::EXECUTED,
|
||||
milvus::scheduler::TaskTableItemState::MOVING,
|
||||
milvus::scheduler::TaskTableItemState::MOVED};
|
||||
for (auto &state : states) {
|
||||
milvus::scheduler::TaskTableItemState::INVALID, milvus::scheduler::TaskTableItemState::START,
|
||||
milvus::scheduler::TaskTableItemState::LOADING, milvus::scheduler::TaskTableItemState::LOADED,
|
||||
milvus::scheduler::TaskTableItemState::EXECUTING, milvus::scheduler::TaskTableItemState::EXECUTED,
|
||||
milvus::scheduler::TaskTableItemState::MOVING, milvus::scheduler::TaskTableItemState::MOVED};
|
||||
for (auto& state : states) {
|
||||
auto item = std::make_shared<milvus::scheduler::TaskTableItem>();
|
||||
item->state = state;
|
||||
items_.emplace_back(item);
|
||||
|
@ -59,9 +54,9 @@ TEST_F(TaskTableItemTest, DESTRUCT) {
|
|||
}
|
||||
|
||||
TEST_F(TaskTableItemTest, IS_FINISH) {
|
||||
for (auto &item : items_) {
|
||||
if (item->state == milvus::scheduler::TaskTableItemState::EXECUTED
|
||||
|| item->state == milvus::scheduler::TaskTableItemState::MOVED) {
|
||||
for (auto& item : items_) {
|
||||
if (item->state == milvus::scheduler::TaskTableItemState::EXECUTED ||
|
||||
item->state == milvus::scheduler::TaskTableItemState::MOVED) {
|
||||
ASSERT_TRUE(item->IsFinish());
|
||||
} else {
|
||||
ASSERT_FALSE(item->IsFinish());
|
||||
|
@ -70,13 +65,13 @@ TEST_F(TaskTableItemTest, IS_FINISH) {
|
|||
}
|
||||
|
||||
TEST_F(TaskTableItemTest, DUMP) {
|
||||
for (auto &item : items_) {
|
||||
for (auto& item : items_) {
|
||||
ASSERT_FALSE(item->Dump().empty());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableItemTest, LOAD) {
|
||||
for (auto &item : items_) {
|
||||
for (auto& item : items_) {
|
||||
auto before_state = item->state;
|
||||
auto ret = item->Load();
|
||||
if (before_state == milvus::scheduler::TaskTableItemState::START) {
|
||||
|
@ -90,7 +85,7 @@ TEST_F(TaskTableItemTest, LOAD) {
|
|||
}
|
||||
|
||||
TEST_F(TaskTableItemTest, LOADED) {
|
||||
for (auto &item : items_) {
|
||||
for (auto& item : items_) {
|
||||
auto before_state = item->state;
|
||||
auto ret = item->Loaded();
|
||||
if (before_state == milvus::scheduler::TaskTableItemState::LOADING) {
|
||||
|
@ -104,7 +99,7 @@ TEST_F(TaskTableItemTest, LOADED) {
|
|||
}
|
||||
|
||||
TEST_F(TaskTableItemTest, EXECUTE) {
|
||||
for (auto &item : items_) {
|
||||
for (auto& item : items_) {
|
||||
auto before_state = item->state;
|
||||
auto ret = item->Execute();
|
||||
if (before_state == milvus::scheduler::TaskTableItemState::LOADED) {
|
||||
|
@ -118,7 +113,7 @@ TEST_F(TaskTableItemTest, EXECUTE) {
|
|||
}
|
||||
|
||||
TEST_F(TaskTableItemTest, EXECUTED) {
|
||||
for (auto &item : items_) {
|
||||
for (auto& item : items_) {
|
||||
auto before_state = item->state;
|
||||
auto ret = item->Executed();
|
||||
if (before_state == milvus::scheduler::TaskTableItemState::EXECUTING) {
|
||||
|
@ -132,7 +127,7 @@ TEST_F(TaskTableItemTest, EXECUTED) {
|
|||
}
|
||||
|
||||
TEST_F(TaskTableItemTest, MOVE) {
|
||||
for (auto &item : items_) {
|
||||
for (auto& item : items_) {
|
||||
auto before_state = item->state;
|
||||
auto ret = item->Move();
|
||||
if (before_state == milvus::scheduler::TaskTableItemState::LOADED) {
|
||||
|
@ -146,7 +141,7 @@ TEST_F(TaskTableItemTest, MOVE) {
|
|||
}
|
||||
|
||||
TEST_F(TaskTableItemTest, MOVED) {
|
||||
for (auto &item : items_) {
|
||||
for (auto& item : items_) {
|
||||
auto before_state = item->state;
|
||||
auto ret = item->Moved();
|
||||
if (before_state == milvus::scheduler::TaskTableItemState::MOVING) {
|
||||
|
@ -180,9 +175,7 @@ class TaskTableBaseTest : public ::testing::Test {
|
|||
|
||||
TEST_F(TaskTableBaseTest, SUBSCRIBER) {
|
||||
bool flag = false;
|
||||
auto callback = [&]() {
|
||||
flag = true;
|
||||
};
|
||||
auto callback = [&]() { flag = true; };
|
||||
empty_table_.RegisterSubscriber(callback);
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_TRUE(flag);
|
||||
|
@ -210,12 +203,6 @@ TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
|
|||
empty_table_.Put(tasks);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, EMPTY) {
|
||||
ASSERT_TRUE(empty_table_.Empty());
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_FALSE(empty_table_.Empty());
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, SIZE) {
|
||||
ASSERT_EQ(empty_table_.Size(), 0);
|
||||
empty_table_.Put(task1_);
|
||||
|
@ -237,7 +224,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD) {
|
|||
|
||||
auto indexes = empty_table_.PickToLoad(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0], 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) {
|
||||
|
@ -250,9 +237,9 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) {
|
|||
|
||||
auto indexes = empty_table_.PickToLoad(3);
|
||||
ASSERT_EQ(indexes.size(), 3);
|
||||
ASSERT_EQ(indexes[0], 2);
|
||||
ASSERT_EQ(indexes[1], 3);
|
||||
ASSERT_EQ(indexes[2], 4);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[1]% empty_table_.Capacity(), 3);
|
||||
ASSERT_EQ(indexes[2]% empty_table_.Capacity(), 4);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) {
|
||||
|
@ -266,14 +253,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) {
|
|||
// first pick, non-cache
|
||||
auto indexes = empty_table_.PickToLoad(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0], 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
|
||||
// second pick, iterate from 2
|
||||
// invalid state change
|
||||
empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START;
|
||||
indexes = empty_table_.PickToLoad(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0], 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) {
|
||||
|
@ -287,7 +274,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) {
|
|||
|
||||
auto indexes = empty_table_.PickToExecute(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0], 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) {
|
||||
|
@ -302,8 +289,8 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) {
|
|||
|
||||
auto indexes = empty_table_.PickToExecute(3);
|
||||
ASSERT_EQ(indexes.size(), 2);
|
||||
ASSERT_EQ(indexes[0], 2);
|
||||
ASSERT_EQ(indexes[1], 3);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) {
|
||||
|
@ -318,14 +305,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) {
|
|||
// first pick, non-cache
|
||||
auto indexes = empty_table_.PickToExecute(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0], 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
|
||||
// second pick, iterate from 2
|
||||
// invalid state change
|
||||
empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START;
|
||||
indexes = empty_table_.PickToExecute(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0], 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
}
|
||||
|
||||
/************ TaskTableAdvanceTest ************/
|
||||
|
@ -356,8 +343,8 @@ class TaskTableAdvanceTest : public ::testing::Test {
|
|||
|
||||
TEST_F(TaskTableAdvanceTest, LOAD) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (auto &task : table1_) {
|
||||
before_state.push_back(task->state);
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
|
@ -375,8 +362,8 @@ TEST_F(TaskTableAdvanceTest, LOAD) {
|
|||
|
||||
TEST_F(TaskTableAdvanceTest, LOADED) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (auto &task : table1_) {
|
||||
before_state.push_back(task->state);
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
|
@ -394,8 +381,8 @@ TEST_F(TaskTableAdvanceTest, LOADED) {
|
|||
|
||||
TEST_F(TaskTableAdvanceTest, EXECUTE) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (auto &task : table1_) {
|
||||
before_state.push_back(task->state);
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
|
@ -413,8 +400,8 @@ TEST_F(TaskTableAdvanceTest, EXECUTE) {
|
|||
|
||||
TEST_F(TaskTableAdvanceTest, EXECUTED) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (auto &task : table1_) {
|
||||
before_state.push_back(task->state);
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
|
@ -432,8 +419,8 @@ TEST_F(TaskTableAdvanceTest, EXECUTED) {
|
|||
|
||||
TEST_F(TaskTableAdvanceTest, MOVE) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (auto &task : table1_) {
|
||||
before_state.push_back(task->state);
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
|
@ -451,8 +438,8 @@ TEST_F(TaskTableAdvanceTest, MOVE) {
|
|||
|
||||
TEST_F(TaskTableAdvanceTest, MOVED) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (auto &task : table1_) {
|
||||
before_state.push_back(task->state);
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
|
@ -467,4 +454,3 @@ TEST_F(TaskTableAdvanceTest, MOVED) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue