mirror of https://github.com/milvus-io/milvus.git
Remove log.Error(err.error())-style log (#26783)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/26551/head
parent
5602b22531
commit
e8f1b1736e
|
@ -68,7 +68,7 @@ func TestMain(m *testing.M) {
|
|||
// init embed etcd
|
||||
embedetcdServer, tempDir, err := etcd.StartTestEmbedEtcdServer()
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
log.Fatal("failed to start embed etcd server", zap.Error(err))
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
defer embedetcdServer.Close()
|
||||
|
|
|
@ -53,7 +53,7 @@ func TestMain(t *testing.M) {
|
|||
// init embed etcd
|
||||
embedetcdServer, tempDir, err := etcd.StartTestEmbedEtcdServer()
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
log.Fatal("failed to start embed etcd server", zap.Error(err))
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
defer embedetcdServer.Close()
|
||||
|
|
|
@ -100,9 +100,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
|
|||
tmpSegIDs, err := dn.bufferDeleteMsg(msg, fgMsg.timeRange, fgMsg.startPositions[0], fgMsg.endPositions[0])
|
||||
if err != nil {
|
||||
// error occurs only when deleteMsg is misaligned, should not happen
|
||||
err = fmt.Errorf("buffer delete msg failed, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
log.Fatal("failed to buffer delete msg", zap.String("traceID", traceID), zap.Error(err))
|
||||
}
|
||||
segIDs.Insert(tmpSegIDs...)
|
||||
}
|
||||
|
@ -129,9 +127,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
|
|||
return dn.flushManager.flushDelData(buf, segmentToFlush, fgMsg.endPositions[0])
|
||||
}, getFlowGraphRetryOpt())
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to flush delete data, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
log.Fatal("failed to flush delete data", zap.Int64("segmentID", segmentToFlush), zap.Error(err))
|
||||
}
|
||||
// remove delete buf
|
||||
dn.delBufferManager.Delete(segmentToFlush)
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"math"
|
||||
"reflect"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/atomic"
|
||||
|
@ -174,10 +173,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
|
|||
|
||||
if startPositions[0].Timestamp < ibNode.lastTimestamp {
|
||||
// message stream should guarantee that this should not happen
|
||||
err := fmt.Errorf("insert buffer node consumed old messages, channel = %s, timestamp = %d, lastTimestamp = %d",
|
||||
ibNode.channelName, startPositions[0].Timestamp, ibNode.lastTimestamp)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
log.Fatal("insert buffer node consumed old messages", zap.String("channel", ibNode.channelName), zap.Uint64("timestamp", startPositions[0].Timestamp), zap.Uint64("lastTimestamp", ibNode.lastTimestamp))
|
||||
}
|
||||
|
||||
ibNode.lastTimestamp = endPositions[0].Timestamp
|
||||
|
@ -186,9 +182,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
|
|||
seg2Upload, err := ibNode.addSegmentAndUpdateRowNum(fgMsg.insertMessages, startPositions[0], endPositions[0])
|
||||
if err != nil {
|
||||
// Occurs only if the collectionID is mismatch, should not happen
|
||||
err = errors.Wrap(err, "update segment states in channel meta wrong")
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
log.Fatal("failed to update segment states in channel meta", zap.String("channelName", ibNode.channelName), zap.Error(err))
|
||||
}
|
||||
|
||||
// insert messages -> buffer
|
||||
|
@ -196,9 +190,12 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
|
|||
err := ibNode.bufferInsertMsg(msg, startPositions[0], endPositions[0])
|
||||
if err != nil {
|
||||
// error occurs when missing schema info or data is misaligned, should not happen
|
||||
err = errors.Wrap(err, "insertBufferNode msg to buffer failed")
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
log.Fatal("insertBufferNode failed to buffer insert msg",
|
||||
zap.String("channelName", ibNode.channelName),
|
||||
zap.Int64("segmentID", msg.SegmentID),
|
||||
zap.Int64("msgID", msg.GetBase().GetMsgID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -498,9 +495,10 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
|
|||
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc()
|
||||
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.TotalLabel).Inc()
|
||||
}
|
||||
err = fmt.Errorf("insertBufferNode flushBufferData failed, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
log.Fatal("insertBufferNode failed to flushBufferData",
|
||||
zap.Int64("segmentID", task.segmentID),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
segmentsToSync = append(segmentsToSync, task.segmentID)
|
||||
ibNode.channel.rollInsertBuffer(task.segmentID)
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus/internal/util/mock"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proxy"
|
||||
|
@ -40,7 +41,7 @@ func TestMain(m *testing.M) {
|
|||
// init embed etcd
|
||||
embedetcdServer, tempDir, err := etcd.StartTestEmbedEtcdServer()
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
log.Fatal("failed to start embed etcd server", zap.Error(err))
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
defer embedetcdServer.Close()
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus/internal/util/mock"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
|
@ -41,7 +42,7 @@ func TestMain(m *testing.M) {
|
|||
// init embed etcd
|
||||
embedetcdServer, tempDir, err := etcd.StartTestEmbedEtcdServer()
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
log.Fatal("failed to start embed etcd server", zap.Error(err))
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
defer embedetcdServer.Close()
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus/internal/util/mock"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
|
@ -41,7 +42,7 @@ func TestMain(m *testing.M) {
|
|||
// init embed etcd
|
||||
embedetcdServer, tempDir, err := etcd.StartTestEmbedEtcdServer()
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
log.Fatal("failed to start embed etcd server", zap.Error(err))
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
defer embedetcdServer.Close()
|
||||
|
|
|
@ -73,28 +73,20 @@ func (suite *HTTPServerTestSuite) TestDefaultLogHandler() {
|
|||
|
||||
// change log level through http
|
||||
payload, err := json.Marshal(map[string]any{"level": "error"})
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
suite.Require().NoError(err)
|
||||
|
||||
url := suite.server.URL + "/log/level"
|
||||
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(payload))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
suite.Require().NoError(err)
|
||||
|
||||
client := suite.server.Client()
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
suite.Require().NoError(err)
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
suite.Require().NoError(err)
|
||||
suite.Equal("{\"level\":\"error\"}\n", string(body))
|
||||
suite.Equal(zap.ErrorLevel, log.GetLevel())
|
||||
}
|
||||
|
|
|
@ -196,7 +196,7 @@ func (i *IndexNode) Init() error {
|
|||
log.Info("IndexNode init", zap.String("state", i.lifetime.GetState().String()))
|
||||
err := i.initSession()
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
log.Error("failed to init session", zap.Error(err))
|
||||
initErr = err
|
||||
return
|
||||
}
|
||||
|
|
|
@ -71,9 +71,8 @@ func removeDuplicate(ss []string) []string {
|
|||
|
||||
func newChannels(vchans []vChan, pchans []pChan) (channelInfos, error) {
|
||||
if len(vchans) != len(pchans) {
|
||||
err := fmt.Errorf("physical channels mismatch virtual channels, len(VirtualChannelNames): %v, len(PhysicalChannelNames): %v", len(vchans), len(pchans))
|
||||
log.Error(err.Error())
|
||||
return channelInfos{}, err
|
||||
log.Error("physical channels mismatch virtual channels", zap.Int("len(VirtualChannelNames)", len(vchans)), zap.Int("len(PhysicalChannelNames)", len(pchans)))
|
||||
return channelInfos{}, fmt.Errorf("physical channels mismatch virtual channels, len(VirtualChannelNames): %v, len(PhysicalChannelNames): %v", len(vchans), len(pchans))
|
||||
}
|
||||
/*
|
||||
// remove duplicate physical channels.
|
||||
|
|
|
@ -38,7 +38,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/crypto"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
|
@ -481,24 +480,20 @@ func TestMetaCache_GetPartitionError(t *testing.T) {
|
|||
// Test the case where ShowPartitionsResponse is not aligned
|
||||
id, err := globalMetaCache.GetPartitionID(ctx, dbName, "errorCollection", "par1")
|
||||
assert.Error(t, err)
|
||||
log.Debug(err.Error())
|
||||
assert.Equal(t, id, typeutil.UniqueID(0))
|
||||
|
||||
partitions, err2 := globalMetaCache.GetPartitions(ctx, dbName, "errorCollection")
|
||||
assert.NotNil(t, err2)
|
||||
log.Debug(err.Error())
|
||||
assert.Equal(t, len(partitions), 0)
|
||||
|
||||
// Test non existed tables
|
||||
id, err = globalMetaCache.GetPartitionID(ctx, dbName, "nonExisted", "par1")
|
||||
assert.Error(t, err)
|
||||
log.Debug(err.Error())
|
||||
assert.Equal(t, id, typeutil.UniqueID(0))
|
||||
|
||||
// Test non existed partition
|
||||
id, err = globalMetaCache.GetPartitionID(ctx, dbName, "collection1", "par3")
|
||||
assert.Error(t, err)
|
||||
log.Debug(err.Error())
|
||||
assert.Equal(t, id, typeutil.UniqueID(0))
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
|
@ -52,7 +53,7 @@ func TestMain(m *testing.M) {
|
|||
// init embed etcd
|
||||
embedetcdServer, tempDir, err := etcd.StartTestEmbedEtcdServer()
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
log.Fatal("failed to start embed etcd server", zap.Error(err))
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
defer embedetcdServer.Close()
|
||||
|
|
|
@ -17,11 +17,10 @@
|
|||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var Average *averageCollector
|
||||
|
@ -61,9 +60,7 @@ func init() {
|
|||
var err error
|
||||
Rate, err = ratelimitutil.NewRateCollector(ratelimitutil.DefaultWindow, ratelimitutil.DefaultGranularity)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("querynode collector init failed, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
log.Fatal("failed to initialize querynode rate collector", zap.Error(err))
|
||||
}
|
||||
Average = newAverageCollector()
|
||||
Counter = newCounter()
|
||||
|
|
|
@ -72,9 +72,7 @@ func (fNode *filterNode) Operate(in Msg) Msg {
|
|||
//Get collection from collection manager
|
||||
collection := fNode.manager.Collection.Get(fNode.collectionID)
|
||||
if collection == nil {
|
||||
err := merr.WrapErrCollectionNotFound(fNode.collectionID)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
log.Fatal("collection not found in meta", zap.Int64("collectionID", fNode.collectionID))
|
||||
}
|
||||
|
||||
out := &insertNodeMsg{
|
||||
|
|
|
@ -257,7 +257,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
|||
// to avoid concurrent watch/unwatch
|
||||
if node.unsubscribingChannels.Contain(channel.GetChannelName()) {
|
||||
err := merr.WrapErrChannelUnsubscribing(channel.GetChannelName())
|
||||
log.Warn(err.Error())
|
||||
log.Warn("abort watch unsubscribing channel", zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
|
|
|
@ -466,8 +466,8 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
|
|||
|
||||
// task queue size has a limit, return error if import request contains too many data files, and skip entire job
|
||||
if capacity-length < taskCount {
|
||||
log.Error("failed to execute import job, task queue capability insufficient", zap.Int("capacity", capacity), zap.Int("length", length), zap.Int("taskCount", taskCount))
|
||||
err := fmt.Errorf("import task queue max size is %v, currently there are %v tasks is pending. Not able to execute this request with %v tasks", capacity, length, taskCount)
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -154,7 +154,7 @@ func initFileLog(cfg *FileLogConfig) (*lumberjack.Logger, error) {
|
|||
|
||||
func newStdLogger() (*zap.Logger, *ZapProperties) {
|
||||
conf := &Config{Level: "debug", Stdout: true, DisableErrorVerbose: true}
|
||||
lg, r, _ := InitLogger(conf)
|
||||
lg, r, _ := InitLogger(conf, zap.OnFatal(zapcore.WriteThenPanic))
|
||||
return lg, r
|
||||
}
|
||||
|
||||
|
|
|
@ -94,7 +94,7 @@ func GetMemoryCount() uint64 {
|
|||
// get container memory by `cgroups`
|
||||
limit, err := getContainerMemLimit()
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
log.Warn("failed to get container memory limit", zap.Error(err))
|
||||
return 0
|
||||
}
|
||||
// in container, return min(hostMem, containerMem)
|
||||
|
@ -117,7 +117,7 @@ func GetUsedMemoryCount() uint64 {
|
|||
// in container, calculate by `cgroups`
|
||||
used, err := getContainerMemUsed()
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
log.Warn("failed to get container memory used", zap.Error(err))
|
||||
return 0
|
||||
}
|
||||
return used
|
||||
|
|
Loading…
Reference in New Issue