diff --git a/.gitignore b/.gitignore index d0406b0029..9c3ac5f981 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,28 @@ -#proxy +# CLion generated files +core/cmake-build-debug/ +core/cmake-build-debug/* +core/cmake-build-release/ +core/cmake-build-release/* +core/cmake_build/ +core/cmake_build/* +core/build/ +core/build/* +core/.idea/ +.idea/ +.idea/* + +# vscode generated files +.vscode + +cmake-build-debug +cmake-build-release +cmake_build + +# proxy proxy/cmake_build proxy/cmake-build-debug proxy/thirdparty/grpc-src proxy/thirdparty/grpc-build -.idea/ -.idea/* # Compiled source *.a diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index dbed0fb41a..b9f7fb662d 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -1,4 +1,7 @@ project(sulvim_core) + +set(CMAKE_POSITION_INDEPENDENT_CODE ON) + cmake_minimum_required(VERSION 3.16) set( CMAKE_CXX_STANDARD 17 ) set( CMAKE_CXX_STANDARD_REQUIRED on ) diff --git a/core/core.go b/core/core.go new file mode 100644 index 0000000000..68c1882ff3 --- /dev/null +++ b/core/core.go @@ -0,0 +1,3 @@ +pacakage core + +import "C" \ No newline at end of file diff --git a/core/src/dog_segment/CMakeLists.txt b/core/src/dog_segment/CMakeLists.txt index e258c28ba9..7205a0d808 100644 --- a/core/src/dog_segment/CMakeLists.txt +++ b/core/src/dog_segment/CMakeLists.txt @@ -1,10 +1,17 @@ set(DOG_SEGMENT_FILES SegmentNaive.cpp + Collection.cpp + cwrap.cpp ) # Third Party dablooms file #aux_source_directory( ${MILVUS_THIRDPARTY_SRC}/dablooms THIRDPARTY_DABLOOMS_FILES ) -add_library(milvus_dog_segment +add_library(milvus_dog_segment SHARED ${DOG_SEGMENT_FILES} ) #add_dependencies( segment sqlite mysqlpp ) -target_link_libraries(milvus_dog_segment tbb milvus_utils pthread) \ No newline at end of file + +target_link_libraries(milvus_dog_segment tbb milvus_utils pthread) + +#add_executable(main main.cpp) +#target_link_libraries(main +# milvus_dog_segment) diff --git a/core/src/dog_segment/Collection.cpp b/core/src/dog_segment/Collection.cpp new file mode 100644 index 0000000000..0373659df1 --- /dev/null +++ b/core/src/dog_segment/Collection.cpp @@ -0,0 +1,2 @@ +#include "Collection.h" + diff --git a/core/src/dog_segment/Collection.h b/core/src/dog_segment/Collection.h index d7973e25cc..44cf7e8f4b 100644 --- a/core/src/dog_segment/Collection.h +++ b/core/src/dog_segment/Collection.h @@ -1,37 +1,43 @@ #pragma once #include "SegmentDefs.h" +#include "SegmentBase.h" ////////////////////////////////////////////////////////////////// +namespace milvus::dog_segment { + class Partition { public: - const std::deque& segments() const { + explicit Partition(std::string& partition_name): partition_name_(partition_name) {} + + const std::vector &segments() const { return segments_; } private: - std::string name_; - std::deque segments_; + std::string partition_name_; + std::vector segments_; }; -using PartitionPtr = std::shard_ptr; +using PartitionPtr = std::shared_ptr; ////////////////////////////////////////////////////////////////// class Collection { public: - explicit Collection(std::string name): name_(name){} + explicit Collection(std::string &collection_name, std::string &schema) + : collection_name_(collection_name), schema_json_(schema) {} // TODO: set index - set_index() {} + void set_index() {} - set_schema(std::string config) { + void parse() { // TODO: config to schema - schema_ = null; } public: + // std::vector Insert() { // for (auto partition: partitions_) { // for (auto segment: partition.segments()) { @@ -43,9 +49,12 @@ public: // } private: - // TODO: Index ptr - IndexPtr index_ = nullptr; - std::string name_; - SchemaPtr schema_; + // TODO: add Index ptr + // IndexPtr index_ = nullptr; + std::string collection_name_; + std::string schema_json_; + milvus::dog_segment::SchemaPtr schema_; std::vector partitions_; }; + +} diff --git a/core/src/dog_segment/SegmentBase.h b/core/src/dog_segment/SegmentBase.h index a5f53c68c9..e5d899de99 100644 --- a/core/src/dog_segment/SegmentBase.h +++ b/core/src/dog_segment/SegmentBase.h @@ -84,7 +84,12 @@ class SegmentBase { uint64_t segment_id_; }; +using SegmentBasePtr = std::shared_ptr; + std::unique_ptr CreateSegment(SchemaPtr ptr); +// TODO: Delete this after schema parse function done +SegmentBase* CreateSegment(); + } // namespace engine } // namespace milvus diff --git a/core/src/dog_segment/SegmentNaive.cpp b/core/src/dog_segment/SegmentNaive.cpp index cbb9afdc45..3621022b7c 100644 --- a/core/src/dog_segment/SegmentNaive.cpp +++ b/core/src/dog_segment/SegmentNaive.cpp @@ -130,6 +130,10 @@ class SegmentNaive : public SegmentBase { public: friend std::unique_ptr CreateSegment(SchemaPtr schema); + + friend SegmentBase* + CreateSegment(); + private: SchemaPtr schema_; std::shared_mutex mutex_; @@ -151,6 +155,15 @@ CreateSegment(SchemaPtr schema) { return segment; } +SegmentBase* CreateSegment() { + auto segment = new SegmentNaive(); + auto schema = std::make_shared(); + schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16); + schema->AddField("age", DataType::INT32); + segment->schema_ = schema; + segment->entity_vecs_.resize(schema->size()); + return segment; +} Status SegmentNaive::Insert(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps, @@ -158,6 +171,7 @@ SegmentNaive::Insert(int64_t size, const uint64_t* primary_keys, const Timestamp const auto& schema = *schema_; auto data_chunk = ColumnBasedDataChunk::from(row_values, schema); + std::cout << "key:" << std::endl; // insert datas // TODO: use shared_lock std::lock_guard lck(mutex_); @@ -166,6 +180,7 @@ SegmentNaive::Insert(int64_t size, const uint64_t* primary_keys, const Timestamp uids_.grow_by(primary_keys, primary_keys + size); for(int64_t i = 0; i < size; ++i) { auto key = primary_keys[i]; + std::cout << key << std::endl; auto internal_index = i + ack_id; internal_indexes_[key] = internal_index; } diff --git a/core/src/dog_segment/cwrap.cpp b/core/src/dog_segment/cwrap.cpp new file mode 100644 index 0000000000..654816b21e --- /dev/null +++ b/core/src/dog_segment/cwrap.cpp @@ -0,0 +1,40 @@ +#include "SegmentBase.h" +#include "cwrap.h" + +CSegmentBase +SegmentBaseInit() { + std::cout << "Hello milvus" << std::endl; + auto seg = milvus::dog_segment::CreateSegment(); + return (void*)seg; +} + +//int32_t Insert(CSegmentBase c_segment, signed long int size, const unsigned long* primary_keys, const unsigned long int* timestamps, DogDataChunk values) { +// auto segment = (milvus::dog_segment::SegmentBase*)c_segment; +// milvus::dog_segment::DogDataChunk dataChunk{}; +// +// dataChunk.raw_data = values.raw_data; +// dataChunk.sizeof_per_row = values.sizeof_per_row; +// dataChunk.count = values.count; +// +// auto res = segment->Insert(size, primary_keys, timestamps, dataChunk); +// return res.code(); +//} + +int Insert(CSegmentBase c_segment, + signed long int size, + const unsigned long* primary_keys, + const unsigned long int* timestamps, + void* raw_data, + int sizeof_per_row, + signed long int count) { + auto segment = (milvus::dog_segment::SegmentBase*)c_segment; + milvus::dog_segment::DogDataChunk dataChunk{}; + + dataChunk.raw_data = raw_data; + dataChunk.sizeof_per_row = sizeof_per_row; + dataChunk.count = count; + + auto res = segment->Insert(size, primary_keys, timestamps, dataChunk); + return res.code(); +} + diff --git a/core/src/dog_segment/cwrap.h b/core/src/dog_segment/cwrap.h new file mode 100644 index 0000000000..63ed8eb8fb --- /dev/null +++ b/core/src/dog_segment/cwrap.h @@ -0,0 +1,27 @@ +#ifdef __cplusplus +extern "C" { +#endif + +//struct DogDataChunk { +// void* raw_data; // schema +// int sizeof_per_row; // alignment +// signed long int count; +//}; + +typedef void* CSegmentBase; + +CSegmentBase SegmentBaseInit(); + +//int32_t Insert(CSegmentBase c_segment, signed long int size, const unsigned long* primary_keys, const unsigned long int* timestamps, DogDataChunk values); + +int Insert(CSegmentBase c_segment, + signed long int size, + const unsigned long* primary_keys, + const unsigned long int* timestamps, + void* raw_data, + int sizeof_per_row, + signed long int count); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/core/src/dog_segment/main.cpp b/core/src/dog_segment/main.cpp new file mode 100644 index 0000000000..5781a3c6d4 --- /dev/null +++ b/core/src/dog_segment/main.cpp @@ -0,0 +1,67 @@ +#include +#include +#include +#include +#include "cwrap.h" + +//int main() { +// auto s = SegmentBaseInit(); +// +// std::vector raw_data; +// std::vector timestamps; +// std::vector uids; +// int N = 10000; +// std::default_random_engine e(67); +// for(int i = 0; i < N; ++i) { +// uids.push_back(100000 + i); +// timestamps.push_back(0); +// // append vec +// float vec[16]; +// for(auto &x: vec) { +// x = e() % 2000 * 0.001 - 1.0; +// } +// raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec)); +// int age = e() % 100; +// raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age)); +// } +// +// auto line_sizeof = (sizeof(int) + sizeof(float) * 16); +// +// DogDataChunk dogDataChunk{}; +// dogDataChunk.count = N; +// dogDataChunk.raw_data = raw_data.data(); +// dogDataChunk.sizeof_per_row = (int)line_sizeof; +// +// auto res = Insert(s, N, uids.data(), timestamps.data(), dogDataChunk); +// +// std::cout << res << std::endl; +//} + +int main() { + auto s = SegmentBaseInit(); + + std::vector raw_data; + std::vector timestamps; + std::vector uids; + int N = 10000; + std::default_random_engine e(67); + for(int i = 0; i < N; ++i) { + uids.push_back(100000 + i); + timestamps.push_back(0); + // append vec + float vec[16]; + for(auto &x: vec) { + x = e() % 2000 * 0.001 - 1.0; + } + raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec)); + int age = e() % 100; + raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age)); + } + + auto line_sizeof = (sizeof(int) + sizeof(float) * 16); + + auto res = Insert(s, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N); + + std::cout << res << std::endl; +} + diff --git a/core/src/dog_segment/main.go b/core/src/dog_segment/main.go new file mode 100644 index 0000000000..e0c8c1a4b4 --- /dev/null +++ b/core/src/dog_segment/main.go @@ -0,0 +1,46 @@ +package main + +/* + +#cgo CFLAGS: -I./ + +#cgo LDFLAGS: -L/home/sheep/workspace/milvus/sheep/suvlim/core/cmake-build-debug/src/dog_segment -lmilvus_dog_segment -Wl,-rpath=/home/sheep/workspace/milvus/sheep/suvlim/core/cmake-build-debug/src/dog_segment + +#include "cwrap.h" + +*/ +import "C" +import ( + "fmt" + "unsafe" +) + +func testInsert() { + const DIM = 4 + const N = 3 + + var ids = [N]uint64{1, 2, 3} + var timestamps = [N]uint64{0, 0, 0} + + var vec = [DIM]float32{1.1, 2.2, 3.3, 4.4} + var rawData []int8 + + for i := 0; i <= N; i++ { + for _, ele := range vec { + rawData=append(rawData, int8(ele)) + } + rawData=append(rawData, int8(i)) + } + + var segment = C.SegmentBaseInit() + fmt.Println(segment) + + const sizeofPerRow = 4 + DIM * 4 + var res = C.Insert(segment, N, (*C.ulong)(&ids[0]), (*C.ulong)(×tamps[0]), unsafe.Pointer(&rawData[0]), C.int(sizeofPerRow), C.long(N)) + fmt.Println(res) +} + +func main() { + fmt.Println("Test milvus segment base:") + testInsert() +} diff --git a/pulsar/go_client.go b/pulsar/go_client.go index 76feac6150..9ab5eac15c 100644 --- a/pulsar/go_client.go +++ b/pulsar/go_client.go @@ -47,10 +47,10 @@ type MessageClient struct { consumer pulsar.Consumer // batch messages - insertMsg []*schema.InsertMsg - deleteMsg []*schema.DeleteMsg - searchMsg []*schema.SearchMsg - timeMsg []*schema.TimeSyncMsg + InsertMsg []*schema.InsertMsg + DeleteMsg []*schema.DeleteMsg + SearchMsg []*schema.SearchMsg + timeMsg []*schema.TimeSyncMsg key2segMsg []*schema.Key2SegMsg } @@ -150,9 +150,9 @@ const ( func (mc *MessageClient) PrepareBatchMsg(jobType JobType) { // assume the channel not full - mc.insertMsg = make([]*schema.InsertMsg, 1000) - mc.deleteMsg = make([]*schema.DeleteMsg, 1000) - mc.searchMsg = make([]*schema.SearchMsg, 1000) + mc.InsertMsg = make([]*schema.InsertMsg, 1000) + mc.DeleteMsg = make([]*schema.DeleteMsg, 1000) + mc.SearchMsg = make([]*schema.SearchMsg, 1000) mc.timeMsg = make([]*schema.TimeSyncMsg, 1000) mc.key2segMsg = make([]*schema.Key2SegMsg, 1000) @@ -167,11 +167,11 @@ func (mc *MessageClient) PrepareBatchMsg(jobType JobType) { // get message from channel to slice for i := 0; i < insertLen; i++ { msg := <- mc.insertChan - mc.insertMsg[i] = msg + mc.InsertMsg[i] = msg } for i := 0; i < deleteLen; i++ { msg := <- mc.deleteChan - mc.deleteMsg[i] = msg + mc.DeleteMsg[i] = msg } for i := 0; i < timeLen; i++ { msg := <- mc.timeSyncChan @@ -185,7 +185,7 @@ func (mc *MessageClient) PrepareBatchMsg(jobType JobType) { for i := 0; i < searchLen; i++ { msg := <-mc.searchChan - mc.searchMsg[i] = msg + mc.SearchMsg[i] = msg } } } diff --git a/pulsar/query_node.go b/pulsar/query_node.go index c52157191a..0a4962102a 100644 --- a/pulsar/query_node.go +++ b/pulsar/query_node.go @@ -13,9 +13,9 @@ type QueryNode struct { func (qn *QueryNode)doQueryNode(wg sync.WaitGroup) { wg.Add(3) - go qn.insert_query(qn.mc.insertMsg, wg) - go qn.delete_query(qn.mc.deleteMsg, wg) - go qn.search_query(qn.mc.searchMsg, wg) + go qn.insert_query(qn.mc.InsertMsg, wg) + go qn.delete_query(qn.mc.DeleteMsg, wg) + go qn.search_query(qn.mc.SearchMsg, wg) wg.Wait() } diff --git a/pulsar/storage_node.go b/pulsar/storage_node.go index 357c6abdcb..3ff8178356 100644 --- a/pulsar/storage_node.go +++ b/pulsar/storage_node.go @@ -13,8 +13,8 @@ type WriteNode struct { func (wn *WriteNode)doWriteNode(wg sync.WaitGroup) { wg.Add(2) - go wn.insert_write(wn.mc.insertMsg, wg) - go wn.delete_write(wn.mc.deleteMsg, wg) + go wn.insert_write(wn.mc.InsertMsg, wg) + go wn.delete_write(wn.mc.DeleteMsg, wg) wg.Wait() } diff --git a/reader/collection.go b/reader/collection.go index 5af0ebbbab..dfe7945756 100644 --- a/reader/collection.go +++ b/reader/collection.go @@ -8,6 +8,7 @@ import ( type Collection struct { CollectionPtr *C.Collection CollectionName string + Partitions []*Partition } // TODO: Schema @@ -44,13 +45,3 @@ func (c *Collection) GetSegments() ([]*Segment, error) { return segments, nil } - -func (c *Collection) CreateSegment() error { - status := C.CreateSegment(c.CollectionPtr) - - if status != 0 { - return errors.New("create segment failed") - } - - return nil -} diff --git a/reader/index.go b/reader/index.go index 6e20ba2544..c1809e3a96 100644 --- a/reader/index.go +++ b/reader/index.go @@ -1,6 +1,6 @@ package reader -import "../pulsar/schema" +import "suvlim/pulsar/schema" type IndexConfig struct {} diff --git a/reader/partition.go b/reader/partition.go new file mode 100644 index 0000000000..97b698d8a9 --- /dev/null +++ b/reader/partition.go @@ -0,0 +1,32 @@ +package reader + +import "C" +import "errors" + +type Partition struct { + PartitionPtr *C.CPartition + PartitionName string + Segments []*Segment +} + +func (c *Collection) NewPartition(partitionName string) (*Partition, error) { + cName := C.CString(partitionName) + partitionPtr, status := C.NewPartition(c.CollectionPtr, cName) + + if status != 0 { + return nil, errors.New("create partition failed") + } + + return &Partition{PartitionPtr: partitionPtr, PartitionName: partitionName}, nil +} + +func (c *Collection) DeletePartition(partitionName string) error { + cName := C.CString(partitionName) + status := C.DeletePartition(c.CollectionPtr, cName) + + if status != 0 { + return errors.New("create partition failed") + } + + return nil +} diff --git a/reader/query_node.go b/reader/query_node.go index 7982881b7a..782b98a5bb 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -2,7 +2,12 @@ package reader import "C" import ( - "context" + "errors" + "fmt" + "suvlim/pulsar" + "suvlim/pulsar/schema" + "sync" + "time" ) type QueryNodeTimeSync struct { @@ -13,11 +18,13 @@ type QueryNodeTimeSync struct { type QueryNode struct { Collections []*Collection + messageClient pulsar.MessageClient queryNodeTimeSync *QueryNodeTimeSync } -func NewQueryNode(ctx context.Context, timeSync uint64) *QueryNode { - ctx = context.Background() +func NewQueryNode(timeSync uint64) *QueryNode { + mc := pulsar.MessageClient{} + queryNodeTimeSync := &QueryNodeTimeSync { deleteTimeSync: timeSync, insertTimeSync: timeSync, @@ -26,12 +33,189 @@ func NewQueryNode(ctx context.Context, timeSync uint64) *QueryNode { return &QueryNode{ Collections: nil, + messageClient: mc, queryNodeTimeSync: queryNodeTimeSync, } } +func (node *QueryNode)doQueryNode(wg *sync.WaitGroup) { + wg.Add(3) + go node.Insert(node.messageClient.InsertMsg, wg) + go node.Delete(node.messageClient.DeleteMsg, wg) + go node.Search(node.messageClient.SearchMsg, wg) + wg.Wait() +} + +func (node *QueryNode) PrepareBatchMsg() { + node.messageClient.PrepareBatchMsg(pulsar.JobType(0)) +} + +func (node *QueryNode) StartMessageClient() { + topics := []string{"insert", "delete"} + node.messageClient.InitClient("pulsar://localhost:6650", topics) + + go node.messageClient.ReceiveMessage() +} + func (node *QueryNode) AddNewCollection(collectionName string, schema CollectionSchema) error { var collection, err = NewCollection(collectionName, schema) node.Collections = append(node.Collections, collection) return err } + +func (node *QueryNode) GetSegmentByEntityId(entityId int64) *Segment { + // TODO: get id2segment info from pulsar + return nil +} + +func (node *QueryNode) GetTargetSegment(collectionName *string, partitionTag *string) (*Segment, error) { + var targetPartition *Partition + + for _, collection := range node.Collections { + if *collectionName == collection.CollectionName { + for _, partition := range collection.Partitions { + if *partitionTag == partition.PartitionName { + targetPartition = partition + break + } + } + } + } + + if targetPartition == nil { + return nil, errors.New("cannot found target partition") + } + + for _, segment := range targetPartition.Segments { + var segmentStatus = segment.GetStatus() + if segmentStatus == 0 { + return segment, nil + } + } + + return nil, errors.New("cannot found target segment") +} + +func (node *QueryNode) GetTimeSync() uint64 { + // TODO: Add time sync + return 0 +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +func (node *QueryNode) InitQueryNodeCollection() { + // TODO: remove hard code, add collection creation request + var collection, _ = NewCollection("collection1", "fakeSchema") + node.Collections = append(node.Collections, collection) + var partition, _ = collection.NewPartition("partition1") + collection.Partitions = append(collection.Partitions, partition) + var segment, _ = partition.NewSegment() + partition.Segments = append(partition.Segments, segment) +} + +func (node *QueryNode) SegmentsManagement() { + var timeSync = node.GetTimeSync() + for _, collection := range node.Collections { + for _, partition := range collection.Partitions { + for _, segment := range partition.Segments { + if timeSync >= segment.SegmentCloseTime { + segment.Close() + // TODO: add atomic segment id + var newSegment, _ = partition.NewSegment() + newSegment.SegmentCloseTime = timeSync + SEGMENT_LIFETIME + partition.Segments = append(partition.Segments, newSegment) + } + } + } + } +} + +func (node *QueryNode) SegmentService() { + for { + time.Sleep(200 * time.Millisecond) + node.SegmentsManagement() + fmt.Println("do segments management in 200ms") + } +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +func (node *QueryNode) Insert(insertMessages []*schema.InsertMsg, wg *sync.WaitGroup) schema.Status { + var collectionName = insertMessages[0].CollectionName + var partitionTag = insertMessages[0].PartitionTag + var clientId = insertMessages[0].ClientId + + // TODO: prevent Memory copy + var entityIds []int64 + var timestamps []uint64 + var vectorRecords [][]*schema.FieldValue + for _, msg := range insertMessages { + entityIds = append(entityIds, msg.EntityId) + timestamps = append(timestamps, msg.Timestamp) + vectorRecords = append(vectorRecords, msg.Fields) + } + + var targetSegment, err = node.GetTargetSegment(&collectionName, &partitionTag) + if err != nil { + // TODO: throw runtime error + fmt.Println(err.Error()) + return schema.Status{} + } + + var result = SegmentInsert(targetSegment, collectionName, partitionTag, &entityIds, ×tamps, vectorRecords) + + wg.Done() + return publishResult(&result, clientId) +} + +func (node *QueryNode) Delete(deleteMessages []*schema.DeleteMsg, wg *sync.WaitGroup) schema.Status { + var collectionName = deleteMessages[0].CollectionName + var clientId = deleteMessages[0].ClientId + + // TODO: prevent Memory copy + var entityIds []int64 + var timestamps []uint64 + for _, msg := range deleteMessages { + entityIds = append(entityIds, msg.EntityId) + timestamps = append(timestamps, msg.Timestamp) + } + + if entityIds == nil { + // TODO: throw runtime error + fmt.Println("no entities found") + return schema.Status{} + } + // TODO: does all entities from a common batch are in the same segment? + var targetSegment = node.GetSegmentByEntityId(entityIds[0]) + + var result = SegmentDelete(targetSegment, collectionName, &entityIds, ×tamps) + + wg.Done() + return publishResult(&result, clientId) +} + +func (node *QueryNode) Search(searchMessages []*schema.SearchMsg, wg *sync.WaitGroup) schema.Status { + var collectionName = searchMessages[0].CollectionName + var partitionTag = searchMessages[0].PartitionTag + var clientId = searchMessages[0].ClientId + var queryString = searchMessages[0].VectorParam.Json + + // TODO: prevent Memory copy + var records []schema.VectorRecord + var timestamps []int64 + for _, msg := range searchMessages { + records = append(records, *msg.VectorParam.RowRecord) + timestamps = append(timestamps, msg.Timestamp) + } + + var targetSegment, err = node.GetTargetSegment(&collectionName, &partitionTag) + if err != nil { + // TODO: throw runtime error + fmt.Println(err.Error()) + return schema.Status{} + } + + var result = SegmentSearch(targetSegment, collectionName, queryString, ×tamps, &records) + + wg.Done() + return publishResult(&result, clientId) +} diff --git a/reader/reader.go b/reader/reader.go index f56c7f1ab0..b1cb05c8e7 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -1,73 +1,22 @@ package reader -import "../pulsar/schema" +import ( + "fmt" + "sync" + "time" +) -func milvusInsertMock(collectionName string, partitionTag string, entityIds *[]int64, timestamps *[]int64, dataChunk [][]*schema.FieldValue) ResultEntityIds { - return ResultEntityIds{} -} +func startQueryNode() { + qn := NewQueryNode(0) + qn.InitQueryNodeCollection() + go qn.SegmentService() + qn.StartMessageClient() -func milvusDeleteMock(collectionName string, entityIds *[]int64, timestamps *[]int64) ResultEntityIds { - return ResultEntityIds{} -} - -func milvusSearchMock(collectionName string, queryString string, timestamps *[]int64, vectorRecord *[]schema.VectorRecord) ResultEntityIds { - return ResultEntityIds{} -} - -type dataChunkSchema struct { - FieldName string - DataType schema.DataType - Dim int -} - -func insert(insertMessages []*schema.InsertMsg) schema.Status { - var collectionName = insertMessages[0].CollectionName - var partitionTag = insertMessages[0].PartitionTag - var clientId = insertMessages[0].ClientId - - // TODO: prevent Memory copy - var entityIds []int64 - var timestamps []int64 - var vectorRecords [][]*schema.FieldValue - for _, msg := range insertMessages { - entityIds = append(entityIds, msg.EntityId) - timestamps = append(timestamps, msg.Timestamp) - vectorRecords = append(vectorRecords, msg.Fields) + var wg sync.WaitGroup + for { + time.Sleep(200 * time.Millisecond) + qn.PrepareBatchMsg() + qn.doQueryNode(&wg) + fmt.Println("do a batch in 200ms") } - - var result = milvusInsertMock(collectionName, partitionTag, &entityIds, ×tamps, vectorRecords) - return publishResult(&result, clientId) -} - -func delete(deleteMessages []*schema.DeleteMsg) schema.Status { - var collectionName = deleteMessages[0].CollectionName - var clientId = deleteMessages[0].ClientId - - // TODO: prevent Memory copy - var entityIds []int64 - var timestamps []int64 - for _, msg := range deleteMessages { - entityIds = append(entityIds, msg.EntityId) - timestamps = append(timestamps, msg.Timestamp) - } - - var result = milvusDeleteMock(collectionName, &entityIds, ×tamps) - return publishResult(&result, clientId) -} - -func search(searchMessages []*schema.SearchMsg) schema.Status { - var collectionName = searchMessages[0].CollectionName - var clientId int64 = searchMessages[0].ClientId - var queryString = searchMessages[0].VectorParam.Json - - // TODO: prevent Memory copy - var records []schema.VectorRecord - var timestamps []int64 - for _, msg := range searchMessages { - records = append(records, *msg.VectorParam.RowRecord) - timestamps = append(timestamps, msg.Timestamp) - } - - var result = milvusSearchMock(collectionName, queryString, ×tamps, &records) - return publishResult(&result, clientId) } diff --git a/reader/result.go b/reader/result.go index b09b4ba5ed..e3def0cd17 100644 --- a/reader/result.go +++ b/reader/result.go @@ -1,8 +1,8 @@ package reader import ( - "../pulsar/schema" "fmt" + "suvlim/pulsar/schema" ) type ResultEntityIds []int64 diff --git a/reader/segment.go b/reader/segment.go index 68dec2741f..346b3400a4 100644 --- a/reader/segment.go +++ b/reader/segment.go @@ -1,13 +1,40 @@ package reader import "C" +import ( + "errors" + "suvlim/pulsar/schema" +) + +const SEGMENT_LIFETIME = 20000 type Segment struct { - Id string - Status int + SegmentPtr *C.SegmentBase + SegmentId int32 SegmentCloseTime uint64 } +func (p *Partition) NewSegment() (*Segment, error) { + // TODO: add segment id + segmentPtr, status := C.CreateSegment(p.PartitionPtr) + + if status != 0 { + return nil, errors.New("create segment failed") + } + + return &Segment{SegmentPtr: segmentPtr}, nil +} + +func (p *Partition) DeleteSegment() error { + status := C.DeleteSegment(p.PartitionPtr) + + if status != 0 { + return errors.New("delete segment failed") + } + + return nil +} + func (s *Segment) GetRowCount() int64 { // TODO: C type to go type return C.GetRowCount(s) @@ -37,3 +64,19 @@ func (s *Segment) Close() { // TODO: C type to go type C.CloseSegment(s) } + +//////////////////////////////////////////////////////////////////////////// +func SegmentInsert(segment *Segment, collectionName string, partitionTag string, entityIds *[]int64, timestamps *[]uint64, dataChunk [][]*schema.FieldValue) ResultEntityIds { + // TODO: wrap cgo + return ResultEntityIds{} +} + +func SegmentDelete(segment *Segment, collectionName string, entityIds *[]int64, timestamps *[]uint64) ResultEntityIds { + // TODO: wrap cgo + return ResultEntityIds{} +} + +func SegmentSearch(segment *Segment, collectionName string, queryString string, timestamps *[]int64, vectorRecord *[]schema.VectorRecord) ResultEntityIds { + // TODO: wrap cgo + return ResultEntityIds{} +}