Add number limitation of partition and field. And Change default partition tag to "_default"

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2020-11-25 18:39:05 +08:00 committed by yefu.chen
parent f12366342f
commit f4c643f1bd
54 changed files with 373 additions and 165 deletions

View File

@ -56,8 +56,8 @@ verifiers: cppcheck fmt lint ruleguard
# Builds various components locally.
build-go:
@echo "Building each component's binary to './'"
@echo "Building reader ..."
@mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/reader $(PWD)/cmd/reader/reader.go 1>/dev/null
@echo "Building query node ..."
@mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null
@echo "Building master ..."
@mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null
@echo "Building proxy ..."
@ -72,7 +72,7 @@ build-cpp-with-unittest:
# Runs the tests.
unittest: test-cpp test-go
#TODO: proxy master reader writer's unittest
#TODO: proxy master query node writer's unittest
test-go:
@echo "Running go unittests..."
@(env bash $(PWD)/scripts/run_go_unittest.sh)
@ -83,14 +83,14 @@ test-cpp: build-cpp-with-unittest
#TODO: build each component to docker
docker: verifiers
@echo "Building reader docker image '$(TAG)'"
@echo "Building query node docker image '$(TAG)'"
@echo "Building proxy docker image '$(TAG)'"
@echo "Building master docker image '$(TAG)'"
# Builds each component and installs it to $GOPATH/bin.
install: all
@echo "Installing binary to './bin'"
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/reader $(GOPATH)/bin/reader
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/querynode $(GOPATH)/bin/querynode
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/master $(GOPATH)/bin/master
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/proxy $(GOPATH)/bin/proxy
@mkdir -p $(LIBRARY_PATH) && cp -f $(PWD)/internal/core/output/lib/* $(LIBRARY_PATH)
@ -100,6 +100,6 @@ clean:
@echo "Cleaning up all the generated files"
@find . -name '*.test' | xargs rm -fv
@find . -name '*~' | xargs rm -fv
@rm -rvf reader
@rm -rvf querynode
@rm -rvf master
@rm -rvf proxy

View File

@ -6,14 +6,14 @@ import (
"os/signal"
"syscall"
"github.com/zilliztech/milvus-distributed/internal/reader"
"github.com/zilliztech/milvus-distributed/internal/querynode"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
reader.Init()
querynode.Init()
sc := make(chan os.Signal, 1)
signal.Notify(sc,
@ -28,7 +28,7 @@ func main() {
cancel()
}()
reader.StartQueryNode(ctx)
querynode.StartQueryNode(ctx)
switch sig {
case syscall.SIGTERM:

View File

@ -20,4 +20,7 @@ master:
minIDAssignCnt: 1024
maxIDAssignCnt: 16384
# old name: segmentExpireDuration: 2000
IDAssignExpiration: 2000 # ms
IDAssignExpiration: 2000 # ms
maxPartitionNum: 4096
defaultPartitionTag: _default

View File

@ -28,3 +28,4 @@ proxy:
bufSize: 512
maxNameLength: 255
maxFieldNum: 64

View File

@ -9,7 +9,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
reader:
queryNode:
stats:
publishInterval: 1000 # milliseconds
@ -19,10 +19,6 @@ reader:
maxParallelism: 1024
msgStream:
dm: # TODO: rm dm
#streamBufSize: 1024 # msgPack chan buffer size
recvBufSize: 1024 # msgPack chan buffer size
pulsarBufSize: 1024 # pulsar chan buffer size
insert:
#streamBufSize: 1024 # msgPack chan buffer size
recvBufSize: 1024 # msgPack chan buffer size

View File

@ -18,6 +18,7 @@
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
#include <knowhere/index/vector_index/VecIndexFactory.h>
#include <cstdint>
#include <boost/concept_check.hpp>
CSegmentBase
NewSegment(CCollection collection, uint64_t segment_id) {
@ -41,7 +42,7 @@ DeleteSegment(CSegmentBase segment) {
//////////////////////////////////////////////////////////////////
int
CStatus
Insert(CSegmentBase c_segment,
int64_t reserved_offset,
int64_t size,
@ -57,11 +58,22 @@ Insert(CSegmentBase c_segment,
dataChunk.sizeof_per_row = sizeof_per_row;
dataChunk.count = count;
auto res = segment->Insert(reserved_offset, size, row_ids, timestamps, dataChunk);
try {
auto res = segment->Insert(reserved_offset, size, row_ids, timestamps, dataChunk);
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
return status;
} catch (std::runtime_error& e) {
auto status = CStatus();
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
return status;
}
// TODO: delete print
// std::cout << "do segment insert, sizeof_per_row = " << sizeof_per_row << std::endl;
return res.code();
}
int64_t
@ -73,13 +85,24 @@ PreInsert(CSegmentBase c_segment, int64_t size) {
return segment->PreInsert(size);
}
int
CStatus
Delete(
CSegmentBase c_segment, int64_t reserved_offset, int64_t size, const int64_t* row_ids, const uint64_t* timestamps) {
auto segment = (milvus::segcore::SegmentBase*)c_segment;
auto res = segment->Delete(reserved_offset, size, row_ids, timestamps);
return res.code();
try {
auto res = segment->Delete(reserved_offset, size, row_ids, timestamps);
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
return status;
} catch (std::runtime_error& e) {
auto status = CStatus();
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
return status;
}
}
int64_t
@ -91,7 +114,7 @@ PreDelete(CSegmentBase c_segment, int64_t size) {
return segment->PreDelete(size);
}
int
CStatus
Search(CSegmentBase c_segment,
CPlan c_plan,
CPlaceholderGroup* c_placeholder_groups,
@ -107,14 +130,22 @@ Search(CSegmentBase c_segment,
}
milvus::segcore::QueryResult query_result;
auto res = segment->Search(plan, placeholder_groups.data(), timestamps, num_groups, query_result);
auto status = CStatus();
try {
auto res = segment->Search(plan, placeholder_groups.data(), timestamps, num_groups, query_result);
status.error_code = Success;
status.error_msg = "";
} catch (std::runtime_error& e) {
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
}
// result_ids and result_distances have been allocated memory in goLang,
// so we don't need to malloc here.
memcpy(result_ids, query_result.result_ids_.data(), query_result.get_row_count() * sizeof(int64_t));
memcpy(result_distances, query_result.result_distances_.data(), query_result.get_row_count() * sizeof(float));
return res.code();
return status;
}
//////////////////////////////////////////////////////////////////

View File

@ -14,12 +14,24 @@ extern "C" {
#endif
#include <stdbool.h>
#include "segcore/collection_c.h"
#include "segcore/plan_c.h"
#include <stdlib.h>
#include <stdint.h>
#include "segcore/collection_c.h"
#include "segcore/plan_c.h"
typedef void* CSegmentBase;
enum ErrorCode {
Success = 0,
UnexpectedException = 1,
};
typedef struct CStatus {
int error_code;
const char* error_msg;
} CStatus;
CSegmentBase
NewSegment(CCollection collection, uint64_t segment_id);
@ -28,7 +40,7 @@ DeleteSegment(CSegmentBase segment);
//////////////////////////////////////////////////////////////////
int
CStatus
Insert(CSegmentBase c_segment,
int64_t reserved_offset,
int64_t size,
@ -41,14 +53,14 @@ Insert(CSegmentBase c_segment,
int64_t
PreInsert(CSegmentBase c_segment, int64_t size);
int
CStatus
Delete(
CSegmentBase c_segment, int64_t reserved_offset, int64_t size, const int64_t* row_ids, const uint64_t* timestamps);
int64_t
PreDelete(CSegmentBase c_segment, int64_t size);
int
CStatus
Search(CSegmentBase c_segment,
CPlan plan,
CPlaceholderGroup* placeholder_groups,

View File

@ -65,7 +65,7 @@ TEST(CApiTest, InsertTest) {
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
assert(res == 0);
assert(res.error_code == Success);
DeleteCollection(collection);
DeleteSegment(segment);
@ -82,7 +82,7 @@ TEST(CApiTest, DeleteTest) {
auto offset = PreDelete(segment, 3);
auto del_res = Delete(segment, offset, 3, delete_row_ids, delete_timestamps);
assert(del_res == 0);
assert(del_res.error_code == Success);
DeleteCollection(collection);
DeleteSegment(segment);
@ -116,7 +116,7 @@ TEST(CApiTest, SearchTest) {
auto offset = PreInsert(segment, N);
auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
assert(ins_res == 0);
assert(ins_res.error_code == Success);
const char* dsl_string = R"(
{
@ -163,7 +163,7 @@ TEST(CApiTest, SearchTest) {
float result_distances[100];
auto sea_res = Search(segment, plan, placeholderGroups.data(), timestamps.data(), 1, result_ids, result_distances);
assert(sea_res == 0);
assert(sea_res.error_code == Success);
DeletePlan(plan);
DeletePlaceholderGroup(placeholderGroup);
@ -199,7 +199,7 @@ TEST(CApiTest, BuildIndexTest) {
auto offset = PreInsert(segment, N);
auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
assert(ins_res == 0);
assert(ins_res.error_code == Success);
// TODO: add index ptr
Close(segment);
@ -250,7 +250,7 @@ TEST(CApiTest, BuildIndexTest) {
float result_distances[100];
auto sea_res = Search(segment, plan, placeholderGroups.data(), timestamps.data(), 1, result_ids, result_distances);
assert(sea_res == 0);
assert(sea_res.error_code == Success);
DeletePlan(plan);
DeletePlaceholderGroup(placeholderGroup);
@ -315,7 +315,7 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) {
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
assert(res == 0);
assert(res.error_code == Success);
auto memory_usage_size = GetMemoryUsageInBytes(segment);
@ -482,7 +482,7 @@ TEST(CApiTest, GetDeletedCountTest) {
auto offset = PreDelete(segment, 3);
auto del_res = Delete(segment, offset, 3, delete_row_ids, delete_timestamps);
assert(del_res == 0);
assert(del_res.error_code == Success);
// TODO: assert(deleted_count == len(delete_row_ids))
auto deleted_count = GetDeletedCount(segment);
@ -502,7 +502,7 @@ TEST(CApiTest, GetRowCountTest) {
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto offset = PreInsert(segment, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
assert(res == 0);
assert(res.error_code == Success);
auto row_count = GetRowCount(segment);
assert(row_count == N);

View File

@ -220,7 +220,7 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionMeta) error {
}
if len(coll.PartitionTags) == 0 {
coll.PartitionTags = append(coll.PartitionTags, "default")
coll.PartitionTags = append(coll.PartitionTags, Params.DefaultPartitionTag)
}
_, ok := mt.collName2ID[coll.Schema.Name]
if ok {
@ -292,6 +292,10 @@ func (mt *metaTable) AddPartition(collID UniqueID, tag string) error {
return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collID, 10))
}
// number of partition tags (except _default) should be limited to 4096 by default
if int64(len(coll.PartitionTags)) > Params.MaxPartitionNum {
return errors.New("maximum partition's number should be limit to " + strconv.FormatInt(Params.MaxPartitionNum, 10))
}
for _, t := range coll.PartitionTags {
if t == tag {
return errors.Errorf("partition already exists.")
@ -326,17 +330,29 @@ func (mt *metaTable) DeletePartition(collID UniqueID, tag string) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if tag == Params.DefaultPartitionTag {
return errors.New("default partition cannot be deleted")
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collID, 10))
}
// check tag exists
exist := false
pt := make([]string, 0, len(collMeta.PartitionTags))
for _, t := range collMeta.PartitionTags {
if t != tag {
pt = append(pt, t)
} else {
exist = true
}
}
if !exist {
return errors.New("partition " + tag + " does not exist")
}
if len(pt) == len(collMeta.PartitionTags) {
return nil
}

View File

@ -3,6 +3,7 @@ package master
import (
"context"
"reflect"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
@ -238,6 +239,10 @@ func TestMetaTable_DeletePartition(t *testing.T) {
assert.Equal(t, 1, len(meta.collName2ID))
assert.Equal(t, 1, len(meta.collID2Meta))
assert.Equal(t, 1, len(meta.segID2Meta))
// delete not exist
err = meta.DeletePartition(100, "not_exist")
assert.NotNil(t, err)
}
func TestMetaTable_Segment(t *testing.T) {
@ -366,3 +371,39 @@ func TestMetaTable_UpdateSegment(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, seg.NumRows, int64(210))
}
func TestMetaTable_AddPartition_Limit(t *testing.T) {
Init()
Params.MaxPartitionNum = 256 // adding 4096 partitions is too slow
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
meta, err := NewMetaTable(etcdKV)
assert.Nil(t, err)
defer meta.client.Close()
colMeta := pb.CollectionMeta{
ID: 100,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
}
err = meta.AddCollection(&colMeta)
assert.Nil(t, err)
for i := 0; i < int(Params.MaxPartitionNum); i++ {
err := meta.AddPartition(100, "partition_"+strconv.Itoa(i))
assert.Nil(t, err)
}
err = meta.AddPartition(100, "partition_limit")
assert.NotNil(t, err)
}

View File

@ -43,6 +43,9 @@ type ParamTable struct {
K2SChannelNames []string
QueryNodeStatsChannelName string
MsgChannelSubName string
MaxPartitionNum int64
DefaultPartitionTag string
}
var Params ParamTable
@ -91,6 +94,8 @@ func (p *ParamTable) Init() {
p.initK2SChannelNames()
p.initQueryNodeStatsChannelName()
p.initMsgChannelSubName()
p.initMaxPartitionNum()
p.initDefaultPartitionTag()
}
func (p *ParamTable) initAddress() {
@ -411,3 +416,24 @@ func (p *ParamTable) initK2SChannelNames() {
}
p.K2SChannelNames = channels
}
func (p *ParamTable) initMaxPartitionNum() {
str, err := p.Load("master.maxPartitionNum")
if err != nil {
panic(err)
}
maxPartitionNum, err := strconv.ParseInt(str, 10, 64)
if err != nil {
panic(err)
}
p.MaxPartitionNum = maxPartitionNum
}
func (p *ParamTable) initDefaultPartitionTag() {
defaultTag, err := p.Load("master.defaultPartitionTag")
if err != nil {
panic(err)
}
p.DefaultPartitionTag = defaultTag
}

View File

@ -191,10 +191,12 @@ func (t *showPartitionTask) Execute() error {
return errors.New("null request")
}
partitions := make([]string, 0)
for _, collection := range t.mt.collID2Meta {
partitions = append(partitions, collection.PartitionTags...)
collMeta, err := t.mt.GetCollectionByName(t.req.CollectionName.CollectionName)
if err != nil {
return err
}
partitions := make([]string, 0)
partitions = append(partitions, collMeta.PartitionTags...)
stringListResponse := servicepb.StringListResponse{
Status: &commonpb.Status{

View File

@ -60,6 +60,9 @@ func TestMaster_Partition(t *testing.T) {
K2SChannelNames: []string{"k2s0", "k2s1"},
QueryNodeStatsChannelName: "statistic",
MsgChannelSubName: Params.MsgChannelSubName,
MaxPartitionNum: int64(4096),
DefaultPartitionTag: "_default",
}
port := 10000 + rand.Intn(1000)
@ -212,7 +215,7 @@ func TestMaster_Partition(t *testing.T) {
//assert.Equal(t, collMeta.PartitionTags[0], "partition1")
//assert.Equal(t, collMeta.PartitionTags[1], "partition2")
assert.ElementsMatch(t, []string{"default", "partition1", "partition2"}, collMeta.PartitionTags)
assert.ElementsMatch(t, []string{"_default", "partition1", "partition2"}, collMeta.PartitionTags)
showPartitionReq := internalpb.ShowPartitionRequest{
MsgType: internalpb.MsgType_kShowPartitions,
@ -224,7 +227,7 @@ func TestMaster_Partition(t *testing.T) {
stringList, err := cli.ShowPartitions(ctx, &showPartitionReq)
assert.Nil(t, err)
assert.ElementsMatch(t, []string{"default", "partition1", "partition2"}, stringList.Values)
assert.ElementsMatch(t, []string{"_default", "partition1", "partition2"}, stringList.Values)
showPartitionReq = internalpb.ShowPartitionRequest{
MsgType: internalpb.MsgType_kShowPartitions,

View File

@ -261,6 +261,9 @@ func startupMaster() {
K2SChannelNames: []string{"k2s0", "k2s1"},
QueryNodeStatsChannelName: "statistic",
MsgChannelSubName: Params.MsgChannelSubName,
MaxPartitionNum: int64(4096),
DefaultPartitionTag: "_default",
}
master, err = CreateServer(ctx)

View File

@ -70,7 +70,7 @@ func (ms *PulsarMsgStream) CreatePulsarProducers(channels []string) {
for i := 0; i < len(channels); i++ {
pp, err := (*ms.client).CreateProducer(pulsar.ProducerOptions{Topic: channels[i]})
if err != nil {
log.Printf("Failed to create reader producer %s, error = %v", channels[i], err)
log.Printf("Failed to create querynode producer %s, error = %v", channels[i], err)
}
ms.producers = append(ms.producers, &pp)
}

View File

@ -463,3 +463,15 @@ func (pt *ParamTable) MaxNameLength() int64 {
}
return maxNameLength
}
func (pt *ParamTable) MaxFieldNum() int64 {
str, err := pt.Load("proxy.maxFieldNum")
if err != nil {
panic(err)
}
maxFieldNum, err := strconv.ParseInt(str, 10, 64)
if err != nil {
panic(err)
}
return maxFieldNum
}

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"log"
"strconv"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/allocator"
@ -164,6 +165,10 @@ func (cct *CreateCollectionTask) SetTs(ts Timestamp) {
}
func (cct *CreateCollectionTask) PreExecute() error {
if int64(len(cct.schema.Fields)) > Params.MaxFieldNum() {
return errors.New("maximum field's number should be limited to " + strconv.FormatInt(Params.MaxFieldNum(), 10))
}
// validate collection name
if err := ValidateCollectionName(cct.schema.Name); err != nil {
return err

View File

@ -68,7 +68,7 @@ func ValidatePartitionTag(partitionTag string, strictCheck bool) error {
if strictCheck {
firstChar := partitionTag[0]
if firstChar != '_' && !isAlpha(firstChar) {
if firstChar != '_' && !isAlpha(firstChar) && !isNumber(firstChar) {
msg := invalidMsg + "The first character of a partition tag must be an underscore or letter."
return errors.New(msg)
}

View File

@ -1,4 +1,4 @@
package reader
package querynode
/*

View File

@ -1,4 +1,4 @@
package reader
package querynode
/*

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"
@ -175,7 +175,7 @@ func TestDataSyncService_Start(t *testing.T) {
// pulsar produce
const receiveBufSize = 1024
producerChannels := []string{"insert"}
producerChannels := Params.insertChannelNames()
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarClient(pulsarURL)

View File

@ -1,4 +1,4 @@
package reader
package querynode
type deleteNode struct {
BaseNode

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"log"
@ -29,8 +29,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
// TODO: add error handling
}
// TODO: add time range check
var iMsg = insertMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
timeRange: TimeRange{

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"fmt"
@ -106,6 +106,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
if err != nil {
log.Println("cannot find segment:", segmentID)
// TODO: add error handling
wg.Done()
return
}
@ -116,8 +117,9 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
err = targetSegment.segmentInsert(offsets, &ids, &timestamps, &records)
if err != nil {
log.Println("insert failed")
log.Println(err)
// TODO: add error handling
wg.Done()
return
}

View File

@ -1,4 +1,4 @@
package reader
package querynode
type key2SegNode struct {
BaseNode

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"
@ -9,8 +9,8 @@ import (
)
func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
receiveBufSize := Params.dmReceiveBufSize()
pulsarBufSize := Params.dmPulsarBufSize()
receiveBufSize := Params.insertReceiveBufSize()
pulsarBufSize := Params.insertPulsarBufSize()
msgStreamURL, err := Params.pulsarAddress()
if err != nil {
@ -18,7 +18,7 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
}
consumeChannels := Params.insertChannelNames()
consumeSubName := "insertSub"
consumeSubName := Params.msgChannelSubName()
insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarClient(msgStreamURL)

View File

@ -1,4 +1,4 @@
package reader
package querynode
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"

View File

@ -1,4 +1,4 @@
package reader
package querynode
type schemaUpdateNode struct {
BaseNode

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"log"

View File

@ -1,4 +1,4 @@
package reader
package querynode
/*

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"
@ -111,8 +111,8 @@ func isSegmentChannelRangeInQueryNodeChannelRange(segment *etcdpb.SegmentMeta) b
}
Params.Init()
var queryNodeChannelStart = Params.topicStart()
var queryNodeChannelEnd = Params.topicEnd()
var queryNodeChannelStart = Params.insertChannelRange()[0]
var queryNodeChannelEnd = Params.insertChannelRange()[1]
if segment.ChannelStart >= int32(queryNodeChannelStart) && segment.ChannelEnd <= int32(queryNodeChannelEnd) {
return true
@ -167,6 +167,7 @@ func (mService *metaService) processSegmentCreate(id string, value string) {
seg := mService.segmentUnmarshal(value)
if !isSegmentChannelRangeInQueryNodeChannelRange(seg) {
log.Println("Illegal segment channel range")
return
}

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"
@ -93,7 +93,7 @@ func TestMetaService_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T)
CollectionID: UniqueID(0),
PartitionTag: "partition0",
ChannelStart: 0,
ChannelEnd: 128,
ChannelEnd: 1,
OpenTime: Timestamp(0),
CloseTime: Timestamp(math.MaxUint64),
NumRows: UniqueID(0),
@ -264,10 +264,9 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
PartitionTags: []string{"default"},
}
colMetaBlob, err := proto.Marshal(&collectionMeta)
assert.NoError(t, err)
colMetaBlob := proto.MarshalTextString(&collectionMeta)
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
err := (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
assert.NoError(t, err)
err = (*node.replica).addPartition(UniqueID(0), "default")
@ -276,7 +275,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
id := "0"
value := `partition_tag: "default"
channel_start: 0
channel_end: 128
channel_end: 1
close_time: 18446744073709551615
`
@ -331,7 +330,7 @@ func TestMetaService_processCreate(t *testing.T) {
key2 := "by-dev/segment/0"
msg2 := `partition_tag: "default"
channel_start: 0
channel_end: 128
channel_end: 1
close_time: 18446744073709551615
`
@ -388,10 +387,9 @@ func TestMetaService_processSegmentModify(t *testing.T) {
PartitionTags: []string{"default"},
}
colMetaBlob, err := proto.Marshal(&collectionMeta)
assert.NoError(t, err)
colMetaBlob := proto.MarshalTextString(&collectionMeta)
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
err := (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
assert.NoError(t, err)
err = (*node.replica).addPartition(UniqueID(0), "default")
@ -400,7 +398,7 @@ func TestMetaService_processSegmentModify(t *testing.T) {
id := "0"
value := `partition_tag: "default"
channel_start: 0
channel_end: 128
channel_end: 1
close_time: 18446744073709551615
`
@ -411,7 +409,7 @@ func TestMetaService_processSegmentModify(t *testing.T) {
newValue := `partition_tag: "default"
channel_start: 0
channel_end: 128
channel_end: 1
close_time: 18446744073709551615
`
@ -581,7 +579,7 @@ func TestMetaService_processModify(t *testing.T) {
key2 := "by-dev/segment/0"
msg2 := `partition_tag: "p1"
channel_start: 0
channel_end: 128
channel_end: 1
close_time: 18446744073709551615
`
@ -637,7 +635,7 @@ func TestMetaService_processModify(t *testing.T) {
msg4 := `partition_tag: "p1"
channel_start: 0
channel_end: 128
channel_end: 1
close_time: 18446744073709551615
`
@ -694,10 +692,9 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
PartitionTags: []string{"default"},
}
colMetaBlob, err := proto.Marshal(&collectionMeta)
assert.NoError(t, err)
colMetaBlob := proto.MarshalTextString(&collectionMeta)
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
err := (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
assert.NoError(t, err)
err = (*node.replica).addPartition(UniqueID(0), "default")
@ -706,7 +703,7 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
id := "0"
value := `partition_tag: "default"
channel_start: 0
channel_end: 128
channel_end: 1
close_time: 18446744073709551615
`
@ -810,7 +807,7 @@ func TestMetaService_processDelete(t *testing.T) {
key2 := "by-dev/segment/0"
msg2 := `partition_tag: "default"
channel_start: 0
channel_end: 128
channel_end: 1
close_time: 18446744073709551615
`

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"log"
@ -46,35 +46,37 @@ func (p *ParamTable) queryNodeID() int {
return id
}
// TODO: func (p *ParamTable) DmChannelRange() []int {
func (p *ParamTable) topicStart() int {
topicStart, err := p.Load("reader.topicstart")
func (p *ParamTable) insertChannelRange() []int {
insertChannelRange, err := p.Load("msgChannel.channelRange.insert")
if err != nil {
panic(err)
}
topicStartNum, err := strconv.Atoi(topicStart)
if err != nil {
panic(err)
}
return topicStartNum
}
func (p *ParamTable) topicEnd() int {
topicEnd, err := p.Load("reader.topicend")
channelRange := strings.Split(insertChannelRange, ",")
if len(channelRange) != 2 {
panic("Illegal channel range num")
}
channelBegin, err := strconv.Atoi(channelRange[0])
if err != nil {
panic(err)
}
topicEndNum, err := strconv.Atoi(topicEnd)
channelEnd, err := strconv.Atoi(channelRange[1])
if err != nil {
panic(err)
}
return topicEndNum
if channelBegin < 0 || channelEnd < 0 {
panic("Illegal channel range value")
}
if channelBegin > channelEnd {
panic("Illegal channel range value")
}
return []int{channelBegin, channelEnd}
}
// advanced params
// stats
func (p *ParamTable) statsPublishInterval() int {
timeInterval, err := p.Load("reader.stats.publishInterval")
timeInterval, err := p.Load("queryNode.stats.publishInterval")
if err != nil {
panic(err)
}
@ -87,7 +89,7 @@ func (p *ParamTable) statsPublishInterval() int {
// dataSync:
func (p *ParamTable) flowGraphMaxQueueLength() int32 {
queueLength, err := p.Load("reader.dataSync.flowGraph.maxQueueLength")
queueLength, err := p.Load("queryNode.dataSync.flowGraph.maxQueueLength")
if err != nil {
panic(err)
}
@ -99,7 +101,7 @@ func (p *ParamTable) flowGraphMaxQueueLength() int32 {
}
func (p *ParamTable) flowGraphMaxParallelism() int32 {
maxParallelism, err := p.Load("reader.dataSync.flowGraph.maxParallelism")
maxParallelism, err := p.Load("queryNode.dataSync.flowGraph.maxParallelism")
if err != nil {
panic(err)
}
@ -111,9 +113,8 @@ func (p *ParamTable) flowGraphMaxParallelism() int32 {
}
// msgStream
// TODO: func (p *ParamTable) insertStreamBufSize() int64
func (p *ParamTable) dmReceiveBufSize() int64 {
revBufSize, err := p.Load("reader.msgStream.dm.recvBufSize")
func (p *ParamTable) insertReceiveBufSize() int64 {
revBufSize, err := p.Load("queryNode.msgStream.insert.recvBufSize")
if err != nil {
panic(err)
}
@ -124,8 +125,8 @@ func (p *ParamTable) dmReceiveBufSize() int64 {
return int64(bufSize)
}
func (p *ParamTable) dmPulsarBufSize() int64 {
pulsarBufSize, err := p.Load("reader.msgStream.dm.pulsarBufSize")
func (p *ParamTable) insertPulsarBufSize() int64 {
pulsarBufSize, err := p.Load("queryNode.msgStream.insert.pulsarBufSize")
if err != nil {
panic(err)
}
@ -137,7 +138,7 @@ func (p *ParamTable) dmPulsarBufSize() int64 {
}
func (p *ParamTable) searchReceiveBufSize() int64 {
revBufSize, err := p.Load("reader.msgStream.search.recvBufSize")
revBufSize, err := p.Load("queryNode.msgStream.search.recvBufSize")
if err != nil {
panic(err)
}
@ -149,7 +150,7 @@ func (p *ParamTable) searchReceiveBufSize() int64 {
}
func (p *ParamTable) searchPulsarBufSize() int64 {
pulsarBufSize, err := p.Load("reader.msgStream.search.pulsarBufSize")
pulsarBufSize, err := p.Load("queryNode.msgStream.search.pulsarBufSize")
if err != nil {
panic(err)
}
@ -161,7 +162,7 @@ func (p *ParamTable) searchPulsarBufSize() int64 {
}
func (p *ParamTable) searchResultReceiveBufSize() int64 {
revBufSize, err := p.Load("reader.msgStream.searchResult.recvBufSize")
revBufSize, err := p.Load("queryNode.msgStream.searchResult.recvBufSize")
if err != nil {
panic(err)
}
@ -173,7 +174,7 @@ func (p *ParamTable) searchResultReceiveBufSize() int64 {
}
func (p *ParamTable) statsReceiveBufSize() int64 {
revBufSize, err := p.Load("reader.msgStream.stats.recvBufSize")
revBufSize, err := p.Load("queryNode.msgStream.stats.recvBufSize")
if err != nil {
panic(err)
}
@ -307,3 +308,20 @@ func (p *ParamTable) searchResultChannelNames() []string {
}
return channels
}
func (p *ParamTable) msgChannelSubName() string {
// TODO: subName = namePrefix + "-" + queryNodeID, queryNodeID is assigned by master
name, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
if err != nil {
log.Panic(err)
}
return name
}
func (p *ParamTable) statsChannelName() string {
channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
if err != nil {
panic(err)
}
return channels
}

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"strings"
@ -26,16 +26,12 @@ func TestParamTable_QueryNodeID(t *testing.T) {
assert.Equal(t, id, 0)
}
func TestParamTable_TopicStart(t *testing.T) {
func TestParamTable_insertChannelRange(t *testing.T) {
Params.Init()
topicStart := Params.topicStart()
assert.Equal(t, topicStart, 0)
}
func TestParamTable_TopicEnd(t *testing.T) {
Params.Init()
topicEnd := Params.topicEnd()
assert.Equal(t, topicEnd, 128)
channelRange := Params.insertChannelRange()
assert.Equal(t, len(channelRange), 2)
assert.Equal(t, channelRange[0], 0)
assert.Equal(t, channelRange[1], 1)
}
func TestParamTable_statsServiceTimeInterval(t *testing.T) {
@ -50,9 +46,9 @@ func TestParamTable_statsMsgStreamReceiveBufSize(t *testing.T) {
assert.Equal(t, bufSize, int64(64))
}
func TestParamTable_dmMsgStreamReceiveBufSize(t *testing.T) {
func TestParamTable_insertMsgStreamReceiveBufSize(t *testing.T) {
Params.Init()
bufSize := Params.dmReceiveBufSize()
bufSize := Params.insertReceiveBufSize()
assert.Equal(t, bufSize, int64(1024))
}
@ -74,9 +70,9 @@ func TestParamTable_searchPulsarBufSize(t *testing.T) {
assert.Equal(t, bufSize, int64(512))
}
func TestParamTable_dmPulsarBufSize(t *testing.T) {
func TestParamTable_insertPulsarBufSize(t *testing.T) {
Params.Init()
bufSize := Params.dmPulsarBufSize()
bufSize := Params.insertPulsarBufSize()
assert.Equal(t, bufSize, int64(1024))
}
@ -91,3 +87,36 @@ func TestParamTable_flowGraphMaxParallelism(t *testing.T) {
maxParallelism := Params.flowGraphMaxParallelism()
assert.Equal(t, maxParallelism, int32(1024))
}
func TestParamTable_insertChannelNames(t *testing.T) {
Params.Init()
names := Params.insertChannelNames()
assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "insert-0")
}
func TestParamTable_searchChannelNames(t *testing.T) {
Params.Init()
names := Params.searchChannelNames()
assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "search-0")
}
func TestParamTable_searchResultChannelNames(t *testing.T) {
Params.Init()
names := Params.searchResultChannelNames()
assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "searchResult-0")
}
func TestParamTable_msgChannelSubName(t *testing.T) {
Params.Init()
name := Params.msgChannelSubName()
assert.Equal(t, name, "queryNode")
}
func TestParamTable_statsChannelName(t *testing.T) {
Params.Init()
name := Params.statsChannelName()
assert.Equal(t, name, "query-node-stats")
}

View File

@ -1,4 +1,4 @@
package reader
package querynode
/*

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"

View File

@ -1,4 +1,4 @@
package reader
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include

View File

@ -1,4 +1,4 @@
package reader
package querynode
/*

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"

View File

@ -1,4 +1,4 @@
package reader
package querynode
import "C"
import (
@ -43,7 +43,7 @@ func newSearchService(ctx context.Context, replica *collectionReplica) *searchSe
}
consumeChannels := Params.searchChannelNames()
consumeSubName := "subSearch"
consumeSubName := Params.msgChannelSubName()
searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"

View File

@ -1,4 +1,4 @@
package reader
package querynode
/*
@ -109,7 +109,7 @@ func (s *Segment) segmentPreDelete(numOfRecords int) int64 {
//-------------------------------------------------------------------------------------- dm & search functions
func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp, records *[]*commonpb.Blob) error {
/*
int
CStatus
Insert(CSegmentBase c_segment,
long int reserved_offset,
signed long int size,
@ -148,8 +148,12 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
cSizeofPerRow,
cNumOfRows)
if status != 0 {
return errors.New("Insert failed, error code = " + strconv.Itoa(int(status)))
errorCode := status.error_code
if errorCode != 0 {
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
}
s.recentlyModified = true
@ -158,7 +162,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp) error {
/*
int
CStatus
Delete(CSegmentBase c_segment,
long int reserved_offset,
long size,
@ -172,8 +176,12 @@ func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps
var status = C.Delete(s.segmentPtr, cOffset, cSize, cEntityIdsPtr, cTimestampsPtr)
if status != 0 {
return errors.New("Delete failed, error code = " + strconv.Itoa(int(status)))
errorCode := status.error_code
if errorCode != 0 {
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
return errors.New("Delete failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
}
return nil
@ -187,7 +195,8 @@ func (s *Segment) segmentSearch(plan *Plan,
numQueries int64,
topK int64) error {
/*
void* Search(void* plan,
CStatus
Search(void* plan,
void* placeholder_groups,
uint64_t* timestamps,
int num_groups,
@ -211,16 +220,20 @@ func (s *Segment) segmentSearch(plan *Plan,
var cNumGroups = C.int(len(placeHolderGroups))
var status = C.Search(s.segmentPtr, plan.cPlan, cPlaceHolder, cTimestamp, cNumGroups, cNewResultIds, cNewResultDistances)
if status != 0 {
return errors.New("search failed, error code = " + strconv.Itoa(int(status)))
errorCode := status.error_code
if errorCode != 0 {
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
return errors.New("Search failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
}
cNumQueries := C.long(numQueries)
cTopK := C.long(topK)
// reduce search result
status = C.MergeInto(cNumQueries, cTopK, cResultDistances, cResultIds, cNewResultDistances, cNewResultIds)
if status != 0 {
return errors.New("merge search result failed, error code = " + strconv.Itoa(int(status)))
mergeStatus := C.MergeInto(cNumQueries, cTopK, cResultDistances, cResultIds, cNewResultDistances, cNewResultIds)
if mergeStatus != 0 {
return errors.New("merge search result failed, error code = " + strconv.Itoa(int(mergeStatus)))
}
return nil
}

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"
@ -463,7 +463,6 @@ func TestSegment_segmentInsert(t *testing.T) {
err := segment.segmentInsert(offset, &ids, &timestamps, &records)
assert.NoError(t, err)
deleteSegment(segment)
deleteCollection(collection)
}
@ -640,7 +639,7 @@ func TestSegment_segmentSearch(t *testing.T) {
pulsarURL, _ := Params.pulsarAddress()
const receiveBufSize = 1024
searchProducerChannels := []string{"search"}
searchProducerChannels := Params.searchChannelNames()
searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchStream.SetPulsarClient(pulsarURL)
searchStream.CreatePulsarProducers(searchProducerChannels)

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"
@ -36,7 +36,7 @@ func (sService *statsService) start() {
if err != nil {
log.Fatal(err)
}
producerChannels := []string{"statistic"}
producerChannels := []string{Params.statsChannelName()}
statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize)
statsStream.SetPulsarClient(msgStreamURL)

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"context"
@ -171,7 +171,7 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
const receiveBufSize = 1024
// start pulsar
producerChannels := []string{"statistic"}
producerChannels := []string{Params.statsChannelName()}
statsStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
statsStream.SetPulsarClient(pulsarURL)

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"sync"

View File

@ -1,4 +1,4 @@
package reader
package querynode
import (
"testing"

View File

@ -1,4 +1,4 @@
package reader
package querynode
import "github.com/zilliztech/milvus-distributed/internal/util/typeutil"

View File

@ -13,5 +13,5 @@ SCRIPTS_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
# ignore Minio,S3 unittes
MILVUS_DIR="${SCRIPTS_DIR}/../internal/"
echo $MILVUS_DIR
#go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/reader/..." "${MILVUS_DIR}/proxy/..." -failfast
go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/reader/..." -failfast
#go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/proxy/..." -failfast
go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast