Former-commit-id: 2825bffeb532d4022ea0a36dfa46bba387295598
pull/191/head
zhiru 2019-06-30 13:28:38 +08:00
parent ecb69a2d41
commit f08fb195c7
11 changed files with 271 additions and 128 deletions

View File

@ -113,21 +113,13 @@ link_directories(${MILVUS_BINARY_DIR})
set(MILVUS_ENGINE_INCLUDE ${PROJECT_SOURCE_DIR}/include)
set(MILVUS_ENGINE_SRC ${PROJECT_SOURCE_DIR}/src)
#set(MILVUS_THIRD_PARTY ${CMAKE_CURRENT_SOURCE_DIR}/third_party)
#set(MILVUS_THIRD_PARTY_BUILD ${CMAKE_CURRENT_SOURCE_DIR}/third_party/build)
add_compile_definitions(PROFILER=${PROFILER})
include_directories(${MILVUS_ENGINE_INCLUDE})
include_directories(${MILVUS_ENGINE_SRC})
include_directories(/usr/local/cuda/include)
#include_directories(${MILVUS_THIRD_PARTY_BUILD}/include)
link_directories(${CMAKE_CURRRENT_BINARY_DIR})
#link_directories(${MILVUS_THIRD_PARTY_BUILD}/lib)
#link_directories(${MILVUS_THIRD_PARTY_BUILD}/lib64)
#execute_process(COMMAND bash build.sh
# WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/third_party)
add_subdirectory(src)

View File

@ -1,8 +0,0 @@
sudo apt-get install mysql-server
sudo apt-get install libmysqlclient-dev
sudo ln -s libmysqlclient.so libmysqlclient_r.so
Install MySQL++
./configure --enable-thread-check LDFLAGS='-pthread'
make
sudo make install

View File

@ -139,7 +139,7 @@ DBImpl::DBImpl(const Options& options)
meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
mem_mgr_ = std::make_shared<MemManager>(meta_ptr_, options_);
// mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_));
if (options.mode != "read_only") {
if (options.mode != Options::MODE::READ_ONLY) {
StartTimerTasks();
}
}
@ -600,7 +600,7 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
meta_ptr_->Archive();
int ttl = 1;
if (options_.mode == "cluster") {
if (options_.mode == Options::MODE::CLUSTER) {
ttl = meta::D_SEC;
// ENGINE_LOG_DEBUG << "Server mode is cluster. Clean up files with ttl = " << std::to_string(ttl) << "seconds.";
}

View File

@ -29,15 +29,8 @@ DBMetaOptions DBMetaOptionsFactory::Build(const std::string& path) {
p = ss.str();
}
// std::string uri;
// const char* uri_p = getenv("MILVUS_DB_META_URI");
// if (uri_p) {
// uri = uri_p;
// }
DBMetaOptions meta;
meta.path = p;
// meta.backend_uri = uri;
return meta;
}
@ -54,14 +47,9 @@ std::shared_ptr<meta::DBMetaImpl> DBMetaImplFactory::Build() {
}
std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOptions,
const std::string& mode) {
const int& mode) {
std::string uri = metaOptions.backend_uri;
// if (uri.empty()) {
// //Default to sqlite if uri is empty
//// return std::make_shared<meta::DBMetaImpl>(new meta::DBMetaImpl(metaOptions));
// return std::shared_ptr<meta::DBMetaImpl>(new meta::DBMetaImpl(metaOptions));
// }
std::string dialectRegex = "(.*)";
std::string usernameRegex = "(.*)";

View File

@ -28,7 +28,7 @@ struct OptionsFactory {
struct DBMetaImplFactory {
static std::shared_ptr<meta::DBMetaImpl> Build();
static std::shared_ptr<meta::Meta> Build(const DBMetaOptions& metaOptions, const std::string& mode);
static std::shared_ptr<meta::Meta> Build(const DBMetaOptions& metaOptions, const int& mode);
};
struct DBFactory {

View File

@ -21,12 +21,12 @@ public:
password_(passWord),
server_(serverIp),
port_(port),
maxPoolSize_(maxPoolSize)
max_pool_size_(maxPoolSize)
{
conns_in_use_ = 0;
maxIdleTime_ = 10; //10 seconds
max_idle_time_ = 10; //10 seconds
}
// The destructor. We _must_ call ConnectionPool::clear() here,
@ -41,12 +41,10 @@ public:
// connections actually in use, not those created. Also note that
// we keep our own count; ConnectionPool::size() isn't the same!
mysqlpp::Connection* grab() override {
while (conns_in_use_ > maxPoolSize_) {
// cout.put('R'); cout.flush(); // indicate waiting for release
while (conns_in_use_ > max_pool_size_) {
sleep(1);
}
// ENGINE_LOG_DEBUG << "conns_in_use_ in grab: " << conns_in_use_ << std::endl;
++conns_in_use_;
return mysqlpp::ConnectionPool::grab();
}
@ -68,7 +66,7 @@ public:
}
void set_max_idle_time(int max_idle) {
maxIdleTime_ = max_idle;
max_idle_time_ = max_idle;
}
protected:
@ -77,7 +75,6 @@ protected:
mysqlpp::Connection* create() override {
// Create connection using the parameters we were passed upon
// creation.
// cout.put('C'); cout.flush(); // indicate connection creation
mysqlpp::Connection* conn = new mysqlpp::Connection();
conn->set_option(new mysqlpp::ReconnectOption(true));
conn->connect(db_.empty() ? 0 : db_.c_str(),
@ -91,12 +88,11 @@ protected:
void destroy(mysqlpp::Connection* cp) override {
// Our superclass can't know how we created the Connection, so
// it delegates destruction to us, to be safe.
// cout.put('D'); cout.flush(); // indicate connection destruction
delete cp;
}
unsigned int max_idle_time() override {
return maxIdleTime_;
return max_idle_time_;
}
private:
@ -107,7 +103,7 @@ private:
std::string db_, user_, password_, server_;
int port_;
int maxPoolSize_;
int max_pool_size_;
unsigned int maxIdleTime_;
unsigned int max_idle_time_;
};

View File

@ -103,7 +103,7 @@ namespace meta {
return Status::OK();
}
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const std::string& mode)
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int& mode)
: options_(options_),
mode_(mode) {
Initialize();
@ -157,7 +157,7 @@ namespace meta {
// connectionPtr->set_option(new mysqlpp::ReconnectOption(true));
int threadHint = std::thread::hardware_concurrency();
int maxPoolSize = threadHint == 0 ? 8 : threadHint;
mySQLConnectionPool_ = std::make_shared<MySQLConnectionPool>(dbName, username, password, serverAddress, port, maxPoolSize);
mysql_connection_pool_ = std::make_shared<MySQLConnectionPool>(dbName, username, password, serverAddress, port, maxPoolSize);
// std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl;
ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize);
try {
@ -165,9 +165,9 @@ namespace meta {
CleanUp();
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: connections in use = " << mySQLConnectionPool_->getConnectionsInUse();
// ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: connections in use = " << mysql_connection_pool_->getConnectionsInUse();
// if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) {
// return Status::Error("DB connection failed: ", connectionPtr->error());
// }
@ -192,6 +192,11 @@ namespace meta {
"files_cnt BIGINT DEFAULT 0 NOT NULL, " <<
"engine_type INT DEFAULT 1 NOT NULL, " <<
"store_raw_data BOOL DEFAULT false NOT NULL);";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
}
if (!InitializeQuery.exec()) {
return Status::DBTransactionError("Initialization Error", InitializeQuery.error());
}
@ -206,6 +211,11 @@ namespace meta {
"updated_time BIGINT NOT NULL, " <<
"created_on BIGINT NOT NULL, " <<
"date INT DEFAULT -1 NOT NULL);";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
}
if (!InitializeQuery.exec()) {
return Status::DBTransactionError("Initialization Error", InitializeQuery.error());
}
@ -282,10 +292,10 @@ namespace meta {
dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", "
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropPartitionsByDates connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropPartitionsByDates connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query dropPartitionsByDatesQuery = connectionPtr->query();
@ -295,6 +305,10 @@ namespace meta {
"WHERE table_id = " << quote << table_id << " AND " <<
"date in (" << dateListStr << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropPartitionsByDates: " << dropPartitionsByDatesQuery.str();
}
if (!dropPartitionsByDatesQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES";
return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES",
@ -323,10 +337,10 @@ namespace meta {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query createTableQuery = connectionPtr->query();
@ -337,6 +351,11 @@ namespace meta {
createTableQuery << "SELECT state FROM Tables " <<
"WHERE table_id = " << quote << table_schema.table_id_ << ";";
// ENGINE_LOG_DEBUG << "Create Table : " << createTableQuery.str();
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
}
StoreQueryResult res = createTableQuery.store();
assert(res && res.num_rows() <= 1);
if (res.num_rows() == 1) {
@ -370,6 +389,11 @@ namespace meta {
"(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " <<
created_on << ", " << files_cnt << ", " << engine_type << ", " << store_raw_data << ");";
// ENGINE_LOG_DEBUG << "Create Table : " << createTableQuery.str();
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
}
if (SimpleResult res = createTableQuery.execute()) {
table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?
// std::cout << table_schema.id_ << std::endl;
@ -420,10 +444,10 @@ namespace meta {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
//soft delete table
@ -433,6 +457,10 @@ namespace meta {
"SET state = " << std::to_string(TableSchema::TO_DELETE) << " " <<
"WHERE table_id = " << quote << table_id << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str();
}
if (!deleteTableQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE";
return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error());
@ -441,9 +469,7 @@ namespace meta {
} //Scoped Connection
// ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER);
// opt.mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single");
if (mode_ != "single") {
if (mode_ != Options::MODE::SINGLE) {
DeleteTableFiles(table_id);
}
@ -465,10 +491,10 @@ namespace meta {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTableFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
//soft delete table files
@ -480,6 +506,10 @@ namespace meta {
"WHERE table_id = " << quote << table_id << " AND " <<
"file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str();
}
if (!deleteTableFilesQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES";
return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableFilesQuery.error());
@ -509,10 +539,10 @@ namespace meta {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DescribeTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DescribeTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query describeTableQuery = connectionPtr->query();
@ -520,6 +550,11 @@ namespace meta {
"FROM Tables " <<
"WHERE table_id = " << quote << table_schema.table_id_ << " " <<
"AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str();
}
res = describeTableQuery.store();
} //Scoped Connection
@ -535,7 +570,7 @@ namespace meta {
table_schema.engine_type_ = resRow["engine_type"];
table_schema.store_raw_data_ = (resRow["store_raw_data"].compare("true") == 0);
table_schema.store_raw_data_ = (resRow["store_raw_data"] == 1);
}
else {
return Status::NotFound("Table " + table_schema.table_id_ + " not found");
@ -568,10 +603,10 @@ namespace meta {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::HasTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::HasTable connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query hasTableQuery = connectionPtr->query();
@ -581,6 +616,11 @@ namespace meta {
"WHERE table_id = " << quote << table_id << " " <<
"AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
"AS " << quote << "check" << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str();
}
res = hasTableQuery.store();
} //Scoped Connection
@ -612,16 +652,21 @@ namespace meta {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::AllTables connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::AllTables connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query allTablesQuery = connectionPtr->query();
allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " <<
"FROM Tables " <<
"WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str();
}
res = allTablesQuery.store();
} //Scoped Connection
@ -640,7 +685,7 @@ namespace meta {
table_schema.engine_type_ = resRow["engine_type"];
table_schema.store_raw_data_ = (resRow["store_raw_data"].compare("true") == 0);
table_schema.store_raw_data_ = (resRow["store_raw_data"] == 1);
table_schema_array.emplace_back(table_schema);
}
@ -695,10 +740,10 @@ namespace meta {
std::string date = std::to_string(file_schema.date_);
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTableFile connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTableFile connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query createTableFileQuery = connectionPtr->query();
@ -708,6 +753,10 @@ namespace meta {
quote << file_id << ", " << file_type << ", " << size << ", " <<
updated_time << ", " << created_on << ", " << date << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str();
}
if (SimpleResult res = createTableFileQuery.execute()) {
file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?
@ -759,16 +808,21 @@ namespace meta {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToIndex connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToIndex connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query filesToIndexQuery = connectionPtr->query();
filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " <<
"FROM TableFiles " <<
"WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();
}
res = filesToIndexQuery.store();
} //Scoped Connection
@ -839,10 +893,10 @@ namespace meta {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToSearch connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToSearch connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
if (partition.empty()) {
@ -854,6 +908,11 @@ namespace meta {
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();
}
res = filesToSearchQuery.store();
} else {
@ -874,6 +933,11 @@ namespace meta {
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();
}
res = filesToSearchQuery.store();
}
@ -944,10 +1008,10 @@ namespace meta {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToMerge connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToMerge connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query filesToMergeQuery = connectionPtr->query();
@ -956,6 +1020,11 @@ namespace meta {
"WHERE table_id = " << quote << table_id << " AND " <<
"file_type = " << std::to_string(TableFileSchema::RAW) << " " <<
"ORDER BY size DESC" << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str();
}
res = filesToMergeQuery.store();
} //Scoped Connection
@ -1033,10 +1102,10 @@ namespace meta {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::GetTableFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::GetTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query getTableFileQuery = connectionPtr->query();
@ -1044,6 +1113,11 @@ namespace meta {
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"(" << idStr << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str();
}
res = getTableFileQuery.store();
} //Scoped Connection
@ -1112,10 +1186,10 @@ namespace meta {
try {
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Archive connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Archive connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query archiveQuery = connectionPtr->query();
@ -1123,6 +1197,11 @@ namespace meta {
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
"WHERE created_on < " << std::to_string(now - usecs) << " AND " <<
"file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str();
}
if (!archiveQuery.exec()) {
return Status::DBTransactionError("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
}
@ -1159,16 +1238,21 @@ namespace meta {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Size connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Size connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query getSizeQuery = connectionPtr->query();
getSizeQuery << "SELECT SUM(size) AS sum " <<
"FROM TableFiles " <<
"WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str();
}
res = getSizeQuery.store();
} //Scoped Connection
@ -1216,10 +1300,10 @@ namespace meta {
bool status;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DiscardFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DiscardFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query discardFilesQuery = connectionPtr->query();
@ -1228,7 +1312,12 @@ namespace meta {
"WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
"ORDER BY id ASC " <<
"LIMIT 10;";
// std::cout << discardFilesQuery.str() << std::endl;
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
}
// std::cout << discardFilesQuery.str() << std::endl;
StoreQueryResult res = discardFilesQuery.store();
assert(res);
@ -1258,6 +1347,10 @@ namespace meta {
"updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " <<
"WHERE " << idsToDiscardStr << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
}
status = discardFilesQuery.exec();
if (!status) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES";
@ -1289,10 +1382,10 @@ namespace meta {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFile connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFile connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query updateTableFileQuery = connectionPtr->query();
@ -1301,6 +1394,11 @@ namespace meta {
//clean thread will delete the file later
updateTableFileQuery << "SELECT state FROM Tables " <<
"WHERE table_id = " << quote << file_schema.table_id_ << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
}
StoreQueryResult res = updateTableFileQuery.store();
assert(res && res.num_rows() <= 1);
@ -1334,7 +1432,11 @@ namespace meta {
"date = " << date << " " <<
"WHERE id = " << id << ";";
// std::cout << updateTableFileQuery.str() << std::endl;
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str();
}
// std::cout << updateTableFileQuery.str() << std::endl;
if (!updateTableFileQuery.exec()) {
ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
@ -1366,10 +1468,10 @@ namespace meta {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query updateTableFilesQuery = connectionPtr->query();
@ -1386,6 +1488,11 @@ namespace meta {
"WHERE table_id = " << quote << file_schema.table_id_ << " " <<
"AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " <<
"AS " << quote << "check" << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
}
StoreQueryResult res = updateTableFilesQuery.store();
assert(res && res.num_rows() == 1);
@ -1421,6 +1528,10 @@ namespace meta {
"date = " << date << " " <<
"WHERE id = " << id << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str();
}
if (!updateTableFilesQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES";
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES",
@ -1454,13 +1565,13 @@ namespace meta {
{
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use before creating ScopedConnection = "
// << mySQLConnectionPool_->getConnectionsInUse();
// << mysql_connection_pool_->getConnectionsInUse();
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use after creating ScopedConnection = "
// << mySQLConnectionPool_->getConnectionsInUse();
// << mysql_connection_pool_->getConnectionsInUse();
// }
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
@ -1468,6 +1579,11 @@ namespace meta {
"FROM TableFiles " <<
"WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " <<
"updated_time < " << std::to_string(now - seconds * US_PS) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
}
StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
assert(res);
@ -1509,6 +1625,11 @@ namespace meta {
idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
cleanUpFilesWithTTLQuery << "DELETE FROM TableFiles WHERE " <<
idsToDeleteStr << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
}
if (!cleanUpFilesWithTTLQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL";
return Status::DBTransactionError("CleanUpFilesWithTTL Error",
@ -1532,19 +1653,24 @@ namespace meta {
{
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use before creating ScopedConnection = "
// << mySQLConnectionPool_->getConnectionsInUse();
// << mysql_connection_pool_->getConnectionsInUse();
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use after creating ScopedConnection = "
// << mySQLConnectionPool_->getConnectionsInUse();
// << mysql_connection_pool_->getConnectionsInUse();
// }
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id " <<
"FROM Tables " <<
"WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
}
StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
assert(res);
// std::cout << res.num_rows() << std::endl;
@ -1568,6 +1694,11 @@ namespace meta {
idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR "
cleanUpFilesWithTTLQuery << "DELETE FROM Tables WHERE " <<
idsToDeleteStr << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
}
if (!cleanUpFilesWithTTLQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL";
return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL",
@ -1594,16 +1725,20 @@ namespace meta {
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
try {
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUp: connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUp: connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
Query cleanUpQuery = connectionPtr->query();
cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
}
if (!cleanUpQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES";
return Status::DBTransactionError("Clean up Error", cleanUpQuery.error());
@ -1640,10 +1775,10 @@ namespace meta {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Count: connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::Count: connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query countQuery = connectionPtr->query();
@ -1653,6 +1788,11 @@ namespace meta {
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str();
}
res = countQuery.store();
} //Scoped Connection
@ -1687,14 +1827,19 @@ namespace meta {
}
try {
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropAll: connection in use = " << mySQLConnectionPool_->getConnectionsInUse();
// if (mysql_connection_pool_->getConnectionsInUse() <= 0) {
// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropAll: connection in use = " << mysql_connection_pool_->getConnectionsInUse();
// }
Query dropTableQuery = connectionPtr->query();
dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;";
if (options_.sql_echo) {
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();
}
if (dropTableQuery.exec()) {
return Status::OK();
}

View File

@ -22,7 +22,7 @@ namespace meta {
class MySQLMetaImpl : public Meta {
public:
MySQLMetaImpl(const DBMetaOptions& options_, const std::string& mode);
MySQLMetaImpl(const DBMetaOptions& options_, const int& mode);
virtual Status CreateTable(TableSchema& table_schema) override;
virtual Status DescribeTable(TableSchema& group_info_) override;
@ -77,9 +77,9 @@ namespace meta {
Status Initialize();
const DBMetaOptions options_;
const std::string mode_;
const int mode_;
std::shared_ptr<MySQLConnectionPool> mySQLConnectionPool_;
std::shared_ptr<MySQLConnectionPool> mysql_connection_pool_;
bool safe_grab = false;
// std::mutex connectionMutex_;

View File

@ -45,15 +45,23 @@ struct DBMetaOptions {
std::string path;
std::string backend_uri;
ArchiveConf archive_conf = ArchiveConf("delete");
bool sql_echo = false;
}; // DBMetaOptions
struct Options {
typedef enum {
SINGLE,
CLUSTER,
READ_ONLY
} MODE;
Options();
uint16_t memory_sync_interval = 1; //unit: second
uint16_t merge_trigger_number = 2;
size_t index_trigger_size = ONE_GB; //unit: byte
DBMetaOptions meta;
std::string mode;
int mode = MODE::SINGLE;
}; // Options

View File

@ -23,9 +23,30 @@ DBWrapper::DBWrapper() {
if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
}
std::string sql_echo = config.GetValue(CONFIG_DB_SQL_ECHO, "off");
if (sql_echo == "on") {
opt.meta.sql_echo = true;
}
else if (sql_echo == "off") {
opt.meta.sql_echo = false;
}
else {
std::cout << "ERROR: sql_echo specified in db_config is not one of ['on', 'off']" << std::endl;
kill(0, SIGUSR1);
}
ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER);
opt.mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single");
if (opt.mode != "single" && opt.mode != "cluster" && opt.mode != "read_only") {
std::string mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single");
if (mode == "single") {
opt.mode = zilliz::milvus::engine::Options::MODE::SINGLE;
}
else if (mode == "cluster") {
opt.mode = zilliz::milvus::engine::Options::MODE::CLUSTER;
}
else if (mode == "read_only") {
opt.mode = zilliz::milvus::engine::Options::MODE::READ_ONLY;
}
else {
std::cout << "ERROR: mode specified in server_config is not one of ['single', 'cluster', 'read_only']" << std::endl;
kill(0, SIGUSR1);
}

View File

@ -27,6 +27,7 @@ static const std::string CONFIG_DB_PATH = "db_path";
static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold";
static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold";
static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold";
static const std::string CONFIG_DB_SQL_ECHO = "sql_echo";
static const std::string CONFIG_LOG = "log_config";