Add sealed segment, segment type and manager, send loadIndexReq to service's buffer

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2021-01-19 10:29:16 +08:00 committed by yefu.chen
parent b874a55c35
commit aaf8b0ad40
15 changed files with 404 additions and 279 deletions

View File

@ -15,6 +15,12 @@
extern "C" {
#endif
enum SegmentType {
Invalid = 0,
Growing = 1,
Sealed = 2,
};
enum ErrorCode {
Success = 0,
UnexpectedException = 1,

View File

@ -28,7 +28,7 @@ extern "C" {
#include <stdint.h>
#include "segcore/collection_c.h"
#include "common/status_c.h"
#include "common/type_c.h"
typedef void* CIndex;
typedef void* CIndexQueryResult;

View File

@ -77,9 +77,9 @@ class SegmentGrowing : public SegmentInternalInterface {
get_deleted_count() const = 0;
};
using SegmentBasePtr = std::unique_ptr<SegmentGrowing>;
using SegmentGrowingPtr = std::unique_ptr<SegmentGrowing>;
SegmentBasePtr
SegmentGrowingPtr
CreateGrowingSegment(SchemaPtr schema, int64_t chunk_size = 32 * 1024);
} // namespace segcore

View File

@ -9,12 +9,14 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <memory>
#include "SegmentInterface.h"
#include "common/LoadInfo.h"
namespace milvus::segcore {
class SegmentSealed {
class SegmentSealed : public SegmentInterface {
public:
virtual const Schema&
get_schema() = 0;
@ -26,4 +28,11 @@ class SegmentSealed {
LoadFieldData(const LoadFieldDataInfo& info) = 0;
};
using SegmentSealedPtr = std::unique_ptr<SegmentSealed>;
SegmentSealedPtr
CreateSealedSegment(SchemaPtr schema, int64_t chunk_size = 32 * 1024) {
return nullptr;
}
} // namespace milvus::segcore

View File

@ -18,7 +18,7 @@ extern "C" {
#include <stdint.h>
#include "segcore/collection_c.h"
#include "common/status_c.h"
#include "common/type_c.h"
typedef void* CLoadIndexInfo;
typedef void* CBinarySet;

View File

@ -16,7 +16,7 @@ extern "C" {
#include <stdbool.h>
#include <stdint.h>
#include "segcore/collection_c.h"
#include "common/status_c.h"
#include "common/type_c.h"
typedef void* CPlan;
typedef void* CPlaceholderGroup;

View File

@ -16,7 +16,7 @@ extern "C" {
#include <stdbool.h>
#include <stdint.h>
#include "segcore/segment_c.h"
#include "common/status_c.h"
#include "common/type_c.h"
typedef void* CMarshaledHits;

View File

@ -10,29 +10,42 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <cstring>
#include <cstdint>
#include "segcore/SegmentGrowing.h"
#include "segcore/SegmentSealed.h"
#include "segcore/Collection.h"
#include "segcore/segment_c.h"
#include "common/LoadInfo.h"
#include "common/type_c.h"
#include <knowhere/index/vector_index/VecIndex.h>
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
#include <knowhere/index/vector_index/VecIndexFactory.h>
#include <cstdint>
#include <boost/concept_check.hpp>
#include "common/LoadInfo.h"
CSegmentBase
NewSegment(CCollection collection, uint64_t segment_id) {
CSegmentInterface
NewSegment(CCollection collection, uint64_t segment_id, int seg_type) {
auto col = (milvus::segcore::Collection*)collection;
auto segment = milvus::segcore::CreateGrowingSegment(col->get_schema());
std::unique_ptr<milvus::segcore::SegmentInterface> segment;
switch (seg_type) {
case Invalid:
std::cout << "invalid segment type" << std::endl;
break;
case Growing:
segment = milvus::segcore::CreateGrowingSegment(col->get_schema());
break;
case Sealed:
segment = milvus::segcore::CreateSealedSegment(col->get_schema());
break;
default:
std::cout << "invalid segment type" << std::endl;
}
std::cout << "create segment " << segment_id << std::endl;
return (void*)segment.release();
}
void
DeleteSegment(CSegmentBase segment) {
DeleteSegment(CSegmentInterface segment) {
auto s = (milvus::segcore::SegmentGrowing*)segment;
std::cout << "delete segment " << std::endl;
@ -48,7 +61,7 @@ DeleteQueryResult(CQueryResult query_result) {
//////////////////////////////////////////////////////////////////
CStatus
Insert(CSegmentBase c_segment,
Insert(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const int64_t* row_ids,
@ -79,15 +92,18 @@ Insert(CSegmentBase c_segment,
}
int64_t
PreInsert(CSegmentBase c_segment, int64_t size) {
PreInsert(CSegmentInterface c_segment, int64_t size) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
return segment->PreInsert(size);
}
CStatus
Delete(
CSegmentBase c_segment, int64_t reserved_offset, int64_t size, const int64_t* row_ids, const uint64_t* timestamps) {
Delete(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const int64_t* row_ids,
const uint64_t* timestamps) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
try {
@ -106,14 +122,15 @@ Delete(
}
int64_t
PreDelete(CSegmentBase c_segment, int64_t size) {
PreDelete(CSegmentInterface c_segment, int64_t size) {
// TODO: use dynamic cast, and return c status
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
return segment->PreDelete(size);
}
CStatus
Search(CSegmentBase c_segment,
Search(CSegmentInterface c_segment,
CPlan c_plan,
CPlaceholderGroup* c_placeholder_groups,
uint64_t* timestamps,
@ -153,7 +170,7 @@ Search(CSegmentBase c_segment,
}
CStatus
FillTargetEntry(CSegmentBase c_segment, CPlan c_plan, CQueryResult c_result) {
FillTargetEntry(CSegmentInterface c_segment, CPlan c_plan, CQueryResult c_result) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto plan = (milvus::query::Plan*)c_plan;
auto result = (milvus::QueryResult*)c_result;
@ -171,7 +188,7 @@ FillTargetEntry(CSegmentBase c_segment, CPlan c_plan, CQueryResult c_result) {
}
CStatus
UpdateSegmentIndex(CSegmentBase c_segment, CLoadIndexInfo c_load_index_info) {
UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info) {
auto status = CStatus();
try {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
@ -189,26 +206,26 @@ UpdateSegmentIndex(CSegmentBase c_segment, CLoadIndexInfo c_load_index_info) {
//////////////////////////////////////////////////////////////////
int
Close(CSegmentBase c_segment) {
Close(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto status = segment->Close();
return status.code();
}
int
BuildIndex(CCollection c_collection, CSegmentBase c_segment) {
BuildIndex(CCollection c_collection, CSegmentInterface c_segment) {
PanicInfo("unimplemented");
}
bool
IsOpened(CSegmentBase c_segment) {
IsOpened(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto status = segment->get_state();
return status == milvus::segcore::SegmentGrowing::SegmentState::Open;
}
int64_t
GetMemoryUsageInBytes(CSegmentBase c_segment) {
GetMemoryUsageInBytes(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto mem_size = segment->GetMemoryUsageInBytes();
return mem_size;
@ -217,14 +234,14 @@ GetMemoryUsageInBytes(CSegmentBase c_segment) {
//////////////////////////////////////////////////////////////////
int64_t
GetRowCount(CSegmentBase c_segment) {
GetRowCount(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto row_count = segment->get_row_count();
return row_count;
}
int64_t
GetDeletedCount(CSegmentBase c_segment) {
GetDeletedCount(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto deleted_count = segment->get_deleted_count();
return deleted_count;

View File

@ -9,6 +9,8 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
@ -17,26 +19,27 @@ extern "C" {
#include <stdlib.h>
#include <stdint.h>
#include "common/type_c.h"
#include "segcore/plan_c.h"
#include "segcore/load_index_c.h"
#include "common/status_c.h"
typedef void* CSegmentBase;
typedef void* CSegmentInterface;
typedef void* CQueryResult;
CSegmentBase
NewSegment(CCollection collection, uint64_t segment_id);
CSegmentInterface
NewSegment(CCollection collection, uint64_t segment_id, int seg_type);
void
DeleteSegment(CSegmentBase segment);
DeleteSegment(CSegmentInterface segment);
void
DeleteQueryResult(CQueryResult query_result);
//////////////////////////////////////////////////////////////////
// interface for growing segment
CStatus
Insert(CSegmentBase c_segment,
Insert(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const int64_t* row_ids,
@ -45,50 +48,65 @@ Insert(CSegmentBase c_segment,
int sizeof_per_row,
int64_t count);
// interface for growing segment
int64_t
PreInsert(CSegmentBase c_segment, int64_t size);
PreInsert(CSegmentInterface c_segment, int64_t size);
// interface for growing segment
CStatus
Delete(
CSegmentBase c_segment, int64_t reserved_offset, int64_t size, const int64_t* row_ids, const uint64_t* timestamps);
Delete(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const int64_t* row_ids,
const uint64_t* timestamps);
// interface for growing segment
int64_t
PreDelete(CSegmentBase c_segment, int64_t size);
PreDelete(CSegmentInterface c_segment, int64_t size);
// common interface
CStatus
Search(CSegmentBase c_segment,
Search(CSegmentInterface c_segment,
CPlan plan,
CPlaceholderGroup* placeholder_groups,
uint64_t* timestamps,
int num_groups,
CQueryResult* result);
// common interface
CStatus
FillTargetEntry(CSegmentBase c_segment, CPlan c_plan, CQueryResult result);
FillTargetEntry(CSegmentInterface c_segment, CPlan c_plan, CQueryResult result);
// deprecated
CStatus
UpdateSegmentIndex(CSegmentBase c_segment, CLoadIndexInfo c_load_index_info);
UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info);
//////////////////////////////////////////////////////////////////
// deprecated
int
Close(CSegmentBase c_segment);
Close(CSegmentInterface c_segment);
// deprecated
int
BuildIndex(CCollection c_collection, CSegmentBase c_segment);
BuildIndex(CCollection c_collection, CSegmentInterface c_segment);
// deprecated
bool
IsOpened(CSegmentBase c_segment);
IsOpened(CSegmentInterface c_segment);
// common interface
int64_t
GetMemoryUsageInBytes(CSegmentBase c_segment);
GetMemoryUsageInBytes(CSegmentInterface c_segment);
//////////////////////////////////////////////////////////////////
// common interface
int64_t
GetRowCount(CSegmentBase c_segment);
GetRowCount(CSegmentInterface c_segment);
// ???
int64_t
GetDeletedCount(CSegmentBase c_segment);
GetDeletedCount(CSegmentInterface c_segment);
#ifdef __cplusplus
}

View File

@ -52,7 +52,7 @@ TEST(CApiTest, GetCollectionNameTest) {
TEST(CApiTest, SegmentTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
DeleteCollection(collection);
DeleteSegment(segment);
}
@ -60,7 +60,7 @@ TEST(CApiTest, SegmentTest) {
TEST(CApiTest, InsertTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
@ -95,7 +95,7 @@ TEST(CApiTest, InsertTest) {
TEST(CApiTest, DeleteTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
long delete_row_ids[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
@ -112,7 +112,7 @@ TEST(CApiTest, DeleteTest) {
TEST(CApiTest, SearchTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
@ -201,7 +201,7 @@ TEST(CApiTest, SearchTest) {
// TEST(CApiTest, BuildIndexTest) {
// auto schema_tmp_conf = "";
// auto collection = NewCollection(schema_tmp_conf);
// auto segment = NewSegment(collection, 0);
// auto segment = NewSegment(collection, 0, 1);
//
// std::vector<char> raw_data;
// std::vector<uint64_t> timestamps;
@ -285,7 +285,7 @@ TEST(CApiTest, SearchTest) {
TEST(CApiTest, IsOpenedTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto is_opened = IsOpened(segment);
assert(is_opened);
@ -297,7 +297,7 @@ TEST(CApiTest, IsOpenedTest) {
TEST(CApiTest, CloseTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto status = Close(segment);
assert(status == 0);
@ -309,7 +309,7 @@ TEST(CApiTest, CloseTest) {
TEST(CApiTest, GetMemoryUsageInBytesTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto old_memory_usage_size = GetMemoryUsageInBytes(segment);
std::cout << "old_memory_usage_size = " << old_memory_usage_size << std::endl;
@ -428,7 +428,7 @@ generate_index(
// TEST(CApiTest, TestSearchPreference) {
// auto schema_tmp_conf = "";
// auto collection = NewCollection(schema_tmp_conf);
// auto segment = NewSegment(collection, 0);
// auto segment = NewSegment(collection, 0, 1);
//
// auto beg = chrono::high_resolution_clock::now();
// auto next = beg;
@ -547,7 +547,7 @@ generate_index(
TEST(CApiTest, GetDeletedCountTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
long delete_row_ids[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
@ -568,7 +568,7 @@ TEST(CApiTest, GetDeletedCountTest) {
TEST(CApiTest, GetRowCountTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
int N = 10000;
auto [raw_data, timestamps, uids] = generate_data(N);
@ -592,7 +592,7 @@ TEST(CApiTest, GetRowCountTest) {
// "\u003e\ncreate_time: 1600416765\nsegment_ids: 6873737669791618215\npartition_tags: \"default\"\n";
//
// auto collection = NewCollection(schema_string.data());
// auto segment = NewSegment(collection, 0);
// auto segment = NewSegment(collection, 0, 1);
// DeleteCollection(collection);
// DeleteSegment(segment);
//}
@ -629,7 +629,7 @@ TEST(CApiTest, MergeInto) {
TEST(CApiTest, Reduce) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
@ -845,7 +845,7 @@ TEST(CApiTest, UpdateSegmentIndex_Without_Predicate) {
std::string schema_string = generate_collection_shema("L2", "16", false);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto N = 1000 * 1000;
auto dataset = DataGen(schema, N);
@ -970,7 +970,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Range) {
std::string schema_string = generate_collection_shema("L2", "16", false);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto N = 1000 * 1000;
auto dataset = DataGen(schema, N);
@ -1108,7 +1108,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Term) {
std::string schema_string = generate_collection_shema("L2", "16", false);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto N = 1000 * 1000;
auto dataset = DataGen(schema, N);
@ -1245,7 +1245,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Range) {
std::string schema_string = generate_collection_shema("JACCARD", "16", true);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto N = 1000 * 1000;
auto dataset = DataGen(schema, N);
@ -1384,7 +1384,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Term) {
std::string schema_string = generate_collection_shema("JACCARD", "16", true);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto N = 1000 * 1000;
auto dataset = DataGen(schema, N);

View File

@ -1,184 +0,0 @@
package querynode
import (
"errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) {
if node.searchService == nil || node.searchService.searchMsgStream == nil {
errMsg := "null search service or null search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
// add request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
// add result channel
producerChannels := []string{in.ResultChannelID}
resultStream.CreatePulsarProducers(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
if node.searchService == nil || node.searchService.searchMsgStream == nil {
errMsg := "null search service or null search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
// remove request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
// TODO: searchStream.RemovePulsarConsumers(producerChannels)
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
// remove result channel
producerChannels := []string{in.ResultChannelID}
// TODO: resultStream.RemovePulsarProducer(producerChannels)
resultStream.CreatePulsarProducers(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
errMsg := "null data sync service or null data manipulation stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for dm message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
// add request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := in.ChannelIDs
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
// TODO: support db
fieldIDs := in.FieldIDs
for _, segmentID := range in.SegmentIDs {
indexID := UniqueID(0) // TODO: ???
err := node.segManager.loadSegment(segmentID, &fieldIDs)
if err != nil {
// TODO: return or continue?
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
}
err = node.segManager.loadIndex(segmentID, indexID)
if err != nil {
// TODO: return or continue?
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
}
}
return nil, nil
}
func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
// TODO: implement
return nil, nil
}
func (node *QueryNode) GetPartitionState(in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) {
// TODO: implement
return nil, nil
}

View File

@ -26,6 +26,7 @@ type loadIndexService struct {
fieldIndexes map[string][]*internalpb2.IndexStats
fieldStatsChan chan []*internalpb2.FieldStats
loadIndexReqChan chan []msgstream.TsMsg
loadIndexMsgStream msgstream.MsgStream
queryNodeID UniqueID
@ -65,6 +66,9 @@ func newLoadIndexService(ctx context.Context, replica collectionReplica) *loadIn
var stream msgstream.MsgStream = loadIndexStream
// init index load requests channel size by message receive buffer size
indexLoadChanSize := receiveBufSize
return &loadIndexService{
ctx: ctx1,
cancel: cancel,
@ -74,15 +78,14 @@ func newLoadIndexService(ctx context.Context, replica collectionReplica) *loadIn
fieldIndexes: make(map[string][]*internalpb2.IndexStats),
fieldStatsChan: make(chan []*internalpb2.FieldStats, 1),
loadIndexReqChan: make(chan []msgstream.TsMsg, indexLoadChanSize),
loadIndexMsgStream: stream,
queryNodeID: Params.QueryNodeID,
}
}
func (lis *loadIndexService) start() {
lis.loadIndexMsgStream.Start()
func (lis *loadIndexService) consume() {
for {
select {
case <-lis.ctx.Done():
@ -93,7 +96,21 @@ func (lis *loadIndexService) start() {
log.Println("null msg pack")
continue
}
for _, msg := range messages.Msgs {
lis.loadIndexReqChan <- messages.Msgs
}
}
}
func (lis *loadIndexService) start() {
lis.loadIndexMsgStream.Start()
go lis.consume()
for {
select {
case <-lis.ctx.Done():
return
case messages := <-lis.loadIndexReqChan:
for _, msg := range messages {
err := lis.execute(msg)
if err != nil {
log.Println(err)

View File

@ -14,13 +14,13 @@ import "C"
import (
"context"
"errors"
"fmt"
"io"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
@ -42,7 +42,6 @@ type QueryNode struct {
queryNodeLoopCancel context.CancelFunc
QueryNodeID uint64
grpcServer *grpc.Server
replica collectionReplica
@ -70,7 +69,6 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) Node {
}
func newQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
ctx1, cancel := context.WithCancel(ctx)
q := &QueryNode{
queryNodeLoopCtx: ctx1,
@ -81,6 +79,7 @@ func newQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
metaService: nil,
searchService: nil,
statsService: nil,
segManager: nil,
}
var err error
@ -114,12 +113,15 @@ func newQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
func (node *QueryNode) Start() error {
// todo add connectMaster logic
// init services and manager
node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica)
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan)
node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.replica, node.loadIndexService.loadIndexReqChan)
// start services
go node.dataSyncService.start()
go node.searchService.start()
go node.metaService.start()
@ -152,5 +154,179 @@ func (node *QueryNode) Close() {
if node.closer != nil {
node.closer.Close()
}
}
func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) {
if node.searchService == nil || node.searchService.searchMsgStream == nil {
errMsg := "null search service or null search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
// add request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
// add result channel
producerChannels := []string{in.ResultChannelID}
resultStream.CreatePulsarProducers(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
if node.searchService == nil || node.searchService.searchMsgStream == nil {
errMsg := "null search service or null search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
// remove request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
// TODO: searchStream.RemovePulsarConsumers(producerChannels)
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
// remove result channel
producerChannels := []string{in.ResultChannelID}
// TODO: resultStream.RemovePulsarProducer(producerChannels)
resultStream.CreatePulsarProducers(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
errMsg := "null data sync service or null data manipulation stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for dm message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
// add request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := in.ChannelIDs
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
// TODO: support db
fieldIDs := in.FieldIDs
for _, segmentID := range in.SegmentIDs {
indexID := UniqueID(0) // TODO: ???
err := node.segManager.loadSegment(segmentID, &fieldIDs)
if err != nil {
// TODO: return or continue?
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
}
err = node.segManager.loadIndex(segmentID, indexID)
if err != nil {
// TODO: return or continue?
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
}
}
return nil, nil
}
func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
// TODO: implement
return nil, nil
}
func (node *QueryNode) GetPartitionState(in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) {
// TODO: implement
return nil, nil
}

View File

@ -25,7 +25,8 @@ import (
type indexParam = map[string]string
type Segment struct {
segmentPtr C.CSegmentBase
segmentPtr C.CSegmentInterface
segmentType C.enum_SegmentType
segmentID UniqueID
partitionTag string // TODO: use partitionID
collectionID UniqueID
@ -58,11 +59,14 @@ func (s *Segment) GetRecentlyModified() bool {
//-------------------------------------------------------------------------------------- constructor and destructor
func newSegment(collection *Collection, segmentID int64, partitionTag string, collectionID UniqueID) *Segment {
/*
CSegmentBase
newSegment(CPartition partition, unsigned long segment_id);
CSegmentInterface
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
*/
initIndexParam := make(map[int64]indexParam)
segmentPtr := C.NewSegment(collection.collectionPtr, C.ulong(segmentID))
// TODO: replace by param
//var segmentType C.enum_SegmentType = C.Growing
var segmentType C.int = 1
segmentPtr := C.NewSegment(collection.collectionPtr, C.ulong(segmentID), segmentType)
var newSegment = &Segment{
segmentPtr: segmentPtr,
segmentID: segmentID,
@ -77,7 +81,7 @@ func newSegment(collection *Collection, segmentID int64, partitionTag string, co
func deleteSegment(segment *Segment) {
/*
void
deleteSegment(CSegmentBase segment);
deleteSegment(CSegmentInterface segment);
*/
cPtr := segment.segmentPtr
C.DeleteSegment(cPtr)
@ -87,7 +91,7 @@ func deleteSegment(segment *Segment) {
func (s *Segment) getRowCount() int64 {
/*
long int
getRowCount(CSegmentBase c_segment);
getRowCount(CSegmentInterface c_segment);
*/
var rowCount = C.GetRowCount(s.segmentPtr)
return int64(rowCount)
@ -96,7 +100,7 @@ func (s *Segment) getRowCount() int64 {
func (s *Segment) getDeletedCount() int64 {
/*
long int
getDeletedCount(CSegmentBase c_segment);
getDeletedCount(CSegmentInterface c_segment);
*/
var deletedCount = C.GetDeletedCount(s.segmentPtr)
return int64(deletedCount)
@ -105,7 +109,7 @@ func (s *Segment) getDeletedCount() int64 {
func (s *Segment) getMemSize() int64 {
/*
long int
GetMemoryUsageInBytes(CSegmentBase c_segment);
GetMemoryUsageInBytes(CSegmentInterface c_segment);
*/
var memoryUsageInBytes = C.GetMemoryUsageInBytes(s.segmentPtr)
@ -116,7 +120,7 @@ func (s *Segment) getMemSize() int64 {
func (s *Segment) segmentPreInsert(numOfRecords int) int64 {
/*
long int
PreInsert(CSegmentBase c_segment, long int size);
PreInsert(CSegmentInterface c_segment, long int size);
*/
var offset = C.PreInsert(s.segmentPtr, C.long(int64(numOfRecords)))
@ -126,7 +130,7 @@ func (s *Segment) segmentPreInsert(numOfRecords int) int64 {
func (s *Segment) segmentPreDelete(numOfRecords int) int64 {
/*
long int
PreDelete(CSegmentBase c_segment, long int size);
PreDelete(CSegmentInterface c_segment, long int size);
*/
var offset = C.PreDelete(s.segmentPtr, C.long(int64(numOfRecords)))
@ -137,7 +141,7 @@ func (s *Segment) segmentPreDelete(numOfRecords int) int64 {
func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp, records *[]*commonpb.Blob) error {
/*
CStatus
Insert(CSegmentBase c_segment,
Insert(CSegmentInterface c_segment,
long int reserved_offset,
signed long int size,
const long* primary_keys,
@ -190,7 +194,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp) error {
/*
CStatus
Delete(CSegmentBase c_segment,
Delete(CSegmentInterface c_segment,
long int reserved_offset,
long size,
const long* primary_keys,

View File

@ -6,24 +6,56 @@ import (
"fmt"
"github.com/zilliztech/milvus-distributed/internal/kv"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/querynode/client"
"github.com/zilliztech/milvus-distributed/internal/storage"
)
type segmentManager struct {
replica collectionReplica
loadIndexReqChan chan []msgstream.TsMsg
// TODO: replace by client instead of grpc client
dataClient datapb.DataServiceClient
indexBuilderClient indexpb.IndexServiceClient
queryNodeClient *client.Client
kv kv.Base // minio kv
iCodec storage.InsertCodec
kv kv.Base // minio kv
iCodec *storage.InsertCodec
}
func newSegmentManager(ctx context.Context, replica collectionReplica, loadIndexReqChan chan []msgstream.TsMsg) *segmentManager {
bucketName := Params.MinioBucketName
option := &miniokv.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSLStr,
BucketName: bucketName,
CreateBucket: true,
}
minioKV, err := miniokv.NewMinIOKV(ctx, option)
if err != nil {
panic(err)
}
return &segmentManager{
replica: replica,
loadIndexReqChan: loadIndexReqChan,
// TODO: init clients
dataClient: nil,
indexBuilderClient: nil,
kv: minioKV,
iCodec: &storage.InsertCodec{},
}
}
func (s *segmentManager) loadSegment(segmentID UniqueID, fieldIDs *[]int64) error {
@ -136,15 +168,45 @@ func (s *segmentManager) loadIndex(segmentID UniqueID, indexID UniqueID) error {
if !ok {
return errors.New(fmt.Sprint("cannot found index params in segment ", segmentID, " with field = ", vecFieldID))
}
err := s.queryNodeClient.LoadIndex(pathResponse.IndexFilePaths, segmentID, vecFieldID, "", targetIndexParam)
if err != nil {
return err
}
// non-blocking send
go s.sendLoadIndex(pathResponse.IndexFilePaths, segmentID, vecFieldID, "", targetIndexParam)
}
return nil
}
func (s *segmentManager) sendLoadIndex(indexPaths []string,
segmentID int64,
fieldID int64,
fieldName string,
indexParams map[string]string) {
var indexParamsKV []*commonpb.KeyValuePair
for key, value := range indexParams {
indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
loadIndexRequest := internalPb.LoadIndex{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSearchResult,
},
SegmentID: segmentID,
FieldName: fieldName,
FieldID: fieldID,
IndexPaths: indexPaths,
IndexParams: indexParamsKV,
}
loadIndexMsg := &msgstream.LoadIndexMsg{
LoadIndex: loadIndexRequest,
}
messages := []msgstream.TsMsg{loadIndexMsg}
s.loadIndexReqChan <- messages
}
func (s *segmentManager) releaseSegment(in *queryPb.ReleaseSegmentRequest) error {
// TODO: implement
// TODO: release specific field, we need segCore supply relevant interface