mirror of https://github.com/milvus-io/milvus.git
Create segments by query node num
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/4973/head^2
parent
5d5a20dce2
commit
567232b10f
|
@ -52,7 +52,7 @@ reader:
|
|||
writer:
|
||||
clientid: 0
|
||||
stopflag: -2
|
||||
readerqueuesize: 1024
|
||||
readerqueuesize: 10000
|
||||
searchbyidchansize: 10000
|
||||
topicstart: 0
|
||||
topicend: 128
|
||||
|
|
|
@ -20,8 +20,8 @@ Collection::AddIndex(const grpc::IndexParam& index_param) {
|
|||
auto& index_name = index_param.index_name();
|
||||
auto& field_name = index_param.field_name();
|
||||
|
||||
assert(!index_name.empty());
|
||||
assert(!field_name.empty());
|
||||
Assert(!index_name.empty());
|
||||
Assert(!field_name.empty());
|
||||
|
||||
auto index_type = knowhere::IndexEnum::INDEX_FAISS_IVFPQ;
|
||||
auto index_mode = knowhere::IndexMode::MODE_CPU;
|
||||
|
@ -72,7 +72,7 @@ Collection::AddIndex(const grpc::IndexParam& index_param) {
|
|||
dim = field.get_dim();
|
||||
}
|
||||
}
|
||||
assert(dim != 0);
|
||||
Assert(dim != 0);
|
||||
|
||||
index_conf = milvus::knowhere::Config{
|
||||
{knowhere::meta::DIM, dim},
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <vector>
|
||||
#include "EasyAssert.h"
|
||||
namespace milvus::dog_segment {
|
||||
|
||||
// we don't use std::array because capacity of concurrent_vector wastes too much memory
|
||||
|
@ -18,7 +19,7 @@ namespace milvus::dog_segment {
|
|||
// }
|
||||
// FixedVector(const FixedVector<Type>& placeholder_vec)
|
||||
// : std::vector<Type>(placeholder_vec.placeholder_size_), is_placeholder_(false) {
|
||||
// // assert(placeholder_vec.is_placeholder_);
|
||||
// // Assert(placeholder_vec.is_placeholder_);
|
||||
// }
|
||||
// FixedVector(FixedVector<Type>&&) = delete;
|
||||
//
|
||||
|
@ -58,14 +59,14 @@ class ThreadSafeVector {
|
|||
}
|
||||
const Type&
|
||||
operator[](int64_t index) const {
|
||||
assert(index < size_);
|
||||
Assert(index < size_);
|
||||
std::shared_lock lck(mutex_);
|
||||
return vec_[index];
|
||||
}
|
||||
|
||||
Type&
|
||||
operator[](int64_t index) {
|
||||
assert(index < size_);
|
||||
Assert(index < size_);
|
||||
std::shared_lock lck(mutex_);
|
||||
return vec_[index];
|
||||
}
|
||||
|
@ -105,7 +106,7 @@ class ConcurrentVector : public VectorBase {
|
|||
public:
|
||||
|
||||
explicit ConcurrentVector(ssize_t dim = 1) : Dim(is_scalar ? 1 : dim), SizePerChunk(Dim * ElementsPerChunk) {
|
||||
assert(is_scalar ? dim == 1 : dim != 1);
|
||||
Assert(is_scalar ? dim == 1 : dim != 1);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -171,7 +172,7 @@ class ConcurrentVector : public VectorBase {
|
|||
|
||||
const Type&
|
||||
operator[](ssize_t element_index) const {
|
||||
assert(Dim == 1);
|
||||
Assert(Dim == 1);
|
||||
auto chunk_id = element_index / ElementsPerChunk;
|
||||
auto chunk_offset = element_index % ElementsPerChunk;
|
||||
return get_chunk(chunk_id)[chunk_offset];
|
||||
|
@ -190,7 +191,7 @@ class ConcurrentVector : public VectorBase {
|
|||
return;
|
||||
}
|
||||
auto chunk_max_size = chunks_.size();
|
||||
assert(chunk_id < chunk_max_size);
|
||||
Assert(chunk_id < chunk_max_size);
|
||||
Chunk& chunk = chunks_[chunk_id];
|
||||
auto ptr = chunk.data();
|
||||
std::copy_n(source + source_offset * Dim, element_count * Dim, ptr + chunk_offset * Dim);
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
#pragma once
|
||||
#include <string_view>
|
||||
|
||||
namespace milvus::impl {
|
||||
inline
|
||||
void EasyAssertInfo(bool value, std::string_view expr_str, std::string_view filename, int lineno,
|
||||
std::string_view extra_info) {
|
||||
if (!value) {
|
||||
std::string info;
|
||||
info += "Assert \"" + std::string(expr_str) + "\"";
|
||||
info += " at " + std::string(filename) + ":" + std::to_string(lineno);
|
||||
info += " => " + std::string(extra_info);
|
||||
throw std::runtime_error(info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#define AssertInfo(expr, info) impl::EasyAssertInfo(bool(expr), #expr, __FILE__, __LINE__, (info))
|
||||
#define Assert(expr) AssertInfo((expr), "")
|
|
@ -19,7 +19,7 @@ IndexMeta::AddEntry(const std::string& index_name, const std::string& field_name
|
|||
throw std::invalid_argument("duplicate index_name");
|
||||
}
|
||||
// TODO: support multiple indexes for single field
|
||||
assert(!lookups_.count(field_name));
|
||||
Assert(!lookups_.count(field_name));
|
||||
lookups_[field_name] = index_name;
|
||||
entries_[index_name] = std::move(entry);
|
||||
|
||||
|
@ -28,7 +28,7 @@ IndexMeta::AddEntry(const std::string& index_name, const std::string& field_name
|
|||
|
||||
Status
|
||||
IndexMeta::DropEntry(const std::string& index_name) {
|
||||
assert(entries_.count(index_name));
|
||||
Assert(entries_.count(index_name));
|
||||
auto entry = std::move(entries_[index_name]);
|
||||
if(lookups_[entry.field_name] == index_name) {
|
||||
lookups_.erase(entry.field_name);
|
||||
|
@ -46,9 +46,9 @@ void IndexMeta::VerifyEntry(const Entry &entry) {
|
|||
auto& field_meta = schema[entry.field_name];
|
||||
// TODO checking
|
||||
if(field_meta.is_vector()) {
|
||||
assert(entry.type == knowhere::IndexEnum::INDEX_FAISS_IVFPQ);
|
||||
Assert(entry.type == knowhere::IndexEnum::INDEX_FAISS_IVFPQ);
|
||||
} else {
|
||||
assert(false);
|
||||
Assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,9 @@ class IndexMeta {
|
|||
}
|
||||
|
||||
const Entry& lookup_by_field(const std::string& field_name) {
|
||||
AssertInfo(lookups_.count(field_name), field_name);
|
||||
auto index_name = lookups_.at(field_name);
|
||||
AssertInfo(entries_.count(index_name), index_name);
|
||||
return entries_.at(index_name);
|
||||
}
|
||||
private:
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
// #include "knowhere/index/Index.h"
|
||||
#include "utils/Status.h"
|
||||
#include "dog_segment/IndexMeta.h"
|
||||
#include "EasyAssert.h"
|
||||
|
||||
namespace milvus::dog_segment {
|
||||
using Timestamp = uint64_t; // TODO: use TiKV-like timestamp
|
||||
|
@ -40,7 +41,7 @@ field_sizeof(DataType data_type, int dim = 1) {
|
|||
case DataType::VECTOR_FLOAT:
|
||||
return sizeof(float) * dim;
|
||||
case DataType::VECTOR_BINARY: {
|
||||
assert(dim % 8 == 0);
|
||||
Assert(dim % 8 == 0);
|
||||
return dim / 8;
|
||||
}
|
||||
default: {
|
||||
|
@ -62,7 +63,7 @@ struct FieldMeta {
|
|||
|
||||
bool
|
||||
is_vector() const {
|
||||
assert(type_ != DataType::NONE);
|
||||
Assert(type_ != DataType::NONE);
|
||||
return type_ == DataType::VECTOR_BINARY || type_ == DataType::VECTOR_FLOAT;
|
||||
}
|
||||
|
||||
|
@ -141,6 +142,8 @@ class Schema {
|
|||
|
||||
const FieldMeta&
|
||||
operator[](int field_index) const {
|
||||
Assert(field_index >= 0);
|
||||
Assert(field_index < fields_.size());
|
||||
return fields_[field_index];
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#include <knowhere/index/vector_index/VecIndexFactory.h>
|
||||
#include <faiss/utils/distances.h>
|
||||
|
||||
|
||||
namespace milvus::dog_segment {
|
||||
int
|
||||
TestABI() {
|
||||
|
@ -18,7 +17,6 @@ TestABI() {
|
|||
|
||||
std::unique_ptr<SegmentBase>
|
||||
CreateSegment(SchemaPtr schema) {
|
||||
|
||||
auto segment = std::make_unique<SegmentNaive>(schema);
|
||||
return segment;
|
||||
}
|
||||
|
@ -26,10 +24,10 @@ CreateSegment(SchemaPtr schema) {
|
|||
SegmentNaive::Record::Record(const Schema &schema) : uids_(1), timestamps_(1) {
|
||||
for (auto &field : schema) {
|
||||
if (field.is_vector()) {
|
||||
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
entity_vec_.emplace_back(std::make_shared<ConcurrentVector<float>>(field.get_dim()));
|
||||
} else {
|
||||
assert(field.get_data_type() == DataType::INT32);
|
||||
Assert(field.get_data_type() == DataType::INT32);
|
||||
entity_vec_.emplace_back(std::make_shared<ConcurrentVector<int32_t, true>>());
|
||||
}
|
||||
}
|
||||
|
@ -73,7 +71,7 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
|
|||
for (auto iter = iter_b; iter != iter_e; ++iter) {
|
||||
auto offset = iter->second;
|
||||
if (record_.timestamps_[offset] < query_timestamp) {
|
||||
assert(offset < insert_barrier);
|
||||
Assert(offset < insert_barrier);
|
||||
the_offset = std::max(the_offset, offset);
|
||||
}
|
||||
}
|
||||
|
@ -102,7 +100,7 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
|
|||
continue;
|
||||
}
|
||||
if (record_.timestamps_[offset] < query_timestamp) {
|
||||
assert(offset < insert_barrier);
|
||||
Assert(offset < insert_barrier);
|
||||
the_offset = std::max(the_offset, offset);
|
||||
}
|
||||
}
|
||||
|
@ -123,12 +121,13 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
|
|||
Status
|
||||
SegmentNaive::Insert(int64_t reserved_begin, int64_t size, const int64_t *uids_raw, const Timestamp *timestamps_raw,
|
||||
const DogDataChunk &entities_raw) {
|
||||
assert(entities_raw.count == size);
|
||||
Assert(entities_raw.count == size);
|
||||
if (entities_raw.sizeof_per_row != schema_->get_total_sizeof()) {
|
||||
std::string msg = "entity length = " + std::to_string(entities_raw.sizeof_per_row) +
|
||||
", schema length = " + std::to_string(schema_->get_total_sizeof());
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
|
||||
auto raw_data = reinterpret_cast<const char *>(entities_raw.raw_data);
|
||||
// std::vector<char> entities(raw_data, raw_data + size * len_per_row);
|
||||
|
||||
|
@ -185,13 +184,13 @@ SegmentNaive::Insert(int64_t reserved_begin, int64_t size, const int64_t *uids_r
|
|||
// go.detach();
|
||||
// const auto& schema = *schema_;
|
||||
// auto record_ptr = GetMutableRecord();
|
||||
// assert(record_ptr);
|
||||
// Assert(record_ptr);
|
||||
// auto& record = *record_ptr;
|
||||
// auto data_chunk = ColumnBasedDataChunk::from(row_values, schema);
|
||||
//
|
||||
// // TODO: use shared_lock for better concurrency
|
||||
// std::lock_guard lck(mutex_);
|
||||
// assert(state_ == SegmentState::Open);
|
||||
// Assert(state_ == SegmentState::Open);
|
||||
// auto ack_id = ack_count_.load();
|
||||
// record.uids_.grow_by(primary_keys, primary_keys + size);
|
||||
// for (int64_t i = 0; i < size; ++i) {
|
||||
|
@ -263,19 +262,20 @@ SegmentNaive::QueryImpl(query::QueryPtr query_info, Timestamp timestamp, QueryRe
|
|||
auto ins_barrier = get_barrier(record_, timestamp);
|
||||
auto del_barrier = get_barrier(deleted_record_, timestamp);
|
||||
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier, true);
|
||||
assert(bitmap_holder);
|
||||
assert(bitmap_holder->bitmap_ptr->capacity() == ins_barrier);
|
||||
Assert(bitmap_holder);
|
||||
Assert(bitmap_holder->bitmap_ptr->capacity() == ins_barrier);
|
||||
|
||||
auto field_offset = schema_->get_offset(query_info->field_name);
|
||||
auto &field = schema_->operator[](query_info->field_name);
|
||||
|
||||
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
auto dim = field.get_dim();
|
||||
auto bitmap = bitmap_holder->bitmap_ptr;
|
||||
auto topK = query_info->topK;
|
||||
auto num_queries = query_info->num_queries;
|
||||
auto the_offset_opt = schema_->get_offset(query_info->field_name);
|
||||
assert(the_offset_opt.has_value());
|
||||
Assert(the_offset_opt.has_value());
|
||||
Assert(the_offset_opt.value() < record_.entity_vec_.size());
|
||||
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<float>>(record_.entity_vec_.at(the_offset_opt.value()));
|
||||
auto index_entry = index_meta_->lookup_by_field(query_info->field_name);
|
||||
auto conf = index_entry.config;
|
||||
|
@ -354,10 +354,10 @@ SegmentNaive::QueryBruteForceImpl(query::QueryPtr query_info, Timestamp timestam
|
|||
auto ins_barrier = get_barrier(record_, timestamp);
|
||||
auto del_barrier = get_barrier(deleted_record_, timestamp);
|
||||
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
|
||||
assert(bitmap_holder);
|
||||
Assert(bitmap_holder);
|
||||
|
||||
auto &field = schema_->operator[](query_info->field_name);
|
||||
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
auto dim = field.get_dim();
|
||||
auto bitmap = bitmap_holder->bitmap_ptr;
|
||||
auto topK = query_info->topK;
|
||||
|
@ -366,7 +366,8 @@ SegmentNaive::QueryBruteForceImpl(query::QueryPtr query_info, Timestamp timestam
|
|||
// TODO: optimize
|
||||
|
||||
auto the_offset_opt = schema_->get_offset(query_info->field_name);
|
||||
assert(the_offset_opt.has_value());
|
||||
Assert(the_offset_opt.has_value());
|
||||
Assert(the_offset_opt.value() < record_.entity_vec_.size());
|
||||
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<float>>(record_.entity_vec_.at(the_offset_opt.value()));
|
||||
|
||||
std::vector<int64_t> final_uids(total_count);
|
||||
|
@ -415,17 +416,18 @@ SegmentNaive::QuerySlowImpl(query::QueryPtr query_info, Timestamp timestamp, Que
|
|||
auto ins_barrier = get_barrier(record_, timestamp);
|
||||
auto del_barrier = get_barrier(deleted_record_, timestamp);
|
||||
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
|
||||
assert(bitmap_holder);
|
||||
Assert(bitmap_holder);
|
||||
|
||||
auto &field = schema_->operator[](query_info->field_name);
|
||||
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
auto dim = field.get_dim();
|
||||
auto bitmap = bitmap_holder->bitmap_ptr;
|
||||
auto topK = query_info->topK;
|
||||
auto num_queries = query_info->num_queries;
|
||||
// TODO: optimize
|
||||
auto the_offset_opt = schema_->get_offset(query_info->field_name);
|
||||
assert(the_offset_opt.has_value());
|
||||
Assert(the_offset_opt.has_value());
|
||||
Assert(the_offset_opt.value() < record_.entity_vec_.size());
|
||||
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<float>>(record_.entity_vec_.at(the_offset_opt.value()));
|
||||
std::vector<std::priority_queue<std::pair<float, int>>> records(num_queries);
|
||||
|
||||
|
@ -521,7 +523,7 @@ SegmentNaive::Close() {
|
|||
template<typename Type>
|
||||
knowhere::IndexPtr SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry &entry) {
|
||||
auto offset_opt = schema_->get_offset(entry.field_name);
|
||||
assert(offset_opt.has_value());
|
||||
Assert(offset_opt.has_value());
|
||||
auto offset = offset_opt.value();
|
||||
auto field = (*schema_)[offset];
|
||||
auto dim = field.get_dim();
|
||||
|
@ -563,8 +565,8 @@ SegmentNaive::BuildIndex(IndexMetaPtr remote_index_meta) {
|
|||
}
|
||||
}
|
||||
|
||||
assert(dim != 0);
|
||||
assert(!index_field_name.empty());
|
||||
Assert(dim != 0);
|
||||
Assert(!index_field_name.empty());
|
||||
|
||||
auto index_meta = std::make_shared<IndexMeta>(schema_);
|
||||
// TODO: this is merge of query conf and insert conf
|
||||
|
@ -587,19 +589,21 @@ SegmentNaive::BuildIndex(IndexMetaPtr remote_index_meta) {
|
|||
if(record_.ack_responder_.GetAck() < 1024 * 4) {
|
||||
return Status(SERVER_BUILD_INDEX_ERROR, "too few elements");
|
||||
}
|
||||
|
||||
index_meta_ = remote_index_meta;
|
||||
for (auto&[index_name, entry]: index_meta_->get_entries()) {
|
||||
assert(entry.index_name == index_name);
|
||||
Assert(entry.index_name == index_name);
|
||||
const auto &field = (*schema_)[entry.field_name];
|
||||
|
||||
if (field.is_vector()) {
|
||||
assert(field.get_data_type() == engine::DataType::VECTOR_FLOAT);
|
||||
Assert(field.get_data_type() == engine::DataType::VECTOR_FLOAT);
|
||||
auto index_ptr = BuildVecIndexImpl<float>(entry);
|
||||
indexings_[index_name] = index_ptr;
|
||||
} else {
|
||||
throw std::runtime_error("unimplemented");
|
||||
}
|
||||
}
|
||||
|
||||
index_ready_ = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -610,7 +614,7 @@ SegmentNaive::GetMemoryUsageInBytes() {
|
|||
if(index_ready_) {
|
||||
auto& index_entries = index_meta_->get_entries();
|
||||
for(auto [index_name, entry]: index_entries) {
|
||||
assert(schema_->operator[](entry.field_name).is_vector());
|
||||
Assert(schema_->operator[](entry.field_name).is_vector());
|
||||
auto vec_ptr = std::static_pointer_cast<knowhere::VecIndex>(indexings_[index_name]);
|
||||
total_bytes += vec_ptr->IndexSize();
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "query/GeneralQuery.h"
|
||||
#include "utils/Status.h"
|
||||
#include "dog_segment/DeletedRecord.h"
|
||||
#include "EasyAssert.h"
|
||||
|
||||
namespace milvus::dog_segment {
|
||||
struct ColumnBasedDataChunk {
|
||||
|
@ -27,7 +28,7 @@ struct ColumnBasedDataChunk {
|
|||
auto align = source.sizeof_per_row;
|
||||
for (auto &field : schema) {
|
||||
auto len = field.get_sizeof();
|
||||
assert(len % sizeof(float) == 0);
|
||||
Assert(len % sizeof(float) == 0);
|
||||
std::vector<float> new_col(len * count / sizeof(float));
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
memcpy(new_col.data() + i * len / sizeof(float), raw_data + i * align, len);
|
||||
|
|
|
@ -53,8 +53,9 @@ func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error {
|
|||
if err != nil {
|
||||
log.Println("SegmentUnMarshal Failed")
|
||||
}
|
||||
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
|
||||
msg.ID(), m.SegementID)
|
||||
//fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
|
||||
// msg.ID(), m.SegementID)
|
||||
fmt.Println("Received SegmentStats -- segmentID:", m.SegementID, ",memSize:", m.MemorySize, ",memRate:", m.MemoryRate)
|
||||
ssChan <- m
|
||||
consumer.Ack(msg)
|
||||
}
|
||||
|
|
|
@ -3,10 +3,8 @@ package mock
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"time"
|
||||
|
||||
masterpb "github.com/czs007/suvlim/pkg/master/grpc/master"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
type SegmentStats struct {
|
||||
|
@ -59,7 +57,7 @@ type Segment struct {
|
|||
Rows int64 `json:"rows"`
|
||||
}
|
||||
|
||||
func NewSegment(id uint64, collectioID uint64, cName string, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment {
|
||||
func NewSegment(id uint64, collectioID uint64, cName string, ptag string, chStart int, chEnd int, openTime uint64, closeTime uint64) Segment {
|
||||
return Segment{
|
||||
SegmentID: id,
|
||||
CollectionID: collectioID,
|
||||
|
@ -67,8 +65,8 @@ func NewSegment(id uint64, collectioID uint64, cName string, ptag string, chStar
|
|||
PartitionTag: ptag,
|
||||
ChannelStart: chStart,
|
||||
ChannelEnd: chEnd,
|
||||
OpenTimeStamp: uint64(openTime.Unix()),
|
||||
CloseTimeStamp: uint64(closeTime.Unix()),
|
||||
OpenTimeStamp: openTime,
|
||||
CloseTimeStamp: closeTime,
|
||||
}
|
||||
}
|
||||
func Segment2JSON(s Segment) (string, error) {
|
||||
|
|
|
@ -61,16 +61,19 @@ func SegmentStatsController() {
|
|||
}
|
||||
}
|
||||
|
||||
func GetPhysicalTimeNow() uint64 {
|
||||
return uint64(time.Now().UnixNano() / int64(time.Millisecond))
|
||||
}
|
||||
|
||||
func ComputeCloseTime(segmentCloseLog *map[uint64]uint64, ss mock.SegmentStats, kvbase kv.Base) error {
|
||||
segmentID := ss.SegementID
|
||||
if _, ok := (*segmentCloseLog)[segmentID]; ok {
|
||||
// This segment has been closed
|
||||
log.Println("Segment", segmentID, "has been closed")
|
||||
return nil
|
||||
}
|
||||
|
||||
if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) {
|
||||
currentTime := time.Now()
|
||||
currentTime := GetPhysicalTimeNow()
|
||||
memRate := int(ss.MemoryRate)
|
||||
if memRate == 0 {
|
||||
//memRate = 1
|
||||
|
@ -80,34 +83,54 @@ func ComputeCloseTime(segmentCloseLog *map[uint64]uint64, ss mock.SegmentStats,
|
|||
sec := float64(conf.Config.Master.SegmentThreshole*0.2) / float64(memRate)
|
||||
data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID)))
|
||||
if err != nil {
|
||||
log.Println("Load segment failed")
|
||||
return err
|
||||
}
|
||||
seg, err := mock.JSON2Segment(data)
|
||||
if err != nil {
|
||||
log.Println("JSON2Segment failed")
|
||||
return err
|
||||
}
|
||||
segmentLogicTime := seg.CloseTimeStamp << 46 >> 46
|
||||
seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix()) << 18 + segmentLogicTime
|
||||
fmt.Println("memRate = ", memRate, ",sec = ", sec ,",Close time = ", seg.CloseTimeStamp)
|
||||
|
||||
seg.CloseTimeStamp = currentTime + uint64(sec * 1000)
|
||||
// Reduce time gap between Proxy and Master
|
||||
seg.CloseTimeStamp = seg.CloseTimeStamp + uint64(5 * 1000)
|
||||
fmt.Println("Close segment = ", seg.SegmentID, ",Close time = ", seg.CloseTimeStamp)
|
||||
|
||||
updateData, err := mock.Segment2JSON(*seg)
|
||||
if err != nil {
|
||||
log.Println("Update segment, Segment2JSON failed")
|
||||
return err
|
||||
}
|
||||
kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData)
|
||||
err = kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData)
|
||||
if err != nil {
|
||||
log.Println("Save segment failed")
|
||||
return err
|
||||
}
|
||||
|
||||
(*segmentCloseLog)[segmentID] = seg.CloseTimeStamp
|
||||
|
||||
//create new segment
|
||||
newSegID := id.New().Uint64()
|
||||
newSeg := mock.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0))
|
||||
newSeg := mock.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, 1 << 46 - 1)
|
||||
newSegData, err := mock.Segment2JSON(*&newSeg)
|
||||
if err != nil {
|
||||
log.Println("Create new segment, Segment2JSON failed")
|
||||
return err
|
||||
}
|
||||
|
||||
//save to kv store
|
||||
kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData)
|
||||
err = kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData)
|
||||
if err != nil {
|
||||
log.Println("Save segment failed")
|
||||
return err
|
||||
}
|
||||
|
||||
// update collection data
|
||||
c, _ := kvbase.Load("collection/" + strconv.Itoa(int(seg.CollectionID)))
|
||||
collection, err := mock.JSON2Collection(c)
|
||||
if err != nil {
|
||||
log.Println("JSON2Segment failed")
|
||||
return err
|
||||
}
|
||||
segIDs := collection.SegmentIDs
|
||||
|
@ -115,9 +138,14 @@ func ComputeCloseTime(segmentCloseLog *map[uint64]uint64, ss mock.SegmentStats,
|
|||
collection.SegmentIDs = segIDs
|
||||
cData, err := mock.Collection2JSON(*collection)
|
||||
if err != nil {
|
||||
log.Println("Collection2JSON failed")
|
||||
return err
|
||||
}
|
||||
err = kvbase.Save("collection/"+strconv.Itoa(int(seg.CollectionID)), cData)
|
||||
if err != nil {
|
||||
log.Println("Save collection failed")
|
||||
return err
|
||||
}
|
||||
kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), cData)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -234,8 +262,8 @@ func CollectionController(ch chan *messagepb.Mapping) {
|
|||
time.Now(), fieldMetas, []uint64{sID, s2ID},
|
||||
[]string{"default"})
|
||||
cm := mock.GrpcMarshal(&c)
|
||||
s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0))
|
||||
s2 := mock.NewSegment(s2ID, cID, collection.CollectionName, "default", 512, 1023, time.Now(), time.Unix(1<<36-1, 0))
|
||||
s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 511, GetPhysicalTimeNow(), 1 << 46 - 1)
|
||||
s2 := mock.NewSegment(s2ID, cID, collection.CollectionName, "default", 512, 1023, GetPhysicalTimeNow(), 1 << 46 - 1)
|
||||
collectionData, _ := mock.Collection2JSON(*cm)
|
||||
segmentData, err := mock.Segment2JSON(s)
|
||||
if err != nil {
|
||||
|
@ -270,37 +298,75 @@ func WriteCollection2Datastore(collection *messagepb.Mapping) error {
|
|||
})
|
||||
defer cli.Close()
|
||||
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
|
||||
sID := id.New().Uint64()
|
||||
|
||||
cID := id.New().Uint64()
|
||||
|
||||
fieldMetas := []*messagepb.FieldMeta{}
|
||||
if collection.Schema != nil {
|
||||
fieldMetas = collection.Schema.FieldMetas
|
||||
}
|
||||
|
||||
queryNodeNum := conf.Config.Master.QueryNodeNum
|
||||
topicNum := conf.Config.Pulsar.TopicNum
|
||||
var topicNumPerQueryNode int
|
||||
|
||||
if topicNum % queryNodeNum != 0 {
|
||||
topicNumPerQueryNode = topicNum / queryNodeNum + 1
|
||||
} else {
|
||||
topicNumPerQueryNode = topicNum / queryNodeNum
|
||||
}
|
||||
|
||||
fmt.Println("QueryNodeNum = ", queryNodeNum)
|
||||
fmt.Println("TopicNum = ", topicNum)
|
||||
fmt.Println("TopicNumPerQueryNode = ", topicNumPerQueryNode)
|
||||
|
||||
sIDs := make([]uint64, queryNodeNum)
|
||||
|
||||
for i := 0; i < queryNodeNum; i++ {
|
||||
// For generating different id
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
|
||||
sIDs[i] = id.New().Uint64()
|
||||
}
|
||||
|
||||
c := mock.NewCollection(cID, collection.CollectionName,
|
||||
time.Now(), fieldMetas, []uint64{sID},
|
||||
time.Now(), fieldMetas, sIDs,
|
||||
[]string{"default"})
|
||||
cm := mock.GrpcMarshal(&c)
|
||||
s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0))
|
||||
|
||||
collectionData, err := mock.Collection2JSON(*cm)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
segmentData, err := mock.Segment2JSON(s)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
|
||||
for i := 0; i < queryNodeNum; i++ {
|
||||
chStart := i * topicNumPerQueryNode
|
||||
chEnd := (i + 1) * topicNumPerQueryNode
|
||||
if chEnd > topicNum {
|
||||
chEnd = topicNum - 1
|
||||
}
|
||||
s := mock.NewSegment(sIDs[i], cID, collection.CollectionName, "default", chStart, chEnd, GetPhysicalTimeNow(), 1 << 46 - 1)
|
||||
|
||||
segmentData, err := mock.Segment2JSON(s)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = kvbase.Save("segment/"+strconv.FormatUint(sIDs[i], 10), segmentData)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
|
|
@ -36,9 +36,9 @@ func GetSegmentObjId(key string) string {
|
|||
func isCollectionObj(key string) bool {
|
||||
prefix := path.Join(conf.Config.Etcd.Rootpath, CollectonPrefix) + "/"
|
||||
prefix = strings.TrimSpace(prefix)
|
||||
println("prefix is :$", prefix)
|
||||
// println("prefix is :$", prefix)
|
||||
index := strings.Index(key, prefix)
|
||||
println("index is :", index)
|
||||
// println("index is :", index)
|
||||
return index == 0
|
||||
}
|
||||
|
||||
|
@ -54,8 +54,15 @@ func isSegmentChannelRangeInQueryNodeChannelRange(segment *mock.Segment) bool {
|
|||
log.Printf("Illegal segment channel range")
|
||||
return false
|
||||
}
|
||||
// TODO: add query node channel range check
|
||||
return true
|
||||
|
||||
var queryNodeChannelStart = conf.Config.Reader.TopicStart
|
||||
var queryNodeChannelEnd = conf.Config.Reader.TopicEnd
|
||||
|
||||
if segment.ChannelStart >= queryNodeChannelStart && segment.ChannelEnd <= queryNodeChannelEnd {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func printCollectionStruct(obj *mock.Collection) {
|
||||
|
@ -104,10 +111,9 @@ func (node *QueryNode) processSegmentCreate(id string, value string) {
|
|||
}
|
||||
printSegmentStruct(segment)
|
||||
|
||||
// TODO: fix this after channel range config finished
|
||||
//if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
|
||||
// return
|
||||
//}
|
||||
if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
|
||||
return
|
||||
}
|
||||
|
||||
collection := node.GetCollectionByID(segment.CollectionID)
|
||||
if collection != nil {
|
||||
|
@ -125,7 +131,7 @@ func (node *QueryNode) processSegmentCreate(id string, value string) {
|
|||
}
|
||||
|
||||
func (node *QueryNode) processCreate(key string, msg string) {
|
||||
println("process create", key, ":", msg)
|
||||
println("process create", key)
|
||||
if isCollectionObj(key) {
|
||||
objID := GetCollectionObjId(key)
|
||||
node.processCollectionCreate(objID, msg)
|
||||
|
@ -138,19 +144,18 @@ func (node *QueryNode) processCreate(key string, msg string) {
|
|||
}
|
||||
|
||||
func (node *QueryNode) processSegmentModify(id string, value string) {
|
||||
println("Modify Segment: ", id)
|
||||
// println("Modify Segment: ", id)
|
||||
|
||||
segment, err := mock.JSON2Segment(value)
|
||||
if err != nil {
|
||||
println("error of json 2 segment")
|
||||
println(err.Error())
|
||||
}
|
||||
printSegmentStruct(segment)
|
||||
// printSegmentStruct(segment)
|
||||
|
||||
// TODO: fix this after channel range config finished
|
||||
//if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
|
||||
// return
|
||||
//}
|
||||
if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
|
||||
return
|
||||
}
|
||||
|
||||
seg, err := node.GetSegmentBySegmentID(int64(segment.SegmentID)) // todo change to uint64
|
||||
if seg != nil {
|
||||
|
@ -159,13 +164,13 @@ func (node *QueryNode) processSegmentModify(id string, value string) {
|
|||
}
|
||||
|
||||
func (node *QueryNode) processCollectionModify(id string, value string) {
|
||||
println("Modify Collection: ", id)
|
||||
// println("Modify Collection: ", id)
|
||||
collection, err := mock.JSON2Collection(value)
|
||||
if err != nil {
|
||||
println("error of json 2 collection")
|
||||
println(err.Error())
|
||||
}
|
||||
printCollectionStruct(collection)
|
||||
// printCollectionStruct(collection)
|
||||
|
||||
goCollection := node.GetCollectionByID(collection.ID)
|
||||
if goCollection != nil {
|
||||
|
@ -175,7 +180,7 @@ func (node *QueryNode) processCollectionModify(id string, value string) {
|
|||
}
|
||||
|
||||
func (node *QueryNode) processModify(key string, msg string) {
|
||||
println("process modify")
|
||||
// println("process modify")
|
||||
if isCollectionObj(key) {
|
||||
objID := GetCollectionObjId(key)
|
||||
node.processCollectionModify(objID, msg)
|
||||
|
@ -214,7 +219,7 @@ func (node *QueryNode) processResp(resp clientv3.WatchResponse) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
println("processResp!!!!!\n")
|
||||
// println("processResp!!!!!\n")
|
||||
|
||||
for _, ev := range resp.Events {
|
||||
if ev.IsCreate() {
|
||||
|
|
|
@ -16,7 +16,6 @@ import "C"
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/czs007/suvlim/conf"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"log"
|
||||
"sort"
|
||||
|
@ -333,13 +332,17 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) {
|
|||
node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0]
|
||||
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
|
||||
fmt.Println("Do Search...")
|
||||
var status = node.Search(node.messageClient.SearchMsg)
|
||||
if status.ErrorCode != 0 {
|
||||
fmt.Println("Search Failed")
|
||||
node.PublishFailedSearchResult()
|
||||
for {
|
||||
if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync {
|
||||
var status = node.Search(node.messageClient.SearchMsg)
|
||||
if status.ErrorCode != 0 {
|
||||
fmt.Println("Search Failed")
|
||||
node.PublishFailedSearchResult()
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
default:
|
||||
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
|
@ -580,8 +583,8 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
|||
// So the ServiceTimeSync is always less than searchTimestamp.
|
||||
// Here, we manually make searchTimestamp's logic time minus `conf.Config.Timesync.Interval` milliseconds.
|
||||
// Which means `searchTimestamp.logicTime = searchTimestamp.logicTime - conf.Config.Timesync.Interval`.
|
||||
var logicTimestamp = searchTimestamp << 46 >> 46
|
||||
searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval)) << 18 + logicTimestamp
|
||||
// var logicTimestamp = searchTimestamp << 46 >> 46
|
||||
// searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval)) << 18 + logicTimestamp
|
||||
|
||||
var vector = msg.Records
|
||||
// We now only the first Json is valid.
|
||||
|
@ -590,7 +593,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
|||
// 1. Timestamp check
|
||||
// TODO: return or wait? Or adding graceful time
|
||||
if searchTimestamp > node.queryNodeTimeSync.ServiceTimeSync {
|
||||
fmt.Println("Invalid query time, timestamp = ", searchTimestamp, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync)
|
||||
fmt.Println("Invalid query time, timestamp = ", searchTimestamp >> 18, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync >> 18)
|
||||
return msgPb.Status{ErrorCode: 1}
|
||||
}
|
||||
|
||||
|
@ -599,6 +602,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
|||
|
||||
// 3. Do search in all segments
|
||||
for _, segment := range node.SegmentsMap {
|
||||
fmt.Println("Search in segment:", segment.SegmentId, ",segment rows:", segment.GetRowCount())
|
||||
var res, err = segment.SegmentSearch(query, searchTimestamp, vector)
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
|
|
|
@ -83,15 +83,16 @@ func (s *Segment) CloseSegment(collection* Collection) error {
|
|||
}
|
||||
|
||||
// Build index after closing segment
|
||||
s.SegmentStatus = SegmentIndexing
|
||||
fmt.Println("Building index...")
|
||||
s.buildIndex(collection)
|
||||
//s.SegmentStatus = SegmentIndexing
|
||||
//fmt.Println("Building index...")
|
||||
//s.buildIndex(collection)
|
||||
|
||||
// TODO: remove redundant segment indexed status
|
||||
// Change segment status to indexed
|
||||
s.SegmentStatus = SegmentIndexed
|
||||
fmt.Println("Segment closed and indexed")
|
||||
//s.SegmentStatus = SegmentIndexed
|
||||
//fmt.Println("Segment closed and indexed")
|
||||
|
||||
fmt.Println("Segment closed")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -19,11 +19,10 @@ func (node *QueryNode) SegmentsManagement() {
|
|||
for _, partition := range collection.Partitions {
|
||||
for _, segment := range partition.Segments {
|
||||
if segment.SegmentStatus != SegmentOpened {
|
||||
log.Println("Segment have been closed")
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Println("timeNow = ", timeNow, "SegmentCloseTime = ", segment.SegmentCloseTime)
|
||||
// fmt.Println("timeNow = ", timeNow, "SegmentCloseTime = ", segment.SegmentCloseTime)
|
||||
if timeNow >= segment.SegmentCloseTime {
|
||||
go segment.CloseSegment(collection)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue