Update cgo interfaces and add query node id

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-09-02 10:38:08 +08:00 committed by yefu.chen
parent 8e323494a5
commit 39addf0a55
9 changed files with 108 additions and 39 deletions

View File

@ -0,0 +1,13 @@
#ifdef __cplusplus
extern "C" {
#endif
typedef void* CCollection;
CCollection NewCollection(const char* collection_name, const char* schema_conf);
void DeleteCollection(CCollection collection);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,15 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "collection_c.h"
typedef void* CPartition;
CPartition NewPartition(CCollection collection, const char* partition_name);
void DeletePartition(CPartition partition);
#ifdef __cplusplus
}
#endif

View File

@ -2,17 +2,13 @@
extern "C" {
#endif
//struct DogDataChunk {
// void* raw_data; // schema
// int sizeof_per_row; // alignment
// signed long int count;
//};
#include "partition_c.h"
typedef void* CSegmentBase;
CSegmentBase SegmentBaseInit(unsigned long segment_id);
CSegmentBase NewSegment(CPartition partition, unsigned long segment_id);
//int32_t Insert(CSegmentBase c_segment, signed long int size, const unsigned long* primary_keys, const unsigned long int* timestamps, DogDataChunk values);
void DeleteSegment(CSegmentBase segment);
int Insert(CSegmentBase c_segment,
signed long int size,

View File

@ -6,16 +6,15 @@ package reader
#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib
#include "collection_c.h"
#include "partition_c.h"
#include "segment_c.h"
*/
import "C"
import (
"errors"
)
type Collection struct {
CollectionPtr *C.Collection
CollectionPtr C.CCollection
CollectionName string
Partitions []*Partition
}
@ -37,11 +36,13 @@ func (c *Collection) DeletePartition(partition *Partition) {
}
func (c *Collection) GetSegments() ([]*Segment, error) {
segments, status := C.GetSegments(c.CollectionPtr)
if status != 0 {
return nil, errors.New("get segments failed")
}
return segments, nil
// TODO: add get segments
//segments, status := C.GetSegments(c.CollectionPtr)
//
//if status != 0 {
// return nil, errors.New("get segments failed")
//}
//
//return segments, nil
return nil, nil
}

View File

@ -1,15 +1,26 @@
package reader
/*
#cgo CFLAGS: -I../core/include
#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib
#include "collection_c.h"
#include "partition_c.h"
#include "segment_c.h"
*/
import "C"
type Partition struct {
PartitionPtr *C.CPartition
PartitionPtr C.CPartition
PartitionName string
Segments []*Segment
}
func (p *Partition) NewSegment(segmentId uint64) *Segment {
segmentPtr := C.NewSegment(p.PartitionPtr, segmentId)
segmentPtr := C.NewSegment(p.PartitionPtr, C.ulong(segmentId))
var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentId: segmentId}
p.Segments = append(p.Segments, newSegment)

View File

@ -1,6 +1,18 @@
package reader
/*
#cgo CFLAGS: -I../core/include
#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib
#include "collection_c.h"
#include "partition_c.h"
#include "segment_c.h"
*/
import "C"
import (
"errors"
"fmt"
@ -26,13 +38,14 @@ type QueryNodeTimeSync struct {
}
type QueryNode struct {
QueryNodeId uint64
Collections []*Collection
messageClient pulsar.MessageClient
queryNodeTimeSync *QueryNodeTimeSync
buffer QueryNodeDataBuffer
}
func NewQueryNode(timeSync uint64) *QueryNode {
func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
mc := pulsar.MessageClient{}
queryNodeTimeSync := &QueryNodeTimeSync {
@ -42,18 +55,16 @@ func NewQueryNode(timeSync uint64) *QueryNode {
}
return &QueryNode{
QueryNodeId: queryNodeId,
Collections: nil,
messageClient: mc,
queryNodeTimeSync: queryNodeTimeSync,
}
}
// TODO: Schema
type CollectionSchema string
func (node *QueryNode) NewCollection(collectionName string, schema CollectionSchema) *Collection {
func (node *QueryNode) NewCollection(collectionName string, schemaConfig string) *Collection {
cName := C.CString(collectionName)
cSchema := C.CString(schema)
cSchema := C.CString(schemaConfig)
collection := C.NewCollection(cName, cSchema)
var newCollection = &Collection{CollectionPtr: collection, CollectionName: collectionName}
@ -212,7 +223,7 @@ func (node *QueryNode) Insert(insertMessages []*schema.InsertMsg, wg *sync.WaitG
return schema.Status{}
}
var result = SegmentInsert(targetSegment, collectionName, partitionTag, &entityIds, &timestamps, vectorRecords)
var result = SegmentInsert(targetSegment, &entityIds, &timestamps, vectorRecords)
wg.Done()
return publishResult(&result, clientId)

View File

@ -7,7 +7,7 @@ import (
)
func startQueryNode() {
qn := NewQueryNode(0)
qn := NewQueryNode(0, 0)
qn.InitQueryNodeCollection()
go qn.SegmentService()
qn.StartMessageClient()

View File

@ -6,6 +6,8 @@ package reader
#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib
#include "collection_c.h"
#include "partition_c.h"
#include "segment_c.h"
*/
@ -17,44 +19,52 @@ import (
const SegmentLifetime = 20000
type Segment struct {
SegmentPtr *C.SegmentBase
SegmentPtr C.CSegmentBase
SegmentId uint64
SegmentCloseTime uint64
}
func (s *Segment) GetRowCount() int64 {
// TODO: C type to go type
return C.GetRowCount(s)
//return C.GetRowCount(s)
return 0
}
func (s *Segment) GetStatus() int {
// TODO: C type to go type
return C.GetStatus(s)
//return C.GetStatus(s)
return 0
}
func (s *Segment) GetMaxTimestamp() uint64 {
// TODO: C type to go type
return C.GetMaxTimestamp(s)
//return C.GetMaxTimestamp(s)
return 0
}
func (s *Segment) GetMinTimestamp() uint64 {
// TODO: C type to go type
return C.GetMinTimestamp(s)
//return C.GetMinTimestamp(s)
return 0
}
func (s *Segment) GetDeletedCount() uint64 {
// TODO: C type to go type
return C.GetDeletedCount(s)
//return C.GetDeletedCount(s)
return 0
}
func (s *Segment) Close() {
// TODO: C type to go type
C.CloseSegment(s)
//C.CloseSegment(s)
}
////////////////////////////////////////////////////////////////////////////
func SegmentInsert(segment *Segment, collectionName string, partitionTag string, entityIds *[]int64, timestamps *[]uint64, dataChunk [][]*schema.FieldValue) ResultEntityIds {
// TODO: wrap cgo
func SegmentInsert(segment *Segment, entityIds *[]int64, timestamps *[]uint64, dataChunk [][]*schema.FieldValue) ResultEntityIds {
// void* raw_data,
// int sizeof_per_row,
// signed long int count
return ResultEntityIds{}
}

View File

@ -1,16 +1,28 @@
package reader
import (
//"github.com/realistschuckle/testify/assert"
"testing"
)
func TestConstructorAndDestructor(t *testing.T) {
node := NewQueryNode(0)
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
func TestSegmentInsert(t *testing.T) {
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)