|
|
|
@ -248,8 +248,6 @@ SqliteMetaImpl::ValidateMetaSchema() {
|
|
|
|
|
Status
|
|
|
|
|
SqliteMetaImpl::SqlQuery(const std::string& sql, AttrsMapList* res) {
|
|
|
|
|
try {
|
|
|
|
|
LOG_ENGINE_DEBUG_ << sql;
|
|
|
|
|
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(sqlite_mutex_);
|
|
|
|
|
|
|
|
|
|
int (* call_back)(void*, int, char**, char**) = nullptr;
|
|
|
|
@ -285,8 +283,6 @@ SqliteMetaImpl::SqlTransaction(const std::vector<std::string>& sql_statements) {
|
|
|
|
|
|
|
|
|
|
int rc = SQLITE_OK;
|
|
|
|
|
for (auto& sql : sql_statements) {
|
|
|
|
|
LOG_ENGINE_DEBUG_ << sql;
|
|
|
|
|
|
|
|
|
|
rc = sqlite3_exec(db_, sql.c_str(), nullptr, nullptr, nullptr);
|
|
|
|
|
if (rc != SQLITE_OK) {
|
|
|
|
|
break;
|
|
|
|
@ -341,6 +337,7 @@ SqliteMetaImpl::Initialize() {
|
|
|
|
|
// create meta tables
|
|
|
|
|
auto create_schema = [&](const MetaSchema& schema) {
|
|
|
|
|
std::string create_table_str = "CREATE TABLE IF NOT EXISTS " + schema.name() + "(" + schema.ToString() + ");";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "Initialize: " << create_table_str;
|
|
|
|
|
std::vector<std::string> statements = {create_table_str};
|
|
|
|
|
auto status = SqlTransaction(statements);
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
@ -371,6 +368,7 @@ SqliteMetaImpl::CreateCollection(CollectionSchema& collection_schema) {
|
|
|
|
|
fiu_do_on("SqliteMetaImpl.CreateCollection.throw_exception", throw std::exception());
|
|
|
|
|
std::string statement = "SELECT state FROM " + std::string(META_TABLES) + " WHERE table_id = "
|
|
|
|
|
+ Quote(collection_schema.collection_id_) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "CreateCollection: " << statement;
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
|
if (res.size() == 1) {
|
|
|
|
@ -409,7 +407,7 @@ SqliteMetaImpl::CreateCollection(CollectionSchema& collection_schema) {
|
|
|
|
|
+ ", " + Quote(owner_collection) + ", " + Quote(partition_tag) + ", " + Quote(version)
|
|
|
|
|
+ ", " + flush_lsn + ");";
|
|
|
|
|
|
|
|
|
|
LOG_ENGINE_DEBUG_ << statement;
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "CreateCollection: " << statement;
|
|
|
|
|
|
|
|
|
|
fiu_do_on("SqliteMetaImpl.CreateCollection.insert_throw_exception", throw std::exception());
|
|
|
|
|
auto status = SqlTransaction({statement});
|
|
|
|
@ -436,6 +434,7 @@ SqliteMetaImpl::DescribeCollection(CollectionSchema& collection_schema) {
|
|
|
|
|
+ std::string(META_TABLES) + " WHERE table_id = "
|
|
|
|
|
+ Quote(collection_schema.collection_id_) + " AND state <> "
|
|
|
|
|
+ std::to_string(CollectionSchema::TO_DELETE) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "DescribeCollection: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -482,6 +481,7 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not
|
|
|
|
|
statement = "SELECT id FROM " + std::string(META_TABLES) + " WHERE table_id = " + Quote(collection_id)
|
|
|
|
|
+ " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";";
|
|
|
|
|
}
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "HasCollection: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -511,6 +511,7 @@ SqliteMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_
|
|
|
|
|
} else {
|
|
|
|
|
statement += ";";
|
|
|
|
|
}
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "AllCollections: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -564,6 +565,7 @@ SqliteMetaImpl::DropCollections(const std::vector<std::string>& collection_id_ar
|
|
|
|
|
}
|
|
|
|
|
statement += ");";
|
|
|
|
|
statements.emplace_back(statement);
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "DropCollections: " << statement;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
@ -611,6 +613,7 @@ SqliteMetaImpl::DeleteCollectionFiles(const std::vector<std::string>& collection
|
|
|
|
|
}
|
|
|
|
|
statement += (") AND file_type <> " + std::to_string(SegmentSchema::TO_DELETE) + ";");
|
|
|
|
|
statements.emplace_back(statement);
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "DeleteCollectionFiles: " << statement;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto status = SqlTransaction(statements);
|
|
|
|
@ -673,6 +676,7 @@ SqliteMetaImpl::CreateCollectionFile(SegmentSchema& file_schema) {
|
|
|
|
|
+ Quote(collection_id) + ", " + Quote(segment_id) + ", " + engine_type + ", "
|
|
|
|
|
+ Quote(file_id) + ", " + file_type + ", " + file_size + ", " + row_count
|
|
|
|
|
+ ", " + updated_time + ", " + created_on + ", " + date + ", " + flush_lsn + ");";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "CreateCollectionFile: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -708,6 +712,7 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::
|
|
|
|
|
" row_count, date, created_on FROM " + std::string(META_TABLEFILES)
|
|
|
|
|
+ " WHERE table_id = " + Quote(collection_id) + " AND (" + idStr + ")"
|
|
|
|
|
+ " AND file_type <> " + std::to_string(SegmentSchema::TO_DELETE) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "GetCollectionFiles: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -761,6 +766,7 @@ SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, Fil
|
|
|
|
|
" row_count, date, created_on FROM " + std::string(META_TABLEFILES)
|
|
|
|
|
+ " WHERE segment_id = " + Quote(segment_id) + " AND file_type <> "
|
|
|
|
|
+ std::to_string(SegmentSchema::TO_DELETE) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "GetCollectionFilesBySegmentId: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
{
|
|
|
|
@ -819,6 +825,7 @@ SqliteMetaImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t f
|
|
|
|
|
|
|
|
|
|
std::string statement = "UPDATE " + std::string(META_TABLES) + " SET flag = " + std::to_string(flag)
|
|
|
|
|
+ " WHERE table_id = " + Quote(collection_id) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "UpdateCollectionFlag: " << statement;
|
|
|
|
|
|
|
|
|
|
auto status = SqlTransaction({statement});
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
@ -841,6 +848,7 @@ SqliteMetaImpl::UpdateCollectionFlushLSN(const std::string& collection_id, uint6
|
|
|
|
|
|
|
|
|
|
std::string statement = "UPDATE " + std::string(META_TABLES) + " SET flush_lsn = "
|
|
|
|
|
+ std::to_string(flush_lsn) + " WHERE table_id = " + Quote(collection_id) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "UpdateCollectionFlushLSN: " << statement;
|
|
|
|
|
|
|
|
|
|
auto status = SqlTransaction({statement});
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
@ -864,6 +872,7 @@ SqliteMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t
|
|
|
|
|
|
|
|
|
|
std::string statement = "SELECT flush_lsn FROM " + std::string(META_TABLES) + " WHERE table_id = "
|
|
|
|
|
+ Quote(collection_id) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "GetCollectionFlushLSN: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -893,6 +902,7 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) {
|
|
|
|
|
|
|
|
|
|
std::string statement = "SELECT state FROM " + std::string(META_TABLES) + " WHERE table_id = "
|
|
|
|
|
+ Quote(file_schema.collection_id_) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "UpdateCollectionFile: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -925,6 +935,7 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) {
|
|
|
|
|
+ " ,file_type = " + file_type + " ,file_size = " + file_size + " ,row_count = " + row_count
|
|
|
|
|
+ " ,updated_time = " + updated_time + " ,created_on = " + created_on + " ,date = " + date
|
|
|
|
|
+ " WHERE id = " + id + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "UpdateCollectionFile: " << statement;
|
|
|
|
|
|
|
|
|
|
status = SqlTransaction({statement});
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
@ -959,6 +970,7 @@ SqliteMetaImpl::UpdateCollectionFiles(SegmentsSchema& files) {
|
|
|
|
|
std::string statement = "SELECT id FROM " + std::string(META_TABLES)
|
|
|
|
|
+ " WHERE table_id = " + Quote(file.collection_id_) + " AND state <> "
|
|
|
|
|
+ std::to_string(CollectionSchema::TO_DELETE) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "UpdateCollectionFiles: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -999,6 +1011,7 @@ SqliteMetaImpl::UpdateCollectionFiles(SegmentsSchema& files) {
|
|
|
|
|
+ " ,row_count = " + row_count + " ,updated_time = " + updated_time
|
|
|
|
|
+ " ,created_on = " + created_on + " ,date = " + date + " WHERE id = " + id + ";";
|
|
|
|
|
statements.emplace_back(statement);
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "UpdateCollectionFiles: " << statement;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto status = SqlTransaction(statements);
|
|
|
|
@ -1026,6 +1039,7 @@ SqliteMetaImpl::UpdateCollectionFilesRowCount(SegmentsSchema& files) {
|
|
|
|
|
|
|
|
|
|
std::string statement = "UPDATE " + std::string(META_TABLEFILES) + " SET row_count = " + row_count
|
|
|
|
|
+ " , updated_time = " + updated_time + " WHERE file_id = " + file.file_id_ + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "UpdateCollectionFilesRowCount: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -1053,6 +1067,7 @@ SqliteMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Co
|
|
|
|
|
std::string statement = "SELECT id, state, dimension, created_on FROM " + std::string(META_TABLES)
|
|
|
|
|
+ " WHERE table_id = " + Quote(collection_id)
|
|
|
|
|
+ " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "UpdateCollectionIndex: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -1073,6 +1088,7 @@ SqliteMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Co
|
|
|
|
|
+ " ,engine_type = " + std::to_string(index.engine_type_) + " ,index_params = "
|
|
|
|
|
+ Quote(index.extra_params_.dump()) + " ,metric_type = " + std::to_string(index.metric_type_)
|
|
|
|
|
+ " WHERE table_id = " + Quote(collection_id) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "UpdateCollectionIndex: " << statement;
|
|
|
|
|
|
|
|
|
|
auto status = SqlTransaction({statement});
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
@ -1101,6 +1117,7 @@ SqliteMetaImpl::UpdateCollectionFilesToIndex(const std::string& collection_id) {
|
|
|
|
|
+ std::to_string(SegmentSchema::TO_INDEX) + " WHERE table_id = " + Quote(collection_id)
|
|
|
|
|
+ " AND row_count >= " + std::to_string(meta::BUILD_INDEX_THRESHOLD)
|
|
|
|
|
+ " AND file_type = " + std::to_string(SegmentSchema::RAW) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "UpdateCollectionFilesToIndex: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -1127,6 +1144,7 @@ SqliteMetaImpl::DescribeCollectionIndex(const std::string& collection_id, Collec
|
|
|
|
|
std::string statement = "SELECT engine_type, index_params, metric_type FROM "
|
|
|
|
|
+ std::string(META_TABLES) + " WHERE table_id = " + Quote(collection_id)
|
|
|
|
|
+ " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "DescribeCollectionIndex: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -1165,6 +1183,7 @@ SqliteMetaImpl::DropCollectionIndex(const std::string& collection_id) {
|
|
|
|
|
+ Quote(collection_id) + " AND file_type = " + std::to_string(SegmentSchema::INDEX)
|
|
|
|
|
+ ";";
|
|
|
|
|
statements.emplace_back(statement);
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "DropCollectionIndex: " << statement;
|
|
|
|
|
|
|
|
|
|
// set all backup file to raw
|
|
|
|
|
statement = "UPDATE " + std::string(META_TABLEFILES) + " SET file_type = "
|
|
|
|
@ -1172,6 +1191,7 @@ SqliteMetaImpl::DropCollectionIndex(const std::string& collection_id) {
|
|
|
|
|
+ std::to_string(utils::GetMicroSecTimeStamp()) + " WHERE table_id = "
|
|
|
|
|
+ Quote(collection_id) + " AND file_type = " + std::to_string(SegmentSchema::BACKUP) + ";";
|
|
|
|
|
statements.emplace_back(statement);
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "DropCollectionIndex: " << statement;
|
|
|
|
|
|
|
|
|
|
// set collection index type to raw
|
|
|
|
|
statement = "UPDATE " + std::string(META_TABLES) + " SET engine_type = (CASE WHEN metric_type in ("
|
|
|
|
@ -1182,6 +1202,7 @@ SqliteMetaImpl::DropCollectionIndex(const std::string& collection_id) {
|
|
|
|
|
+ " ELSE " + std::to_string((int32_t)EngineType::FAISS_IDMAP) + " END)"
|
|
|
|
|
+ " , index_params = '{}' WHERE table_id = " + Quote(collection_id) + ";";
|
|
|
|
|
statements.emplace_back(statement);
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "DropCollectionIndex: " << statement;
|
|
|
|
|
|
|
|
|
|
auto status = SqlTransaction(statements);
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
@ -1261,6 +1282,7 @@ SqliteMetaImpl::HasPartition(const std::string& collection_id, const std::string
|
|
|
|
|
+ " WHERE owner_table = " + Quote(collection_id)
|
|
|
|
|
+ " AND partition_tag = " + Quote(valid_tag)
|
|
|
|
|
+ " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "HasPartition: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -1297,6 +1319,7 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id,
|
|
|
|
|
+ std::string(META_TABLES) + " WHERE owner_table = "
|
|
|
|
|
+ Quote(collection_id) + " AND state <> "
|
|
|
|
|
+ std::to_string(CollectionSchema::TO_DELETE) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "ShowPartitions: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -1338,6 +1361,7 @@ SqliteMetaImpl::CountPartitions(const std::string& collection_id, int64_t& parti
|
|
|
|
|
std::string statement = "SELECT count(*) FROM " + std::string(META_TABLES)
|
|
|
|
|
+ " WHERE owner_table = " + Quote(collection_id)
|
|
|
|
|
+ " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "CountPartitions: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -1373,6 +1397,7 @@ SqliteMetaImpl::GetPartitionName(const std::string& collection_id, const std::st
|
|
|
|
|
+ " WHERE owner_table = " + Quote(collection_id)
|
|
|
|
|
+ " AND partition_tag = " + Quote(valid_tag)
|
|
|
|
|
+ " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "GetPartitionName: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -1404,6 +1429,7 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil
|
|
|
|
|
+ " AND (file_type = " + std::to_string(SegmentSchema::RAW)
|
|
|
|
|
+ " OR file_type = " + std::to_string(SegmentSchema::TO_INDEX)
|
|
|
|
|
+ " OR file_type = " + std::to_string(SegmentSchema::INDEX) + ");";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "FilesToSearch: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -1511,6 +1537,7 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::s
|
|
|
|
|
statement += (" AND (file_type = " + std::to_string(SegmentSchema::RAW));
|
|
|
|
|
statement += (" OR file_type = " + std::to_string(SegmentSchema::TO_INDEX));
|
|
|
|
|
statement += (" OR file_type = " + std::to_string(SegmentSchema::INDEX) + ");");
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "FilesToSearchEx: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -1584,6 +1611,7 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file
|
|
|
|
|
+ " WHERE table_id = " + Quote(collection_id)
|
|
|
|
|
+ " AND file_type = " + std::to_string(SegmentSchema::RAW)
|
|
|
|
|
+ " ORDER BY row_count DESC;";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "FilesToMerge: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -1651,6 +1679,7 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) {
|
|
|
|
|
std::string statement = "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, "
|
|
|
|
|
"date, engine_type, created_on, updated_time FROM " + Quote(META_TABLEFILES)
|
|
|
|
|
+ " WHERE file_type = " + std::to_string(SegmentSchema::TO_INDEX) + ";";
|
|
|
|
|
// LOG_ENGINE_DEBUG_ << "FilesToIndex: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -1738,6 +1767,7 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector<
|
|
|
|
|
"date, engine_type, created_on, updated_time FROM " + Quote(META_TABLEFILES)
|
|
|
|
|
+ " WHERE table_id = " + Quote(collection_id)
|
|
|
|
|
+ " AND file_type in (" + types + ");";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "FilesByType: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -1889,6 +1919,7 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector<meta::CollectionSchema>& collect
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
statement += (") AND file_type in (" + types + ");");
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "FilesByTypeEx: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -1997,6 +2028,7 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_hol
|
|
|
|
|
idStr = idStr.substr(0, idStr.size() - 4); // remove the last " OR "
|
|
|
|
|
|
|
|
|
|
statement += (" WHERE (" + idStr + ")");
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "FilesByID: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -2152,6 +2184,7 @@ SqliteMetaImpl::CleanUpShadowFiles() {
|
|
|
|
|
+ std::to_string(SegmentSchema::NEW) + ","
|
|
|
|
|
+ std::to_string(SegmentSchema::NEW_MERGE) + ","
|
|
|
|
|
+ std::to_string(SegmentSchema::NEW_INDEX) + ");";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "CleanUpShadowFiles: " << statement;
|
|
|
|
|
|
|
|
|
|
auto status = SqlTransaction({statement});
|
|
|
|
|
fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.fail_commited", status = Status(DB_ERROR, ""));
|
|
|
|
@ -2263,6 +2296,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/
|
|
|
|
|
idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR "
|
|
|
|
|
statement = "DELETE FROM " + std::string(META_TABLEFILES) + " WHERE " + idsToDeleteStr + ";";
|
|
|
|
|
statements.emplace_back(statement);
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -2299,6 +2333,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/
|
|
|
|
|
++remove_collections;
|
|
|
|
|
|
|
|
|
|
statement = "DELETE FROM " + std::string(META_TABLES) + " WHERE id = " + resRow["id"] + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement;
|
|
|
|
|
status = SqlTransaction({statement});
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
|
return HandleException("Failed to clean up with ttl", status.message().c_str());
|
|
|
|
@ -2355,6 +2390,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/
|
|
|
|
|
for (auto& segment_id : segment_ids) {
|
|
|
|
|
std::string statement = "SELECT id FROM " + std::string(META_TABLEFILES)
|
|
|
|
|
+ " WHERE segment_id = " + Quote(segment_id.first) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -2392,6 +2428,7 @@ SqliteMetaImpl::Count(const std::string& collection_id, uint64_t& result) {
|
|
|
|
|
+ " AND (file_type = " + std::to_string(SegmentSchema::RAW)
|
|
|
|
|
+ " OR file_type = " + std::to_string(SegmentSchema::TO_INDEX)
|
|
|
|
|
+ " OR file_type = " + std::to_string(SegmentSchema::INDEX) + ");";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "Count: " << statement;
|
|
|
|
|
|
|
|
|
|
// to ensure UpdateCollectionFiles to be a atomic operation
|
|
|
|
|
std::lock_guard<std::mutex> meta_lock(operation_mutex_);
|
|
|
|
@ -2427,6 +2464,10 @@ SqliteMetaImpl::DropAll() {
|
|
|
|
|
statement + FIELDS_SCHEMA.name() + ";",
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
for (auto& sql : statements) {
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "DropAll: " << sql;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto status = SqlTransaction(statements);
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
|
return HandleException("Failed to drop all", status.message().c_str());
|
|
|
|
@ -2453,6 +2494,7 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
|
|
|
|
|
std::string statement = "SELECT id, file_size FROM " + std::string(META_TABLEFILES)
|
|
|
|
|
+ " WHERE file_type <> " + std::to_string(SegmentSchema::TO_DELETE)
|
|
|
|
|
+ " ORDER BY id ASC LIMIT 10;";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "DiscardFiles: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
@ -2485,6 +2527,7 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
|
|
|
|
|
+ " SET file_type = " + std::to_string(SegmentSchema::TO_DELETE)
|
|
|
|
|
+ " ,updated_time = " + std::to_string(utils::GetMicroSecTimeStamp())
|
|
|
|
|
+ " WHERE " + idsToDiscardStr + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "DiscardFiles: " << statement;
|
|
|
|
|
|
|
|
|
|
status = SqlTransaction({statement});
|
|
|
|
|
fiu_do_on("SqliteMetaImpl.DiscardFiles.fail_commited", status = Status(DB_ERROR, ""));
|
|
|
|
@ -2521,6 +2564,7 @@ SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
|
|
|
|
|
|
|
|
|
|
if (first_create) { // first time to get global lsn
|
|
|
|
|
statement = "INSERT INTO " + std::string(META_ENVIRONMENT) + " VALUES(" + std::to_string(lsn) + ");";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "SetGlobalLastLSN: " << statement;
|
|
|
|
|
|
|
|
|
|
status = SqlTransaction({statement});
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
@ -2528,6 +2572,7 @@ SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
|
|
|
|
|
}
|
|
|
|
|
} else if (lsn > last_lsn) {
|
|
|
|
|
statement = "UPDATE " + std::string(META_ENVIRONMENT) + " SET global_lsn = " + std::to_string(lsn) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "SetGlobalLastLSN: " << statement;
|
|
|
|
|
|
|
|
|
|
status = SqlTransaction({statement});
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
@ -2535,7 +2580,6 @@ SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "Update global lsn = " << lsn;
|
|
|
|
|
} catch (std::exception& e) {
|
|
|
|
|
std::string msg = "Exception update global lsn = " + lsn;
|
|
|
|
|
return HandleException(msg, e.what());
|
|
|
|
@ -2550,6 +2594,7 @@ SqliteMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
|
|
|
|
|
server::MetricCollector metric;
|
|
|
|
|
|
|
|
|
|
std::string statement = "SELECT global_lsn FROM " + std::string(META_ENVIRONMENT) + ";";
|
|
|
|
|
LOG_ENGINE_DEBUG_ << "GetGlobalLastLSN: " << statement;
|
|
|
|
|
|
|
|
|
|
AttrsMapList res;
|
|
|
|
|
auto status = SqlQuery(statement, &res);
|
|
|
|
|