mirror of https://github.com/milvus-io/milvus.git
Add init log logic for index and proxy
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/4973/head^2
parent
8a5c039137
commit
012f5f360e
|
@ -13,21 +13,25 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
"github.com/zilliztech/milvus-distributed/internal/indexnode"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/logutil"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
indexnode.Params.Init()
|
||||
logutil.SetupLogger(&indexnode.Params.Log)
|
||||
n, err := components.NewIndexNode(ctx)
|
||||
if err != nil {
|
||||
log.Print("create server failed", zap.Error(err))
|
||||
log.Error("create server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
|
@ -44,11 +48,11 @@ func main() {
|
|||
}()
|
||||
|
||||
if err := n.Run(); err != nil {
|
||||
log.Fatal("run builder server failed", zap.Error(err))
|
||||
log.Error("run builder server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
log.Print("Got signal to exit", zap.String("signal", sig.String()))
|
||||
log.Debug("Got signal to exit", zap.String("signal", sig.String()))
|
||||
|
||||
if err := n.Stop(); err != nil {
|
||||
log.Fatal("stop builder server failed", zap.Error(err))
|
||||
|
|
|
@ -13,21 +13,25 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
"github.com/zilliztech/milvus-distributed/internal/indexservice"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/logutil"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
indexservice.Params.Init()
|
||||
logutil.SetupLogger(&indexservice.Params.Log)
|
||||
s, err := components.NewIndexService(ctx)
|
||||
if err != nil {
|
||||
log.Print("create server failed", zap.Error(err))
|
||||
log.Debug("create server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
|
@ -48,7 +52,7 @@ func main() {
|
|||
}
|
||||
|
||||
<-ctx.Done()
|
||||
log.Print("Got signal to exit", zap.String("signal", sig.String()))
|
||||
log.Debug("Got signal to exit", zap.String("signal", sig.String()))
|
||||
|
||||
if err := s.Stop(); err != nil {
|
||||
log.Fatal("stop server failed", zap.Error(err))
|
||||
|
|
|
@ -2,23 +2,27 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/logutil"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proxynode"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
msFactory := pulsarms.NewFactory()
|
||||
proxynode.Params.Init()
|
||||
logutil.SetupLogger(&proxynode.Params.Log)
|
||||
n, err := components.NewProxyNode(ctx, msFactory)
|
||||
if err != nil {
|
||||
log.Print("create server failed", zap.Error(err))
|
||||
log.Error("create server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
|
@ -31,7 +35,7 @@ func main() {
|
|||
var sig os.Signal
|
||||
go func() {
|
||||
sig = <-sc
|
||||
log.Println("receive stop signal ...")
|
||||
log.Debug("receive stop signal ...")
|
||||
cancel()
|
||||
}()
|
||||
|
||||
|
@ -40,7 +44,7 @@ func main() {
|
|||
}
|
||||
|
||||
<-ctx.Done()
|
||||
log.Print("Got signal to exit", zap.String("signal", sig.String()))
|
||||
log.Debug("Got signal to exit", zap.String("signal", sig.String()))
|
||||
|
||||
if err := n.Stop(); err != nil {
|
||||
log.Fatal("stop server failed", zap.Error(err))
|
||||
|
|
|
@ -2,24 +2,29 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/logutil"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proxyservice"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
msFactory := pulsarms.NewFactory()
|
||||
proxyservice.Params.Init()
|
||||
logutil.SetupLogger(&proxyservice.Params.Log)
|
||||
s, err := components.NewProxyService(ctx, msFactory)
|
||||
if err != nil {
|
||||
log.Fatal("create proxy service error: " + err.Error())
|
||||
}
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc,
|
||||
syscall.SIGHUP,
|
||||
|
@ -30,7 +35,7 @@ func main() {
|
|||
var sig os.Signal
|
||||
go func() {
|
||||
sig = <-sc
|
||||
log.Println("receive stop signal")
|
||||
log.Debug("receive stop signal")
|
||||
cancel()
|
||||
}()
|
||||
|
||||
|
@ -39,7 +44,7 @@ func main() {
|
|||
}
|
||||
|
||||
<-ctx.Done()
|
||||
log.Print("Got signal to exit", zap.String("signal", sig.String()))
|
||||
log.Debug("Got signal to exit", zap.String("signal", sig.String()))
|
||||
|
||||
if err := s.Stop(); err != nil {
|
||||
log.Fatal("stop server failed", zap.Error(err))
|
||||
|
|
|
@ -7,14 +7,13 @@ import (
|
|||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/retry"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/retry"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
|
|
|
@ -7,14 +7,13 @@ import (
|
|||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/retry"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/retry"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
|
|
|
@ -18,7 +18,10 @@ import (
|
|||
"fmt"
|
||||
"unsafe"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexcgopb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/storage"
|
||||
|
@ -188,13 +191,12 @@ func (index *CIndex) BuildFloatVecIndexWithoutIds(vectors []float32) error {
|
|||
CStatus
|
||||
BuildFloatVecIndexWithoutIds(CIndex index, int64_t float_value_num, const float* vectors);
|
||||
*/
|
||||
fmt.Println("before BuildFloatVecIndexWithoutIds")
|
||||
log.Debug("before BuildFloatVecIndexWithoutIds")
|
||||
status := C.BuildFloatVecIndexWithoutIds(index.indexPtr, (C.int64_t)(len(vectors)), (*C.float)(&vectors[0]))
|
||||
errorCode := status.error_code
|
||||
fmt.Println("BuildFloatVecIndexWithoutIds error code: ", errorCode)
|
||||
if errorCode != 0 {
|
||||
errorMsg := C.GoString(status.error_msg)
|
||||
fmt.Println("BuildFloatVecIndexWithoutIds error msg: ", errorMsg)
|
||||
log.Debug("indexnode", zap.String("BuildFloatVecIndexWithoutIds error msg: ", errorMsg))
|
||||
defer C.free(unsafe.Pointer(status.error_msg))
|
||||
return fmt.Errorf("BuildFloatVecIndexWithoutIds failed, C runtime error detected, error code = %d, err msg = %s", errorCode, errorMsg)
|
||||
}
|
||||
|
@ -256,14 +258,13 @@ func NewCIndex(typeParams, indexParams map[string]string) (Index, error) {
|
|||
CIndex* res_index);
|
||||
*/
|
||||
var indexPtr C.CIndex
|
||||
fmt.Println("before create index ........................................")
|
||||
log.Debug("before create index ...")
|
||||
status := C.CreateIndex(typeParamsPointer, indexParamsPointer, &indexPtr)
|
||||
fmt.Println("after create index ........................................")
|
||||
log.Debug("after create index ...")
|
||||
errorCode := status.error_code
|
||||
fmt.Println("EEEEEEEEEEEEEEEEEEEEEEEEEE error code: ", errorCode)
|
||||
if errorCode != 0 {
|
||||
errorMsg := C.GoString(status.error_msg)
|
||||
fmt.Println("EEEEEEEEEEEEEEEEEEEEEEEEEE error msg: ", errorMsg)
|
||||
log.Debug("indexnode", zap.String("create index error msg", errorMsg))
|
||||
defer C.free(unsafe.Pointer(status.error_msg))
|
||||
return nil, fmt.Errorf(" failed, C runtime error detected, error code = %d, err msg = %s", errorCode, errorMsg)
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ var once sync.Once
|
|||
func (pt *ParamTable) Init() {
|
||||
once.Do(func() {
|
||||
pt.BaseTable.Init()
|
||||
pt.initLogCfg()
|
||||
pt.initParams()
|
||||
})
|
||||
}
|
||||
|
@ -56,7 +57,6 @@ func (pt *ParamTable) initParams() {
|
|||
pt.initMinIOSecretAccessKey()
|
||||
pt.initMinIOUseSSL()
|
||||
pt.initMinioBucketName()
|
||||
pt.initLogCfg()
|
||||
}
|
||||
|
||||
func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb.InitParams) error {
|
||||
|
@ -179,5 +179,9 @@ func (pt *ParamTable) initLogCfg() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("indexnode-%d.log", pt.NodeID))
|
||||
if len(rootPath) != 0 {
|
||||
pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("indexnode-%d.log", pt.NodeID))
|
||||
} else {
|
||||
pt.Log.File.Filename = ""
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package indexnode
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -141,8 +140,8 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
|||
|
||||
if resp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
err = errors.New(resp.Reason)
|
||||
log.Debug("indexnode", zap.String("[IndexBuildTask][PostExecute] err", err.Error()))
|
||||
}
|
||||
log.Debug("indexnode", zap.String("[IndexBuildTask][PostExecute] err", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -192,7 +191,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
|
||||
it.index, err = NewCIndex(typeParams, indexParams)
|
||||
if err != nil {
|
||||
fmt.Println("NewCIndex err:", err.Error())
|
||||
log.Error("indexnode", zap.String("NewCIndex err:", err.Error()))
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
|
@ -244,7 +243,6 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
var insertCodec storage.InsertCodec
|
||||
defer insertCodec.Close()
|
||||
partitionID, segmentID, insertData, err2 := insertCodec.Deserialize(storageBlobs)
|
||||
//fmt.Println("IndexBuilder for segmentID,", segmentID)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
@ -258,7 +256,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
if fOk {
|
||||
err = it.index.BuildFloatVecIndexWithoutIds(floatVectorFieldData.Data)
|
||||
if err != nil {
|
||||
fmt.Println("BuildFloatVecIndexWithoutIds, error:", err.Error())
|
||||
log.Error("indexnode", zap.String("BuildFloatVecIndexWithoutIds error", err.Error()))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -267,7 +265,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
if bOk {
|
||||
err = it.index.BuildBinaryVecIndexWithoutIds(binaryVectorFieldData.Data)
|
||||
if err != nil {
|
||||
fmt.Println("BuildBinaryVecIndexWithoutIds, err:", err.Error())
|
||||
log.Error("indexnode", zap.String("BuildBinaryVecIndexWithoutIds err", err.Error()))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -278,7 +276,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
|
||||
indexBlobs, err := it.index.Serialize()
|
||||
if err != nil {
|
||||
fmt.Println("serialize ... err:", err.Error())
|
||||
log.Error("indexnode", zap.String("serialize err", err.Error()))
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package indexservice
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -191,7 +190,7 @@ func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
|
|||
}
|
||||
|
||||
func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
|
||||
fmt.Println("builder building index ..., indexName = ", req.IndexName, "indexID = ", req.IndexID, "dataPath = ", req.DataPaths)
|
||||
log.Debug("builder building index", zap.String("indexName = ", req.IndexName), zap.Int64("indexID = ", req.IndexID), zap.Strings("dataPath = ", req.DataPaths))
|
||||
ret := &indexpb.BuildIndexResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
|
|
@ -200,7 +200,7 @@ func (mt *metaTable) DeleteIndex(indexBuildID UniqueID) error {
|
|||
if !ok {
|
||||
return fmt.Errorf("can't find index. id = %d", indexBuildID)
|
||||
}
|
||||
fmt.Print(indexMeta)
|
||||
log.Debug("indexservice", zap.Stringer("indexMeta", &indexMeta))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ var once sync.Once
|
|||
func (pt *ParamTable) Init() {
|
||||
once.Do(func() {
|
||||
pt.BaseTable.Init()
|
||||
pt.initLogCfg()
|
||||
pt.initAddress()
|
||||
pt.initPort()
|
||||
pt.initEtcdAddress()
|
||||
|
@ -48,7 +49,6 @@ func (pt *ParamTable) Init() {
|
|||
pt.initMinIOSecretAccessKey()
|
||||
pt.initMinIOUseSSL()
|
||||
pt.initMinioBucketName()
|
||||
pt.initLogCfg()
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -192,5 +192,9 @@ func (pt *ParamTable) initLogCfg() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.File.Filename = path.Join(rootPath, "indexservice-%d.log")
|
||||
if len(rootPath) != 0 {
|
||||
pt.Log.File.Filename = path.Join(rootPath, "indexservice.log")
|
||||
} else {
|
||||
pt.Log.File.Filename = ""
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,9 +116,9 @@ func (it *IndexAddTask) Execute(ctx context.Context) error {
|
|||
log.Debug("before index ...")
|
||||
resp, err := it.builderClient.BuildIndex(ctx, it.req)
|
||||
if err != nil {
|
||||
log.Debug("indexservice", zap.String("build index finish err", err.Error()))
|
||||
return err
|
||||
}
|
||||
log.Debug("indexservice", zap.String("build index finish err", err.Error()))
|
||||
if resp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return errors.New(resp.Reason)
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ func (queue *BaseTaskQueue) FrontUnissuedTask() task {
|
|||
defer queue.utLock.Unlock()
|
||||
|
||||
if queue.unissuedTasks.Len() <= 0 {
|
||||
log.Panic("sorry, but the unissued task list is empty!")
|
||||
log.Warn("sorry, but the unissued task list is empty!")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -219,7 +219,6 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
|
|||
|
||||
defer func() {
|
||||
t.Notify(err)
|
||||
log.Debug("indexservice", zap.String("notify with error", err.Error()))
|
||||
}()
|
||||
if err != nil {
|
||||
trace.LogError(span, err)
|
||||
|
|
|
@ -80,7 +80,7 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb.InitParams
|
|||
for _, v := range val {
|
||||
ss, err := cast.ToStringE(v)
|
||||
if err != nil {
|
||||
log.Panic("proxynode", zap.String("error", err.Error()))
|
||||
log.Warn("proxynode", zap.String("error", err.Error()))
|
||||
}
|
||||
if len(str) == 0 {
|
||||
str = ss
|
||||
|
@ -90,7 +90,7 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb.InitParams
|
|||
}
|
||||
|
||||
default:
|
||||
log.Panic("proxynode", zap.String("error", "Undefined config type, key="+key))
|
||||
log.Debug("proxynode", zap.String("error", "Undefined config type, key="+key))
|
||||
}
|
||||
}
|
||||
log.Debug("proxynode", zap.String(key, str))
|
||||
|
@ -123,6 +123,7 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb.InitParams
|
|||
func (pt *ParamTable) Init() {
|
||||
once.Do(func() {
|
||||
pt.BaseTable.Init()
|
||||
pt.initLogCfg()
|
||||
// err := pt.LoadYaml("advanced/proxy_node.yaml")
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
|
@ -155,7 +156,6 @@ func (pt *ParamTable) initParams() {
|
|||
pt.initMaxDimension()
|
||||
pt.initDefaultPartitionTag()
|
||||
pt.initDefaultIndexName()
|
||||
pt.initLogCfg()
|
||||
|
||||
pt.initPulsarMaxMessageSize()
|
||||
}
|
||||
|
@ -182,7 +182,7 @@ func (pt *ParamTable) initQueryNodeIDList() []UniqueID {
|
|||
for _, i := range queryNodeIDs {
|
||||
v, err := strconv.Atoi(i)
|
||||
if err != nil {
|
||||
log.Panic("proxynode", zap.String("load proxynode id list error", err.Error()))
|
||||
log.Error("proxynode", zap.String("load proxynode id list error", err.Error()))
|
||||
}
|
||||
ret = append(ret, UniqueID(v))
|
||||
}
|
||||
|
@ -472,5 +472,9 @@ func (pt *ParamTable) initLogCfg() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("proxynode-%d.log", pt.ProxyID))
|
||||
if len(rootPath) != 0 {
|
||||
pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("proxynode-%d.log", pt.ProxyID))
|
||||
} else {
|
||||
pt.Log.File.Filename = ""
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,10 +2,12 @@ package proxynode
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"sort"
|
||||
"unsafe"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
|
@ -97,7 +99,7 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
|
|||
ts, ok := channelMaxTSMap[reqID][channelID]
|
||||
if !ok {
|
||||
ts = typeutil.ZeroTimestamp
|
||||
log.Println("Warning: did not get max Timstamp!")
|
||||
log.Debug("Warning: did not get max Timstamp!")
|
||||
}
|
||||
channelName := getChannelName(collID, channelID)
|
||||
if channelName == "" {
|
||||
|
@ -109,7 +111,7 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
|
|||
}
|
||||
reqSegCountMap[reqID][channelID] = make(map[UniqueID]uint32)
|
||||
reqSegCountMap[reqID][channelID] = mapInfo
|
||||
log.Println("ProxyNode: repackFunc, reqSegCountMap, reqID:", reqID, " mapInfo: ", mapInfo)
|
||||
log.Debug("proxynode", zap.Int64("repackFunc, reqSegCountMap, reqID", reqID), zap.Any("mapinfo", mapInfo))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -167,13 +169,13 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
|
|||
return segIDSlice[index]
|
||||
}
|
||||
}
|
||||
log.Panic("Can't Found SegmentID")
|
||||
log.Warn("Can't Found SegmentID")
|
||||
return 0
|
||||
}
|
||||
|
||||
factor := 10
|
||||
threshold := Params.PulsarMaxMessageSize / factor
|
||||
log.Println("threshold of message size: ", threshold)
|
||||
log.Debug("proxynode", zap.Int("threshold of message size: ", threshold))
|
||||
// not accurate
|
||||
getSizeOfInsertMsg := func(msg *msgstream.InsertMsg) int {
|
||||
// if real struct, call unsafe.Sizeof directly,
|
||||
|
@ -202,7 +204,8 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
|
|||
size += int(unsafe.Sizeof(blob.Value))
|
||||
size += len(blob.Value)
|
||||
}
|
||||
// log.Println("size of insert message: ", size)
|
||||
|
||||
//log.Debug("proxynode", zap.Int("insert message size", size))
|
||||
return size
|
||||
}
|
||||
// not accurate
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"container/list"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
|
@ -423,7 +422,7 @@ func (sched *TaskScheduler) queryResultLoop() {
|
|||
//t := sched.getTaskByReqID(reqID)
|
||||
{
|
||||
colName := t.(*SearchTask).query.CollectionName
|
||||
fmt.Println("ljq getCollection: ", colName, " reqID: ", reqIDStr, " answer cnt:", len(queryResultBuf[reqID]))
|
||||
log.Debug("Getcollection", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBuf[reqID])))
|
||||
}
|
||||
if len(queryResultBuf[reqID]) == queryNodeNum {
|
||||
t := sched.getTaskByReqID(reqID)
|
||||
|
|
|
@ -90,7 +90,7 @@ func (tt *timeTick) tick() error {
|
|||
if err != nil {
|
||||
log.Warn("proxynode", zap.String("error", err.Error()))
|
||||
} else {
|
||||
log.Warn("proxynode send time tick message")
|
||||
log.Debug("proxynode send time tick message")
|
||||
}
|
||||
tt.tickLock.Lock()
|
||||
defer tt.tickLock.Unlock()
|
||||
|
|
|
@ -3,10 +3,12 @@ package proxynode
|
|||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/retry"
|
||||
)
|
||||
|
||||
|
@ -15,7 +17,7 @@ func GetPulsarConfig(protocol, ip, port, url string) (map[string]interface{}, er
|
|||
var err error
|
||||
|
||||
getResp := func() error {
|
||||
log.Println("GET: ", protocol+"://"+ip+":"+port+url)
|
||||
log.Debug("proxynode util", zap.String("url", protocol+"://"+ip+":"+port+url))
|
||||
resp, err = http.Get(protocol + "://" + ip + ":" + port + url)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
)
|
||||
|
||||
|
|
|
@ -66,7 +66,7 @@ func (pt *ParamTable) initMasterAddress() {
|
|||
func (pt *ParamTable) initNodeTimeTickChannel() {
|
||||
prefix, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick")
|
||||
if err != nil {
|
||||
log.Panic("proxyservice", zap.Error(err))
|
||||
log.Error("proxyservice", zap.Error(err))
|
||||
}
|
||||
prefix += "-0"
|
||||
pt.NodeTimeTickChannel = []string{prefix}
|
||||
|
@ -75,7 +75,7 @@ func (pt *ParamTable) initNodeTimeTickChannel() {
|
|||
func (pt *ParamTable) initServiceTimeTickChannel() {
|
||||
ch, err := pt.Load("msgChannel.chanNamePrefix.proxyServiceTimeTick")
|
||||
if err != nil {
|
||||
log.Panic("proxyservice", zap.Error(err))
|
||||
log.Error("proxyservice", zap.Error(err))
|
||||
}
|
||||
pt.ServiceTimeTickChannel = ch
|
||||
}
|
||||
|
@ -125,5 +125,9 @@ func (pt *ParamTable) initLogCfg() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.File.Filename = path.Join(rootPath, "proxyservice-%d.log")
|
||||
if len(rootPath) != 0 {
|
||||
pt.Log.File.Filename = path.Join(rootPath, "proxyservice.log")
|
||||
} else {
|
||||
pt.Log.File.Filename = ""
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package proxyservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"errors"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
|
|
|
@ -57,7 +57,7 @@ func (queue *BaseTaskQueue) FrontTask() task {
|
|||
defer queue.mtx.Unlock()
|
||||
|
||||
if queue.tasks.Len() <= 0 {
|
||||
log.Panic("sorry, but the task list is empty!")
|
||||
log.Warn("sorry, but the task list is empty!")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ func (queue *BaseTaskQueue) PopTask() task {
|
|||
defer queue.mtx.Unlock()
|
||||
|
||||
if queue.tasks.Len() <= 0 {
|
||||
log.Panic("sorry, but the task list is empty!")
|
||||
log.Warn("sorry, but the task list is empty!")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue