mirror of https://github.com/milvus-io/milvus.git
Add log for poxynode transform data and querynode segment insert (#5627)
* Add log for poxynode transform data and querynode segment insert Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com> * Decrease print times Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/5598/head
parent
5037646cbd
commit
7dac20c35c
|
@ -210,6 +210,17 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
collSchema := collection.schema
|
||||
// 1.2 Get Fields
|
||||
var pos int = 0 // Record position of blob
|
||||
log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("Fields", collSchema.Fields))
|
||||
var fieldIDs []int64
|
||||
var fieldTypes []schemapb.DataType
|
||||
for _, field := range collSchema.Fields {
|
||||
fieldIDs = append(fieldIDs, field.FieldID)
|
||||
fieldTypes = append(fieldTypes, field.DataType)
|
||||
}
|
||||
|
||||
log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("FieldIDs", fieldIDs))
|
||||
log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("fieldTypes", fieldTypes))
|
||||
|
||||
for _, field := range collSchema.Fields {
|
||||
switch field.DataType {
|
||||
case schemapb.DataType_FloatVector:
|
||||
|
|
|
@ -301,6 +301,7 @@ func (it *InsertTask) transferColumnBasedRequestToRowBasedData() error {
|
|||
l := len(dTypes)
|
||||
// TODO(dragondriver): big endian or little endian?
|
||||
endian := binary.LittleEndian
|
||||
printed := false
|
||||
for i := 0; i < rowNum; i++ {
|
||||
blob := &commonpb.Blob{
|
||||
Value: make([]byte, 0),
|
||||
|
@ -376,7 +377,10 @@ func (it *InsertTask) transferColumnBasedRequestToRowBasedData() error {
|
|||
log.Warn("unsupported data type")
|
||||
}
|
||||
}
|
||||
|
||||
if !printed {
|
||||
log.Debug("ProxyNode, transform", zap.Any("ID", it.ID()), zap.Any("BlobLen", len(blob.Value)), zap.Any("dTypes", dTypes))
|
||||
printed = true
|
||||
}
|
||||
it.RowData = append(it.RowData, blob)
|
||||
}
|
||||
|
||||
|
@ -618,6 +622,7 @@ func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre
|
|||
func (it *InsertTask) Execute(ctx context.Context) error {
|
||||
collectionName := it.BaseInsertTask.CollectionName
|
||||
collSchema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName)
|
||||
log.Debug("ProxyNode Insert", zap.Any("collSchema", collSchema))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -136,7 +136,15 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
}
|
||||
|
||||
func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
|
||||
log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID))
|
||||
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
|
||||
log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID),
|
||||
zap.Any("targetSegment", targetSegment),
|
||||
zap.Error(err),
|
||||
zap.Any("SegmentType", targetSegment.segmentType),
|
||||
zap.Any("enableLoadBinLog", targetSegment.enableLoadBinLog),
|
||||
)
|
||||
|
||||
if targetSegment.segmentType != segmentTypeGrowing || targetSegment.enableLoadBinLog {
|
||||
wg.Done()
|
||||
return
|
||||
|
@ -160,7 +168,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
|
|||
|
||||
err = targetSegment.segmentInsert(offsets, &ids, ×tamps, &records)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
log.Debug("QueryNode: targetSegmentInsert failed", zap.Error(err))
|
||||
// TODO: add error handling
|
||||
wg.Done()
|
||||
return
|
||||
|
|
|
@ -295,7 +295,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas
|
|||
return status, nil
|
||||
}
|
||||
|
||||
// deprecated
|
||||
// ReleaseSegments deprecated
|
||||
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
@ -337,20 +337,27 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
|
|||
}
|
||||
// get info from historical
|
||||
for _, id := range in.SegmentIDs {
|
||||
log.Debug("QueryNode::Impl::GetSegmentInfo for historical", zap.Any("SegmentID", id))
|
||||
segment, err := node.historical.replica.getSegmentByID(id)
|
||||
if err != nil {
|
||||
log.Debug("QueryNode::Impl::GetSegmentInfo, for historical segmentID not exist", zap.Any("SegmentID", id))
|
||||
continue
|
||||
}
|
||||
info := getSegmentInfo(segment)
|
||||
log.Debug("QueryNode::Impl::GetSegmentInfo for historical", zap.Any("SegmentID", id), zap.Any("info", info))
|
||||
|
||||
infos = append(infos, info)
|
||||
}
|
||||
// get info from streaming
|
||||
for _, id := range in.SegmentIDs {
|
||||
log.Debug("QueryNode::Impl::GetSegmentInfo for streaming", zap.Any("SegmentID", id))
|
||||
segment, err := node.streaming.replica.getSegmentByID(id)
|
||||
if err != nil {
|
||||
log.Debug("QueryNode::Impl::GetSegmentInfo, for streaming segmentID not exist", zap.Any("SegmentID", id))
|
||||
continue
|
||||
}
|
||||
info := getSegmentInfo(segment)
|
||||
log.Debug("QueryNode::Impl::GetSegmentInfo for streaming", zap.Any("SegmentID", id), zap.Any("info", info))
|
||||
infos = append(infos, info)
|
||||
}
|
||||
return &queryPb.GetSegmentInfoResponse{
|
||||
|
|
|
@ -165,10 +165,13 @@ func (s *Segment) getRowCount() int64 {
|
|||
long int
|
||||
getRowCount(CSegmentInterface c_segment);
|
||||
*/
|
||||
segmentPtrIsNil := s.segmentPtr == nil
|
||||
log.Debug("QueryNode::Segment::getRowCount", zap.Any("segmentPtrIsNil", segmentPtrIsNil))
|
||||
if s.segmentPtr == nil {
|
||||
return -1
|
||||
}
|
||||
var rowCount = C.GetRowCount(s.segmentPtr)
|
||||
log.Debug("QueryNode::Segment::getRowCount", zap.Any("rowCount", rowCount))
|
||||
return int64(rowCount)
|
||||
}
|
||||
|
||||
|
@ -452,9 +455,12 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
|
|||
int sizeof_per_row,
|
||||
signed long int count);
|
||||
*/
|
||||
log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("segmentType", s.segmentType))
|
||||
log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("enableLoadBinLog", s.enableLoadBinLog))
|
||||
if s.segmentType != segmentTypeGrowing || s.enableLoadBinLog {
|
||||
return nil
|
||||
}
|
||||
log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("s.sgmentPtr", s.segmentPtr))
|
||||
|
||||
if s.segmentPtr == nil {
|
||||
return errors.New("null seg core pointer")
|
||||
|
@ -478,7 +484,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
|
|||
var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
|
||||
var cSizeofPerRow = C.int(sizeofPerRow)
|
||||
var cRawDataVoidPtr = unsafe.Pointer(&rawData[0])
|
||||
|
||||
log.Debug("QueryNode::Segment::InsertBegin", zap.Any("cNumOfRows", cNumOfRows))
|
||||
var status = C.Insert(s.segmentPtr,
|
||||
cOffset,
|
||||
cNumOfRows,
|
||||
|
@ -489,11 +495,14 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
|
|||
cNumOfRows)
|
||||
|
||||
errorCode := status.error_code
|
||||
log.Debug("QueryNode::Segment::InsertEnd", zap.Any("errorCode", errorCode))
|
||||
|
||||
if errorCode != 0 {
|
||||
errorMsg := C.GoString(status.error_msg)
|
||||
defer C.free(unsafe.Pointer(status.error_msg))
|
||||
return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
|
||||
err := errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
|
||||
log.Debug("QueryNode::Segment::InsertEnd failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
s.setRecentlyModified(true)
|
||||
|
|
Loading…
Reference in New Issue