rewrite dabloom (#3629)

* rewrite dabloom

Signed-off-by: groot <yihua.mo@zilliz.com>

* refine code

Signed-off-by: groot <yihua.mo@zilliz.com>

* unittest

Signed-off-by: groot <yihua.mo@zilliz.com>
Signed-off-by: shengjun.li <shengjun.li@zilliz.com>
pull/3745/head
groot 2020-09-07 16:26:29 +08:00 committed by shengjun.li
parent 08d5c145f0
commit 11c68b20c2
11 changed files with 275 additions and 291 deletions

View File

@ -29,9 +29,6 @@ namespace codec {
const char* BLOOM_FILTER_POSTFIX = ".blf";
constexpr unsigned int BLOOM_FILTER_CAPACITY = 500000;
constexpr double BLOOM_FILTER_ERROR_RATE = 0.01;
std::string
IdBloomFilterFormat::FilePostfix() {
std::string str = BLOOM_FILTER_POSTFIX;
@ -43,13 +40,18 @@ IdBloomFilterFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string
segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
try {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
scaling_bloom_t* bloom_filter =
new_scaling_bloom_from_file(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str());
fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr);
if (bloom_filter == nullptr) {
return Status(SERVER_UNEXPECTED_ERROR, "Fail to read bloom filter from file: " + full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open bloom filter file: " + full_file_path);
}
id_bloom_filter_ptr = std::make_shared<segment::IdBloomFilter>(bloom_filter);
id_bloom_filter_ptr = std::make_shared<segment::IdBloomFilter>();
auto status = id_bloom_filter_ptr->Read(fs_ptr);
if (!status.ok()) {
fs_ptr->reader_ptr_->Close();
return status;
}
fs_ptr->reader_ptr_->Close();
} catch (std::exception& ex) {
std::string msg = "Failed to read bloom filter file, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
@ -64,9 +66,17 @@ IdBloomFilterFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::strin
const segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
try {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) {
return Status(SERVER_UNEXPECTED_ERROR, "Fail to write bloom filter to file: " + full_file_path);
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open bloom filter file: " + full_file_path);
}
auto status = id_bloom_filter_ptr->Write(fs_ptr);
if (!status.ok()) {
fs_ptr->writer_ptr_->Close();
return status;
}
fs_ptr->writer_ptr_->Close();
} catch (std::exception& ex) {
std::string msg = "Failed to write bloom filter file, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
@ -76,25 +86,5 @@ IdBloomFilterFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::strin
return Status::OK();
}
Status
IdBloomFilterFormat::Create(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
try {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
scaling_bloom_t* bloom_filter =
new_scaling_bloom(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str());
if (bloom_filter == nullptr) {
return Status(SERVER_UNEXPECTED_ERROR, "Failed to read bloom filter from file: " + full_file_path);
}
id_bloom_filter_ptr = std::make_shared<segment::IdBloomFilter>(bloom_filter);
} catch (std::exception& ex) {
std::string msg = "Failed to create bloom filter file, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
return Status(SERVER_UNEXPECTED_ERROR, msg);
}
return Status::OK();
}
} // namespace codec
} // namespace milvus

View File

@ -42,10 +42,6 @@ class IdBloomFilterFormat {
Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const segment::IdBloomFilterPtr& id_bloom_filter_ptr);
Status
Create(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
segment::IdBloomFilterPtr& id_bloom_filter_ptr);
// No copy and move
IdBloomFilterFormat(const IdBloomFilterFormat&) = delete;
IdBloomFilterFormat(IdBloomFilterFormat&&) = delete;

View File

@ -256,8 +256,7 @@ MemCollection::ApplyDeleteToFile() {
}
}
segment::IdBloomFilterPtr bloom_filter;
STATUS_CHECK(segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter));
segment::IdBloomFilterPtr bloom_filter = std::make_shared<segment::IdBloomFilter>(uids.size());
std::vector<engine::offset_t> delete_docs_offset;
for (size_t i = 0; i < uids.size(); i++) {
if (std::binary_search(ids_to_check.begin(), ids_to_check.end(), uids[i])) {

View File

@ -16,6 +16,7 @@
// under the License.
#include "segment/IdBloomFilter.h"
#include "db/Utils.h"
#include "utils/Log.h"
#include "utils/Status.h"
@ -24,46 +25,70 @@
namespace milvus {
namespace segment {
IdBloomFilter::IdBloomFilter(scaling_bloom_t* bloom_filter) : bloom_filter_(bloom_filter) {
constexpr double BLOOM_FILTER_ERROR_RATE = 0.01;
constexpr int64_t CAPACITY_EXPAND = 1024;
IdBloomFilter::IdBloomFilter(int64_t capacity) : capacity_(capacity + CAPACITY_EXPAND) {
}
IdBloomFilter::~IdBloomFilter() {
const std::lock_guard<std::mutex> lock(mutex_);
if (bloom_filter_) {
free_scaling_bloom(bloom_filter_);
}
FreeBloomFilter();
}
scaling_bloom_t*
IdBloomFilter::GetBloomFilter() {
const std::lock_guard<std::mutex> lock(mutex_);
if (bloom_filter_ == nullptr) {
bloom_filter_ = new_scaling_bloom(capacity_, BLOOM_FILTER_ERROR_RATE);
}
return bloom_filter_;
}
void
IdBloomFilter::FreeBloomFilter() {
if (bloom_filter_) {
free_scaling_bloom(bloom_filter_);
bloom_filter_ = nullptr;
}
}
bool
IdBloomFilter::Check(engine::idx_t uid) {
std::string s = std::to_string(uid);
const std::lock_guard<std::mutex> lock(mutex_);
return scaling_bloom_check(bloom_filter_, s.c_str(), s.size());
scaling_bloom_t* bloom_filter = GetBloomFilter();
if (bloom_filter == nullptr) {
return true; // bloom filter doesn't work, always return true
}
return scaling_bloom_check(bloom_filter, s.c_str(), s.size());
}
Status
IdBloomFilter::Add(engine::idx_t uid) {
std::string s = std::to_string(uid);
const std::lock_guard<std::mutex> lock(mutex_);
if (scaling_bloom_add(bloom_filter_, s.c_str(), s.size(), uid) == -1) {
scaling_bloom_t* bloom_filter = GetBloomFilter();
if (bloom_filter == nullptr) {
return Status(DB_ERROR, "bloom filter is null pointer"); // bloom filter doesn't work
}
if (scaling_bloom_add(bloom_filter, s.c_str(), s.size(), uid) == -1) {
// Counter overflow does not affect bloom filter's normal functionality
LOG_ENGINE_WARNING_ << "Warning adding id=" << s << " to bloom filter: 4 bit counter Overflow";
// return Status(DB_BLOOM_FILTER_ERROR, "Bloom filter error: 4 bit counter Overflow");
}
return Status::OK();
}
Status
IdBloomFilter::Remove(engine::idx_t uid) {
std::string s = std::to_string(uid);
const std::lock_guard<std::mutex> lock(mutex_);
if (scaling_bloom_remove(bloom_filter_, s.c_str(), s.size(), uid) == -1) {
scaling_bloom_t* bloom_filter = GetBloomFilter();
if (bloom_filter == nullptr) {
return Status(DB_ERROR, "bloom filter is null pointer"); // bloom filter doesn't work
}
if (scaling_bloom_remove(bloom_filter, s.c_str(), s.size(), uid) == -1) {
// Should never go in here, but just to be safe
LOG_ENGINE_WARNING_ << "Warning removing id=" << s << " in bloom filter: Decrementing zero in counter";
// return Status(DB_BLOOM_FILTER_ERROR, "Error removing in bloom filter: Decrementing zero in counter");
@ -81,5 +106,63 @@ IdBloomFilter::Size() {
return bloom_filter_ ? bloom_filter_->num_bytes : 0;
}
double
IdBloomFilter::ErrorRate() const {
return BLOOM_FILTER_ERROR_RATE;
}
Status
IdBloomFilter::Write(const storage::FSHandlerPtr& fs_ptr) {
scaling_bloom_t* bloom_filter = GetBloomFilter();
try {
fs_ptr->writer_ptr_->Write(&(bloom_filter->capacity), sizeof(bloom_filter->capacity));
fs_ptr->writer_ptr_->Write(&(bloom_filter->error_rate), sizeof(bloom_filter->error_rate));
fs_ptr->writer_ptr_->Write(&(bloom_filter->bitmap->bytes), sizeof(bloom_filter->bitmap->bytes));
fs_ptr->writer_ptr_->Write(bloom_filter->bitmap->array, bloom_filter->bitmap->bytes);
} catch (std::exception& ex) {
std::string err_msg = "Failed to write bloom filter: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
Status
IdBloomFilter::Read(const storage::FSHandlerPtr& fs_ptr) {
FreeBloomFilter();
try {
unsigned int capacity = 0;
fs_ptr->reader_ptr_->Read(&capacity, sizeof(capacity));
capacity_ = capacity;
double error_rate = 0.0;
fs_ptr->reader_ptr_->Read(&error_rate, sizeof(error_rate));
size_t bitmap_bytes = 0;
fs_ptr->reader_ptr_->Read(&bitmap_bytes, sizeof(bitmap_bytes));
bloom_filter_ = new_scaling_bloom(capacity, error_rate);
if (bitmap_bytes != bloom_filter_->bitmap->bytes) {
FreeBloomFilter();
return Status(DB_ERROR, "Invalid bloom filter file");
}
fs_ptr->reader_ptr_->Read(bloom_filter_->bitmap->array, bitmap_bytes);
} catch (std::exception& ex) {
std::string err_msg = "Failed to read bloom filter: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
FreeBloomFilter();
return Status(SERVER_UNEXPECTED_ERROR, err_msg);
}
return Status::OK();
}
} // namespace segment
} // namespace milvus

View File

@ -19,24 +19,25 @@
#include <memory>
#include <mutex>
#include <string>
#include "cache/DataObj.h"
#include "dablooms/dablooms.h"
#include "db/Types.h"
#include "storage/FSHandler.h"
#include "utils/Status.h"
namespace milvus {
namespace segment {
constexpr int64_t DEFAULT_BLOOM_FILTER_CAPACITY = 500000;
class IdBloomFilter : public cache::DataObj {
public:
explicit IdBloomFilter(scaling_bloom_t* bloom_filter);
explicit IdBloomFilter(int64_t capacity = DEFAULT_BLOOM_FILTER_CAPACITY);
~IdBloomFilter();
scaling_bloom_t*
GetBloomFilter();
bool
Check(engine::idx_t uid);
@ -49,8 +50,14 @@ class IdBloomFilter : public cache::DataObj {
int64_t
Size() override;
// const std::string&
// GetName() const;
double
ErrorRate() const;
Status
Write(const storage::FSHandlerPtr& fs_ptr);
Status
Read(const storage::FSHandlerPtr& fs_ptr);
// No copy and move
IdBloomFilter(const IdBloomFilter&) = delete;
@ -62,9 +69,15 @@ class IdBloomFilter : public cache::DataObj {
operator=(IdBloomFilter&&) = delete;
private:
scaling_bloom_t* bloom_filter_;
// const std::string name_ = "bloom_filter";
std::mutex mutex_;
scaling_bloom_t*
GetBloomFilter();
void
FreeBloomFilter();
private:
scaling_bloom_t* bloom_filter_ = nullptr;
int64_t capacity_ = 0;
};
using IdBloomFilterPtr = std::shared_ptr<IdBloomFilter>;

View File

@ -477,8 +477,7 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
if (id_bloom_filter_ptr) {
segment_ptr_->SetBloomFilter(id_bloom_filter_ptr);
// TODO: disable cache for solving bloom filter ptr problem
// cache::CpuCacheMgr::GetInstance().InsertItem(file_path, id_bloom_filter_ptr); // put into cache
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, id_bloom_filter_ptr); // put into cache
}
} catch (std::exception& e) {
std::string err_msg = "Failed to load bloom filter: " + std::string(e.what());

View File

@ -145,12 +145,9 @@ SegmentWriter::WriteBloomFilter() {
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
auto& ss_codec = codec::Codec::instance();
segment::IdBloomFilterPtr bloom_filter_ptr;
STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr));
auto uids = reinterpret_cast<int64_t*>(uid_data->data_.data());
int64_t row_count = segment_ptr_->GetRowCount();
segment::IdBloomFilterPtr bloom_filter_ptr = std::make_shared<segment::IdBloomFilter>(row_count);
for (int64_t i = 0; i < row_count; i++) {
bloom_filter_ptr->Add(uids[i]);
}
@ -169,14 +166,6 @@ SegmentWriter::WriteBloomFilter() {
return Status::OK();
}
Status
SegmentWriter::CreateBloomFilter(const std::string& file_path, IdBloomFilterPtr& bloom_filter_ptr) {
auto& ss_codec = codec::Codec::instance();
STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr));
return Status::OK();
}
Status
SegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilterPtr& id_bloom_filter_ptr) {
if (id_bloom_filter_ptr == nullptr) {

View File

@ -42,9 +42,6 @@ class SegmentWriter {
Status
AddChunk(const engine::DataChunkPtr& chunk_ptr, int64_t from, int64_t to);
Status
CreateBloomFilter(const std::string& file_path, IdBloomFilterPtr& bloom_filter_ptr);
Status
WriteBloomFilter(const std::string& file_path, const IdBloomFilterPtr& bloom_filter_ptr);

View File

@ -29,84 +29,39 @@ const char *dablooms_version(void)
void free_bitmap(bitmap_t *bitmap)
{
if ((munmap(bitmap->array, bitmap->bytes)) < 0) {
perror("Error, unmapping memory");
}
close(bitmap->fd);
free(bitmap->array);
bitmap->bytes = 0;
free(bitmap);
}
bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size)
{
int fd = bitmap->fd;
struct stat fileStat;
fstat(fd, &fileStat);
size_t size = fileStat.st_size;
/* grow file if necessary */
if (size < new_size) {
if (ftruncate(fd, new_size) < 0) {
perror("Error increasing file size with ftruncate");
free_bitmap(bitmap);
close(fd);
return NULL;
}
}
lseek(fd, 0, SEEK_SET);
/* resize if mmap exists and possible on this os, else new mmap */
if (bitmap->array != NULL) {
#if __linux
bitmap->array = (char *)mremap(bitmap->array, old_size, new_size, MREMAP_MAYMOVE);
if (bitmap->array == MAP_FAILED) {
perror("Error resizing mmap");
free_bitmap(bitmap);
close(fd);
return NULL;
}
#else
if (munmap(bitmap->array, bitmap->bytes) < 0) {
perror("Error unmapping memory");
free_bitmap(bitmap);
close(fd);
return NULL;
}
bitmap->array = NULL;
#endif
}
if (bitmap->array == NULL) {
bitmap->array = (char *)mmap(0, new_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (bitmap->array == MAP_FAILED) {
perror("Error init mmap");
free_bitmap(bitmap);
close(fd);
return NULL;
}
bitmap->array = (char*)realloc(bitmap->array, new_size);
} else {
bitmap->array = (char*)malloc(new_size);
}
bitmap->bytes = new_size;
if (bitmap->array != NULL) {
bitmap->bytes = new_size;
}
return bitmap;
}
/* Create a new bitmap, not full featured, simple to give
* us a means of interacting with the 4 bit counters */
bitmap_t *new_bitmap(int fd, size_t bytes)
bitmap_t *new_bitmap(size_t bytes)
{
bitmap_t *bitmap;
if ((bitmap = (bitmap_t *)malloc(sizeof(bitmap_t))) == NULL) {
return NULL;
}
bitmap->bytes = bytes;
bitmap->fd = fd;
bitmap->array = NULL;
if ((bitmap = bitmap_resize(bitmap, 0, bytes)) == NULL) {
if ((bitmap->array = (char*)malloc(bytes)) == NULL) {
return NULL;
}
bitmap->bytes = bytes;
return bitmap;
}
@ -201,18 +156,6 @@ void hash_func(counting_bloom_t *bloom, const char *key, size_t key_len, uint32_
}
}
int free_counting_bloom(counting_bloom_t *bloom)
{
if (bloom != NULL) {
free(bloom->hashes);
bloom->hashes = NULL;
free_bitmap(bloom->bitmap);
free(bloom);
bloom = NULL;
}
return 0;
}
counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate, long offset)
{
counting_bloom_t *bloom;
@ -225,8 +168,8 @@ counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate,
bloom->capacity = capacity;
bloom->error_rate = error_rate;
bloom->offset = offset + sizeof(counting_bloom_header_t);
bloom->nfuncs = (int) ceil(log(1 / error_rate) / log(2));
bloom->counts_per_func = (int) ceil(capacity * fabs(log(error_rate)) / (bloom->nfuncs * pow(log(2), 2)));
bloom->nfuncs = (size_t) ceil(log(1 / error_rate) / log(2));
bloom->counts_per_func = (unsigned int) ceil(capacity * fabs(log(error_rate)) / (bloom->nfuncs * pow(log(2), 2)));
bloom->size = bloom->nfuncs * bloom->counts_per_func;
/* rounding-up integer divide by 2 of bloom->size */
bloom->num_bytes = ((bloom->size + 1) / 2) + sizeof(counting_bloom_header_t);
@ -235,23 +178,6 @@ counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate,
return bloom;
}
counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate, const char *filename)
{
counting_bloom_t *cur_bloom;
int fd;
if ((fd = open(filename, O_RDWR | O_CREAT | O_TRUNC, (mode_t)0600)) < 0) {
perror("Error, Opening File Failed");
fprintf(stderr, " %s \n", filename);
return NULL;
}
cur_bloom = counting_bloom_init(capacity, error_rate, 0);
cur_bloom->bitmap = new_bitmap(fd, cur_bloom->num_bytes);
cur_bloom->header = (counting_bloom_header_t *)(cur_bloom->bitmap->array);
return cur_bloom;
}
int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len)
{
unsigned int index, i, offset;
@ -313,10 +239,6 @@ int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len)
int free_scaling_bloom(scaling_bloom_t *bloom)
{
if(close(bloom->fd) == -1) {
std::cerr << " Close fd " << bloom->fd << "Failed: " << strerror(errno) << std::endl;
}
int i;
for (i = bloom->num_blooms - 1; i >= 0; i--) {
free(bloom->blooms[i]->hashes);
@ -366,54 +288,9 @@ counting_bloom_t *new_counting_bloom_from_scale(scaling_bloom_t *bloom)
return cur_bloom;
}
counting_bloom_t *new_counting_bloom_from_file(unsigned int capacity, double error_rate, const char *filename)
{
int fd;
off_t size;
counting_bloom_t *bloom;
if ((fd = open(filename, O_RDWR, (mode_t)0600)) < 0) {
fprintf(stderr, "Error, Could not open file %s: %s\n", filename, strerror(errno));
return NULL;
}
if ((size = lseek(fd, 0, SEEK_END)) < 0) {
perror("Error, calling lseek() to tell file size");
close(fd);
return NULL;
}
if (size == 0) {
fprintf(stderr, "Error, File size zero\n");
}
bloom = counting_bloom_init(capacity, error_rate, 0);
if (size != bloom->num_bytes) {
free_counting_bloom(bloom);
fprintf(stderr, "Error, Actual filesize and expected filesize are not equal\n");
return NULL;
}
if ((bloom->bitmap = new_bitmap(fd, size)) == NULL) {
fprintf(stderr, "Error, Could not create bitmap with file\n");
free_counting_bloom(bloom);
return NULL;
}
bloom->header = (counting_bloom_header_t *)(bloom->bitmap->array);
return bloom;
}
uint64_t scaling_bloom_clear_seqnums(scaling_bloom_t *bloom)
{
uint64_t seqnum;
if (bloom->header->disk_seqnum != 0) {
// disk_seqnum cleared on disk before any other changes
bloom->header->disk_seqnum = 0;
bitmap_flush(bloom->bitmap);
}
seqnum = bloom->header->mem_seqnum;
uint64_t seqnum = bloom->header->mem_seqnum;
bloom->header->mem_seqnum = 0;
return seqnum;
}
@ -494,32 +371,18 @@ int scaling_bloom_flush(scaling_bloom_t *bloom)
if (bitmap_flush(bloom->bitmap) != 0) {
return -1;
}
// all changes written to disk before disk_seqnum set
if (bloom->header->disk_seqnum == 0) {
bloom->header->disk_seqnum = bloom->header->mem_seqnum;
return bitmap_flush(bloom->bitmap);
}
return 0;
return bitmap_flush(bloom->bitmap);
}
uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom)
{
return bloom->header->mem_seqnum;
}
uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom)
{
return bloom->header->disk_seqnum;
}
scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate, const char *filename, int fd)
scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate)
{
scaling_bloom_t *bloom;
if ((bloom = (scaling_bloom_t *)malloc(sizeof(scaling_bloom_t))) == NULL) {
return NULL;
}
if ((bloom->bitmap = new_bitmap(fd, sizeof(scaling_bloom_header_t))) == NULL) {
if ((bloom->bitmap = new_bitmap(sizeof(scaling_bloom_header_t))) == NULL) {
fprintf(stderr, "Error, Could not create bitmap with file\n");
free_scaling_bloom(bloom);
return NULL;
@ -530,26 +393,17 @@ scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate, co
bloom->error_rate = error_rate;
bloom->num_blooms = 0;
bloom->num_bytes = sizeof(scaling_bloom_header_t);
bloom->fd = fd;
bloom->blooms = NULL;
return bloom;
}
scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate, const char *filename)
scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate)
{
scaling_bloom_t *bloom;
counting_bloom_t *cur_bloom;
int fd;
if ((fd = open(filename, O_RDWR | O_CREAT | O_TRUNC, (mode_t)0600)) < 0) {
perror("Error, Opening File Failed");
fprintf(stderr, " %s \n", filename);
return NULL;
}
bloom = scaling_bloom_init(capacity, error_rate, filename, fd);
bloom = scaling_bloom_init(capacity, error_rate);
if (!(cur_bloom = new_counting_bloom_from_scale(bloom))) {
fprintf(stderr, "Error, Could not create counting bloom\n");
@ -562,40 +416,3 @@ scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate, con
bloom->header->mem_seqnum = 1;
return bloom;
}
scaling_bloom_t *new_scaling_bloom_from_file(unsigned int capacity, double error_rate, const char *filename)
{
int fd;
off_t size;
scaling_bloom_t *bloom;
counting_bloom_t *cur_bloom;
if ((fd = open(filename, O_RDWR, (mode_t)0600)) < 0) {
fprintf(stderr, "Error, Could not open file %s: %s\n", filename, strerror(errno));
return NULL;
}
if ((size = lseek(fd, 0, SEEK_END)) < 0) {
perror("Error, calling lseek() to tell file size");
close(fd);
return NULL;
}
if (size == 0) {
fprintf(stderr, "Error, File size zero\n");
}
bloom = scaling_bloom_init(capacity, error_rate, filename, fd);
size -= sizeof(scaling_bloom_header_t);
while (size) {
cur_bloom = new_counting_bloom_from_scale(bloom);
// leave count and id as they were set in the file
size -= cur_bloom->num_bytes;
if (size < 0) {
free_scaling_bloom(bloom);
fprintf(stderr, "Error, Actual filesize and expected filesize are not equal\n");
return NULL;
}
}
return bloom;
}

View File

@ -9,13 +9,12 @@ const char *dablooms_version(void);
typedef struct {
size_t bytes;
int fd;
char *array;
} bitmap_t;
bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size);
bitmap_t *new_bitmap(int fd, size_t bytes);
bitmap_t *new_bitmap(size_t bytes);
int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset);
int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset);
@ -30,7 +29,6 @@ typedef struct {
uint32_t _pad;
} counting_bloom_header_t;
typedef struct {
counting_bloom_header_t *header;
unsigned int capacity;
@ -44,9 +42,6 @@ typedef struct {
bitmap_t *bitmap;
} counting_bloom_t;
int free_counting_bloom(counting_bloom_t *bloom);
counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate, const char *filename);
counting_bloom_t *new_counting_bloom_from_file(unsigned int capacity, double error_rate, const char *filename);
int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len);
int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len);
int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len);
@ -54,7 +49,6 @@ int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len);
typedef struct {
uint64_t max_id;
uint64_t mem_seqnum;
uint64_t disk_seqnum;
} scaling_bloom_header_t;
typedef struct {
@ -63,18 +57,13 @@ typedef struct {
unsigned int num_blooms;
size_t num_bytes;
double error_rate;
int fd;
counting_bloom_t **blooms;
bitmap_t *bitmap;
} scaling_bloom_t;
scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate, const char *filename);
scaling_bloom_t *new_scaling_bloom_from_file(unsigned int capacity, double error_rate, const char *filename);
scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate);
int free_scaling_bloom(scaling_bloom_t *bloom);
int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id);
int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id);
int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len);
int scaling_bloom_flush(scaling_bloom_t *bloom);
uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom);
uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom);
#endif

View File

@ -14,7 +14,10 @@
#include <gtest/gtest.h>
#include <string>
#include <experimental/filesystem>
#include "codecs/Codec.h"
#include "db/IDGenerator.h"
#include "db/utils.h"
#include "db/SnapshotVisitor.h"
#include "db/Types.h"
@ -23,9 +26,13 @@
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "segment/IdBloomFilter.h"
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "utils/Json.h"
using SegmentVisitor = milvus::engine::SegmentVisitor;
using IdBloomFilter = milvus::segment::IdBloomFilter;
namespace {
milvus::Status
@ -156,3 +163,108 @@ TEST_F(SegmentTest, SegmentTest) {
status = db_->DropCollection(c1);
ASSERT_TRUE(status.ok());
}
TEST(BloomFilterTest, BloomFilterTest) {
std::string file_path = "/tmp/milvus_bloom.blf";
milvus::storage::IOReaderPtr reader_ptr = std::make_shared<milvus::storage::DiskIOReader>();
milvus::storage::IOWriterPtr writer_ptr = std::make_shared<milvus::storage::DiskIOWriter>();
milvus::storage::OperationPtr operation_ptr = nullptr;
auto fs_ptr = std::make_shared<milvus::storage::FSHandler>(reader_ptr, writer_ptr, operation_ptr);
const int64_t id_count = 100000;
milvus::engine::SafeIDGenerator id_gen;
std::vector<int64_t> id_array;
std::vector<int64_t> removed_id_array;
auto error_rate_check_1 = [&](IdBloomFilter& filter, int64_t repeat) -> void {
int64_t wrong_check = 0;
for (int64_t i = 0; i < repeat; ++i) {
auto id = id_gen.GetNextIDNumber();
bool res = filter.Check(id);
if (res) {
wrong_check++;
}
}
double error_rate = filter.ErrorRate();
double wrong_rate = (double)wrong_check/id_count;
ASSERT_LT(wrong_rate, error_rate);
};
auto error_rate_check_2 = [&](IdBloomFilter& filter, const std::vector<int64_t>& id_array) -> void {
int64_t wrong_check = 0;
for (auto id : id_array) {
bool res = filter.Check(id);
if (res) {
wrong_check++;
}
}
double error_rate = filter.ErrorRate();
double wrong_rate = (double)wrong_check/id_count;
ASSERT_LT(wrong_rate, error_rate);
};
{
IdBloomFilter filter(id_count);
// insert some ids
for (int64_t i = 0; i < id_count; ++i) {
auto id = id_gen.GetNextIDNumber();
filter.Add(id);
id_array.push_back(id);
}
// check inserted ids
for (auto id : id_array) {
bool res = filter.Check(id);
ASSERT_TRUE(res);
}
// check non-exist ids
error_rate_check_1(filter, id_count);
// remove some ids
std::vector<int64_t> temp_array;
for (auto id : id_array) {
if (id % 7 == 0) {
filter.Remove(id);
removed_id_array.push_back(id);
} else {
temp_array.push_back(id);
}
}
id_array.swap(temp_array);
// check removed ids
error_rate_check_2(filter, removed_id_array);
fs_ptr->writer_ptr_->Open(file_path);
auto status = filter.Write(fs_ptr);
ASSERT_TRUE(status.ok());
fs_ptr->writer_ptr_->Close();
}
{
IdBloomFilter filter(0);
fs_ptr->reader_ptr_->Open(file_path);
auto status = filter.Read(fs_ptr);
ASSERT_TRUE(status.ok());
fs_ptr->reader_ptr_->Close();
// check inserted ids
for (auto id : id_array) {
bool res = filter.Check(id);
ASSERT_TRUE(res);
}
// check non-exist ids
error_rate_check_1(filter, id_count);
// check removed ids
error_rate_check_2(filter, removed_id_array);
}
std::experimental::filesystem::remove(file_path);
}