mirror of https://github.com/milvus-io/milvus.git
Fix sqlite insert error (#3191)
* Fix sqlite transaction error Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com> * Update header reference in ut Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com>pull/3188/head^2
parent
22ea1e3423
commit
d30f4c8585
|
@ -17,9 +17,6 @@
|
|||
#include <vector>
|
||||
|
||||
#include "db/meta/MetaSession.h"
|
||||
#include "db/meta/backend/MockEngine.h"
|
||||
#include "db/meta/backend/MySqlEngine.h"
|
||||
#include "db/meta/backend/SqliteEngine.h"
|
||||
#include "db/snapshot/Resources.h"
|
||||
#include "utils/Exception.h"
|
||||
|
||||
|
@ -77,20 +74,14 @@ class MetaAdapter {
|
|||
template <typename ResourceT>
|
||||
Status
|
||||
Apply(snapshot::ResourceContextPtr<ResourceT> resp, int64_t& result_id) {
|
||||
result_id = 0;
|
||||
|
||||
auto session = CreateSession();
|
||||
session->Apply<ResourceT>(resp);
|
||||
STATUS_CHECK(session->Apply<ResourceT>(resp));
|
||||
|
||||
std::vector<int64_t> result_ids;
|
||||
auto status = session->Commit(result_ids);
|
||||
|
||||
if (!status.ok()) {
|
||||
throw Exception(status.code(), status.message());
|
||||
}
|
||||
STATUS_CHECK(session->Commit(result_ids));
|
||||
|
||||
if (result_ids.size() != 1) {
|
||||
throw Exception(1, "Result id is not equal one ... ");
|
||||
return Status(DB_ERROR, "Result id is wrong");
|
||||
}
|
||||
|
||||
result_id = result_ids.at(0);
|
||||
|
|
|
@ -11,15 +11,18 @@
|
|||
|
||||
#include "db/meta/MetaFactory.h"
|
||||
|
||||
#include "db/Utils.h"
|
||||
#include "utils/Exception.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
#include <cstdlib>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
#include "db/Utils.h"
|
||||
#include "db/meta/backend/MockEngine.h"
|
||||
#include "db/meta/backend/MySqlEngine.h"
|
||||
#include "db/meta/backend/SqliteEngine.h"
|
||||
#include "utils/Exception.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
namespace milvus::engine {
|
||||
|
||||
DBMetaOptions
|
||||
|
@ -61,7 +64,6 @@ MetaFactory::Build(const DBMetaOptions& meta_options) {
|
|||
return std::make_shared<meta::MetaAdapter>(engine);
|
||||
} else if (strcasecmp(uri_info.dialect_.c_str(), "sqlite") == 0) {
|
||||
LOG_ENGINE_INFO_ << "Using Sqlite";
|
||||
/* options.backend_uri_ = "mock://:@:/"; */
|
||||
auto engine = std::make_shared<meta::SqliteEngine>(meta_options);
|
||||
return std::make_shared<meta::MetaAdapter>(engine);
|
||||
} else {
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
|
||||
#include <vector>
|
||||
|
||||
#include "db/meta/backend/MetaContext.h"
|
||||
#include "utils/StringHelpFunctions.h"
|
||||
|
||||
namespace milvus::engine::meta {
|
||||
|
|
|
@ -11,6 +11,8 @@
|
|||
|
||||
#include "db/meta/backend/MetaSchema.h"
|
||||
|
||||
#include "utils/StringHelpFunctions.h"
|
||||
|
||||
namespace milvus::engine::meta {
|
||||
|
||||
bool
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "utils/StringHelpFunctions.h"
|
||||
|
||||
namespace milvus::engine::meta {
|
||||
|
||||
class MetaField {
|
||||
|
@ -26,7 +24,7 @@ class MetaField {
|
|||
: name_(name), type_(type), setting_(setting) {
|
||||
}
|
||||
|
||||
std::string
|
||||
const std::string&
|
||||
name() const {
|
||||
return name_;
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
|
||||
#include "db/meta/backend/SqliteEngine.h"
|
||||
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
@ -123,10 +122,6 @@ static const MetaSchema FIELDELEMENT_SCHEMA(TABLE_FIELD_ELEMENT,
|
|||
|
||||
/////////////////////////////////////////////////////
|
||||
static AttrsMapList* QueryData = nullptr;
|
||||
static AttrsMapList* InsertData = nullptr;
|
||||
static std::vector<int64_t>* InsertedIDs;
|
||||
static AttrsMapList* UpdateData = nullptr;
|
||||
static AttrsMapList* DeleteData = nullptr;
|
||||
|
||||
static int
|
||||
QueryCallback(void* data, int argc, char** argv, char** azColName) {
|
||||
|
@ -146,18 +141,32 @@ QueryCallback(void* data, int argc, char** argv, char** azColName) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string
|
||||
ErrorMsg(sqlite3* db) {
|
||||
std::stringstream ss;
|
||||
ss << "(code:" << sqlite3_errcode(db) << ", extend code: " << sqlite3_extended_errcode(db)
|
||||
<< ", sys errno:" << sqlite3_system_errno(db) << ", sys err msg:" << strerror(errno)
|
||||
<< ", msg:" << sqlite3_errmsg(db) << ")";
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
SqliteEngine::SqliteEngine(const DBMetaOptions& options) : options_(options) {
|
||||
std::string meta_path = options_.path_ + "/meta.sqlite";
|
||||
int rc = sqlite3_open(meta_path.c_str(), &db_);
|
||||
if (rc) {
|
||||
std::string err = "Cannot open Sqlite database: ";
|
||||
err += sqlite3_errmsg(db_);
|
||||
if (sqlite3_open(meta_path.c_str(), &db_) != SQLITE_OK) {
|
||||
std::string err = "Cannot open Sqlite database: " + ErrorMsg(db_);
|
||||
std::cerr << err << std::endl;
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
|
||||
sqlite3_extended_result_codes(db_, 1); // allow extend error code
|
||||
if (SQLITE_OK != sqlite3_exec(db_, "pragma journal_mode = WAL", nullptr, nullptr, nullptr)) {
|
||||
std::string errs = "Sqlite configure wal journal mode to WAL failed: " + ErrorMsg(db_);
|
||||
std::cerr << errs << std::endl;
|
||||
throw std::runtime_error(errs);
|
||||
}
|
||||
|
||||
Initialize();
|
||||
}
|
||||
|
||||
|
@ -200,12 +209,8 @@ SqliteEngine::Query(const MetaQueryContext& context, AttrsMapList& attrs) {
|
|||
std::lock_guard<std::mutex> lock(meta_mutex_);
|
||||
|
||||
QueryData = &attrs;
|
||||
auto rc = sqlite3_exec(db_, sql.c_str(), QueryCallback, NULL, NULL);
|
||||
|
||||
if (rc != SQLITE_OK) {
|
||||
std::string err = "Query fail:";
|
||||
err += sqlite3_errmsg(db_);
|
||||
std::cerr << err << std::endl;
|
||||
if (SQLITE_OK != sqlite3_exec(db_, sql.c_str(), QueryCallback, nullptr, nullptr)) {
|
||||
std::string err = "Query fail:" + ErrorMsg(db_);
|
||||
return Status(DB_META_QUERY_FAILED, err);
|
||||
}
|
||||
|
||||
|
@ -225,15 +230,14 @@ SqliteEngine::ExecuteTransaction(const std::vector<MetaApplyContext>& sql_contex
|
|||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(meta_mutex_);
|
||||
int rc = sqlite3_exec(db_, "BEGIN", NULL, NULL, NULL);
|
||||
if (rc != SQLITE_OK) {
|
||||
std::string sql_err =
|
||||
std::string("(code:") + std::to_string(sqlite3_errcode(db_)) + ", msg:" + sqlite3_errmsg(db_) + ")";
|
||||
if (SQLITE_OK != sqlite3_exec(db_, "BEGIN", nullptr, nullptr, nullptr)) {
|
||||
std::string sql_err = "Sqlite begin transaction failed: " + ErrorMsg(db_);
|
||||
return Status(DB_ERROR, sql_err);
|
||||
}
|
||||
|
||||
int rc = SQLITE_OK;
|
||||
for (size_t i = 0; i < sql_contexts.size(); i++) {
|
||||
rc = sqlite3_exec(db_, sqls[i].c_str(), NULL, NULL, NULL);
|
||||
rc = sqlite3_exec(db_, sqls[i].c_str(), nullptr, nullptr, nullptr);
|
||||
if (rc != SQLITE_OK) {
|
||||
break;
|
||||
}
|
||||
|
@ -249,18 +253,16 @@ SqliteEngine::ExecuteTransaction(const std::vector<MetaApplyContext>& sql_contex
|
|||
} else if (sql_contexts[i].op_ == oUpdate || sql_contexts[i].op_ == oDelete) {
|
||||
result_ids.push_back(sql_contexts[i].id_);
|
||||
} else {
|
||||
sqlite3_exec(db_, "ROLLBACK", NULL, NULL, NULL);
|
||||
return Status(SERVER_UNEXPECTED_ERROR, "Unknown Op");
|
||||
sqlite3_exec(db_, "ROLLBACK", nullptr, nullptr, nullptr);
|
||||
return Status(DB_ERROR, "Unknown Op");
|
||||
}
|
||||
}
|
||||
if (SQLITE_OK != rc) {
|
||||
std::string err = "Execute Fail:";
|
||||
err += sqlite3_errmsg(db_);
|
||||
sqlite3_exec(db_, "ROLLBACK", NULL, NULL, NULL);
|
||||
return Status(SERVER_UNEXPECTED_ERROR, err);
|
||||
if (SQLITE_OK != rc || SQLITE_OK != sqlite3_exec(db_, "COMMIT", nullptr, nullptr, nullptr)) {
|
||||
std::string err = "Execute Fail:" + ErrorMsg(db_);
|
||||
sqlite3_exec(db_, "ROLLBACK", nullptr, nullptr, nullptr);
|
||||
return Status(DB_ERROR, err);
|
||||
}
|
||||
|
||||
sqlite3_exec(db_, "COMMIT", NULL, NULL, NULL);
|
||||
return Status();
|
||||
}
|
||||
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
#include "codecs/Codec.h"
|
||||
#include "db/Utils.h"
|
||||
#include "db/meta/MetaFactory.h"
|
||||
#include "db/meta/MetaSession.h"
|
||||
#include "db/snapshot/ResourceContext.h"
|
||||
#include "db/snapshot/ResourceTypes.h"
|
||||
#include "db/snapshot/Resources.h"
|
||||
|
@ -211,7 +210,7 @@ class Store : public std::enable_shared_from_this<Store> {
|
|||
IDS_TYPE
|
||||
AllActiveCollectionCommitIds(ID_TYPE collection_id, bool reversed = true) const {
|
||||
IDS_TYPE ids, selected_ids;
|
||||
adapter_->SelectResourceIDs<CollectionCommit, int64_t>(selected_ids, meta::F_COLLECTON_ID, {collection_id});
|
||||
adapter_->SelectResourceIDs<CollectionCommit, int64_t>(selected_ids, CollectionIdField::Name, {collection_id});
|
||||
|
||||
if (!reversed) {
|
||||
ids = selected_ids;
|
||||
|
|
|
@ -36,6 +36,9 @@
|
|||
#ifdef MILVUS_GPU_VERSION
|
||||
#include "knowhere/index/vector_index/helpers/FaissGpuResourceMgr.h"
|
||||
#endif
|
||||
#include "db/meta/backend/MockEngine.h"
|
||||
#include "db/meta/backend/MySqlEngine.h"
|
||||
#include "db/meta/backend/SqliteEngine.h"
|
||||
#include "scheduler/ResourceFactory.h"
|
||||
#include "scheduler/SchedInst.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
|
|
Loading…
Reference in New Issue