mirror of https://github.com/milvus-io/milvus.git
parent
d66d48c6b6
commit
1cfc6c7f86
|
@ -79,6 +79,8 @@ For better throughput, Milvus allows asynchronous state synchronization between
|
|||
``` go
|
||||
type CollectionSchema struct {
|
||||
Name string
|
||||
Description string
|
||||
AutoId bool
|
||||
Fields []FieldSchema
|
||||
}
|
||||
```
|
||||
|
@ -87,7 +89,6 @@ type CollectionSchema struct {
|
|||
|
||||
``` go
|
||||
type FieldSchema struct {
|
||||
Id uint64
|
||||
Name string
|
||||
Description string
|
||||
DataType DataType
|
||||
|
@ -96,33 +97,36 @@ type FieldSchema struct {
|
|||
}
|
||||
```
|
||||
|
||||
###### 2.2.1 Data Types
|
||||
|
||||
###### 2.2.2 Type Params
|
||||
|
||||
#### 2.3 Type Params
|
||||
|
||||
#### 2.4 Index Params
|
||||
###### 2.2.3 Index Params
|
||||
|
||||
|
||||
|
||||
## 3. Request
|
||||
|
||||
In this section, we introduce the RPCs of milvus service. A brief description of the RPCs is listed as follows.
|
||||
|
||||
|
||||
#### 3.1 Base Request
|
||||
|
||||
``` go
|
||||
type BaseRequest interface {
|
||||
Type() ReqType
|
||||
PreExecute() Status
|
||||
Execute() Status
|
||||
PostExecute() Status
|
||||
WaitToFinish() Status
|
||||
}
|
||||
```
|
||||
| RPC | description |
|
||||
| :----------------- | ------------------------------------------------------------ |
|
||||
| CreateCollection | create a collection base on schema statement |
|
||||
| DropCollection | drop a collection |
|
||||
| HasCollection | whether or not a collection exists |
|
||||
| DescribeCollection | show a collection's schema and its descriptive statistics |
|
||||
| ShowCollections | list all collections |
|
||||
| CreatePartition | create a partition |
|
||||
| DropPartition | drop a partition |
|
||||
| HasPartition | whether or not a partition exists |
|
||||
| DescribePartition | show a partition's name and its descriptive statistics |
|
||||
| ShowPartitions | list a collection's all partitions |
|
||||
| Insert | insert a batch of rows into a collection or a partition |
|
||||
| Search | query the columns of a collection or a partition with ANNS statements and boolean expressions |
|
||||
|
||||
|
||||
|
||||
#### 3.2 Definition Requests
|
||||
#### 3.1 Definition Requests
|
||||
|
||||
###### 3.2.1 Collection
|
||||
|
||||
|
@ -140,91 +144,21 @@ type BaseRequest interface {
|
|||
* DescribePartition
|
||||
* ShowPartitions
|
||||
|
||||
###### 3.2.3 Index
|
||||
|
||||
* CreateIndex
|
||||
* DropIndex
|
||||
* DescribeIndex
|
||||
|
||||
###### 3.2.4 Definition Request & Task
|
||||
|
||||
```go
|
||||
type DDRequest struct {
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
SegmentId uint64
|
||||
ChannelId uint64
|
||||
|
||||
PrimaryKeys []uint64
|
||||
RowData []*RowDataBlob
|
||||
|
||||
reqType ReqType
|
||||
ts Timestamp
|
||||
}
|
||||
|
||||
type DDTask struct {
|
||||
DDRequest
|
||||
}
|
||||
|
||||
// TsMsg interfaces
|
||||
func (req *DDTask) Ts() Timestamp
|
||||
func (req *DDTask) SetTs(ts Timestamp)
|
||||
|
||||
// BaseRequest interfaces
|
||||
func (req *DDTask) Type() ReqType
|
||||
func (req *DDTask) PreExecute() Status
|
||||
func (req *DDTask) Execute() Status
|
||||
func (req *DDTask) PostExecute() Status
|
||||
func (req *DDTask) WaitToFinish() Status
|
||||
```
|
||||
|
||||
|
||||
#### 3.2 Manipulation Requsts
|
||||
|
||||
#### 3.3 Manipulation Requsts
|
||||
|
||||
###### 3.3.1 Insert
|
||||
###### 3.2.1 Insert
|
||||
|
||||
* Insert
|
||||
|
||||
###### 3.3.2 Delete
|
||||
###### 3.2.2 Delete
|
||||
|
||||
* DeleteByID
|
||||
|
||||
###### 3.3.3 Manipulation Requst
|
||||
|
||||
```go
|
||||
type DMRequest struct {
|
||||
CollectionName string
|
||||
PartitionTag string
|
||||
SegmentId uint64
|
||||
ChannelId uint64
|
||||
|
||||
PrimaryKeys []uint64
|
||||
RowData []*RowDataBlob
|
||||
|
||||
reqType ReqType
|
||||
ts Timestamp
|
||||
}
|
||||
|
||||
type DMTask struct {
|
||||
DMRequest
|
||||
}
|
||||
|
||||
// TsMsg interfaces
|
||||
func (req *DMTask) Ts() Timestamp
|
||||
func (req *DMTask) SetTs(ts Timestamp)
|
||||
|
||||
// BaseRequest interfaces
|
||||
func (req *DMTask) Type() ReqType
|
||||
func (req *DMTask) PreExecute() Status
|
||||
func (req *DMTask) Execute() Status
|
||||
func (req *DMTask) PostExecute() Status
|
||||
func (req *DMTask) WaitToFinish() Status
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 3.5 Query
|
||||
#### 3.3 Query
|
||||
|
||||
|
||||
|
||||
|
@ -287,11 +221,11 @@ func (tso *timestampOracle) loadTimestamp() Status
|
|||
|
||||
|
||||
|
||||
#### 4.3 Batch Allocation of Timestamps
|
||||
#### 4.2 Timestamp Allocator
|
||||
|
||||
###### 4.2.1 Batch Allocation of Timestamps
|
||||
|
||||
|
||||
#### 4.4 Expiration of Timestamps
|
||||
###### 4.2.2 Expiration of Timestamps
|
||||
|
||||
|
||||
|
||||
|
@ -351,56 +285,167 @@ func (gparams *GlobalParamsTable) Remove(key string) Status
|
|||
|
||||
|
||||
|
||||
#### 5.3 Message Stream
|
||||
|
||||
``` go
|
||||
type MsgType uint32
|
||||
const {
|
||||
USER_REQUEST MsgType = 1
|
||||
TIME_TICK = 2
|
||||
}
|
||||
|
||||
type TsMsg interface {
|
||||
SetTs(ts Timestamp)
|
||||
Ts() Timestamp
|
||||
Type() MsgType
|
||||
}
|
||||
|
||||
type TsMsgMarshaler interface {
|
||||
Marshal(input *TsMsg) ([]byte, Status)
|
||||
Unmarshal(input []byte) (*TsMsg, Status)
|
||||
}
|
||||
|
||||
type MsgPack struct {
|
||||
BeginTs Timestamp
|
||||
EndTs Timestamp
|
||||
Msgs []*TsMsg
|
||||
}
|
||||
|
||||
type MsgStream interface {
|
||||
SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
|
||||
Produce(*MsgPack) Status
|
||||
Consume() *MsgPack // message can be consumed exactly once
|
||||
}
|
||||
|
||||
type PulsarMsgStream struct {
|
||||
client *pulsar.Client
|
||||
produceChannels []string
|
||||
consumeChannels []string
|
||||
|
||||
msgMarshaler *TsMsgMarshaler
|
||||
msgUnmarshaler *TsMsgMarshaler
|
||||
}
|
||||
|
||||
func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
|
||||
func (ms *PulsarMsgStream) Produce(*MsgPack) Status
|
||||
func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick
|
||||
|
||||
type PulsarTtMsgStream struct {
|
||||
client *pulsar.Client
|
||||
produceChannels []string
|
||||
consumeChannels []string
|
||||
|
||||
msgMarshaler *TsMsgMarshaler
|
||||
msgUnmarshaler *TsMsgMarshaler
|
||||
inputBuf []*TsMsg
|
||||
unsolvedBuf []*TsMsg
|
||||
msgPacks []*MsgPack
|
||||
}
|
||||
|
||||
func (ms *PulsarTtMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
|
||||
func (ms *PulsarTtMsgStream) Produce(*MsgPack) Status
|
||||
func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time tick
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
## 6. Proxy
|
||||
|
||||
#### 6.1 Overview
|
||||
|
||||
#### 6.2 Request Scheduler
|
||||
#### 3.1 Task
|
||||
|
||||
``` go
|
||||
type requestScheduler struct {
|
||||
definitions requestQueue
|
||||
manipulations requestQueue
|
||||
queries requestQueue
|
||||
}
|
||||
|
||||
func (rs *requestScheduler) ExecuteRequest(req *Request) Status
|
||||
|
||||
func (rs *requestScheduler) staticValidityCheck(req *Request) Status
|
||||
func (rs *requestScheduler) setTimestamp(req *Request)
|
||||
func (rs *requestScheduler) setPrimaryKey(req *Request)
|
||||
func (rs *requestScheduler) setSegmentId(req *Request)
|
||||
func (rs *requestScheduler) setProxyId(req *Request)
|
||||
|
||||
// @param selection
|
||||
// bit_0 = 1: select definition queue
|
||||
// bit_1 = 1: select manipulation queue
|
||||
// bit_2 = 1: select query queue
|
||||
// example: if mode = 3, then both definition and manipulation queues are selected
|
||||
func (rs *requestScheduler) AreRequestsDelivered(ts Timestamp, selection uint32) bool
|
||||
|
||||
// ActiveComponent interfaces
|
||||
func (rs *requestScheduler) Id() String
|
||||
func (rs *requestScheduler) Status() Status
|
||||
func (rs *requestScheduler) Clean() Status
|
||||
func (rs *requestScheduler) Restart() Status
|
||||
func (rs *requestScheduler) heartbeat()
|
||||
|
||||
// protobuf
|
||||
message ReqSchedulerHeartbeat {
|
||||
string id
|
||||
uint64 definition_queue_length
|
||||
uint64 manipulation_queue_length
|
||||
uint64 query_queue_length
|
||||
uint64 num_delivered_definitions
|
||||
uint64 num_delivered_manipulations
|
||||
uint64 num_delivered_queries
|
||||
type task interface {
|
||||
PreExecute() Status
|
||||
Execute() Status
|
||||
PostExecute() Status
|
||||
WaitToFinish() Status
|
||||
Notify() Status
|
||||
}
|
||||
```
|
||||
|
||||
* Base Task
|
||||
|
||||
```go
|
||||
type baseTask struct {
|
||||
Type ReqType
|
||||
ReqId int64
|
||||
Ts Timestamp
|
||||
ProxyId int64
|
||||
}
|
||||
|
||||
func (task *baseTask) PreExecute() Status
|
||||
func (task *baseTask) Execute() Status
|
||||
func (task *baseTask) PostExecute() Status
|
||||
func (task *baseTask) WaitToFinish() Status
|
||||
func (task *baseTask) Notify() Status
|
||||
```
|
||||
|
||||
* Insert Task
|
||||
|
||||
Take insertTask as an example:
|
||||
|
||||
```go
|
||||
type insertTask struct {
|
||||
baseTask
|
||||
SegIdAssigner *segIdAssigner
|
||||
RowIdAllocator *IdAllocator
|
||||
rowBatch *RowBatch
|
||||
}
|
||||
|
||||
func (task *InsertTask) Execute() Status
|
||||
func (task *InsertTask) WaitToFinish() Status
|
||||
func (task *InsertTask) Notify() Status
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 6.2 Task Scheduler
|
||||
|
||||
``` go
|
||||
type taskScheduler struct {
|
||||
// definition tasks
|
||||
ddTasks *task chan
|
||||
// manipulation tasks
|
||||
dmTasks *task chan
|
||||
// query tasks
|
||||
dqTasks *task chan
|
||||
|
||||
tsAllocator *TimestampAllocator
|
||||
ReqIdAllocator *IdAllocator
|
||||
}
|
||||
|
||||
func (sched *taskScheduler) EnqueueDDTask(task *task) Status
|
||||
func (sched *taskScheduler) EnqueueDMTask(task *task) Status
|
||||
func (sched *taskScheduler) EnqueueDQTask(task *task) Status
|
||||
|
||||
func (sched *taskScheduler) TaskDoneTest(ts Timestamp) bool
|
||||
|
||||
// ActiveComponent interfaces
|
||||
func (sched *taskScheduler) Id() String
|
||||
func (sched *taskScheduler) Status() Status
|
||||
func (sched *taskScheduler) Clean() Status
|
||||
func (sched *taskScheduler) Restart() Status
|
||||
func (sched *taskScheduler) heartbeat()
|
||||
|
||||
// protobuf
|
||||
message taskSchedulerHeartbeat {
|
||||
string id
|
||||
uint64 dd_queue_length
|
||||
uint64 dm_queue_length
|
||||
uint64 dq_queue_length
|
||||
uint64 num_dd_done
|
||||
uint64 num_dm_done
|
||||
uint64 num_dq_done
|
||||
}
|
||||
```
|
||||
|
||||
* EnqueueDMTask
|
||||
|
||||
If a insertTask is enqueued, *EnqueueDDTask(task \*task)* will set *Ts*, *ReqId*, *ProxyId*, *SegIdAssigner*, *RowIdAllocator*, then push it into queue *dmTasks*. The *SegIdAssigner* and *RowIdAllocator* will later be used in the task's execution phase.
|
||||
|
||||
#### 6.3 Time Tick
|
||||
|
||||
|
@ -459,9 +504,11 @@ type MsgStream interface {
|
|||
Consume() *MsgPack // message can be consumed exactly once
|
||||
}
|
||||
|
||||
type HashFunc func(*MsgPack) map[int32]*MsgPack
|
||||
|
||||
type PulsarMsgStream struct {
|
||||
client *pulsar.Client
|
||||
msgHashFunc (*MsgPack) map[int32]*MsgPack // return a map from produceChannel idx to *MsgPack
|
||||
msgHashFunc HashFunc // return a map from produceChannel idx to *MsgPack
|
||||
producers []*pulsar.Producer
|
||||
consumers []*pulsar.Consumer
|
||||
msgMarshaler *TsMsgMarshaler
|
||||
|
@ -471,15 +518,15 @@ type PulsarMsgStream struct {
|
|||
func (ms *PulsarMsgStream) SetProducerChannels(channels []string)
|
||||
func (ms *PulsarMsgStream) SetConsumerChannels(channels []string)
|
||||
func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
|
||||
func (ms *PulsarMsgStream) SetMsgHashFunc(XXX)
|
||||
func (ms *PulsarMsgStream) Produce(*MsgPack) Status
|
||||
func (ms *PulsarMsgStream) SetMsgHashFunc(hashFunc *HashFunc)
|
||||
func (ms *PulsarMsgStream) Produce(msgs *MsgPack) Status
|
||||
func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick
|
||||
|
||||
type PulsarTtMsgStream struct {
|
||||
client *pulsar.Client
|
||||
produceChannels []string
|
||||
consumeChannels []string
|
||||
|
||||
msgHashFunc (*MsgPack) map[int32]*MsgPack // return a map from produceChannel idx to *MsgPack
|
||||
producers []*pulsar.Producer
|
||||
consumers []*pulsar.Consumer
|
||||
msgMarshaler *TsMsgMarshaler
|
||||
msgUnmarshaler *TsMsgMarshaler
|
||||
inputBuf []*TsMsg
|
||||
|
@ -487,9 +534,12 @@ type PulsarTtMsgStream struct {
|
|||
msgPacks []*MsgPack
|
||||
}
|
||||
|
||||
func (ms *PulsarTtMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
|
||||
func (ms *PulsarTtMsgStream) Produce(*MsgPack) Status
|
||||
func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time tick
|
||||
func (ms *PulsarMsgStream) SetProducerChannels(channels []string)
|
||||
func (ms *PulsarMsgStream) SetConsumerChannels(channels []string)
|
||||
func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
|
||||
func (ms *PulsarMsgStream) SetMsgHashFunc(hashFunc *HashFunc)
|
||||
func (ms *PulsarMsgStream) Produce(msgs *MsgPack) Status
|
||||
func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick
|
||||
```
|
||||
|
||||
|
||||
|
@ -587,9 +637,113 @@ func (unmarshaler *QueryReqUnmarshaler) Unmarshal(input *pulsar.Message) (*TsMsg
|
|||
|
||||
|
||||
|
||||
#### 5.X Interfaces
|
||||
#### 5.1 Interfaces (RPC)
|
||||
|
||||
| RPC | description |
|
||||
| :----------------- | ------------------------------------------------------------ |
|
||||
| CreateCollection | create a collection base on schema statement |
|
||||
| DropCollection | drop a collection |
|
||||
| HasCollection | whether or not a collection exists |
|
||||
| DescribeCollection | show a collection's schema and its descriptive statistics |
|
||||
| ShowCollections | list all collections |
|
||||
| CreatePartition | create a partition |
|
||||
| DropPartition | drop a partition |
|
||||
| HasPartition | whether or not a partition exists |
|
||||
| DescribePartition | show a partition's name and its descriptive statistics |
|
||||
| ShowPartitions | list a collection's all partitions |
|
||||
| AllocTimestamp | allocate a batch of consecutive timestamps |
|
||||
| AllocId | allocate a batch of consecutive IDs |
|
||||
| AssignSegmentId | assign segment id to insert rows (master determines which segment these rows belong to) |
|
||||
| | |
|
||||
| | |
|
||||
|
||||
|
||||
|
||||
#### 5.2 Master Instance
|
||||
|
||||
```go
|
||||
type Master interface {
|
||||
tso timestampOracle // timestamp oracle
|
||||
ddScheduler ddRequestScheduler // data definition request scheduler
|
||||
metaTable metaTable // in-memory system meta
|
||||
collManager collectionManager // collection & partition manager
|
||||
segManager segmentManager // segment manager
|
||||
}
|
||||
```
|
||||
|
||||
* Timestamp allocation
|
||||
|
||||
Master serves as a centrol clock of the whole system. Other components (i.e. Proxy) allocates timestamps from master via RPC *AllocTimestamp*. All the timestamp allocation requests will be handled by the timestampOracle singleton. See section 4.2 for the details about timestampOracle.
|
||||
|
||||
* Request Scheduling
|
||||
|
||||
* System Meta
|
||||
|
||||
* Collection Management
|
||||
|
||||
* Segment Management
|
||||
|
||||
|
||||
|
||||
#### 5.3 Data definition Request Scheduler
|
||||
|
||||
###### 5.2.1 Task
|
||||
|
||||
Master receives data definition requests via grpc. Each request (described by a proto) will be wrapped as a task for further scheduling. The task interface is
|
||||
|
||||
```go
|
||||
type task interface {
|
||||
Type() ReqType
|
||||
Ts() Timestamp
|
||||
Execute() Status
|
||||
WaitToFinish() Status
|
||||
Notify() Status
|
||||
}
|
||||
```
|
||||
|
||||
A task example is as follows. In this example, we wrap a CreateCollectionRequest (a proto) as a createCollectionTask. The wrapper need to contain task interfaces.
|
||||
|
||||
``` go
|
||||
type createCollectionTask struct {
|
||||
req *CreateCollectionRequest
|
||||
cv int chan
|
||||
}
|
||||
|
||||
// Task interfaces
|
||||
func (task *createCollectionTask) Type() ReqType
|
||||
func (task *createCollectionTask) Ts() Timestamp
|
||||
func (task *createCollectionTask) Execute() Status
|
||||
func (task *createCollectionTask) Notify() Status
|
||||
func (task *createCollectionTask) WaitToFinish() Status
|
||||
```
|
||||
|
||||
|
||||
|
||||
###### 5.2.2 Scheduler
|
||||
|
||||
```go
|
||||
type ddRequestScheduler struct {
|
||||
reqQueue *task chan
|
||||
}
|
||||
|
||||
func (rs *ddRequestScheduler) Enqueue(task *task) Status
|
||||
func (rs *ddRequestScheduler) schedule() *task // implement scheduling policy
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 5.4 Meta Table
|
||||
|
||||
```go
|
||||
type metaTable struct {
|
||||
client *etcd.Client // client of a reliable kv service, i.e. etcd client
|
||||
rootPath string // this metaTable's working root path on the reliable kv service
|
||||
tenantMeta map[int64]TenantMeta // tenant id to tenant meta
|
||||
proxyMeta map[int64]ProxyMeta // proxy id to proxy meta
|
||||
collMeta map[int64]CollectionMeta // collection id to collection meta
|
||||
segMeta map[int64]SegmentMeta // segment id to segment meta
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ message TenantMeta {
|
|||
string query_channel_id = 4;
|
||||
}
|
||||
|
||||
|
||||
message ProxyMeta {
|
||||
uint64 id = 1;
|
||||
common.Address address = 2;
|
||||
|
@ -37,4 +38,4 @@ message SegmentMeta {
|
|||
uint64 open_time=6;
|
||||
uint64 close_time=7;
|
||||
int64 num_rows=8;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,4 +44,4 @@ message CollectionSchema {
|
|||
string description = 2;
|
||||
bool auto_id = 3;
|
||||
repeated FieldSchema fields = 4;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
)
|
||||
|
||||
type manipulationReq struct {
|
||||
commonpb.Status
|
||||
stats []commonpb.Status
|
||||
msgs []*pb.ManipulationReqMsg
|
||||
wg sync.WaitGroup
|
||||
proxy *proxyServer
|
||||
|
@ -26,14 +26,13 @@ func (req *manipulationReq) Ts() (Timestamp, error) {
|
|||
return Timestamp(req.msgs[0].Timestamp), nil
|
||||
}
|
||||
func (req *manipulationReq) SetTs(ts Timestamp) {
|
||||
for _, mreq := range req.msgs {
|
||||
mreq.Timestamp = uint64(ts)
|
||||
for _, msg := range req.msgs {
|
||||
msg.Timestamp = uint64(ts)
|
||||
}
|
||||
}
|
||||
|
||||
// BaseRequest interfaces
|
||||
func (req *manipulationReq) Type() pb.ReqType {
|
||||
// TODO: return a invalid ReqType?
|
||||
if req.msgs == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -59,11 +58,17 @@ func (req *manipulationReq) Execute() commonpb.Status {
|
|||
|
||||
func (req *manipulationReq) PostExecute() commonpb.Status { // send into pulsar
|
||||
req.wg.Add(1)
|
||||
return req.Status
|
||||
return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}
|
||||
}
|
||||
|
||||
func (req *manipulationReq) WaitToFinish() commonpb.Status { // wait until send into pulsar
|
||||
req.wg.Wait()
|
||||
|
||||
for _, stat := range req.stats{
|
||||
if stat.ErrorCode != commonpb.ErrorCode_SUCCESS{
|
||||
return stat
|
||||
}
|
||||
}
|
||||
// update timestamp if necessary
|
||||
ts, _ := req.Ts()
|
||||
req.proxy.reqSch.mTimestampMux.Lock()
|
||||
|
@ -73,7 +78,7 @@ func (req *manipulationReq) WaitToFinish() commonpb.Status { // wait until send
|
|||
} else {
|
||||
log.Printf("there is some wrong with m_timestamp, it goes back, current = %d, previous = %d", ts, req.proxy.reqSch.mTimestamp)
|
||||
}
|
||||
return req.Status
|
||||
return req.stats[0]
|
||||
}
|
||||
|
||||
func (s *proxyServer) restartManipulationRoutine(bufSize int) error {
|
||||
|
@ -109,22 +114,23 @@ func (s *proxyServer) restartManipulationRoutine(bufSize int) error {
|
|||
ts, st := s.getTimestamp(1)
|
||||
if st.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
log.Printf("get time stamp failed, error code = %d, msg = %s", st.ErrorCode, st.Reason)
|
||||
ip.Status = st
|
||||
ip.stats[0] = st
|
||||
ip.wg.Done()
|
||||
break
|
||||
}
|
||||
ip.SetTs(ts[0])
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for _, mq := range ip.msgs {
|
||||
for i, mq := range ip.msgs {
|
||||
mq := mq
|
||||
i := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
mb, err := proto.Marshal(mq)
|
||||
if err != nil {
|
||||
log.Printf("Marshal ManipulationReqMsg failed, error = %v", err)
|
||||
ip.Status = commonpb.Status{
|
||||
ip.stats[i] = commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("Marshal ManipulationReqMsg failed, error=%v", err),
|
||||
}
|
||||
|
@ -135,7 +141,7 @@ func (s *proxyServer) restartManipulationRoutine(bufSize int) error {
|
|||
case pb.ReqType_kInsert:
|
||||
if _, err := readers[mq.ChannelId].Send(s.ctx, &pulsar.ProducerMessage{Payload: mb}); err != nil {
|
||||
log.Printf("post into puslar failed, error = %v", err)
|
||||
ip.Status = commonpb.Status{
|
||||
ip.stats[i] = commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("Post into puslar failed, error=%v", err.Error()),
|
||||
}
|
||||
|
@ -144,6 +150,10 @@ func (s *proxyServer) restartManipulationRoutine(bufSize int) error {
|
|||
case pb.ReqType_kDeleteEntityByID:
|
||||
if _, err = deleter.Send(s.ctx, &pulsar.ProducerMessage{Payload: mb}); err != nil {
|
||||
log.Printf("post into pulsar filed, error = %v", err)
|
||||
ip.stats[i] = commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("Post into puslar failed, error=%v", err.Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
default:
|
||||
|
|
|
@ -156,6 +156,7 @@ func (s *proxyServer) DeleteByID(ctx context.Context, req *pb.DeleteByIDParam) (
|
|||
}
|
||||
if len(mReqMsg.PrimaryKeys) > 1 {
|
||||
mReq := &manipulationReq{
|
||||
stats: make([]commonpb.Status, 1),
|
||||
msgs: append([]*pb.ManipulationReqMsg{}, &mReqMsg),
|
||||
proxy: s,
|
||||
}
|
||||
|
@ -222,10 +223,10 @@ func (s *proxyServer) Insert(ctx context.Context, req *servicepb.RowBatch) (*ser
|
|||
|
||||
// TODO: alloc manipulation request id
|
||||
mReq := manipulationReq{
|
||||
Status: commonpb.Status{},
|
||||
msgs: make([]*pb.ManipulationReqMsg, len(msgMap)),
|
||||
wg: sync.WaitGroup{},
|
||||
proxy: s,
|
||||
stats: make([]commonpb.Status, len(msgMap)),
|
||||
msgs: make([]*pb.ManipulationReqMsg, len(msgMap)),
|
||||
wg: sync.WaitGroup{},
|
||||
proxy: s,
|
||||
}
|
||||
for _, v := range msgMap {
|
||||
mReq.msgs = append(mReq.msgs, v)
|
||||
|
|
Loading…
Reference in New Issue