diff --git a/internal/proxy/grpc_service.go b/internal/proxy/grpc_service.go index ace97098be..ba0f92dc39 100644 --- a/internal/proxy/grpc_service.go +++ b/internal/proxy/grpc_service.go @@ -39,7 +39,6 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb. var cancel func() it.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval) - // TODO: req_id, segment_id, channel_id, proxy_id, timestamps, row_ids defer cancel() @@ -81,7 +80,6 @@ func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSc CreateCollectionRequest: internalpb.CreateCollectionRequest{ MsgType: internalpb.MsgType_kCreateCollection, Schema: &commonpb.Blob{}, - // TODO: req_id, timestamp, proxy_id }, masterClient: p.masterClient, } @@ -124,7 +122,6 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu SearchRequest: internalpb.SearchRequest{ MsgType: internalpb.MsgType_kSearch, Query: &commonpb.Blob{}, - // TODO: req_id, proxy_id, timestamp, result_channel_id }, queryMsgStream: p.queryMsgStream, resultBuf: make(chan []*internalpb.SearchResult), @@ -174,8 +171,7 @@ func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionNam dct := &DropCollectionTask{ Condition: NewTaskCondition(ctx), DropCollectionRequest: internalpb.DropCollectionRequest{ - MsgType: internalpb.MsgType_kDropCollection, - // TODO: req_id, timestamp, proxy_id + MsgType: internalpb.MsgType_kDropCollection, CollectionName: req, }, masterClient: p.masterClient, @@ -215,8 +211,7 @@ func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName hct := &HasCollectionTask{ Condition: NewTaskCondition(ctx), HasCollectionRequest: internalpb.HasCollectionRequest{ - MsgType: internalpb.MsgType_kHasCollection, - // TODO: req_id, timestamp, proxy_id + MsgType: internalpb.MsgType_kHasCollection, CollectionName: req, }, masterClient: p.masterClient, @@ -260,8 +255,7 @@ func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.Collectio dct := &DescribeCollectionTask{ Condition: NewTaskCondition(ctx), DescribeCollectionRequest: internalpb.DescribeCollectionRequest{ - MsgType: internalpb.MsgType_kDescribeCollection, - // TODO: req_id, timestamp, proxy_id + MsgType: internalpb.MsgType_kDescribeCollection, CollectionName: req, }, masterClient: p.masterClient, @@ -306,7 +300,6 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv Condition: NewTaskCondition(ctx), ShowCollectionRequest: internalpb.ShowCollectionRequest{ MsgType: internalpb.MsgType_kDescribeCollection, - // TODO: req_id, timestamp, proxy_id }, masterClient: p.masterClient, } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index f8193670c4..3b6b3ba223 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -58,10 +58,6 @@ func CreateProxy(ctx context.Context) (*Proxy, error) { // TODO: use config instead pulsarAddress := Params.PulsarAddress() - p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamInsertBufSize()) - p.manipulationMsgStream.SetPulsarClient(pulsarAddress) - p.manipulationMsgStream.CreatePulsarProducers(Params.InsertChannelNames()) - p.queryMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamSearchBufSize()) p.queryMsgStream.SetPulsarClient(pulsarAddress) p.queryMsgStream.CreatePulsarProducers(Params.SearchChannelNames()) @@ -86,6 +82,14 @@ func CreateProxy(ctx context.Context) (*Proxy, error) { } p.segAssigner = segAssigner + p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamInsertBufSize()) + p.manipulationMsgStream.SetPulsarClient(pulsarAddress) + p.manipulationMsgStream.CreatePulsarProducers(Params.InsertChannelNames()) + repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { + return insertRepackFunc(tsMsgs, hashKeys, p.segAssigner, false) + } + p.manipulationMsgStream.SetRepackFunc(repackFuncImpl) + p.sched, err = NewTaskScheduler(p.proxyLoopCtx, p.idAllocator, p.tsoAllocator) if err != nil { return nil, err diff --git a/internal/proxy/repack_func.go b/internal/proxy/repack_func.go new file mode 100644 index 0000000000..ec12f475e2 --- /dev/null +++ b/internal/proxy/repack_func.go @@ -0,0 +1,97 @@ +package proxy + +import ( + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +func insertRepackFunc(tsMsgs []msgstream.TsMsg, + hashKeys [][]int32, + segIDAssigner *allocator.SegIDAssigner, + together bool) (map[int32]*msgstream.MsgPack, error) { + + result := make(map[int32]*msgstream.MsgPack) + + for i, request := range tsMsgs { + if request.Type() != internalpb.MsgType_kInsert { + return nil, errors.New(string("msg's must be Insert")) + } + insertRequest, ok := request.(*msgstream.InsertMsg) + if !ok { + return nil, errors.New(string("msg's must be Insert")) + } + keys := hashKeys[i] + + timestampLen := len(insertRequest.Timestamps) + rowIDLen := len(insertRequest.RowIDs) + rowDataLen := len(insertRequest.RowData) + keysLen := len(keys) + + if keysLen != timestampLen || keysLen != rowIDLen || keysLen != rowDataLen { + return nil, errors.New(string("the length of hashValue, timestamps, rowIDs, RowData are not equal")) + } + + reqID := insertRequest.ReqID + collectionName := insertRequest.CollectionName + partitionTag := insertRequest.PartitionTag + channelID := insertRequest.ChannelID + proxyID := insertRequest.ProxyID + for index, key := range keys { + ts := insertRequest.Timestamps[index] + rowID := insertRequest.RowIDs[index] + row := insertRequest.RowData[index] + _, ok := result[key] + if !ok { + msgPack := msgstream.MsgPack{} + result[key] = &msgPack + } + sliceRequest := internalpb.InsertRequest{ + MsgType: internalpb.MsgType_kInsert, + ReqID: reqID, + CollectionName: collectionName, + PartitionTag: partitionTag, + SegmentID: 0, // will be assigned later if together + ChannelID: channelID, + ProxyID: proxyID, + Timestamps: []uint64{ts}, + RowIDs: []int64{rowID}, + RowData: []*commonpb.Blob{row}, + } + insertMsg := &msgstream.InsertMsg{ + InsertRequest: sliceRequest, + } + if together { // all rows with same hash value are accumulated to only one message + if len(result[key].Msgs) <= 0 { + result[key].Msgs = append(result[key].Msgs, insertMsg) + } else { + accMsgs, _ := result[key].Msgs[0].(*msgstream.InsertMsg) + accMsgs.Timestamps = append(accMsgs.Timestamps, ts) + accMsgs.RowIDs = append(accMsgs.RowIDs, rowID) + accMsgs.RowData = append(accMsgs.RowData, row) + } + } else { // every row is a message + segID, _ := segIDAssigner.GetSegmentID(collectionName, partitionTag, int32(channelID), 1) + insertMsg.SegmentID = segID + result[key].Msgs = append(result[key].Msgs, insertMsg) + } + } + } + + if together { + for key := range result { + insertMsg, _ := result[key].Msgs[0].(*msgstream.InsertMsg) + rowNums := len(insertMsg.RowIDs) + collectionName := insertMsg.CollectionName + partitionTag := insertMsg.PartitionTag + channelID := insertMsg.ChannelID + segID, _ := segIDAssigner.GetSegmentID(collectionName, partitionTag, int32(channelID), uint32(rowNums)) + insertMsg.SegmentID = segID + result[key].Msgs[0] = insertMsg + } + } + + return result, nil +} diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 12dc1551ac..538fd05326 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -14,7 +14,8 @@ import ( ) type task interface { - ID() UniqueID // return ReqID + ID() UniqueID // return ReqID + SetID(uid UniqueID) // set ReqID Type() internalpb.MsgType BeginTs() Timestamp EndTs() Timestamp @@ -38,6 +39,10 @@ type InsertTask struct { rowIDAllocator *allocator.IDAllocator } +func (it *InsertTask) SetID(uid UniqueID) { + it.ReqID = uid +} + func (it *InsertTask) SetTs(ts Timestamp) { it.ts = ts } @@ -121,6 +126,10 @@ func (cct *CreateCollectionTask) ID() UniqueID { return cct.ReqID } +func (cct *CreateCollectionTask) SetID(uid UniqueID) { + cct.ReqID = uid +} + func (cct *CreateCollectionTask) Type() internalpb.MsgType { return cct.MsgType } @@ -171,6 +180,10 @@ func (dct *DropCollectionTask) ID() UniqueID { return dct.ReqID } +func (dct *DropCollectionTask) SetID(uid UniqueID) { + dct.ReqID = uid +} + func (dct *DropCollectionTask) Type() internalpb.MsgType { return dct.MsgType } @@ -222,6 +235,10 @@ func (qt *QueryTask) ID() UniqueID { return qt.ReqID } +func (qt *QueryTask) SetID(uid UniqueID) { + qt.ReqID = uid +} + func (qt *QueryTask) Type() internalpb.MsgType { return qt.MsgType } @@ -329,6 +346,10 @@ func (hct *HasCollectionTask) ID() UniqueID { return hct.ReqID } +func (hct *HasCollectionTask) SetID(uid UniqueID) { + hct.ReqID = uid +} + func (hct *HasCollectionTask) Type() internalpb.MsgType { return hct.MsgType } @@ -382,6 +403,10 @@ func (dct *DescribeCollectionTask) ID() UniqueID { return dct.ReqID } +func (dct *DescribeCollectionTask) SetID(uid UniqueID) { + dct.ReqID = uid +} + func (dct *DescribeCollectionTask) Type() internalpb.MsgType { return dct.MsgType } @@ -430,6 +455,10 @@ func (sct *ShowCollectionsTask) ID() UniqueID { return sct.ReqID } +func (sct *ShowCollectionsTask) SetID(uid UniqueID) { + sct.ReqID = uid +} + func (sct *ShowCollectionsTask) Type() internalpb.MsgType { return sct.MsgType } @@ -482,6 +511,10 @@ func (cpt *CreatePartitionTask) ID() UniqueID { return cpt.ReqID } +func (cpt *CreatePartitionTask) SetID(uid UniqueID) { + cpt.ReqID = uid +} + func (cpt *CreatePartitionTask) Type() internalpb.MsgType { return cpt.MsgType } @@ -523,6 +556,10 @@ func (dpt *DropPartitionTask) ID() UniqueID { return dpt.ReqID } +func (dpt *DropPartitionTask) SetID(uid UniqueID) { + dpt.ReqID = uid +} + func (dpt *DropPartitionTask) Type() internalpb.MsgType { return dpt.MsgType } @@ -564,6 +601,10 @@ func (hpt *HasPartitionTask) ID() UniqueID { return hpt.ReqID } +func (hpt *HasPartitionTask) SetID(uid UniqueID) { + hpt.ReqID = uid +} + func (hpt *HasPartitionTask) Type() internalpb.MsgType { return hpt.MsgType } @@ -605,6 +646,10 @@ func (dpt *DescribePartitionTask) ID() UniqueID { return dpt.ReqID } +func (dpt *DescribePartitionTask) SetID(uid UniqueID) { + dpt.ReqID = uid +} + func (dpt *DescribePartitionTask) Type() internalpb.MsgType { return dpt.MsgType } @@ -646,6 +691,10 @@ func (spt *ShowPartitionsTask) ID() UniqueID { return spt.ReqID } +func (spt *ShowPartitionsTask) SetID(uid UniqueID) { + spt.ReqID = uid +} + func (spt *ShowPartitionsTask) Type() internalpb.MsgType { return spt.MsgType } diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index e001584165..4eccf249e1 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -159,10 +159,14 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool { } func (queue *BaseTaskQueue) Enqueue(t task) error { - // TODO: set Ts, ReqId, ProxyId ts, _ := queue.sched.tsoAllocator.AllocOne() - log.Printf("allocate timestamp: %v", ts) + log.Printf("[Proxy] allocate timestamp: %v", ts) t.SetTs(ts) + + reqID, _ := queue.sched.idAllocator.AllocOne() + log.Printf("[Proxy] allocate reqID: %v", reqID) + t.SetID(reqID) + return queue.addUnissuedTask(t) } @@ -180,15 +184,18 @@ type DqTaskQueue struct { } func (queue *DdTaskQueue) Enqueue(t task) error { - queue.lock.Lock() defer queue.lock.Unlock() - // TODO: set Ts, ReqId, ProxyId ts, _ := queue.sched.tsoAllocator.AllocOne() + log.Printf("[Proxy] allocate timestamp: %v", ts) t.SetTs(ts) - return queue.addUnissuedTask(t) + reqID, _ := queue.sched.idAllocator.AllocOne() + log.Printf("[Proxy] allocate reqID: %v", reqID) + t.SetID(reqID) + + return queue.addUnissuedTask(t) } func NewDdTaskQueue(sched *TaskScheduler) *DdTaskQueue { diff --git a/scripts/README.md b/scripts/README.md index 6f1dbcb0a1..c3475d6e15 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -34,7 +34,11 @@ #### Generate the go files from proto file ```shell script - make check-proto-product + cd milvus-distributed + pwd_dir=`pwd` + export PATH=$PATH:$(go env GOPATH)/bin + export protoc=${pwd_dir}/cmake_build/thirdparty/protobuf/protobuf-build/protoc + ./scripts/proto_gen_go.sh ``` #### Check code specifications @@ -49,16 +53,6 @@ make all ``` -#### Install docker-compose - -refer: https://docs.docker.com/compose/install/ -```shell script - sudo curl -L "https://github.com/docker/compose/releases/download/1.27.4/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose - sudo chmod +x /usr/local/bin/docker-compose - sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose - docker-compose --version -``` - #### Start service ```shell script