mirror of https://github.com/milvus-io/milvus.git
181 lines
5.2 KiB
Go
181 lines
5.2 KiB
Go
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
package querynode
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/opentracing/opentracing-go"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
|
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
|
"github.com/milvus-io/milvus/internal/util/trace"
|
|
)
|
|
|
|
type insertNode struct {
|
|
baseNode
|
|
replica ReplicaInterface
|
|
}
|
|
|
|
type InsertData struct {
|
|
insertIDs map[UniqueID][]int64
|
|
insertTimestamps map[UniqueID][]Timestamp
|
|
insertRecords map[UniqueID][]*commonpb.Blob
|
|
insertOffset map[UniqueID]int64
|
|
}
|
|
|
|
func (iNode *insertNode) Name() string {
|
|
return "iNode"
|
|
}
|
|
|
|
func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|
//log.Debug("Do insertNode operation")
|
|
|
|
if len(in) != 1 {
|
|
log.Error("Invalid operate message input in insertNode", zap.Int("input length", len(in)))
|
|
// TODO: add error handling
|
|
}
|
|
|
|
iMsg, ok := in[0].(*insertMsg)
|
|
if !ok {
|
|
log.Warn("type assertion failed for insertMsg")
|
|
// TODO: add error handling
|
|
}
|
|
|
|
insertData := InsertData{
|
|
insertIDs: make(map[UniqueID][]int64),
|
|
insertTimestamps: make(map[UniqueID][]Timestamp),
|
|
insertRecords: make(map[UniqueID][]*commonpb.Blob),
|
|
insertOffset: make(map[UniqueID]int64),
|
|
}
|
|
|
|
if iMsg == nil {
|
|
return []Msg{}
|
|
}
|
|
|
|
var spans []opentracing.Span
|
|
for _, msg := range iMsg.insertMessages {
|
|
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
|
|
spans = append(spans, sp)
|
|
msg.SetTraceCtx(ctx)
|
|
}
|
|
|
|
// 1. hash insertMessages to insertData
|
|
for _, task := range iMsg.insertMessages {
|
|
// check if partition exists, if not, create partition
|
|
if hasPartition := iNode.replica.hasPartition(task.PartitionID); !hasPartition {
|
|
err := iNode.replica.addPartition(task.CollectionID, task.PartitionID)
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
continue
|
|
}
|
|
}
|
|
|
|
// check if segment exists, if not, create this segment
|
|
if !iNode.replica.hasSegment(task.SegmentID) {
|
|
err := iNode.replica.addSegment(task.SegmentID, task.PartitionID, task.CollectionID, task.ShardName, segmentTypeGrowing, true)
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
continue
|
|
}
|
|
}
|
|
|
|
insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
|
|
insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
|
|
insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
|
|
}
|
|
|
|
// 2. do preInsert
|
|
for segmentID := range insertData.insertRecords {
|
|
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
}
|
|
|
|
var numOfRecords = len(insertData.insertRecords[segmentID])
|
|
if targetSegment != nil {
|
|
offset, err := targetSegment.segmentPreInsert(numOfRecords)
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
}
|
|
insertData.insertOffset[segmentID] = offset
|
|
log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID))
|
|
}
|
|
}
|
|
|
|
// 3. do insert
|
|
wg := sync.WaitGroup{}
|
|
for segmentID := range insertData.insertRecords {
|
|
wg.Add(1)
|
|
go iNode.insert(&insertData, segmentID, &wg)
|
|
}
|
|
wg.Wait()
|
|
|
|
var res Msg = &serviceTimeMsg{
|
|
timeRange: iMsg.timeRange,
|
|
}
|
|
for _, sp := range spans {
|
|
sp.Finish()
|
|
}
|
|
|
|
return []Msg{res}
|
|
}
|
|
|
|
func (iNode *insertNode) insert(insertData *InsertData, segmentID UniqueID, wg *sync.WaitGroup) {
|
|
log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID))
|
|
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
|
|
if err != nil {
|
|
log.Warn("cannot find segment:", zap.Int64("segmentID", segmentID))
|
|
// TODO: add error handling
|
|
wg.Done()
|
|
return
|
|
}
|
|
|
|
if targetSegment.segmentType != segmentTypeGrowing {
|
|
wg.Done()
|
|
return
|
|
}
|
|
|
|
ids := insertData.insertIDs[segmentID]
|
|
timestamps := insertData.insertTimestamps[segmentID]
|
|
records := insertData.insertRecords[segmentID]
|
|
offsets := insertData.insertOffset[segmentID]
|
|
|
|
err = targetSegment.segmentInsert(offsets, &ids, ×tamps, &records)
|
|
if err != nil {
|
|
log.Debug("QueryNode: targetSegmentInsert failed", zap.Error(err))
|
|
// TODO: add error handling
|
|
wg.Done()
|
|
return
|
|
}
|
|
|
|
log.Debug("Do insert done", zap.Int("len", len(insertData.insertIDs[segmentID])),
|
|
zap.Int64("segmentID", segmentID))
|
|
wg.Done()
|
|
}
|
|
|
|
func newInsertNode(replica ReplicaInterface) *insertNode {
|
|
maxQueueLength := Params.FlowGraphMaxQueueLength
|
|
maxParallelism := Params.FlowGraphMaxParallelism
|
|
|
|
baseNode := baseNode{}
|
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
|
baseNode.SetMaxParallelism(maxParallelism)
|
|
|
|
return &insertNode{
|
|
baseNode: baseNode,
|
|
replica: replica,
|
|
}
|
|
}
|