Refactor logs in DataCoord & DataNode ()

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
pull/25630/head
Enwei Jiao 2023-07-14 15:56:31 +08:00 committed by GitHub
parent d216f9abda
commit 66fdc71479
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 318 additions and 1055 deletions

View File

@ -95,7 +95,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
log.Info("zero timeoutTs, skip starting timer",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.String("channelName", channelName),
)
return
}
@ -110,7 +110,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
log.Info("timer started",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.String("channelName", channelName),
zap.Duration("check interval", timeout))
defer ticker.Stop()
@ -121,7 +121,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
log.Warn("timeout and stop timer: wait for channel ACK timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.String("channelName", channelName),
zap.Duration("timeout interval", timeout),
zap.Int32("runningTimerCount", c.runningTimerCount.Load()))
ackType := getAckType(watchState)
@ -131,7 +131,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
log.Info("stop timer before timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.String("channelName", channelName),
zap.Duration("timeout interval", timeout),
zap.Int32("runningTimerCount", c.runningTimerCount.Load()))
return

View File

@ -216,7 +216,7 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
log.Info("processing watch info",
zap.String("watch state", info.GetState().String()),
zap.String("channel name", channelName))
zap.String("channelName", channelName))
switch info.GetState() {
case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete:
@ -378,8 +378,8 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
c.unsubAttempt(nodeChannelInfo)
updates := c.deregisterPolicy(c.store, nodeID)
log.Warn("deregister node",
zap.Int64("unregistered node", nodeID),
log.Info("deregister node",
zap.Int64("nodeID", nodeID),
zap.Array("updates", updates))
if len(updates) <= 0 {
return nil
@ -397,7 +397,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
chNames = append(chNames, ch.Name)
}
log.Info("remove timers for channel of the deregistered node",
zap.Any("channels", chNames), zap.Int64("nodeID", nodeID))
zap.Strings("channels", chNames), zap.Int64("nodeID", nodeID))
c.stateTimer.removeTimers(chNames)
if err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch); err != nil {
@ -445,7 +445,7 @@ func (c *ChannelManager) Watch(ch *channel) error {
err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
if err != nil {
log.Warn("fail to update channel watch info with ToWatch state",
zap.Any("channel", ch), zap.Array("updates", updates), zap.Error(err))
zap.String("channel", ch.String()), zap.Array("updates", updates), zap.Error(err))
}
return err
}
@ -627,17 +627,17 @@ func (c *ChannelManager) processAck(e *ackEvent) {
switch e.ackType {
case invalidAck:
log.Warn("detected invalid Ack", zap.String("channel name", e.channelName))
log.Warn("detected invalid Ack", zap.String("channelName", e.channelName))
case watchSuccessAck:
log.Info("datanode successfully watched channel", zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName))
log.Info("datanode successfully watched channel", zap.Int64("nodeID", e.nodeID), zap.String("channelName", e.channelName))
case watchFailAck, watchTimeoutAck: // failure acks from toWatch
log.Warn("datanode watch channel failed or timeout, will release", zap.Int64("nodeID", e.nodeID),
zap.String("channel", e.channelName))
err := c.Release(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to set channels to release for watch failure ACKs",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
zap.Int64("nodeID", e.nodeID), zap.String("channelName", e.channelName), zap.Error(err))
}
case releaseFailAck, releaseTimeoutAck: // failure acks from toRelease
// Cleanup, Delete and Reassign
@ -646,7 +646,7 @@ func (c *ChannelManager) processAck(e *ackEvent) {
err := c.CleanupAndReassign(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to clean and reassign channels for release failure ACKs",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
zap.Int64("nodeID", e.nodeID), zap.String("channelName", e.channelName), zap.Error(err))
}
case releaseSuccessAck:
@ -656,7 +656,7 @@ func (c *ChannelManager) processAck(e *ackEvent) {
err := c.Reassign(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to response to release success ACK",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
zap.Int64("nodeID", e.nodeID), zap.String("channelName", e.channelName), zap.Error(err))
}
}
}
@ -685,7 +685,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context, revision in
case ackEvent := <-timeoutWatcher:
log.Info("receive timeout acks from state watcher",
zap.Any("state", ackEvent.ackType),
zap.Int64("nodeID", ackEvent.nodeID), zap.String("channel name", ackEvent.channelName))
zap.Int64("nodeID", ackEvent.nodeID), zap.String("channelName", ackEvent.channelName))
c.processAck(ackEvent)
case event, ok := <-etcdWatcher:
if !ok {
@ -781,7 +781,7 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err
if err := c.h.FinishDropChannel(channelName); err != nil {
return fmt.Errorf("FinishDropChannel failed, err=%w", err)
}
log.Info("removed channel assignment", zap.String("channel name", channelName))
log.Info("removed channel assignment", zap.String("channelName", channelName))
return nil
}
@ -791,7 +791,7 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err
// Skip the remove if reassign to the original node.
log.Warn("failed to reassign channel to other nodes, assigning to the original DataNode",
zap.Int64("nodeID", originNodeID),
zap.String("channel name", channelName))
zap.String("channelName", channelName))
updates.Add(originNodeID, []*channel{ch})
}
@ -826,7 +826,7 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}
log.Info("try to cleanup removal flag ", zap.String("channel name", channelName))
log.Info("try to cleanup removal flag ", zap.String("channelName", channelName))
if err := c.h.FinishDropChannel(channelName); err != nil {
return fmt.Errorf("FinishDropChannel failed, err=%w", err)
}
@ -841,7 +841,7 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
// Skip the remove if reassign to the original node.
log.Warn("failed to reassign channel to other nodes, add channel to the original node",
zap.Int64("node ID", nodeID),
zap.String("channel name", channelName))
zap.String("channelName", channelName))
updates.Add(nodeID, []*channel{chToCleanUp})
}

View File

@ -325,7 +325,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
if state == commonpb.CompactionState_Executing && !c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) {
continue
}
log.Info("compaction timeout",
log.Warn("compaction timeout",
zap.Int64("planID", task.plan.PlanID),
zap.Int64("nodeID", task.dataNodeID),
zap.Uint64("startTime", task.plan.GetStartTime()),

View File

@ -406,14 +406,14 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
if !signal.isForce && t.compactionHandler.isFull() {
log.Warn("compaction plan skipped due to handler full",
zap.Int64("collectionID", signal.collectionID),
zap.Int64s("segment IDs", segIDs))
zap.Int64s("segmentIDs", segIDs))
break
}
start := time.Now()
if err := t.fillOriginPlan(plan); err != nil {
log.Warn("failed to fill plan",
zap.Int64("collectionID", signal.collectionID),
zap.Int64s("segment IDs", segIDs),
zap.Int64s("segmentIDs", segIDs),
zap.Error(err))
continue
}
@ -422,7 +422,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
log.Warn("failed to execute compaction plan",
zap.Int64("collectionID", signal.collectionID),
zap.Int64("planID", plan.PlanID),
zap.Int64s("segment IDs", segIDs),
zap.Int64s("segmentIDs", segIDs),
zap.Error(err))
continue
}
@ -435,11 +435,11 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
log.Info("time cost of generating global compaction",
zap.Any("segID2DeltaLogs", segIDMap),
zap.Int64("planID", plan.PlanID),
zap.Any("time cost", time.Since(start).Milliseconds()),
zap.Int64("time cost", time.Since(start).Milliseconds()),
zap.Int64("collectionID", signal.collectionID),
zap.String("channel", group.channelName),
zap.Int64("partitionID", group.partitionID),
zap.Int64s("segment IDs", segIDs))
zap.Int64s("segmentIDs", segIDs))
}
}
}
@ -529,9 +529,9 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
log.Info("time cost of generating compaction",
zap.Int64("plan ID", plan.PlanID),
zap.Any("time cost", time.Since(start).Milliseconds()),
zap.Int64("collection ID", signal.collectionID),
zap.Int64("collectionID", signal.collectionID),
zap.String("channel", channel),
zap.Int64("partition ID", partitionID),
zap.Int64("partitionID", partitionID),
zap.Int64s("segment IDs", fetchSegIDs(plan.GetSegmentBinlogs())))
}
}
@ -655,7 +655,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i
targetRow > int64(float64(segment.GetMaxRowNum())*Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat()) {
plan := segmentsToPlan(bucket, compactTime)
log.Info("generate a plan for small candidates",
zap.Int64s("plan segment IDs", lo.Map(bucket, getSegmentIDs)),
zap.Int64s("plan segmentIDs", lo.Map(bucket, getSegmentIDs)),
zap.Int64("target segment row", targetRow),
zap.Int64("target segment size", size))
plans = append(plans, plan)
@ -681,9 +681,9 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i
plan.TotalRows += s.GetNumOfRows()
plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinLogs)
log.Info("small segment appended on existing plan",
zap.Int64("segment ID", s.GetID()),
zap.Int64("segmentID", s.GetID()),
zap.Int64("target rows", plan.GetTotalRows()),
zap.Int64s("plan segment ID", getSegIDsFromPlan(plan)),
zap.Int64s("plan segmentID", getSegIDsFromPlan(plan)),
)
remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...)
@ -708,7 +708,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i
plan := segmentsToPlan(bucket, compactTime)
plans = append(plans, plan)
log.Info("generate a plan for to squeeze small candidates into non-planned segment",
zap.Int64s("plan segment IDs", lo.Map(bucket, getSegmentIDs)),
zap.Int64s("plan segmentIDs", lo.Map(bucket, getSegmentIDs)),
zap.Int64("target segment row", targetRow),
)
}
@ -835,7 +835,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
// TODO maybe we want to compact to single statslog to reduce watch dml channel cost
// TODO avoid rebuild index twice.
if statsLog > maxSize*2.0 {
log.Info("stats number is too much, trigger compaction", zap.Int64("segment", segment.ID), zap.Int("Bin logs", binLog), zap.Int("Stat logs", statsLog))
log.Info("stats number is too much, trigger compaction", zap.Int64("segmentID", segment.ID), zap.Int("Bin logs", binLog), zap.Int("Stat logs", statsLog))
return true
}
}
@ -846,7 +846,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
}
if deltaLog > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt() {
log.Info("total delta number is too much, trigger compaction", zap.Int64("segment", segment.ID), zap.Int("Bin logs", binLog), zap.Int("Delta logs", deltaLog))
log.Info("total delta number is too much, trigger compaction", zap.Int64("segmentID", segment.ID), zap.Int("Bin logs", binLog), zap.Int("Delta logs", deltaLog))
return true
}
@ -864,7 +864,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
}
if float64(totalExpiredRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() || totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize.GetAsInt64() {
log.Info("total expired entities is too much, trigger compaction", zap.Int64("segment", segment.ID),
log.Info("total expired entities is too much, trigger compaction", zap.Int64("segmentID", segment.ID),
zap.Int("expired rows", totalExpiredRows), zap.Int64("expired log size", totalExpiredSize))
return true
}
@ -890,7 +890,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
// currently delta log size and delete ratio policy is applied
if float64(totalDeletedRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() || totalDeleteLogSize > Params.DataCoordCfg.SingleCompactionDeltaLogMaxSize.GetAsInt64() {
log.Info("total delete entities is too much, trigger compaction", zap.Int64("segment", segment.ID),
log.Info("total delete entities is too much, trigger compaction", zap.Int64("segmentID", segment.ID),
zap.Int("deleted rows", totalDeletedRows), zap.Int64("delete log size", totalDeleteLogSize))
return true
}

View File

@ -63,7 +63,7 @@ func (b *CoordinatorBroker) DescribeCollectionInternal(ctx context.Context, coll
CollectionID: collectionID,
})
if err = VerifyResponse(resp, err); err != nil {
log.Error("DescribeCollectionInternal failed",
log.Warn("DescribeCollectionInternal failed",
zap.Int64("collectionID", collectionID),
zap.Error(err))
return nil, err
@ -85,7 +85,7 @@ func (b *CoordinatorBroker) ShowPartitionsInternal(ctx context.Context, collecti
CollectionID: collectionID,
})
if err = VerifyResponse(resp, err); err != nil {
log.Error("ShowPartitionsInternal failed",
log.Warn("ShowPartitionsInternal failed",
zap.Int64("collectionID", collectionID),
zap.Error(err))
return nil, err

View File

@ -268,7 +268,7 @@ func (gc *garbageCollector) clearEtcd() {
log.Info("GC segment", zap.Int64("segmentID", segment.GetID()))
if gc.removeLogs(logs) {
err := gc.meta.DropSegment(segment.GetID())
log.Warn("failed to drop segment", zap.Int64("segment id", segment.GetID()), zap.Error(err))
log.Warn("failed to drop segment", zap.Int64("segmentID", segment.GetID()), zap.Error(err))
}
if segList := gc.meta.GetSegmentsByChannel(segInsertChannel); len(segList) == 0 &&
!gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) {
@ -329,7 +329,7 @@ func (gc *garbageCollector) recycleUnusedIndexes() {
deletedIndexes := gc.meta.GetDeletedIndexes()
for _, index := range deletedIndexes {
if err := gc.meta.RemoveIndex(index.CollectionID, index.IndexID); err != nil {
log.Warn("remove index on collection fail", zap.Int64("collID", index.CollectionID),
log.Warn("remove index on collection fail", zap.Int64("collectionID", index.CollectionID),
zap.Int64("indexID", index.IndexID), zap.Error(err))
continue
}
@ -342,11 +342,11 @@ func (gc *garbageCollector) recycleUnusedSegIndexes() {
if gc.meta.GetSegment(segIdx.SegmentID) == nil || !gc.meta.IsIndexExist(segIdx.CollectionID, segIdx.IndexID) {
if err := gc.meta.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, segIdx.BuildID); err != nil {
log.Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", segIdx.BuildID),
zap.Int64("segID", segIdx.SegmentID), zap.Int64("nodeID", segIdx.NodeID), zap.Error(err))
zap.Int64("segmentID", segIdx.SegmentID), zap.Int64("nodeID", segIdx.NodeID), zap.Error(err))
continue
}
log.Info("index meta recycle success", zap.Int64("buildID", segIdx.BuildID),
zap.Int64("segID", segIdx.SegmentID))
zap.Int64("segmentID", segIdx.SegmentID))
}
}
}

View File

@ -259,7 +259,7 @@ func (h *ServerHandler) getEarliestSegmentDMLPos(channel *channel, partitionIDs
}
if minPos != nil {
log.Info("getEarliestSegmentDMLPos done",
zap.Int64("segment ID", minPosSegID),
zap.Int64("segmentID", minPosSegID),
zap.Uint64("posTs", minPosTs),
zap.Time("posTime", tsoutil.PhysicalTime(minPosTs)))
}

View File

@ -229,7 +229,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
indexParams := ib.meta.GetIndexParams(meta.CollectionID, meta.IndexID)
if isFlatIndex(getIndexType(indexParams)) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
log.Ctx(ib.ctx).Debug("segment does not need index really", zap.Int64("buildID", buildID),
zap.Int64("segID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
if err := ib.meta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
@ -307,7 +307,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
return false
}
log.Ctx(ib.ctx).Info("index task assigned successfully", zap.Int64("buildID", buildID),
zap.Int64("segID", meta.SegmentID), zap.Int64("nodeID", nodeID))
zap.Int64("segmentID", meta.SegmentID), zap.Int64("nodeID", nodeID))
// update index meta state to InProgress
if err := ib.meta.BuildIndex(buildID); err != nil {
// need to release lock then reassign, so set task state to retry

View File

@ -217,20 +217,20 @@ func (m *meta) AddSegmentIndex(segIndex *model.SegmentIndex) error {
defer m.Unlock()
buildID := segIndex.BuildID
log.Info("meta update: adding segment index", zap.Int64("collID", segIndex.CollectionID),
zap.Int64("segID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID),
log.Info("meta update: adding segment index", zap.Int64("collectionID", segIndex.CollectionID),
zap.Int64("segmentID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID),
zap.Int64("buildID", buildID))
segIndex.IndexState = commonpb.IndexState_Unissued
if err := m.catalog.CreateSegmentIndex(m.ctx, segIndex); err != nil {
log.Warn("meta update: adding segment index failed",
zap.Int64("segment ID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID),
zap.Int64("segmentID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID),
zap.Int64("buildID", segIndex.BuildID), zap.Error(err))
return err
}
m.updateSegmentIndex(segIndex)
log.Info("meta update: adding segment index success", zap.Int64("collID", segIndex.CollectionID),
zap.Int64("segID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID),
log.Info("meta update: adding segment index success", zap.Int64("collectionID", segIndex.CollectionID),
zap.Int64("segmentID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID),
zap.Int64("buildID", buildID))
m.updateIndexTasksMetrics()
return nil
@ -347,7 +347,7 @@ func (m *meta) GetIndexesForCollection(collID UniqueID, indexName string) []*mod
// MarkIndexAsDeleted will mark the corresponding index as deleted, and recycleUnusedIndexFiles will recycle these tasks.
func (m *meta) MarkIndexAsDeleted(collID UniqueID, indexIDs []UniqueID) error {
log.Info("IndexCoord metaTable MarkIndexAsDeleted", zap.Int64("collID", collID),
log.Info("IndexCoord metaTable MarkIndexAsDeleted", zap.Int64("collectionID", collID),
zap.Int64s("indexIDs", indexIDs))
m.Lock()
@ -379,7 +379,7 @@ func (m *meta) MarkIndexAsDeleted(collID UniqueID, indexIDs []UniqueID) error {
m.indexes[index.CollectionID][index.IndexID] = index
}
log.Info("IndexCoord metaTable MarkIndexAsDeleted success", zap.Int64("collID", collID), zap.Int64s("indexIDs", indexIDs))
log.Info("IndexCoord metaTable MarkIndexAsDeleted success", zap.Int64("collectionID", collID), zap.Int64s("indexIDs", indexIDs))
return nil
}
@ -569,7 +569,7 @@ func (m *meta) BuildIndex(buildID UniqueID) error {
return err
}
log.Info("meta update: segment index in progress success", zap.Int64("buildID", segIdx.BuildID),
zap.Int64("segID", segIdx.SegmentID))
zap.Int64("segmentID", segIdx.SegmentID))
m.updateIndexTasksMetrics()
return nil
@ -619,10 +619,10 @@ func (m *meta) GetDeletedIndexes() []*model.Index {
func (m *meta) RemoveIndex(collID, indexID UniqueID) error {
m.Lock()
defer m.Unlock()
log.Info("IndexCoord meta table remove index", zap.Int64("collID", collID), zap.Int64("indexID", indexID))
log.Info("IndexCoord meta table remove index", zap.Int64("collectionID", collID), zap.Int64("indexID", indexID))
err := m.catalog.DropIndex(m.ctx, collID, indexID)
if err != nil {
log.Info("IndexCoord meta table remove index fail", zap.Int64("collID", collID),
log.Info("IndexCoord meta table remove index fail", zap.Int64("collectionID", collID),
zap.Int64("indexID", indexID), zap.Error(err))
return err
}
@ -635,7 +635,7 @@ func (m *meta) RemoveIndex(collID, indexID UniqueID) error {
metrics.IndexTaskNum.Delete(prometheus.Labels{"collection_id": strconv.FormatInt(collID, 10), "index_task_status": metrics.FinishedIndexTaskLabel})
metrics.IndexTaskNum.Delete(prometheus.Labels{"collection_id": strconv.FormatInt(collID, 10), "index_task_status": metrics.FailedIndexTaskLabel})
}
log.Info("IndexCoord meta table remove index success", zap.Int64("collID", collID), zap.Int64("indexID", indexID))
log.Info("IndexCoord meta table remove index success", zap.Int64("collectionID", collID), zap.Int64("indexID", indexID))
return nil
}

View File

@ -51,7 +51,7 @@ func (s *Server) startIndexService(ctx context.Context) {
}
func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) error {
log.Info("create index for segment", zap.Int64("segID", segment.ID), zap.Int64("indexID", indexID))
log.Info("create index for segment", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", indexID))
buildID, err := s.allocator.allocID(context.Background())
if err != nil {
return err
@ -78,7 +78,7 @@ func (s *Server) createIndexesForSegment(segment *SegmentInfo) error {
for _, index := range indexes {
if _, ok := segment.segmentIndexes[index.IndexID]; !ok {
if err := s.createIndexForSegment(segment, index.IndexID); err != nil {
log.Warn("create index for segment fail", zap.Int64("segID", segment.ID),
log.Warn("create index for segment fail", zap.Int64("segmentID", segment.ID),
zap.Int64("indexID", index.IndexID))
return err
}
@ -102,30 +102,30 @@ func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
segments := s.meta.GetHasUnindexTaskSegments()
for _, segment := range segments {
if err := s.createIndexesForSegment(segment); err != nil {
log.Warn("create index for segment fail, wait for retry", zap.Int64("segID", segment.ID))
log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
continue
}
}
case collID := <-s.notifyIndexChan:
log.Info("receive create index notify", zap.Int64("collID", collID))
case collectionID := <-s.notifyIndexChan:
log.Info("receive create index notify", zap.Int64("collectionID", collectionID))
segments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && collID == info.CollectionID
return isFlush(info) && collectionID == info.CollectionID
})
for _, segment := range segments {
if err := s.createIndexesForSegment(segment); err != nil {
log.Warn("create index for segment fail, wait for retry", zap.Int64("segID", segment.ID))
log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
continue
}
}
case segID := <-s.buildIndexCh:
log.Info("receive new flushed segment", zap.Int64("segID", segID))
log.Info("receive new flushed segment", zap.Int64("segmentID", segID))
segment := s.meta.GetSegment(segID)
if segment == nil {
log.Warn("segment is not exist, no need to build index", zap.Int64("segID", segID))
log.Warn("segment is not exist, no need to build index", zap.Int64("segmentID", segID))
continue
}
if err := s.createIndexesForSegment(segment); err != nil {
log.Warn("create index for segment fail, wait for retry", zap.Int64("segID", segment.ID))
log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
continue
}
}
@ -137,8 +137,10 @@ func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
// will get all flushed segments from DataCoord and record tasks with these segments. The background process
// indexBuilder will find this task and assign it to IndexNode for execution.
func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx)
log.Info("receive CreateIndex request", zap.Int64("CollectionID", req.GetCollectionID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.CollectionID),
)
log.Info("receive CreateIndex request",
zap.String("IndexName", req.GetIndexName()), zap.Int64("fieldID", req.GetFieldID()),
zap.Any("TypeParams", req.GetTypeParams()),
zap.Any("IndexParams", req.GetIndexParams()))
@ -195,7 +197,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
err = s.meta.CreateIndex(index)
if err != nil {
log.Error("CreateIndex fail", zap.Int64("collectionID", req.GetCollectionID()),
log.Error("CreateIndex fail",
zap.Int64("fieldID", req.GetFieldID()), zap.String("indexName", req.GetIndexName()), zap.Error(err))
errResp.Reason = err.Error()
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
@ -207,7 +209,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
default:
}
log.Info("CreateIndex successfully", zap.Int64("collectionID", req.GetCollectionID()),
log.Info("CreateIndex successfully",
zap.String("IndexName", req.GetIndexName()), zap.Int64("fieldID", req.GetFieldID()),
zap.Int64("IndexID", indexID))
errResp.ErrorCode = commonpb.ErrorCode_Success
@ -218,8 +220,10 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
// GetIndexState gets the index state of the index name in the request from Proxy.
// Deprecated
func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) {
log := log.Ctx(ctx)
log.Info("receive GetIndexState request", zap.Int64("collectionID", req.CollectionID),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.CollectionID),
)
log.Info("receive GetIndexState request",
zap.String("indexName", req.IndexName))
errResp := &commonpb.Status{
@ -239,7 +243,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
if len(indexes) == 0 {
errResp.ErrorCode = commonpb.ErrorCode_IndexNotExist
errResp.Reason = fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
log.Error("GetIndexState fail", zap.Int64("collectionID", req.CollectionID),
log.Error("GetIndexState fail",
zap.String("indexName", req.IndexName), zap.String("fail reason", errResp.Reason))
return &indexpb.GetIndexStateResponse{
Status: errResp,
@ -272,14 +276,16 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
ret.State = indexInfo.State
ret.FailReason = indexInfo.IndexStateFailReason
log.Info("GetIndexState success", zap.Int64("collectionID", req.GetCollectionID()),
log.Info("GetIndexState success",
zap.String("IndexName", req.GetIndexName()), zap.String("state", ret.GetState().String()))
return ret, nil
}
func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest) (*indexpb.GetSegmentIndexStateResponse, error) {
log := log.Ctx(ctx)
log.Info("receive GetSegmentIndexState", zap.Int64("CollectionID", req.GetCollectionID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.CollectionID),
)
log.Info("receive GetSegmentIndexState",
zap.String("IndexName", req.GetIndexName()), zap.Int64s("fieldID", req.GetSegmentIDs()))
errResp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -303,8 +309,7 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
indexID2CreateTs := s.meta.GetIndexIDByName(req.GetCollectionID(), req.GetIndexName())
if len(indexID2CreateTs) == 0 {
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.GetIndexName())
log.Error("GetSegmentIndexState fail", zap.Int64("collectionID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()), zap.String("fail reason", errMsg))
log.Warn("GetSegmentIndexState fail", zap.String("indexName", req.GetIndexName()), zap.String("fail reason", errMsg))
return &indexpb.GetSegmentIndexStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IndexNotExist,
@ -320,8 +325,7 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
FailReason: state.failReason,
})
}
log.Info("GetSegmentIndexState successfully", zap.Int64("collectionID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()))
log.Info("GetSegmentIndexState successfully", zap.String("indexName", req.GetIndexName()))
return ret, nil
}
@ -446,7 +450,7 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
indexInfo.State = commonpb.IndexState_Finished
}
log.Info("completeIndexInfo success", zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
log.Info("completeIndexInfo success", zap.Int64("collectionID", index.CollectionID), zap.Int64("indexID", index.IndexID),
zap.Int64("totalRows", indexInfo.TotalRows), zap.Int64("indexRows", indexInfo.IndexedRows),
zap.Int64("pendingIndexRows", indexInfo.PendingIndexRows),
zap.String("state", indexInfo.State.String()), zap.String("failReason", indexInfo.IndexStateFailReason))
@ -455,9 +459,10 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
// GetIndexBuildProgress get the index building progress by num rows.
// Deprecated
func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) {
log := log.Ctx(ctx)
log.Info("receive GetIndexBuildProgress request", zap.Int64("collID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()))
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.CollectionID),
)
log.Info("receive GetIndexBuildProgress request", zap.String("indexName", req.GetIndexName()))
errResp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
@ -474,8 +479,7 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 {
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
log.Error("GetIndexBuildProgress fail", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
log.Warn("GetIndexBuildProgress fail", zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
return &indexpb.GetIndexBuildProgressResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IndexNotExist,
@ -503,8 +507,7 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
}), false)
log.Info("GetIndexBuildProgress success", zap.Int64("collectionID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()))
log.Info("GetIndexBuildProgress success", zap.String("indexName", req.GetIndexName()))
return &indexpb.GetIndexBuildProgressResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -517,9 +520,10 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
// DescribeIndex describe the index info of the collection.
func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
log := log.Ctx(ctx)
log.Info("receive DescribeIndex request", zap.Int64("collID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()))
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.CollectionID),
)
log.Info("receive DescribeIndex request", zap.String("indexName", req.GetIndexName()))
errResp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
@ -536,8 +540,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 {
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
log.Error("DescribeIndex fail", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
log.Warn("DescribeIndex fail", zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IndexNotExist,
@ -569,8 +572,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
s.completeIndexInfo(indexInfo, index, segments, false)
indexInfos = append(indexInfos, indexInfo)
}
log.Info("DescribeIndex success", zap.Int64("collectionID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()))
log.Info("DescribeIndex success", zap.String("indexName", req.GetIndexName()))
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -581,10 +583,10 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
// GetIndexStatistics get the statistics of the index. DescribeIndex doesn't contain statistics.
func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexStatisticsRequest) (*indexpb.GetIndexStatisticsResponse, error) {
log := log.Ctx(ctx)
log.Info("receive GetIndexStatistics request",
zap.Int64("collID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()))
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.CollectionID),
)
log.Info("receive GetIndexStatistics request", zap.String("indexName", req.GetIndexName()))
if s.isClosed() {
log.Warn(msgDataCoordIsUnhealthy(s.serverID()))
return &indexpb.GetIndexStatisticsResponse{
@ -595,8 +597,7 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 {
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
log.Error("GetIndexStatistics fail",
zap.Int64("collectionID", req.CollectionID),
log.Warn("GetIndexStatistics fail",
zap.String("indexName", req.IndexName),
zap.String("fail reason", errMsg))
return &indexpb.GetIndexStatisticsResponse{
@ -630,8 +631,7 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
s.completeIndexInfo(indexInfo, index, segments, true)
indexInfos = append(indexInfos, indexInfo)
}
log.Info("GetIndexStatisticsResponse success",
zap.Int64("collectionID", req.GetCollectionID()),
log.Debug("GetIndexStatisticsResponse success",
zap.String("indexName", req.GetIndexName()))
return &indexpb.GetIndexStatisticsResponse{
Status: &commonpb.Status{
@ -645,8 +645,10 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
// divided into many segments, and each segment corresponds to an IndexBuildID. DataCoord uses IndexBuildID to record
// index tasks.
func (s *Server) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx)
log.Info("receive DropIndex request", zap.Int64("collectionID", req.GetCollectionID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.CollectionID),
)
log.Info("receive DropIndex request",
zap.Int64s("partitionIDs", req.GetPartitionIDs()), zap.String("indexName", req.GetIndexName()),
zap.Bool("drop all indexes", req.GetDropAll()))
errResp := &commonpb.Status{
@ -684,25 +686,24 @@ func (s *Server) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (
// drop collection index
err := s.meta.MarkIndexAsDeleted(req.CollectionID, indexIDs)
if err != nil {
log.Error("DropIndex fail", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName), zap.Error(err))
log.Warn("DropIndex fail", zap.String("indexName", req.IndexName), zap.Error(err))
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
ret.Reason = err.Error()
return ret, nil
}
}
log.Info("DropIndex success", zap.Int64("collID", req.CollectionID),
zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName),
log.Debug("DropIndex success", zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName),
zap.Int64s("indexIDs", indexIDs))
return ret, nil
}
// GetIndexInfos gets the index file paths for segment from DataCoord.
func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) {
log := log.Ctx(ctx)
log.Info("receive GetIndexInfos request", zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.String("indexName", req.GetIndexName()))
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.CollectionID),
)
log.Info("receive GetIndexInfos request", zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.String("indexName", req.GetIndexName()))
errResp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
@ -756,8 +757,7 @@ func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq
}
}
log.Info("GetIndexInfos successfully", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.GetIndexName()))
log.Debug("GetIndexInfos successfully", zap.String("indexName", req.GetIndexName()))
return ret, nil
}

View File

@ -71,7 +71,7 @@ func (nm *IndexNodeManager) RemoveNode(nodeID UniqueID) {
}
func (nm *IndexNodeManager) StoppingNode(nodeID UniqueID) {
log.Info("IndexCoord", zap.Any("Stopping node with ID", nodeID))
log.Debug("IndexCoord", zap.Int64("Stopping node with ID", nodeID))
nm.lock.Lock()
defer nm.lock.Unlock()
nm.stoppingNodes[nodeID] = struct{}{}
@ -79,7 +79,7 @@ func (nm *IndexNodeManager) StoppingNode(nodeID UniqueID) {
// AddNode adds the client of IndexNode.
func (nm *IndexNodeManager) AddNode(nodeID UniqueID, address string) error {
log.Debug("add IndexNode", zap.Any("nodeID", nodeID), zap.Any("node address", address))
log.Debug("add IndexNode", zap.Int64("nodeID", nodeID), zap.String("node address", address))
var (
nodeClient types.IndexNode
err error
@ -153,7 +153,7 @@ func (nm *IndexNodeManager) PeekClient(meta *model.SegmentIndex) (UniqueID, type
}
func (nm *IndexNodeManager) ClientSupportDisk() bool {
log.Info("check if client support disk index")
log.Debug("check if client support disk index")
allClients := nm.GetAllClients()
if len(allClients) == 0 {
log.Warn("there is no IndexNode online")

View File

@ -176,14 +176,12 @@ func (m *meta) reloadFromKV() error {
// AddCollection adds a collection into meta
// Note that collection info is just for caching and will not be set into etcd from datacoord
func (m *meta) AddCollection(collection *collectionInfo) {
log.Info("meta update: add collection",
zap.Int64("collection ID", collection.ID))
log.Debug("meta update: add collection", zap.Int64("collectionID", collection.ID))
m.Lock()
defer m.Unlock()
m.collections[collection.ID] = collection
metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections)))
log.Info("meta update: add collection - complete",
zap.Int64("collection ID", collection.ID))
log.Info("meta update: add collection - complete", zap.Int64("collectionID", collection.ID))
}
// GetCollection returns collection info with provided collection id from local cache
@ -292,45 +290,42 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64) {
// AddSegment records segment info, persisting info into kv store
func (m *meta) AddSegment(segment *SegmentInfo) error {
log.Info("meta update: adding segment",
zap.Int64("segment ID", segment.GetID()))
log.Debug("meta update: adding segment", zap.Int64("segmentID", segment.GetID()))
m.Lock()
defer m.Unlock()
if err := m.catalog.AddSegment(m.ctx, segment.SegmentInfo); err != nil {
log.Error("meta update: adding segment failed",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("segmentID", segment.GetID()),
zap.Error(err))
return err
}
m.segments.SetSegment(segment.GetID(), segment)
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Inc()
log.Info("meta update: adding segment - complete",
zap.Int64("segment ID", segment.GetID()))
log.Info("meta update: adding segment - complete", zap.Int64("segmentID", segment.GetID()))
return nil
}
// DropSegment remove segment with provided id, etcd persistence also removed
func (m *meta) DropSegment(segmentID UniqueID) error {
log.Info("meta update: dropping segment",
zap.Int64("segment ID", segmentID))
log.Debug("meta update: dropping segment", zap.Int64("segmentID", segmentID))
m.Lock()
defer m.Unlock()
segment := m.segments.GetSegment(segmentID)
if segment == nil {
log.Warn("meta update: dropping segment failed - segment not found",
zap.Int64("segment ID", segmentID))
zap.Int64("segmentID", segmentID))
return nil
}
if err := m.catalog.DropSegment(m.ctx, segment.SegmentInfo); err != nil {
log.Warn("meta update: dropping segment failed",
zap.Int64("segment ID", segmentID),
zap.Int64("segmentID", segmentID),
zap.Error(err))
return err
}
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Dec()
m.segments.DropSegment(segmentID)
log.Info("meta update: dropping segment - complete",
zap.Int64("segment ID", segmentID))
zap.Int64("segmentID", segmentID))
return nil
}
@ -364,15 +359,15 @@ func (m *meta) GetAllSegmentsUnsafe() []*SegmentInfo {
// SetState setting segment with provided ID state
func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) error {
log.Info("meta update: setting segment state",
zap.Int64("segment ID", segmentID),
log.Debug("meta update: setting segment state",
zap.Int64("segmentID", segmentID),
zap.Any("target state", targetState))
m.Lock()
defer m.Unlock()
curSegInfo := m.segments.GetSegment(segmentID)
if curSegInfo == nil {
log.Warn("meta update: setting segment state - segment not found",
zap.Int64("segment ID", segmentID),
zap.Int64("segmentID", segmentID),
zap.Any("target state", targetState))
// idempotent drop
if targetState == commonpb.SegmentState_Dropped {
@ -389,8 +384,8 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
updateSegStateAndPrepareMetrics(clonedSegment, targetState, metricMutation)
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil {
log.Error("meta update: setting segment state - failed to alter segments",
zap.Int64("segment ID", segmentID),
log.Warn("meta update: setting segment state - failed to alter segments",
zap.Int64("segmentID", segmentID),
zap.String("target state", targetState.String()),
zap.Error(err))
return err
@ -401,15 +396,15 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
// Update in-memory meta.
m.segments.SetState(segmentID, targetState)
log.Info("meta update: setting segment state - complete",
zap.Int64("segment ID", segmentID),
zap.Int64("segmentID", segmentID),
zap.String("target state", targetState.String()))
return nil
}
// UnsetIsImporting removes the `isImporting` flag of a segment.
func (m *meta) UnsetIsImporting(segmentID UniqueID) error {
log.Info("meta update: unsetting isImport state of segment",
zap.Int64("segment ID", segmentID))
log.Debug("meta update: unsetting isImport state of segment",
zap.Int64("segmentID", segmentID))
m.Lock()
defer m.Unlock()
curSegInfo := m.segments.GetSegment(segmentID)
@ -421,8 +416,8 @@ func (m *meta) UnsetIsImporting(segmentID UniqueID) error {
clonedSegment.IsImporting = false
if isSegmentHealthy(clonedSegment) {
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil {
log.Error("meta update: unsetting isImport state of segment - failed to unset segment isImporting state",
zap.Int64("segment ID", segmentID),
log.Warn("meta update: unsetting isImport state of segment - failed to unset segment isImporting state",
zap.Int64("segmentID", segmentID),
zap.Error(err))
return err
}
@ -430,7 +425,7 @@ func (m *meta) UnsetIsImporting(segmentID UniqueID) error {
// Update in-memory meta.
m.segments.SetIsImporting(segmentID, false)
log.Info("meta update: unsetting isImport state of segment - complete",
zap.Int64("segment ID", segmentID))
zap.Int64("segmentID", segmentID))
return nil
}
@ -446,7 +441,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
checkpoints []*datapb.CheckPoint,
startPositions []*datapb.SegmentStartPosition,
) error {
log.Info("meta update: update flush segments info",
log.Debug("meta update: update flush segments info",
zap.Int64("segmentId", segmentID),
zap.Int("binlog", len(binlogs)),
zap.Int("stats log", len(statslogs)),
@ -462,7 +457,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
segment := m.segments.GetSegment(segmentID)
if segment == nil || !isSegmentHealthy(segment) {
log.Warn("meta update: update flush segments info - segment not found",
zap.Int64("segment ID", segmentID),
zap.Int64("segmentID", segmentID),
zap.Bool("segment nil", segment == nil),
zap.Bool("segment unhealthy", !isSegmentHealthy(segment)))
return nil
@ -554,7 +549,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
count := segmentutil.CalcRowCountFromBinLog(s.SegmentInfo)
if count != segment.currRows {
log.Info("check point reported inconsistent with bin log row count",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("segmentID", segment.GetID()),
zap.Int64("current rows (wrong)", segment.currRows),
zap.Int64("segment bin log row count (correct)", count))
}
@ -579,7 +574,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
// count should smaller than or equal to cp reported
if count != cp.NumOfRows {
log.Info("check point reported inconsistent with bin log row count",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("segmentID", segment.GetID()),
zap.Int64("check point (wrong)", cp.NumOfRows),
zap.Int64("segment bin log row count (correct)", count))
}
@ -611,14 +606,14 @@ func (m *meta) UpdateFlushSegmentsInfo(
m.segments.SetSegment(id, s)
}
log.Info("meta update: update flush segments info - update flush segments info successfully",
zap.Int64("segment ID", segmentID))
zap.Int64("segmentID", segmentID))
return nil
}
// UpdateDropChannelSegmentInfo updates segment checkpoints and binlogs before drop
// reusing segment info to pass segment id, binlogs, statslog, deltalog, start position and checkpoint
func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentInfo) error {
log.Info("meta update: update drop channel segment info",
log.Debug("meta update: update drop channel segment info",
zap.String("channel", channel))
m.Lock()
defer m.Unlock()
@ -651,7 +646,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
}
err := m.batchSaveDropSegments(channel, modSegments)
if err != nil {
log.Error("meta update: update drop channel segment info failed",
log.Warn("meta update: update drop channel segment info failed",
zap.String("channel", channel),
zap.Error(err))
} else {
@ -913,7 +908,7 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
// AddAllocation add allocation in segment
func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
log.Info("meta update: add allocation",
log.Debug("meta update: add allocation",
zap.Int64("segmentID", segmentID),
zap.Any("allocation", allocation))
m.Lock()
@ -930,7 +925,7 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil {
log.Error("meta update: add allocation failed",
zap.Int64("segment ID", segmentID),
zap.Int64("segmentID", segmentID),
zap.Error(err))
return err
}
@ -1059,8 +1054,8 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac
segment := NewSegmentInfo(segmentInfo)
metricMutation.addNewSeg(segment.GetState(), segment.GetNumOfRows())
log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collection ID", segment.GetCollectionID()),
zap.Int64("partition ID", segment.GetPartitionID()),
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partitionID", segment.GetPartitionID()),
zap.Int64("new segment ID", segment.GetID()),
zap.Int64("new segment num of rows", segment.GetNumOfRows()),
zap.Any("compacted from", segment.GetCompactionFrom()))
@ -1103,9 +1098,9 @@ func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segm
newSegment.State = commonpb.SegmentState_Dropped
}
log.Info("meta update: alter meta store for compaction updates",
log.Debug("meta update: alter meta store for compaction updates",
zap.Int64s("compact from segments (segments to be updated as dropped)", modSegIDs),
zap.Int64("new segmentId", newSegment.GetID()),
zap.Int64("new segmentID", newSegment.GetID()),
zap.Int("binlog", len(newSegment.GetBinlogs())),
zap.Int("stats log", len(newSegment.GetStatslogs())),
zap.Int("delta logs", len(newSegment.GetDeltalogs())),
@ -1126,9 +1121,6 @@ func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segm
for _, v := range segmentsCompactFrom {
compactFromIDs = append(compactFromIDs, v.GetID())
}
log.Info("meta update: alter in memory meta after compaction",
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
zap.Int64s("compact from segment IDs", compactFromIDs))
m.Lock()
defer m.Unlock()
for _, s := range segmentsCompactFrom {

View File

@ -436,7 +436,7 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
for _, id := range segCandidates {
info := s.meta.GetHealthySegment(id)
if info == nil {
log.Warn("failed to get seg info from meta", zap.Int64("segment ID", id))
log.Warn("failed to get seg info from meta", zap.Int64("segmentID", id))
continue
}
if info.CollectionID != collectionID {

View File

@ -493,7 +493,7 @@ func (s *Server) initServiceDiscovery() error {
inSessions, inRevision, err := s.session.GetSessions(typeutil.IndexNodeRole)
if err != nil {
log.Error("DataCoord get QueryCoord session failed", zap.Error(err))
log.Warn("DataCoord get QueryCoord session failed", zap.Error(err))
return err
}
if Params.DataCoordCfg.BindIndexNodeMode.GetAsBool() {
@ -630,7 +630,7 @@ func (s *Server) handleDataNodeTimetickMsgstream(ctx context.Context, ttMsgStrea
}
if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
log.Error("failed to handle timetick message", zap.Error(err))
log.Warn("failed to handle timetick message", zap.Error(err))
continue
}
}
@ -700,14 +700,14 @@ func (s *Server) updateSegmentStatistics(stats []*commonpb.SegmentStats) {
segment := s.meta.GetSegment(stat.GetSegmentID())
if segment == nil {
log.Warn("skip updating row number for not exist segment",
zap.Int64("segment ID", stat.GetSegmentID()),
zap.Int64("segmentID", stat.GetSegmentID()),
zap.Int64("new value", stat.GetNumRows()))
continue
}
if isFlushState(segment.GetState()) {
log.Warn("skip updating row number for flushed segment",
zap.Int64("segment ID", stat.GetSegmentID()),
zap.Int64("segmentID", stat.GetSegmentID()),
zap.Int64("new value", stat.GetNumRows()))
continue
}
@ -715,7 +715,7 @@ func (s *Server) updateSegmentStatistics(stats []*commonpb.SegmentStats) {
// Log if # of rows is updated.
if segment.currRows < stat.GetNumRows() {
log.Debug("Updating segment number of rows",
zap.Int64("segment ID", stat.GetSegmentID()),
zap.Int64("segmentID", stat.GetSegmentID()),
zap.Int64("old value", s.meta.GetSegment(stat.GetSegmentID()).GetNumOfRows()),
zap.Int64("new value", stat.GetNumRows()),
)

View File

@ -465,7 +465,7 @@ func TestGetInsertBinlogPaths(t *testing.T) {
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("with invalid segment id", func(t *testing.T) {
t.Run("with invalid segmentID", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
@ -600,7 +600,7 @@ func TestGetSegmentInfo(t *testing.T) {
assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("with wrong segment id", func(t *testing.T) {
t.Run("with wrong segmentID", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
@ -3837,7 +3837,7 @@ func TestDataCoord_SaveImportSegment(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
})
t.Run("test add segment w/ bad channel name", func(t *testing.T) {
t.Run("test add segment w/ bad channelName", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

View File

@ -311,6 +311,7 @@ func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol
// for now only row count is returned
func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64s("partitionIDs", req.GetPartitionIDs()),
)
resp := &datapb.GetPartitionStatisticsResponse{
@ -515,7 +516,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
channel := req.GetChannelName()
log.Info("receive DropVirtualChannel request",
zap.String("channel name", channel))
zap.String("channelName", channel))
// validate
nodeID := req.GetBase().GetSourceID()
@ -576,7 +577,7 @@ func (s *Server) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStat
err := s.meta.SetState(req.GetSegmentId(), req.GetNewState())
if err != nil {
log.Error("failed to updated segment state in dataCoord meta",
zap.Int64("segment ID", req.SegmentId),
zap.Int64("segmentID", req.SegmentId),
zap.String("to state", req.GetNewState().String()))
return &datapb.SetSegmentStateResponse{
Status: &commonpb.Status{
@ -706,7 +707,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
if newCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo); newCount != segment.NumOfRows {
log.Warn("segment row number meta inconsistent with bin log row count and will be corrected",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("segmentID", segment.GetID()),
zap.Int64("segment meta row count (wrong)", segment.GetNumOfRows()),
zap.Int64("segment bin log row count (correct)", newCount))
segmentsNumOfRows[id] = newCount
@ -818,7 +819,7 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
rowCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo)
if rowCount != segment.NumOfRows && rowCount > 0 {
log.Warn("segment row number meta inconsistent with bin log row count and will be corrected",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("segmentID", segment.GetID()),
zap.Int64("segment meta row count (wrong)", segment.GetNumOfRows()),
zap.Int64("segment bin log row count (correct)", rowCount))
} else {
@ -936,7 +937,6 @@ func (s *Server) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegment
// ShowConfigurations returns the configurations of DataCoord matching req.Pattern
func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
log := log.Ctx(ctx)
log.Debug("DataCoord.ShowConfigurations", zap.String("pattern", req.Pattern))
if s.isClosed() {
log.Warn("DataCoord.ShowConfigurations failed",
zap.Int64("nodeId", paramtable.GetNodeID()),
@ -1046,8 +1046,10 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
// ManualCompaction triggers a compaction for a collection
func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
log := log.Ctx(ctx)
log.Info("received manual compaction", zap.Int64("collectionID", req.GetCollectionID()))
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
log.Info("received manual compaction")
resp := &milvuspb.ManualCompactionResponse{
Status: &commonpb.Status{
@ -1056,8 +1058,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
}
if s.isClosed() {
log.Warn("failed to execute manual compaction", zap.Int64("collectionID", req.GetCollectionID()),
zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID())))
log.Warn("failed to execute manual compaction", zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID())))
resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return resp, nil
}
@ -1069,12 +1070,12 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
id, err := s.compactionTrigger.forceTriggerCompaction(req.CollectionID)
if err != nil {
log.Error("failed to trigger manual compaction", zap.Int64("collectionID", req.GetCollectionID()), zap.Error(err))
log.Error("failed to trigger manual compaction", zap.Error(err))
resp.Status.Reason = err.Error()
return resp, nil
}
log.Info("success to trigger manual compaction", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("compactionID", id))
log.Info("success to trigger manual compaction", zap.Int64("compactionID", id))
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.CompactionID = id
return resp, nil
@ -1082,8 +1083,10 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
// GetCompactionState gets the state of a compaction
func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
log := log.Ctx(ctx)
log.Info("received get compaction state request", zap.Int64("compactionID", req.GetCompactionID()))
log := log.Ctx(ctx).With(
zap.Int64("compactionID", req.GetCompactionID()),
)
log.Info("received get compaction state request")
resp := &milvuspb.GetCompactionStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -1091,8 +1094,7 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac
}
if s.isClosed() {
log.Warn("failed to get compaction state", zap.Int64("compactionID", req.GetCompactionID()),
zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID())))
log.Warn("failed to get compaction state", zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID())))
resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return resp, nil
}
@ -1124,15 +1126,17 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac
// GetCompactionStateWithPlans returns the compaction state of given plan
func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
log := log.Ctx(ctx)
log.Info("received the request to get compaction state with plans", zap.Int64("compactionID", req.GetCompactionID()))
log := log.Ctx(ctx).With(
zap.Int64("compactionID", req.GetCompactionID()),
)
log.Info("received the request to get compaction state with plans")
resp := &milvuspb.GetCompactionPlansResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError},
}
if s.isClosed() {
log.Warn("failed to get compaction state with plans", zap.Int64("compactionID", req.GetCompactionID()), zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID())))
log.Warn("failed to get compaction state with plans", zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID())))
resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return resp, nil
}
@ -1204,8 +1208,11 @@ func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState
// WatchChannels notifies DataCoord to watch vchannels of a collection.
func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
log := log.Ctx(ctx)
log.Info("receive watch channels request", zap.Any("channels", req.GetChannelNames()))
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.Strings("channels", req.GetChannelNames()),
)
log.Info("receive watch channels request")
resp := &datapb.WatchChannelsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -1213,8 +1220,7 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
}
if s.isClosed() {
log.Warn("failed to watch channels request", zap.Any("channels", req.GetChannelNames()),
zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID())))
log.Warn("failed to watch channels request", zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID())))
resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return resp, nil
}
@ -1227,13 +1233,13 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
}
err := s.channelManager.Watch(ch)
if err != nil {
log.Warn("fail to watch channelName", zap.String("channelName", channelName), zap.Error(err))
log.Warn("fail to watch channelName", zap.Error(err))
resp.Status.Reason = err.Error()
return resp, nil
}
if err := s.meta.catalog.MarkChannelAdded(ctx, ch.Name); err != nil {
// TODO: add background task to periodically cleanup the orphaned channel add marks.
log.Error("failed to mark channel added", zap.String("channelName", channelName), zap.Error(err))
log.Error("failed to mark channel added", zap.Error(err))
resp.Status.Reason = err.Error()
return resp, nil
}
@ -1388,6 +1394,7 @@ func (s *Server) UpdateSegmentStatistics(ctx context.Context, req *datapb.Update
// UpdateChannelCheckpoint updates channel checkpoint in dataCoord.
func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx)
resp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
@ -1411,6 +1418,7 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update
// ReportDataNodeTtMsgs send datenode timetick messages to dataCoord.
func (s *Server) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDataNodeTtMsgsRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx)
if s.isClosed() {
log.Warn("failed to report dataNode ttMsgs on closed server")
return merr.Status(merr.WrapErrServiceUnavailable(msgDataCoordIsUnhealthy(s.session.ServerID))), nil
@ -1435,13 +1443,14 @@ func (s *Server) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDat
}
func (s *Server) handleRPCTimetickMessage(ctx context.Context, ttMsg *msgpb.DataNodeTtMsg) error {
log := log.Ctx(ctx)
ch := ttMsg.GetChannelName()
ts := ttMsg.GetTimestamp()
// ignore to handle RPC Timetick message since it's no longer the leader
if !s.cluster.channelManager.Match(ttMsg.GetBase().GetSourceID(), ch) {
log.Warn("node is not matched with channel",
zap.String("channel", ch),
zap.String("channelName", ch),
zap.Int64("nodeID", ttMsg.GetBase().GetSourceID()),
)
return nil
@ -1499,12 +1508,13 @@ func getDiff(base, remove []int64) []int64 {
// SaveImportSegment saves the segment binlog paths and puts this segment to its belonging DataNode as a flushed segment.
func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx)
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionId()),
)
log.Info("DataCoord putting segment to the right DataNode and saving binlog path",
zap.Int64("segment ID", req.GetSegmentId()),
zap.Int64("collection ID", req.GetCollectionId()),
zap.Int64("partition ID", req.GetPartitionId()),
zap.String("channel name", req.GetChannelName()),
zap.Int64("segmentID", req.GetSegmentId()),
zap.Int64("partitionID", req.GetPartitionId()),
zap.String("channelName", req.GetChannelName()),
zap.Int64("# of rows", req.GetRowNum()))
errResp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -1519,7 +1529,7 @@ func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe
// Look for the DataNode that watches the channel.
ok, nodeID := s.channelManager.getNodeIDByChannelName(req.GetChannelName())
if !ok {
log.Error("no DataNode found for channel", zap.String("channel name", req.GetChannelName()))
log.Error("no DataNode found for channel", zap.String("channelName", req.GetChannelName()))
errResp.Reason = fmt.Sprint("no DataNode found for channel ", req.GetChannelName())
return errResp, nil
}
@ -1582,7 +1592,7 @@ func (s *Server) UnsetIsImportingState(ctx context.Context, req *datapb.UnsetIsI
for _, segID := range req.GetSegmentIds() {
if err := s.meta.UnsetIsImporting(segID); err != nil {
// Fail-open.
log.Error("failed to unset segment is importing state", zap.Int64("segment ID", segID))
log.Error("failed to unset segment is importing state", zap.Int64("segmentID", segID))
reportErr = err
}
}
@ -1606,7 +1616,7 @@ func (s *Server) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmen
for _, segID := range req.GetSegmentIds() {
if err := s.meta.SetState(segID, commonpb.SegmentState_Dropped); err != nil {
// Fail-open.
log.Error("failed to set segment state as dropped", zap.Int64("segment ID", segID))
log.Error("failed to set segment state as dropped", zap.Int64("segmentID", segID))
failure = true
}
}

View File

@ -232,7 +232,7 @@ func (c *ChannelMeta) addSegment(req addSegmentReq) error {
err := c.InitPKstats(context.TODO(), seg, req.statsBinLogs, req.recoverTs)
if err != nil {
log.Error("failed to init bloom filter",
zap.Int64("segment ID", req.segID),
zap.Int64("segmentID", req.segID),
zap.Error(err))
return err
}
@ -483,7 +483,7 @@ func (c *ChannelMeta) RollPKstats(segID UniqueID, stat *storage.PrimaryKeyStats)
c.segMu.Lock()
defer c.segMu.Unlock()
seg, ok := c.segments[segID]
log.Info("roll pk stats", zap.Int64("segment id", segID))
log.Info("roll pk stats", zap.Int64("segmentID", segID))
if ok && seg.notFlushed() {
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
@ -591,14 +591,14 @@ func (c *ChannelMeta) updateSegmentRowNumber(segID UniqueID, numRows int64) {
c.segMu.Lock()
defer c.segMu.Unlock()
log.Info("updating segment num row", zap.Int64("Segment ID", segID), zap.Int64("numRows", numRows))
log.Info("updating segment num row", zap.Int64("segmentID", segID), zap.Int64("numRows", numRows))
seg, ok := c.segments[segID]
if ok && seg.notFlushed() {
seg.numRows += numRows
return
}
log.Warn("update segment num row not exist", zap.Int64("segID", segID))
log.Warn("update segment num row not exist", zap.Int64("segmentID", segID))
}
// updateStatistics updates the number of rows of a segment in channel.
@ -606,14 +606,14 @@ func (c *ChannelMeta) updateSegmentMemorySize(segID UniqueID, memorySize int64)
c.segMu.Lock()
defer c.segMu.Unlock()
log.Info("updating segment memorySize", zap.Int64("Segment ID", segID), zap.Int64("memorySize", memorySize))
log.Info("updating segment memorySize", zap.Int64("segmentID", segID), zap.Int64("memorySize", memorySize))
seg, ok := c.segments[segID]
if ok && seg.notFlushed() {
seg.memorySize = memorySize
return
}
log.Warn("update segment memorySize not exist", zap.Int64("segID", segID))
log.Warn("update segment memorySize not exist", zap.Int64("segmentID", segID))
}
// getSegmentStatisticsUpdates gives current segment's statistics updates.
@ -665,12 +665,12 @@ func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schem
func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error {
log := log.Ctx(ctx).With(
zap.Int64("segment ID", seg.segmentID),
zap.Int64("collection ID", seg.collectionID),
zap.Int64("partition ID", seg.partitionID),
zap.Int64("segmentID", seg.segmentID),
zap.Int64("collectionID", seg.collectionID),
zap.Int64("partitionID", seg.partitionID),
zap.Int64s("compacted from", compactedFrom),
zap.Int64("planID", planID),
zap.String("channel name", c.channelName))
zap.String("channelName", c.channelName))
if seg.collectionID != c.collectionID {
log.Warn("failed to mergeFlushedSegments, collection mismatch",
@ -730,10 +730,10 @@ func (c *ChannelMeta) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, n
}
log.Info("Add Flushed segment",
zap.Int64("segment ID", segID),
zap.Int64("collection ID", collID),
zap.Int64("partition ID", partID),
zap.String("channel name", c.channelName),
zap.Int64("segmentID", segID),
zap.Int64("collectionID", collID),
zap.Int64("partitionID", partID),
zap.String("channelName", c.channelName),
)
seg := &Segment{

View File

@ -214,7 +214,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick
log.Warn("Collection ID or ChannelName not match",
zap.Int64("Wanted ID", dsService.collectionID),
zap.Int64("Actual ID", us.CollectionID),
zap.String("Wanted Channel Name", vchanInfo.ChannelName),
zap.String("Wanted channel Name", vchanInfo.ChannelName),
zap.String("Actual Channel Name", us.GetInsertChannel()),
)
continue

View File

@ -125,7 +125,7 @@ func (t *tickler) inc() {
func (t *tickler) watch() {
if t.interval == 0 {
log.Info("zero interval, close ticler watch",
zap.String("channel name", t.watchInfo.GetVchan().GetChannelName()),
zap.String("channelName", t.watchInfo.GetVchan().GetChannelName()),
)
return
}

View File

@ -117,7 +117,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
log.Info("ddNode in dropMode",
zap.String("vChannelName", ddn.vChannelName),
zap.Int64("collection ID", ddn.collectionID))
zap.Int64("collectionID", ddn.collectionID))
return []Msg{}
}
@ -182,7 +182,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
if ddn.tryToFilterSegmentInsertMessages(imsg) {
log.Info("filter insert messages",
zap.Int64("filter segment ID", imsg.GetSegmentID()),
zap.Int64("filter segmentID", imsg.GetSegmentID()),
zap.Uint64("message timestamp", msg.EndTs()),
zap.String("segment's vChannel", imsg.GetShardName()),
zap.String("current vChannel", ddn.vChannelName))

View File

@ -344,7 +344,7 @@ func TestFlowGraph_DDNode_filterMessages(t *testing.T) {
getInsertMsg(111, 70000),
false},
// for pChannel reuse on same collection
{"test insert msg with different channel name",
{"test insert msg with different channelName",
[]UniqueID{100},
nil,
nil,

View File

@ -40,7 +40,7 @@ import (
// flowgraph ddNode.
func newDmInputNode(dispatcherClient msgdispatcher.Client, seekPos *msgpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) {
log := log.With(zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Int64("collection ID", dmNodeConfig.collectionID),
zap.Int64("collectionID", dmNodeConfig.collectionID),
zap.String("vchannel", dmNodeConfig.vChannelName))
var err error
var input <-chan *msgstream.MsgPack

View File

@ -549,8 +549,8 @@ func (ibNode *insertBufferNode) addSegmentAndUpdateRowNum(insertMsgs []*msgstrea
})
if err != nil {
log.Warn("add segment wrong",
zap.Int64("segID", currentSegID),
zap.Int64("collID", collID),
zap.Int64("segmentID", currentSegID),
zap.Int64("collectionID", collID),
zap.Int64("partID", partitionID),
zap.String("chanName", msg.GetShardName()),
zap.Error(err))

View File

@ -142,7 +142,7 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *msgpb.MsgPosition) *flushTaskR
q.tailCh = t.finishSignal
q.tailMut.Unlock()
log.Info("new flush task runner created and initialized",
zap.Int64("segment ID", q.segmentID),
zap.Int64("segmentID", q.segmentID),
zap.String("pos message ID", string(pos.GetMsgID())),
)
}
@ -290,7 +290,7 @@ func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQu
func (m *rendezvousFlushManager) handleInsertTask(segmentID UniqueID, task flushInsertTask, binlogs, statslogs map[UniqueID]*datapb.Binlog, flushed bool, dropped bool, pos *msgpb.MsgPosition) {
log.Info("handling insert task",
zap.Int64("segment ID", segmentID),
zap.Int64("segmentID", segmentID),
zap.Bool("flushed", flushed),
zap.Bool("dropped", dropped),
zap.Any("position", pos),
@ -316,7 +316,7 @@ func (m *rendezvousFlushManager) handleInsertTask(segmentID UniqueID, task flush
}
func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flushDeleteTask, deltaLogs *DelDataBuf, pos *msgpb.MsgPosition) {
log.Info("handling delete task", zap.Int64("segment ID", segmentID))
log.Info("handling delete task", zap.Int64("segmentID", segmentID))
// in dropping mode
if m.dropping.Load() {
// preventing separate delete, check position exists in queue first
@ -922,9 +922,9 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
// Stop retry and still proceed to the end, ignoring this error.
if !pack.flushed && rsp.GetErrorCode() == commonpb.ErrorCode_SegmentNotFound {
log.Warn("stale segment not found, could be compacted",
zap.Int64("segment ID", pack.segmentID))
zap.Int64("segmentID", pack.segmentID))
log.Warn("failed to SaveBinlogPaths",
zap.Int64("segment ID", pack.segmentID),
zap.Int64("segmentID", pack.segmentID),
zap.Error(errors.New(rsp.GetReason())))
return nil
}
@ -945,7 +945,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
}, opts...)
if err != nil {
log.Warn("failed to SaveBinlogPaths",
zap.Int64("segment ID", pack.segmentID),
zap.Int64("segmentID", pack.segmentID),
zap.Error(err))
// TODO change to graceful stop
panic(err)

View File

@ -133,7 +133,7 @@ func (t *flushTaskRunner) runFlushInsert(task flushInsertTask,
t.pos = pos
t.dropped = dropped
log.Info("running flush insert task",
zap.Int64("segment ID", t.segmentID),
zap.Int64("segmentID", t.segmentID),
zap.Bool("flushed", flushed),
zap.Bool("dropped", dropped),
zap.Any("position", pos),

View File

@ -273,12 +273,12 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannel())
if !ok {
log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channel name", req.GetChannel()))
log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channelName", req.GetChannel()))
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "illegel compaction plan")), nil
}
if !node.compactionExecutor.channelValidateForCompaction(req.GetChannel()) {
log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channel name", req.GetChannel()))
log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channelName", req.GetChannel()))
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil
}
@ -410,8 +410,8 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*commonpb.Status, error) {
logFields := []zap.Field{
zap.Int64("task ID", req.GetImportTask().GetTaskId()),
zap.Int64("collection ID", req.GetImportTask().GetCollectionId()),
zap.Int64("partition ID", req.GetImportTask().GetPartitionId()),
zap.Int64("collectionID", req.GetImportTask().GetCollectionId()),
zap.Int64("partitionID", req.GetImportTask().GetPartitionId()),
zap.String("database name", req.GetImportTask().GetDatabaseName()),
zap.Strings("channel names", req.GetImportTask().GetChannelNames()),
zap.Int64s("working dataNodes", req.WorkingNodes),
@ -542,7 +542,7 @@ func (node *DataNode) getPartitions(ctx context.Context, dbName string, collecti
logFields := []zap.Field{
zap.String("dbName", dbName),
zap.String("collection name", collectionName),
zap.String("collectionName", collectionName),
}
resp, err := node.rootCoord.ShowPartitions(ctx, req)
if err != nil {
@ -575,10 +575,10 @@ func (node *DataNode) getPartitions(ctx context.Context, dbName string, collecti
// AddImportSegment adds the import segment to the current DataNode.
func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) {
logFields := []zap.Field{
zap.Int64("segment ID", req.GetSegmentId()),
zap.Int64("collection ID", req.GetCollectionId()),
zap.Int64("partition ID", req.GetPartitionId()),
zap.String("channel name", req.GetChannelName()),
zap.Int64("segmentID", req.GetSegmentId()),
zap.Int64("collectionID", req.GetCollectionId()),
zap.Int64("partitionID", req.GetPartitionId()),
zap.String("channelName", req.GetChannelName()),
zap.Int64("# of rows", req.GetRowNum()),
}
log.Info("adding segment to DataNode flow graph", logFields...)
@ -674,7 +674,7 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil
logFields := []zap.Field{
zap.Int64("task ID", importTaskID),
zap.Int("shard ID", shardID),
zap.Int64("partition ID", partID),
zap.Int64("partitionID", partID),
zap.Int("# of channels", len(chNames)),
zap.Strings("channel names", chNames),
}
@ -703,7 +703,7 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil
}
segmentID := resp.SegIDAssignments[0].SegID
logFields = append(logFields, zap.Int64("segment ID", segmentID))
logFields = append(logFields, zap.Int64("segmentID", segmentID))
log.Info("new segment assigned", logFields...)
// call report to notify the rootcoord update the segment id list for this task
@ -739,8 +739,8 @@ func createBinLogsFunc(node *DataNode, req *datapb.ImportTaskRequest, schema *sc
importTaskID := req.GetImportTask().GetTaskId()
logFields := []zap.Field{
zap.Int64("task ID", importTaskID),
zap.Int64("partition ID", partID),
zap.Int64("segment ID", segmentID),
zap.Int64("partitionID", partID),
zap.Int64("segmentID", segmentID),
zap.Int("# of channels", len(chNames)),
zap.Strings("channel names", chNames),
}
@ -771,8 +771,8 @@ func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoo
targetChName string, rowCount int64, partID int64) error {
logFields := []zap.Field{
zap.Int64("task ID", importTaskID),
zap.Int64("partition ID", partID),
zap.Int64("segment ID", segmentID),
zap.Int64("partitionID", partID),
zap.Int64("segmentID", segmentID),
zap.String("target channel name", targetChName),
zap.Int64("row count", rowCount),
zap.Uint64("ts", ts),
@ -999,6 +999,6 @@ func reportImportFunc(node *DataNode) importutil.ReportFunc {
func logDupFlush(cID, segID int64) {
log.Info("segment is already being flushed, ignoring flush request",
zap.Int64("collection ID", cID),
zap.Int64("segment ID", segID))
zap.Int64("collectionID", cID),
zap.Int64("segmentID", segID))
}

View File

@ -144,7 +144,7 @@ func (it *indexBuildTask) OnEnqueue(ctx context.Context) error {
it.tr.RecordSpan()
it.statistic.StartTime = time.Now().UnixMicro()
it.statistic.PodID = it.node.GetNodeID()
log.Ctx(ctx).Info("IndexNode IndexBuilderTask Enqueue", zap.Int64("buildID", it.BuildID), zap.Int64("segID", it.segmentID))
log.Ctx(ctx).Info("IndexNode IndexBuilderTask Enqueue", zap.Int64("buildID", it.BuildID), zap.Int64("segmentID", it.segmentID))
return nil
}

View File

@ -23,11 +23,11 @@ func (s *collectionDb) GetCollectionIDTs(tenantID string, collectionID typeutil.
err := s.db.Model(&dbmodel.Collection{}).Select("collection_id, ts").Where("tenant_id = ? AND collection_id = ? AND ts <= ?", tenantID, collectionID, ts).Order("ts desc").Take(&col).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Warn("record not found", zap.Int64("collID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Warn("record not found", zap.Int64("collectionID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
return nil, fmt.Errorf("record not found, collID=%d, ts=%d", collectionID, ts)
}
if err != nil {
log.Error("get collection ts failed", zap.String("tenant", tenantID), zap.Int64("collID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("get collection ts failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
return nil, err
}
@ -55,7 +55,7 @@ func (s *collectionDb) Get(tenantID string, collectionID typeutil.UniqueID, ts t
return nil, fmt.Errorf("collection not found, collID=%d, ts=%d", collectionID, ts)
}
if err != nil {
log.Error("get collection by collection_id and ts failed", zap.String("tenant", tenantID), zap.Int64("collID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("get collection by collection_id and ts failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
return nil, err
}
@ -86,7 +86,7 @@ func (s *collectionDb) Insert(in *dbmodel.Collection) error {
}).Create(&in).Error
if err != nil {
log.Error("insert collection failed", zap.String("tenant", in.TenantID), zap.Int64("collID", in.CollectionID), zap.Uint64("ts", in.Ts), zap.Error(err))
log.Error("insert collection failed", zap.String("tenant", in.TenantID), zap.Int64("collectionID", in.CollectionID), zap.Uint64("ts", in.Ts), zap.Error(err))
return err
}

View File

@ -18,7 +18,7 @@ func (s *collChannelDb) GetByCollectionID(tenantID string, collectionID typeutil
err := s.db.Model(&dbmodel.CollectionChannel{}).Where("tenant_id = ? AND collection_id = ? AND ts = ? AND is_deleted = false", tenantID, collectionID, ts).Find(&r).Error
if err != nil {
log.Error("get channels by collection_id and ts failed", zap.String("tenant", tenantID), zap.Int64("collID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("get channels by collection_id and ts failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
return nil, err
}

View File

@ -18,7 +18,7 @@ func (s *fieldDb) GetByCollectionID(tenantID string, collectionID typeutil.Uniqu
err := s.db.Model(&dbmodel.Field{}).Where("tenant_id = ? AND collection_id = ? AND ts = ? AND is_deleted = false", tenantID, collectionID, ts).Find(&r).Error
if err != nil {
log.Error("get fields by collection_id and ts failed", zap.String("tenant", tenantID), zap.Int64("collID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("get fields by collection_id and ts failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
return nil, err
}

View File

@ -18,7 +18,7 @@ func (s *indexDb) Get(tenantID string, collectionID typeutil.UniqueID) ([]*dbmod
err := s.db.Model(&dbmodel.Index{}).Where("tenant_id = ? AND collection_id = ?", tenantID, collectionID).Find(&r).Error
if err != nil {
log.Error("get indexes by collection_id failed", zap.String("tenant", tenantID), zap.Int64("collID", collectionID), zap.Error(err))
log.Error("get indexes by collection_id failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, err
}
@ -59,7 +59,7 @@ func (s *indexDb) Update(in *dbmodel.Index) error {
}).Error
if err != nil {
log.Error("update indexes failed", zap.String("tenant", in.TenantID), zap.Int64("collID", in.CollectionID), zap.Int64("indexID", in.IndexID), zap.Error(err))
log.Error("update indexes failed", zap.String("tenant", in.TenantID), zap.Int64("collectionID", in.CollectionID), zap.Int64("indexID", in.IndexID), zap.Error(err))
return err
}
@ -72,7 +72,7 @@ func (s *indexDb) MarkDeletedByCollectionID(tenantID string, collID typeutil.Uni
}).Error
if err != nil {
log.Error("update indexes is_deleted=true failed", zap.String("tenant", tenantID), zap.Int64("collID", collID), zap.Error(err))
log.Error("update indexes is_deleted=true failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collID), zap.Error(err))
return err
}

View File

@ -18,7 +18,7 @@ func (s *partitionDb) GetByCollectionID(tenantID string, collectionID typeutil.U
err := s.db.Model(&dbmodel.Partition{}).Where("tenant_id = ? AND collection_id = ? AND ts = ? AND is_deleted = false", tenantID, collectionID, ts).Find(&r).Error
if err != nil {
log.Error("get partitions by collection_id and ts failed", zap.String("tenant", tenantID), zap.Int64("collID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("get partitions by collection_id and ts failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collectionID), zap.Uint64("ts", ts), zap.Error(err))
return nil, err
}

View File

@ -19,7 +19,7 @@ func (s *segmentIndexDb) Get(tenantID string, collectionID, buildID typeutil.Uni
err := s.db.Model(&dbmodel.SegmentIndex{}).Where("tenant_id = ? AND collection_id = ? AND build_id = ?", tenantID, collectionID, buildID).Find(&r).Error
if err != nil {
log.Error("get indexes by collection_id failed", zap.String("tenant", tenantID), zap.Int64("collID", collectionID), zap.Error(err))
log.Error("get indexes by collection_id failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, err
}
@ -104,7 +104,7 @@ func (s *segmentIndexDb) MarkDeletedByCollectionID(tenantID string, collID typeu
}).Error
if err != nil {
log.Error("update segment_indexes deleted by collection id failed", zap.String("tenant", tenantID), zap.Int64("collID", collID), zap.Error(err))
log.Error("update segment_indexes deleted by collection id failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collID), zap.Error(err))
return err
}

View File

@ -53,14 +53,14 @@ func UnmarshalCollectionModel(coll *Collection) (*model.Collection, error) {
if coll.StartPosition != "" {
err := json.Unmarshal([]byte(coll.StartPosition), &startPositions)
if err != nil {
log.Error("unmarshal collection start positions error", zap.Int64("collID", coll.CollectionID), zap.Uint64("ts", coll.Ts), zap.Error(err))
log.Error("unmarshal collection start positions error", zap.Int64("collectionID", coll.CollectionID), zap.Uint64("ts", coll.Ts), zap.Error(err))
return nil, err
}
}
properties, err := UnmarshalProperties(coll.Properties)
if err != nil {
log.Error("unmarshal collection properties error", zap.Int64("collID", coll.CollectionID),
log.Error("unmarshal collection properties error", zap.Int64("collectionID", coll.CollectionID),
zap.String("properties", coll.Properties), zap.Error(err))
return nil, err
}

View File

@ -62,7 +62,7 @@ func ConvertFieldDBToModel(field *Field) (*model.Field, error) {
if field.TypeParams != "" {
err := json.Unmarshal([]byte(field.TypeParams), &typeParams)
if err != nil {
log.Error("unmarshal TypeParams of field failed", zap.Int64("collID", field.CollectionID),
log.Error("unmarshal TypeParams of field failed", zap.Int64("collectionID", field.CollectionID),
zap.Int64("fieldID", field.FieldID), zap.String("fieldName", field.FieldName), zap.Error(err))
return nil, err
}
@ -72,7 +72,7 @@ func ConvertFieldDBToModel(field *Field) (*model.Field, error) {
if field.IndexParams != "" {
err := json.Unmarshal([]byte(field.IndexParams), &indexParams)
if err != nil {
log.Error("unmarshal IndexParams of field failed", zap.Int64("collID", field.CollectionID),
log.Error("unmarshal IndexParams of field failed", zap.Int64("collectionID", field.CollectionID),
zap.Int64("fieldID", field.FieldID), zap.String("fieldName", field.FieldName), zap.Error(err))
return nil, err
}

View File

@ -69,7 +69,7 @@ func UnmarshalIndexModel(inputs []*IndexResult) ([]*model.Index, error) {
if ir.IndexParams != "" {
err := json.Unmarshal([]byte(ir.IndexParams), &indexParams)
if err != nil {
log.Error("unmarshal IndexParams of index failed", zap.Int64("collID", ir.CollectionID),
log.Error("unmarshal IndexParams of index failed", zap.Int64("collectionID", ir.CollectionID),
zap.Int64("indexID", ir.IndexID), zap.String("indexName", ir.IndexName), zap.Error(err))
return nil, err
}
@ -78,7 +78,7 @@ func UnmarshalIndexModel(inputs []*IndexResult) ([]*model.Index, error) {
if ir.UserIndexParams != "" {
err := json.Unmarshal([]byte(ir.UserIndexParams), &userIndexParams)
if err != nil {
log.Error("unmarshal UserIndexParams of index failed", zap.Int64("collID", ir.CollectionID),
log.Error("unmarshal UserIndexParams of index failed", zap.Int64("collectionID", ir.CollectionID),
zap.Int64("indexID", ir.IndexID), zap.String("indexName", ir.IndexName), zap.Error(err))
return nil, err
}
@ -88,7 +88,7 @@ func UnmarshalIndexModel(inputs []*IndexResult) ([]*model.Index, error) {
if ir.TypeParams != "" {
err := json.Unmarshal([]byte(ir.TypeParams), &typeParams)
if err != nil {
log.Error("unmarshal TypeParams of index failed", zap.Int64("collID", ir.CollectionID),
log.Error("unmarshal TypeParams of index failed", zap.Int64("collectionID", ir.CollectionID),
zap.Int64("indexID", ir.IndexID), zap.String("indexName", ir.IndexName), zap.Error(err))
return nil, err
}

View File

@ -69,7 +69,7 @@ type ISegmentIndexDb interface {
// if ir.IndexFileKeys != "" {
// err := json.Unmarshal([]byte(ir.IndexFileKeys), &IndexFileKeys)
// if err != nil {
// log.Error("unmarshal index file paths of segment index failed", zap.Int64("collID", ir.CollectionID),
// log.Error("unmarshal index file paths of segment index failed", zap.Int64("collectionID", ir.CollectionID),
// zap.Int64("indexID", ir.IndexID), zap.Int64("segmentID", ir.SegmentID),
// zap.Int64("buildID", ir.BuildID), zap.Error(err))
// return nil, err

View File

@ -1,346 +0,0 @@
package indexcoord
//
//import (
// "context"
// "encoding/json"
//
// "go.uber.org/zap"
//
// "github.com/milvus-io/milvus/pkg/log"
// "github.com/milvus-io/milvus/internal/metastore/db/dbmodel"
// "github.com/milvus-io/milvus/internal/metastore/model"
// "github.com/milvus-io/milvus/pkg/util/contextutil"
// "github.com/milvus-io/milvus/pkg/util/typeutil"
//)
//
//type Catalog struct {
// metaDomain dbmodel.IMetaDomain
// txImpl dbmodel.ITransaction
//}
//
//func NewTableCatalog(txImpl dbmodel.ITransaction, metaDomain dbmodel.IMetaDomain) *Catalog {
// return &Catalog{
// txImpl: txImpl,
// metaDomain: metaDomain,
// }
//}
//
//func (tc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error {
// tenantID := contextutil.TenantID(ctx)
//
// indexParamsBytes, err := json.Marshal(index.IndexParams)
// if err != nil {
// log.Error("marshal IndexParams of index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
// zap.String("indexName", index.IndexName), zap.Error(err))
// return err
// }
//
// userIndexParamsBytes, err := json.Marshal(index.UserIndexParams)
// if err != nil {
// log.Error("marshal userIndexParams of index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
// zap.String("indexName", index.IndexName), zap.Error(err))
// return err
// }
//
// typeParamsBytes, err := json.Marshal(index.TypeParams)
// if err != nil {
// log.Error("marshal TypeParams of index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
// zap.String("indexName", index.IndexName), zap.Error(err))
// return err
// }
//
// idx := &dbmodel.Index{
// TenantID: tenantID,
// CollectionID: index.CollectionID,
// FieldID: index.FieldID,
// IndexID: index.IndexID,
// IndexName: index.IndexName,
// TypeParams: string(typeParamsBytes),
// IndexParams: string(indexParamsBytes),
// CreateTime: index.CreateTime,
// IsDeleted: index.IsDeleted,
// IsAutoIndex: index.IsAutoIndex,
// UserIndexParams: string(userIndexParamsBytes),
// }
//
// err = tc.metaDomain.IndexDb(ctx).Insert([]*dbmodel.Index{idx})
// if err != nil {
// log.Error("insert indexes failed", zap.String("tenant", tenantID), zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName), zap.Error(err))
// return err
// }
//
// return nil
//}
//
//func (tc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) {
// tenantID := contextutil.TenantID(ctx)
//
// rs, err := tc.metaDomain.IndexDb(ctx).List(tenantID)
// if err != nil {
// return nil, err
// }
//
// result, err := dbmodel.UnmarshalIndexModel(rs)
// if err != nil {
// return nil, err
// }
//
// return result, nil
//}
//
//func (tc *Catalog) AlterIndex(ctx context.Context, index *model.Index) error {
// tenantID := contextutil.TenantID(ctx)
//
// indexParamsBytes, err := json.Marshal(index.IndexParams)
// if err != nil {
// log.Error("marshal IndexParams of index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
// zap.String("indexName", index.IndexName), zap.Error(err))
// return err
// }
//
// userIndexParamsBytes, err := json.Marshal(index.UserIndexParams)
// if err != nil {
// log.Error("marshal userIndexParams of index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
// zap.String("indexName", index.IndexName), zap.Error(err))
// return err
// }
//
// typeParamsBytes, err := json.Marshal(index.TypeParams)
// if err != nil {
// log.Error("marshal TypeParams of index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
// zap.String("indexName", index.IndexName), zap.Error(err))
// return err
// }
//
// idx := &dbmodel.Index{
// TenantID: tenantID,
// CollectionID: index.CollectionID,
// FieldID: index.FieldID,
// IndexID: index.IndexID,
// IndexName: index.IndexName,
// TypeParams: string(typeParamsBytes),
// IndexParams: string(indexParamsBytes),
// CreateTime: index.CreateTime,
// IsDeleted: index.IsDeleted,
// IsAutoIndex: index.IsAutoIndex,
// UserIndexParams: string(userIndexParamsBytes),
// }
// err = tc.metaDomain.IndexDb(ctx).Update(idx)
// if err != nil {
// return err
// }
// return nil
//}
//
//func (tc *Catalog) AlterIndexes(ctx context.Context, indexes []*model.Index) error {
// tenantID := contextutil.TenantID(ctx)
//
// return tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
// for _, index := range indexes {
// indexParamsBytes, err := json.Marshal(index.IndexParams)
// if err != nil {
// log.Error("marshal IndexParams of index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
// zap.String("indexName", index.IndexName), zap.Error(err))
// return err
// }
//
// userIndexParamsBytes, err := json.Marshal(index.UserIndexParams)
// if err != nil {
// log.Error("marshal userIndexParams of index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
// zap.String("indexName", index.IndexName), zap.Error(err))
// return err
// }
// typeParamsBytes, err := json.Marshal(index.TypeParams)
// if err != nil {
// log.Error("marshal TypeParams of index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
// zap.String("indexName", index.IndexName), zap.Error(err))
// return err
// }
//
// idx := &dbmodel.Index{
// TenantID: tenantID,
// CollectionID: index.CollectionID,
// FieldID: index.FieldID,
// IndexID: index.IndexID,
// IndexName: index.IndexName,
// TypeParams: string(typeParamsBytes),
// IndexParams: string(indexParamsBytes),
// CreateTime: index.CreateTime,
// IsDeleted: index.IsDeleted,
// IsAutoIndex: index.IsAutoIndex,
// UserIndexParams: string(userIndexParamsBytes),
// }
// err = tc.metaDomain.IndexDb(ctx).Update(idx)
// if err != nil {
// return err
// }
// }
// return nil
// })
//}
//
//func (tc *Catalog) DropIndex(ctx context.Context, collID, dropIdxID typeutil.UniqueID) error {
// tenantID := contextutil.TenantID(ctx)
//
// // TODO: really delete.
// return tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
// // mark deleted for index
// err := tc.metaDomain.IndexDb(txCtx).MarkDeletedByIndexID(tenantID, dropIdxID)
// if err != nil {
// return err
// }
//
// return nil
// })
//}
//
//func (tc *Catalog) CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error {
// tenantID := contextutil.TenantID(ctx)
//
// indexFileKeysBytes, err := json.Marshal(segIdx.IndexFileKeys)
// if err != nil {
// log.Error("marshal IndexFiles of segment index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", segIdx.CollectionID), zap.Int64("indexID", segIdx.IndexID),
// zap.Int64("segID", segIdx.SegmentID), zap.Int64("buildID", segIdx.BuildID), zap.Error(err))
// return err
// }
//
// idx := &dbmodel.SegmentIndex{
// TenantID: tenantID,
// CollectionID: segIdx.CollectionID,
// PartitionID: segIdx.PartitionID,
// SegmentID: segIdx.SegmentID,
// NumRows: segIdx.NumRows,
// IndexID: segIdx.IndexID,
// BuildID: segIdx.BuildID,
// NodeID: segIdx.NodeID,
// IndexVersion: segIdx.IndexVersion,
// IndexState: int32(segIdx.IndexState),
// FailReason: segIdx.FailReason,
// CreateTime: segIdx.CreateTime,
// IndexFileKeys: string(indexFileKeysBytes),
// IndexSize: segIdx.IndexSize,
// IsDeleted: segIdx.IsDeleted,
// }
//
// err = tc.metaDomain.SegmentIndexDb(ctx).Insert([]*dbmodel.SegmentIndex{idx})
// if err != nil {
// log.Error("insert segment index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", segIdx.CollectionID), zap.Int64("indexID", segIdx.IndexID),
// zap.Int64("segID", segIdx.SegmentID), zap.Int64("buildID", segIdx.BuildID), zap.Error(err))
// return err
// }
//
// return nil
//}
//
//func (tc *Catalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error) {
// tenantID := contextutil.TenantID(ctx)
//
// rs, err := tc.metaDomain.SegmentIndexDb(ctx).List(tenantID)
// if err != nil {
// return nil, err
// }
//
// result, err := dbmodel.UnmarshalSegmentIndexModel(rs)
// if err != nil {
// return nil, err
// }
//
// return result, nil
//}
//
//func (tc *Catalog) AlterSegmentIndex(ctx context.Context, segIndex *model.SegmentIndex) error {
// tenantID := contextutil.TenantID(ctx)
//
// indexFileKeysBytes, err := json.Marshal(segIndex.IndexFileKeys)
// if err != nil {
// log.Error("marshal index files of segment index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", segIndex.CollectionID), zap.Int64("indexID", segIndex.IndexID),
// zap.Int64("segID", segIndex.SegmentID), zap.Int64("buildID", segIndex.BuildID), zap.Error(err))
// return err
// }
//
// idx := &dbmodel.SegmentIndex{
// TenantID: tenantID,
// CollectionID: segIndex.CollectionID,
// PartitionID: segIndex.PartitionID,
// SegmentID: segIndex.SegmentID,
// NumRows: segIndex.NumRows,
// IndexID: segIndex.IndexID,
// BuildID: segIndex.BuildID,
// NodeID: segIndex.NodeID,
// IndexVersion: segIndex.IndexVersion,
// IndexState: int32(segIndex.IndexState),
// FailReason: segIndex.FailReason,
// CreateTime: segIndex.CreateTime,
// IndexFileKeys: string(indexFileKeysBytes),
// IndexSize: segIndex.IndexSize,
// IsDeleted: segIndex.IsDeleted,
// }
// err = tc.metaDomain.SegmentIndexDb(ctx).Update(idx)
// if err != nil {
// return err
// }
// return nil
//}
//
//func (tc *Catalog) AlterSegmentIndexes(ctx context.Context, segIdxes []*model.SegmentIndex) error {
// tenantID := contextutil.TenantID(ctx)
//
// return tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
// for _, segIndex := range segIdxes {
// indexFileKeysBytes, err := json.Marshal(segIndex.IndexFileKeys)
// if err != nil {
// log.Error("marshal index files of segment index failed", zap.String("tenant", tenantID),
// zap.Int64("collID", segIndex.CollectionID), zap.Int64("indexID", segIndex.IndexID),
// zap.Int64("segID", segIndex.SegmentID), zap.Int64("buildID", segIndex.BuildID), zap.Error(err))
// return err
// }
//
// idx := &dbmodel.SegmentIndex{
// TenantID: tenantID,
// CollectionID: segIndex.CollectionID,
// PartitionID: segIndex.PartitionID,
// SegmentID: segIndex.SegmentID,
// NumRows: segIndex.NumRows,
// IndexID: segIndex.IndexID,
// BuildID: segIndex.BuildID,
// NodeID: segIndex.NodeID,
// IndexVersion: segIndex.IndexVersion,
// IndexState: int32(segIndex.IndexState),
// FailReason: segIndex.FailReason,
// CreateTime: segIndex.CreateTime,
// IndexFileKeys: string(indexFileKeysBytes),
// IndexSize: segIndex.IndexSize,
// IsDeleted: segIndex.IsDeleted,
// }
// err = tc.metaDomain.SegmentIndexDb(ctx).Update(idx)
// if err != nil {
// return err
// }
// }
// return nil
// })
//}
//
//func (tc *Catalog) DropSegmentIndex(ctx context.Context, collID, partID, segID, buildID typeutil.UniqueID) error {
// tenantID := contextutil.TenantID(ctx)
//
// err := tc.metaDomain.SegmentIndexDb(ctx).MarkDeletedByBuildID(tenantID, buildID)
// if err != nil {
// return err
// }
//
// return nil
//}

View File

@ -1,393 +0,0 @@
package indexcoord
//import (
// "context"
// "github.com/cockroachdb/errors"
// "github.com/milvus-io/milvus/internal/metastore/db/dbmodel"
// "os"
// "testing"
//
// "github.com/milvus-io/milvus/internal/metastore"
// "github.com/milvus-io/milvus/internal/metastore/db/dbmodel/mocks"
// "github.com/milvus-io/milvus/internal/metastore/model"
// "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
// "github.com/milvus-io/milvus/pkg/util/contextutil"
// "github.com/milvus-io/milvus/pkg/util/typeutil"
// "github.com/stretchr/testify/mock"
// "github.com/stretchr/testify/require"
//)
//
//const (
// tenantID = "test_tenant"
// noTs = typeutil.Timestamp(0)
// ts = typeutil.Timestamp(10)
// collID1 = typeutil.UniqueID(101)
// partitionID1 = typeutil.UniqueID(500)
// fieldID1 = typeutil.UniqueID(1000)
// indexID1 = typeutil.UniqueID(1500)
// segmentID1 = typeutil.UniqueID(2000)
// indexBuildID1 = typeutil.UniqueID(3000)
//
// collName1 = "test_collection_name_1"
// collAlias1 = "test_collection_alias_1"
// collAlias2 = "test_collection_alias_2"
//
// username = "test_username_1"
// password = "test_xxx"
//)
//
//var (
// ctx context.Context
// metaDomainMock *mocks.IMetaDomain
// collDbMock *mocks.ICollectionDb
// fieldDbMock *mocks.IFieldDb
// partitionDbMock *mocks.IPartitionDb
// collChannelDbMock *mocks.ICollChannelDb
// indexDbMock *mocks.IIndexDb
// aliasDbMock *mocks.ICollAliasDb
// segIndexDbMock *mocks.ISegmentIndexDb
// userDbMock *mocks.IUserDb
//
// mockCatalog *Catalog
//)
//
//// TestMain is the first function executed in current package, we will do some initial here
//func TestMain(m *testing.M) {
// ctx = contextutil.WithTenantID(context.Background(), tenantID)
//
// collDbMock = &mocks.ICollectionDb{}
// fieldDbMock = &mocks.IFieldDb{}
// partitionDbMock = &mocks.IPartitionDb{}
// collChannelDbMock = &mocks.ICollChannelDb{}
// indexDbMock = &mocks.IIndexDb{}
// aliasDbMock = &mocks.ICollAliasDb{}
// segIndexDbMock = &mocks.ISegmentIndexDb{}
// userDbMock = &mocks.IUserDb{}
//
// metaDomainMock = &mocks.IMetaDomain{}
// metaDomainMock.On("CollectionDb", ctx).Return(collDbMock)
// metaDomainMock.On("FieldDb", ctx).Return(fieldDbMock)
// metaDomainMock.On("PartitionDb", ctx).Return(partitionDbMock)
// metaDomainMock.On("CollChannelDb", ctx).Return(collChannelDbMock)
// metaDomainMock.On("IndexDb", ctx).Return(indexDbMock)
// metaDomainMock.On("CollAliasDb", ctx).Return(aliasDbMock)
// metaDomainMock.On("SegmentIndexDb", ctx).Return(segIndexDbMock)
// metaDomainMock.On("UserDb", ctx).Return(userDbMock)
//
// mockCatalog = mockMetaCatalog(metaDomainMock)
//
// // m.Run entry for executing tests
// os.Exit(m.Run())
//}
//
//type NoopTransaction struct{}
//
//func (*NoopTransaction) Transaction(ctx context.Context, fn func(txctx context.Context) error) error {
// return fn(ctx)
//}
//
//func mockMetaCatalog(petDomain dbmodel.IMetaDomain) *Catalog {
// return NewTableCatalog(&NoopTransaction{}, petDomain)
//}
//
//func TestTableCatalog_CreateIndex(t *testing.T) {
// index := &model.Index{
// CollectionID: collID1,
// FieldID: fieldID1,
// IndexID: indexID1,
// IndexName: "testColl_index_110",
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: "test_index_params_k1",
// Value: "test_index_params_v1",
// },
// },
// SegmentIndexes: map[int64]model.SegmentIndex{
// segmentID1: {
// Segment: model.Segment{
// SegmentID: segmentID1,
// PartitionID: partitionID1,
// },
// BuildID: indexBuildID1,
// CreateTime: 0,
// },
// },
// }
//
// // expectation
// indexDbMock.On("Insert", mock.Anything).Return(nil).Once()
// segIndexDbMock.On("Insert", mock.Anything).Return(nil).Once()
//
// // actual
// gotErr := mockCatalog.CreateIndex(ctx, index)
// require.NoError(t, gotErr)
//}
//
//func TestTableCatalog_CreateIndex_InsertIndexError(t *testing.T) {
// index := &model.Index{
// CollectionID: collID1,
// FieldID: fieldID1,
// IndexID: indexID1,
// IndexName: "testColl_index_110",
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: "test_index_params_k1",
// Value: "test_index_params_v1",
// },
// },
// }
//
// // expectation
// errTest := errors.New("test error")
// indexDbMock.On("Insert", mock.Anything).Return(errTest).Once()
//
// // actual
// gotErr := mockCatalog.CreateIndex(ctx, index)
// require.NoError(t, gotErr)
//}
//
//func TestTableCatalog_CreateIndex_InsertSegmentIndexError(t *testing.T) {
// index := &model.Index{
// CollectionID: collID1,
// FieldID: fieldID1,
// IndexID: indexID1,
// IndexName: "testColl_index_110",
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: "test_index_params_k1",
// Value: "test_index_params_v1",
// },
// },
// SegmentIndexes: map[int64]model.SegmentIndex{
// segmentID1: {
// Segment: model.Segment{
// SegmentID: segmentID1,
// PartitionID: partitionID1,
// },
// BuildID: indexBuildID1,
// CreateTime: 0,
// IndexFileKeys: []string{"a\xc5z"},
// },
// },
// }
//
// // expectation
// errTest := errors.New("test error")
// indexDbMock.On("Insert", mock.Anything).Return(nil).Once()
// segIndexDbMock.On("Insert", mock.Anything).Return(errTest).Once()
//
// // actual
// gotErr := mockCatalog.CreateIndex(ctx, index)
// require.NoError(t, gotErr)
//}
//
//func TestTableCatalog_AlterIndex_AddSegmentIndex(t *testing.T) {
// oldIndex := &model.Index{
// CollectionID: collID1,
// FieldID: fieldID1,
// IndexID: indexID1,
// IndexName: "testColl_index_110",
// CreateTime: uint64(0),
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: "test_index_params_k1",
// Value: "test_index_params_v1",
// },
// },
// SegmentIndexes: map[int64]model.SegmentIndex{
// segmentID1: {
// Segment: model.Segment{
// SegmentID: segmentID1,
// PartitionID: partitionID1,
// },
// CreateTime: uint64(0),
// },
// },
// }
//
// newIndex := &model.Index{
// CollectionID: collID1,
// FieldID: fieldID1,
// IndexID: indexID1,
// IndexName: "testColl_index_110",
// CreateTime: uint64(1011),
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: "test_index_params_k1",
// Value: "test_index_params_v1",
// },
// },
// SegmentIndexes: map[int64]model.SegmentIndex{
// segmentID1: {
// Segment: model.Segment{
// SegmentID: segmentID1,
// PartitionID: partitionID1,
// },
// BuildID: indexBuildID1,
// CreateTime: uint64(1011),
// },
// },
// }
//
// // expectation
// segIndexDbMock.On("Upsert", mock.Anything).Return(nil).Once()
// indexDbMock.On("Update", mock.Anything).Return(nil).Once()
//
// // actual
// gotErr := mockCatalog.AlterIndex(ctx, oldIndex, newIndex, metastore.ADD)
// require.NoError(t, gotErr)
//}
//
//func TestTableCatalog_AlterIndex_AddSegmentIndex_UpsertSegmentIndexError(t *testing.T) {
// oldIndex := &model.Index{
// CollectionID: collID1,
// FieldID: fieldID1,
// IndexID: indexID1,
// IndexName: "testColl_index_110",
// CreateTime: uint64(0),
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: "test_index_params_k1",
// Value: "test_index_params_v1",
// },
// },
// SegmentIndexes: map[int64]model.SegmentIndex{
// segmentID1: {
// Segment: model.Segment{
// SegmentID: segmentID1,
// PartitionID: partitionID1,
// },
// CreateTime: uint64(0),
// },
// },
// }
//
// newIndex := &model.Index{
// CollectionID: collID1,
// FieldID: fieldID1,
// IndexID: indexID1,
// IndexName: "testColl_index_110",
// CreateTime: uint64(1011),
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: "test_index_params_k1",
// Value: "test_index_params_v1",
// },
// },
// SegmentIndexes: map[int64]model.SegmentIndex{
// segmentID1: {
// Segment: model.Segment{
// SegmentID: segmentID1,
// PartitionID: partitionID1,
// },
// BuildID: indexBuildID1,
// CreateTime: uint64(1011),
// },
// },
// }
//
// // expectation
// errTest := errors.New("test error")
// segIndexDbMock.On("Upsert", mock.Anything).Return(errTest).Once()
//
// // actual
// gotErr := mockCatalog.AlterIndex(ctx, oldIndex, newIndex, metastore.ADD)
// require.NoError(t, gotErr)
//}
//
//func TestTableCatalog_DropIndex(t *testing.T) {
// // expectation
// indexDbMock.On("MarkDeletedByIndexID", tenantID, indexID1).Return(nil).Once()
// segIndexDbMock.On("MarkDeletedByIndexID", tenantID, indexID1).Return(nil).Once()
//
// // actual
// gotErr := mockCatalog.DropIndex(ctx, 0, indexID1)
// require.NoError(t, gotErr)
//}
//
//func TestTableCatalog_DropIndex_IndexMarkDeletedError(t *testing.T) {
// // expectation
// errTest := errors.New("test error")
// indexDbMock.On("MarkDeletedByIndexID", tenantID, indexID1).Return(errTest).Once()
//
// // actual
// gotErr := mockCatalog.DropIndex(ctx, 0, indexID1)
// require.NoError(t, gotErr)
//}
//
//func TestTableCatalog_DropIndex_SegmentIndexMarkDeletedError(t *testing.T) {
// // expectation
// errTest := errors.New("test error")
// indexDbMock.On("MarkDeletedByIndexID", tenantID, indexID1).Return(nil).Once()
// segIndexDbMock.On("MarkDeletedByIndexID", tenantID, indexID1).Return(errTest).Once()
//
// // actual
// gotErr := mockCatalog.DropIndex(ctx, 0, indexID1)
// require.NoError(t, gotErr)
//}
//
////func TestTableCatalog_ListIndexes(t *testing.T) {
//// indexResult := []*dbmodel.IndexResult{
//// {
//// FieldID: fieldID1,
//// CollectionID: collID1,
//// IndexID: indexID1,
//// IndexName: "test_index_name_1",
//// IndexParams: "[{\"Key\":\"test_index_params_k1\",\"Value\":\"test_index_params_v1\"}]",
//// SegmentID: segmentID1,
//// PartitionID: partitionID1,
//// EnableIndex: false,
//// IndexBuildID: indexBuildID1,
//// IndexSize: 0,
//// IndexFileKeys: "[\"test_index_file_path_1\"]",
//// },
//// }
//// out := []*model.Index{
//// {
//// CollectionID: collID1,
//// FieldID: fieldID1,
//// IndexID: indexID1,
//// IndexName: "test_index_name_1",
//// IsDeleted: false,
//// CreateTime: 0,
//// IndexParams: []*commonpb.KeyValuePair{
//// {
//// Key: "test_index_params_k1",
//// Value: "test_index_params_v1",
//// },
//// },
//// SegmentIndexes: map[int64]model.SegmentIndex{
//// segmentID1: {
//// Segment: model.Segment{
//// SegmentID: segmentID1,
//// PartitionID: partitionID1,
//// },
//// BuildID: indexBuildID1,
//// //EnableIndex: false,
//// CreateTime: 0,
//// IndexFileKeys: []string{"test_index_file_path_1"},
//// },
//// },
//// Extra: nil,
//// },
//// }
////
//// // expectation
//// indexDbMock.On("List", tenantID).Return(indexResult, nil).Once()
////
//// // actual
//// res, gotErr := mockCatalog.ListIndexes(ctx)
//// require.NoError(t, gotErr)
//// require.Equal(t, out, res)
////}
//
//func TestTableCatalog_ListIndexes_SelectIndexError(t *testing.T) {
// // expectation
// errTest := errors.New("test error")
// indexDbMock.On("List", tenantID).Return(nil, errTest).Once()
//
// // actual
// res, gotErr := mockCatalog.ListIndexes(ctx)
// require.Nil(t, res)
// require.Error(t, gotErr)
//}

View File

@ -59,7 +59,7 @@ func (tc *Catalog) CreateCollection(ctx context.Context, collection *model.Colle
if collection.StartPositions != nil {
startPositionsBytes, err := json.Marshal(collection.StartPositions)
if err != nil {
log.Error("marshal collection start positions error", zap.Int64("collID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("marshal collection start positions error", zap.Int64("collectionID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
return err
}
startPositionsStr = string(startPositionsBytes)
@ -176,7 +176,7 @@ func (tc *Catalog) GetCollectionByID(ctx context.Context, dbID int64, ts typeuti
return nil, err
}
if cidTsPair.IsDeleted {
log.Error("not found collection", zap.Int64("collID", collectionID), zap.Uint64("ts", ts))
log.Error("not found collection", zap.Int64("collectionID", collectionID), zap.Uint64("ts", ts))
return nil, fmt.Errorf("not found collection, collID=%d, ts=%d", collectionID, ts)
}
@ -326,7 +326,7 @@ func (tc *Catalog) DropCollection(ctx context.Context, collection *model.Collect
}
err := tc.metaDomain.CollectionDb(txCtx).Insert(coll)
if err != nil {
log.Error("insert tombstone record for collections failed", zap.String("tenant", tenantID), zap.Int64("collID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("insert tombstone record for collections failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
return err
}
@ -346,7 +346,7 @@ func (tc *Catalog) DropCollection(ctx context.Context, collection *model.Collect
err = tc.metaDomain.CollAliasDb(txCtx).Insert(collAliases)
if err != nil {
log.Error("insert tombstone record for collection_aliases failed", zap.String("tenant", tenantID), zap.Int64("collID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("insert tombstone record for collection_aliases failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
return err
}
}
@ -360,7 +360,7 @@ func (tc *Catalog) DropCollection(ctx context.Context, collection *model.Collect
}
err = tc.metaDomain.CollChannelDb(txCtx).Insert([]*dbmodel.CollectionChannel{collChannel})
if err != nil {
log.Error("insert tombstone record for collection_channels failed", zap.String("tenant", tenantID), zap.Int64("collID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("insert tombstone record for collection_channels failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
return err
}
@ -373,7 +373,7 @@ func (tc *Catalog) DropCollection(ctx context.Context, collection *model.Collect
}
err = tc.metaDomain.FieldDb(txCtx).Insert([]*dbmodel.Field{field})
if err != nil {
log.Error("insert tombstone record for field_schemas failed", zap.String("tenant", tenantID), zap.Int64("collID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("insert tombstone record for field_schemas failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
return err
}
@ -386,7 +386,7 @@ func (tc *Catalog) DropCollection(ctx context.Context, collection *model.Collect
}
err = tc.metaDomain.PartitionDb(txCtx).Insert([]*dbmodel.Partition{partition})
if err != nil {
log.Error("insert tombstone record for partitions failed", zap.String("tenant", tenantID), zap.Int64("collID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("insert tombstone record for partitions failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collection.CollectionID), zap.Uint64("ts", ts), zap.Error(err))
return err
}
@ -455,7 +455,7 @@ func (tc *Catalog) CreatePartition(ctx context.Context, dbID int64, partition *m
}
err := tc.metaDomain.PartitionDb(ctx).Insert([]*dbmodel.Partition{p})
if err != nil {
log.Error("insert partitions failed", zap.String("tenant", tenantID), zap.Int64("collID", partition.CollectionID), zap.Int64("partitionID", partition.PartitionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("insert partitions failed", zap.String("tenant", tenantID), zap.Int64("collectionID", partition.CollectionID), zap.Int64("partitionID", partition.PartitionID), zap.Uint64("ts", ts), zap.Error(err))
return err
}
@ -474,7 +474,7 @@ func (tc *Catalog) DropPartition(ctx context.Context, dbID int64, collectionID t
}
err := tc.metaDomain.PartitionDb(ctx).Insert([]*dbmodel.Partition{p})
if err != nil {
log.Error("insert tombstone record for partition failed", zap.String("tenant", tenantID), zap.Int64("collID", collectionID), zap.Int64("partitionID", partitionID), zap.Uint64("ts", ts), zap.Error(err))
log.Error("insert tombstone record for partition failed", zap.String("tenant", tenantID), zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Uint64("ts", ts), zap.Error(err))
return err
}
@ -516,7 +516,7 @@ func (tc *Catalog) CreateAlias(ctx context.Context, alias *model.Alias, ts typeu
}
err := tc.metaDomain.CollAliasDb(ctx).Insert([]*dbmodel.CollectionAlias{collAlias})
if err != nil {
log.Error("insert collection_aliases failed", zap.Int64("collID", alias.CollectionID), zap.String("alias", alias.Name), zap.Uint64("ts", ts), zap.Error(err))
log.Error("insert collection_aliases failed", zap.Int64("collectionID", alias.CollectionID), zap.String("alias", alias.Name), zap.Uint64("ts", ts), zap.Error(err))
return err
}
@ -540,7 +540,7 @@ func (tc *Catalog) DropAlias(ctx context.Context, dbID int64, alias string, ts t
}
err = tc.metaDomain.CollAliasDb(ctx).Insert([]*dbmodel.CollectionAlias{collAlias})
if err != nil {
log.Error("insert tombstone record for collection_aliases failed", zap.Int64("collID", collectionID), zap.String("collAlias", alias), zap.Uint64("ts", ts), zap.Error(err))
log.Error("insert tombstone record for collection_aliases failed", zap.Int64("collectionID", collectionID), zap.String("collAlias", alias), zap.Uint64("ts", ts), zap.Error(err))
return err
}

View File

@ -861,7 +861,7 @@ func checkBinlogs(binlogType storage.BinlogType, segmentID typeutil.UniqueID, lo
for _, fieldBinlog := range logs {
for _, binlog := range fieldBinlog.Binlogs {
if segmentID != getSegmentID(binlog.LogPath) {
log.Panic("the segment path doesn't match the segment id", zap.Int64("segment_id", segmentID), zap.String("path", binlog.LogPath))
log.Panic("the segment path doesn't match the segmentID", zap.Int64("segmentID", segmentID), zap.String("path", binlog.LogPath))
}
}
}

View File

@ -3750,7 +3750,7 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq
collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
if err != nil {
log.Warn("failed to get collection id",
zap.String("collection name", req.GetCollectionName()),
zap.String("collectionName", req.GetCollectionName()),
zap.Error(err))
status.Reason = err.Error()
return status, nil
@ -3973,7 +3973,7 @@ func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*mi
log := log.Ctx(ctx)
log.Info("received import request",
zap.String("collection name", req.GetCollectionName()),
zap.String("collectionName", req.GetCollectionName()),
zap.String("partition name", req.GetPartitionName()),
zap.Strings("files", req.GetFiles()))
resp := &milvuspb.ImportResponse{

View File

@ -348,7 +348,7 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, database, collectio
collInfo = m.collInfo[database][collectionName]
metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Debug("Reload collection from root coordinator ",
zap.String("collection name", collectionName),
zap.String("collectionName", collectionName),
zap.Any("time (milliseconds) take ", tr.ElapseSpan().Milliseconds()))
return collInfo.schema, nil
}

View File

@ -123,8 +123,8 @@ func repackInsertDataByPartition(ctx context.Context,
assignedSegmentInfos, err := segIDAssigner.GetSegmentID(insertMsg.CollectionID, partitionID, channelName, uint32(len(rowOffsets)), maxTs)
if err != nil {
log.Error("allocate segmentID for insert data failed",
zap.String("collection name", insertMsg.CollectionName),
zap.String("channel name", channelName),
zap.String("collectionName", insertMsg.CollectionName),
zap.String("channelName", channelName),
zap.Int("allocate count", len(rowOffsets)),
zap.Error(err))
return nil, err
@ -136,7 +136,7 @@ func repackInsertDataByPartition(ctx context.Context,
msgs, err := genInsertMsgsByPartition(ctx, segmentID, partitionID, partitionName, subRowOffsets, channelName, insertMsg)
if err != nil {
log.Warn("repack insert data to insert msgs failed",
zap.String("collection name", insertMsg.CollectionName),
zap.String("collectionName", insertMsg.CollectionName),
zap.Int64("partitionID", partitionID),
zap.Error(err))
return nil, err
@ -187,7 +187,7 @@ func repackInsertData(ctx context.Context,
msgs, err := repackInsertDataByPartition(ctx, partitionName, rowOffsets, channel, insertMsg, segIDAssigner)
if err != nil {
log.Warn("repack insert data to msg pack failed",
zap.String("collection name", insertMsg.CollectionName),
zap.String("collectionName", insertMsg.CollectionName),
zap.String("partition name", partitionName),
zap.Error(err))
return nil, err
@ -199,7 +199,7 @@ func repackInsertData(ctx context.Context,
err := setMsgID(ctx, msgPack.Msgs, idAllocator)
if err != nil {
log.Error("failed to set msgID when repack insert data",
zap.String("collection name", insertMsg.CollectionName),
zap.String("collectionName", insertMsg.CollectionName),
zap.String("partition name", insertMsg.PartitionName),
zap.Error(err))
return nil, err
@ -224,14 +224,14 @@ func repackInsertDataWithPartitionKey(ctx context.Context,
partitionNames, err := getDefaultPartitionNames(ctx, insertMsg.GetDbName(), insertMsg.CollectionName)
if err != nil {
log.Warn("get default partition names failed in partition key mode",
zap.String("collection name", insertMsg.CollectionName),
zap.String("collectionName", insertMsg.CollectionName),
zap.Error(err))
return nil, err
}
hashValues, err := typeutil.HashKey2Partitions(partitionKeys, partitionNames)
if err != nil {
log.Warn("has partition keys to partitions failed",
zap.String("collection name", insertMsg.CollectionName),
zap.String("collectionName", insertMsg.CollectionName),
zap.Error(err))
return nil, err
}
@ -265,8 +265,8 @@ func repackInsertDataWithPartitionKey(ctx context.Context,
err = errGroup.Wait()
if err != nil {
log.Warn("repack insert data into insert msg pack failed",
zap.String("collection name", insertMsg.CollectionName),
zap.String("channel name", channel),
zap.String("collectionName", insertMsg.CollectionName),
zap.String("channelName", channel),
zap.Error(err))
return nil, err
}
@ -281,7 +281,7 @@ func repackInsertDataWithPartitionKey(ctx context.Context,
err = setMsgID(ctx, msgPack.Msgs, idAllocator)
if err != nil {
log.Error("failed to set msgID when repack insert data",
zap.String("collection name", insertMsg.CollectionName),
zap.String("collectionName", insertMsg.CollectionName),
zap.Error(err))
return nil, err
}

View File

@ -378,7 +378,7 @@ func (cit *createIndexTask) PreExecute(ctx context.Context) error {
}
func (cit *createIndexTask) Execute(ctx context.Context) error {
log.Ctx(ctx).Info("proxy create index", zap.Int64("collID", cit.collectionID), zap.Int64("fieldID", cit.fieldSchema.GetFieldID()),
log.Ctx(ctx).Info("proxy create index", zap.Int64("collectionID", cit.collectionID), zap.Int64("fieldID", cit.fieldSchema.GetFieldID()),
zap.String("indexName", cit.req.GetIndexName()), zap.Any("typeParams", cit.fieldSchema.GetTypeParams()),
zap.Any("indexParams", cit.req.GetExtraParams()),
zap.Any("newExtraParams", cit.newExtraParams),

View File

@ -176,14 +176,14 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
partitionKeyMode, err := isPartitionKeyMode(ctx, it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Warn("check partition key mode failed", zap.String("collection name", collectionName), zap.Error(err))
log.Warn("check partition key mode failed", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
if partitionKeyMode {
fieldSchema, _ := typeutil.GetPartitionKeyFieldSchema(it.schema)
it.partitionKeys, err = getPartitionKeyFieldData(fieldSchema, it.insertMsg)
if err != nil {
log.Warn("get partition keys from insert request failed", zap.String("collection name", collectionName), zap.Error(err))
log.Warn("get partition keys from insert request failed", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
} else {

View File

@ -221,7 +221,7 @@ func (it *upsertTask) deletePreExecute(ctx context.Context) error {
zap.String("collectionName", collName))
if err := validateCollectionName(collName); err != nil {
log.Info("Invalid collection name", zap.Error(err))
log.Info("Invalid collectionName", zap.Error(err))
return err
}
collID, err := globalMetaCache.GetCollectionID(ctx, it.req.GetDbName(), collName)

View File

@ -52,7 +52,7 @@ func loadGrowingSegments(ctx context.Context, delegator delegator.ShardDelegator
// unFlushed segment may not have binLogs, skip loading
segmentInfo := req.GetSegmentInfos()[segmentID]
if segmentInfo == nil {
log.Warn("an unflushed segment is not found in segment infos", zap.Int64("segment ID", segmentID))
log.Warn("an unflushed segment is not found in segment infos", zap.Int64("segmentID", segmentID))
continue
}
if len(segmentInfo.GetBinlogs()) > 0 {

View File

@ -203,7 +203,7 @@ func (b *ServerBroker) Flush(ctx context.Context, cID int64, segIDs []int64) err
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(resp.Status.Reason)
}
log.Info("flush on collection succeed", zap.Int64("collection ID", cID))
log.Info("flush on collection succeed", zap.Int64("collectionID", cID))
return nil
}
@ -257,7 +257,7 @@ func (b *ServerBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID
}
func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
log.Info("broadcasting request to alter collection", zap.String("collection name", req.GetCollectionName()), zap.Int64("collection id", req.GetCollectionID()))
log.Info("broadcasting request to alter collection", zap.String("collectionName", req.GetCollectionName()), zap.Int64("collectionID", req.GetCollectionID()))
colMeta, err := b.s.meta.GetCollectionByID(ctx, req.GetDbName(), req.GetCollectionID(), typeutil.MaxTimestamp, false)
if err != nil {
@ -289,7 +289,7 @@ func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milv
if resp.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(resp.Reason)
}
log.Info("done to broadcast request to alter collection", zap.String("collection name", req.GetCollectionName()), zap.Int64("collection id", req.GetCollectionID()))
log.Info("done to broadcast request to alter collection", zap.String("collectionName", req.GetCollectionName()), zap.Int64("collectionID", req.GetCollectionID()))
return nil
}

View File

@ -264,7 +264,7 @@ func (d *dmlChannels) getChannelNum() int {
func (d *dmlChannels) getMsgStreamByName(chanName string) (*dmlMsgStream, error) {
v, ok := d.pool.Load(chanName)
if !ok {
log.Error("invalid channel name", zap.String("chanName", chanName))
log.Error("invalid channelName", zap.String("chanName", chanName))
return nil, errors.Newf("invalid channel name: %s", chanName)
}
return v.(*dmlMsgStream), nil
@ -366,12 +366,12 @@ func genChannelNames(prefix string, num int64) []string {
func parseChannelNameIndex(channelName string) int {
index := strings.LastIndex(channelName, "_")
if index < 0 {
log.Error("invalid channel name", zap.String("chanName", channelName))
log.Error("invalid channelName", zap.String("chanName", channelName))
panic("invalid channel name: " + channelName)
}
index, err := strconv.Atoi(channelName[index+1:])
if err != nil {
log.Error("invalid channel name", zap.String("chanName", channelName), zap.Error(err))
log.Error("invalid channelName", zap.String("chanName", channelName), zap.Error(err))
panic("invalid channel name: " + channelName)
}
return index

View File

@ -415,9 +415,9 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
log.Info("receive import job",
zap.String("database name", req.GetDbName()),
zap.String("collection name", req.GetCollectionName()),
zap.Int64("collection ID", cID),
zap.Int64("partition ID", pID))
zap.String("collectionName", req.GetCollectionName()),
zap.Int64("collectionID", cID),
zap.Int64("partitionID", pID))
err := func() error {
m.pendingLock.Lock()
defer m.pendingLock.Unlock()
@ -698,8 +698,8 @@ func (m *importManager) setCollectionPartitionName(dbName string, colID, partID
return nil
}
log.Error("failed to setCollectionPartitionName",
zap.Int64("collection ID", colID),
zap.Int64("partition ID", partID),
zap.Int64("collectionID", colID),
zap.Int64("partitionID", partID),
zap.Error(err))
}
return errors.New("failed to setCollectionPartitionName for import task")

View File

@ -1000,7 +1000,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
tr := timerecord.NewTimeRecorder("HasCollection")
ts := getTravelTs(in)
log := log.Ctx(ctx).With(zap.String("collection name", in.GetCollectionName()),
log := log.Ctx(ctx).With(zap.String("collectionName", in.GetCollectionName()),
zap.Uint64("ts", ts))
log.Info("received request to has collection")
@ -1085,7 +1085,7 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe
tr := timerecord.NewTimeRecorder("DescribeCollection")
ts := getTravelTs(in)
log := log.Ctx(ctx).With(zap.String("collection name", in.GetCollectionName()),
log := log.Ctx(ctx).With(zap.String("collectionName", in.GetCollectionName()),
zap.String("dbName", in.GetDbName()),
zap.Int64("id", in.GetCollectionID()),
zap.Uint64("ts", ts),
@ -1804,7 +1804,7 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
var err error
if colInfo, err = c.meta.GetCollectionByName(ctx, req.GetDbName(), req.GetCollectionName(), typeutil.MaxTimestamp); err != nil {
log.Error("failed to find collection ID from its name",
zap.String("collection name", req.GetCollectionName()),
zap.String("collectionName", req.GetCollectionName()),
zap.Error(err))
return nil, err
}
@ -1813,7 +1813,7 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
if isBackUp {
if len(req.GetPartitionName()) == 0 {
log.Info("partition name not specified when backup recovery",
zap.String("collection name", req.GetCollectionName()))
zap.String("collectionName", req.GetCollectionName()))
ret := &milvuspb.ImportResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError,
"partition name not specified when backup"),
@ -1846,11 +1846,11 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
}
log.Info("RootCoord receive import request",
zap.String("collection name", req.GetCollectionName()),
zap.Int64("collection ID", cID),
zap.String("collectionName", req.GetCollectionName()),
zap.Int64("collectionID", cID),
zap.String("partition name", req.GetPartitionName()),
zap.Strings("virtual channel names", req.GetChannelNames()),
zap.Int64("partition ID", pID),
zap.Int64("partitionID", pID),
zap.Int("# of files = ", len(req.GetFiles())),
)
importJobResp := c.importManager.importJob(ctx, req, cID, pID)

View File

@ -121,7 +121,7 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact
// recover physical channels for all collections
for collID, chanNames := range chanMap {
dmlChannels.addChannels(chanNames...)
log.Info("recover physical channels", zap.Int64("collID", collID), zap.Strings("physical channels", chanNames))
log.Info("recover physical channels", zap.Int64("collectionID", collID), zap.Strings("physical channels", chanNames))
}
return &timetickSync{

View File

@ -13,7 +13,7 @@ func ReCalcRowCount(seg, segCloned *datapb.SegmentInfo) {
// `segment` is not mutated but only cloned above and is safe to be referred here.
if newCount := CalcRowCountFromBinLog(seg); newCount != seg.GetNumOfRows() {
log.Warn("segment row number meta inconsistent with bin log row count and will be corrected",
zap.Int64("segment ID", seg.GetID()),
zap.Int64("segmentID", seg.GetID()),
zap.Int64("segment meta row count (wrong)", seg.GetNumOfRows()),
zap.Int64("segment bin log row count (correct)", newCount))
// Update the corrected row count.