mirror of https://github.com/milvus-io/milvus.git
Replace log of indexnode and indexservice
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/4973/head^2
parent
01f0a49a39
commit
60b97f9b65
|
@ -2,24 +2,25 @@ package grpcindexnode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
|
||||
"github.com/zilliztech/milvus-distributed/internal/indexnode"
|
||||
"github.com/zilliztech/milvus-distributed/internal/types"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/types"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
|
@ -50,10 +51,10 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
|
||||
defer s.loopWg.Done()
|
||||
|
||||
log.Println("network port: ", grpcPort)
|
||||
log.Debug("indexnode", zap.Int("network port: ", grpcPort))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
|
||||
if err != nil {
|
||||
log.Printf("GrpcServer:failed to listen: %v", err)
|
||||
log.Warn("indexnode", zap.String("GrpcServer:failed to listen", err.Error()))
|
||||
s.grpcErrChan <- err
|
||||
return
|
||||
}
|
||||
|
@ -92,7 +93,7 @@ func (s *Server) init() error {
|
|||
if err != nil {
|
||||
err = s.Stop()
|
||||
if err != nil {
|
||||
log.Println("Init failed, and Stop failed")
|
||||
log.Debug("Init failed, and Stop failed")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -2,11 +2,8 @@ package grpcindexserviceclient
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
|
@ -14,6 +11,7 @@ import (
|
|||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/retry"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
)
|
||||
|
@ -99,7 +97,6 @@ func (c *Client) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNo
|
|||
|
||||
func NewClient(address string) *Client {
|
||||
|
||||
log.Println("new index service, address = ", address)
|
||||
return &Client{
|
||||
address: address,
|
||||
ctx: context.Background(),
|
||||
|
|
|
@ -4,16 +4,18 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/uber/jaeger-client-go/config"
|
||||
"github.com/zilliztech/milvus-distributed/internal/indexservice"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
|
@ -73,7 +75,7 @@ func (s *Server) start() error {
|
|||
if err := s.indexservice.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Println("indexService started")
|
||||
log.Debug("indexService started")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -122,10 +124,10 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
|
||||
defer s.loopWg.Done()
|
||||
|
||||
log.Println("network port: ", grpcPort)
|
||||
log.Debug("indexservice", zap.Int("network port", grpcPort))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
|
||||
if err != nil {
|
||||
log.Printf("GrpcServer:failed to listen: %v", err)
|
||||
log.Warn("indexservice", zap.String("GrpcServer:failed to listen", err.Error()))
|
||||
s.grpcErrChan <- err
|
||||
return
|
||||
}
|
||||
|
|
|
@ -4,13 +4,14 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
|
@ -22,6 +23,7 @@ import (
|
|||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/uber/jaeger-client-go/config"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
|
@ -72,10 +74,10 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
|
||||
defer s.wg.Done()
|
||||
|
||||
log.Println("network port: ", grpcPort)
|
||||
log.Debug("proxynode", zap.Int("network port", grpcPort))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
|
||||
if err != nil {
|
||||
log.Printf("Server:failed to listen: %v", err)
|
||||
log.Warn("proxynode", zap.String("Server:failed to listen:", err.Error()))
|
||||
s.grpcErrChan <- err
|
||||
return
|
||||
}
|
||||
|
@ -107,12 +109,12 @@ func (s *Server) Run() error {
|
|||
if err := s.init(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Println("proxy node init done ...")
|
||||
log.Debug("proxy node init done ...")
|
||||
|
||||
if err := s.start(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Println("proxy node start done ...")
|
||||
log.Debug("proxy node start done ...")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -128,9 +130,9 @@ func (s *Server) init() error {
|
|||
|
||||
Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
|
||||
|
||||
log.Println("proxy host: ", Params.IP)
|
||||
log.Println("proxy port: ", Params.Port)
|
||||
log.Println("proxy address: ", Params.Address)
|
||||
log.Debug("proxynode", zap.String("proxy host", Params.IP))
|
||||
log.Debug("proxynode", zap.Int("proxy port", Params.Port))
|
||||
log.Debug("proxynode", zap.String("proxy address", Params.Address))
|
||||
|
||||
// TODO
|
||||
cfg := &config.Configuration{
|
||||
|
@ -151,7 +153,7 @@ func (s *Server) init() error {
|
|||
if err != nil {
|
||||
err2 := s.Stop()
|
||||
if err2 != nil {
|
||||
log.Println("Init failed, and Stop failed")
|
||||
log.Debug("Init failed, and Stop failed")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -160,7 +162,7 @@ func (s *Server) init() error {
|
|||
go s.startGrpcLoop(Params.Port)
|
||||
// wait for grpc server loop start
|
||||
err = <-s.grpcErrChan
|
||||
log.Println("create grpc server ...")
|
||||
log.Debug("create grpc server ...")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -171,10 +173,10 @@ func (s *Server) init() error {
|
|||
return err
|
||||
}
|
||||
s.impl.SetProxyServiceClient(s.proxyServiceClient)
|
||||
log.Println("set proxy service client ...")
|
||||
log.Debug("set proxy service client ...")
|
||||
|
||||
masterServiceAddr := Params.MasterAddress
|
||||
log.Println("master address: ", masterServiceAddr)
|
||||
log.Debug("proxynode", zap.String("master address", masterServiceAddr))
|
||||
timeout := 3 * time.Second
|
||||
s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, timeout)
|
||||
if err != nil {
|
||||
|
@ -190,30 +192,30 @@ func (s *Server) init() error {
|
|||
panic(err)
|
||||
}
|
||||
s.impl.SetMasterClient(s.masterServiceClient)
|
||||
log.Println("set master client ...")
|
||||
log.Debug("set master client ...")
|
||||
|
||||
dataServiceAddr := Params.DataServiceAddress
|
||||
log.Println("data service address ...", dataServiceAddr)
|
||||
log.Debug("proxynode", zap.String("data service address", dataServiceAddr))
|
||||
s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr)
|
||||
err = s.dataServiceClient.Init()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.impl.SetDataServiceClient(s.dataServiceClient)
|
||||
log.Println("set data service address ...")
|
||||
log.Debug("set data service address ...")
|
||||
|
||||
indexServiceAddr := Params.IndexServerAddress
|
||||
log.Println("index server address: ", indexServiceAddr)
|
||||
log.Debug("proxynode", zap.String("index server address", indexServiceAddr))
|
||||
s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr)
|
||||
err = s.indexServiceClient.Init()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.impl.SetIndexServiceClient(s.indexServiceClient)
|
||||
log.Println("set index service client ...")
|
||||
log.Debug("set index service client ...")
|
||||
|
||||
queryServiceAddr := Params.QueryServiceAddress
|
||||
log.Println("query server address: ", queryServiceAddr)
|
||||
log.Debug("proxynode", zap.String("query server address", queryServiceAddr))
|
||||
s.queryServiceClient, err = grpcqueryserviceclient.NewClient(queryServiceAddr, timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -223,10 +225,10 @@ func (s *Server) init() error {
|
|||
return err
|
||||
}
|
||||
s.impl.SetQueryServiceClient(s.queryServiceClient)
|
||||
log.Println("set query service client ...")
|
||||
log.Debug("set query service client ...")
|
||||
|
||||
proxynode.Params.Init()
|
||||
log.Println("init params done ...")
|
||||
log.Debug("init params done ...")
|
||||
proxynode.Params.NetworkPort = Params.Port
|
||||
proxynode.Params.IP = Params.IP
|
||||
proxynode.Params.NetworkAddress = Params.Address
|
||||
|
@ -236,7 +238,7 @@ func (s *Server) init() error {
|
|||
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
|
||||
|
||||
if err := s.impl.Init(); err != nil {
|
||||
log.Println("impl init error: ", err)
|
||||
log.Debug("proxynode", zap.String("impl init error", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -4,15 +4,17 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/uber/jaeger-client-go/config"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
|
@ -73,7 +75,7 @@ func (s *Server) Run() error {
|
|||
if err := s.init(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Println("proxy service init done ...")
|
||||
log.Debug("proxy service init done ...")
|
||||
|
||||
if err := s.start(); err != nil {
|
||||
return err
|
||||
|
@ -84,7 +86,7 @@ func (s *Server) Run() error {
|
|||
func (s *Server) init() error {
|
||||
Params.Init()
|
||||
proxyservice.Params.Init()
|
||||
log.Println("init params done")
|
||||
log.Debug("init params done")
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.startGrpcLoop(Params.ServicePort)
|
||||
|
@ -93,7 +95,7 @@ func (s *Server) init() error {
|
|||
return err
|
||||
}
|
||||
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
|
||||
log.Println("grpc init done ...")
|
||||
log.Debug("grpc init done ...")
|
||||
|
||||
if err := s.impl.Init(); err != nil {
|
||||
return err
|
||||
|
@ -105,10 +107,10 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
|
||||
defer s.wg.Done()
|
||||
|
||||
log.Println("network port: ", grpcPort)
|
||||
log.Debug("proxyservice", zap.Int("network port", grpcPort))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
|
||||
if err != nil {
|
||||
log.Printf("GrpcServer:failed to listen: %v", err)
|
||||
log.Warn("proxyservice", zap.String("GrpcServer:failed to listen", err.Error()))
|
||||
s.grpcErrChan <- err
|
||||
return
|
||||
}
|
||||
|
@ -135,7 +137,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
}
|
||||
|
||||
func (s *Server) start() error {
|
||||
log.Println("proxy ProxyService start ...")
|
||||
log.Debug("proxy ProxyService start ...")
|
||||
if err := s.impl.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -14,11 +14,10 @@ package indexnode
|
|||
import "C"
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"unsafe"
|
||||
|
||||
"errors"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexcgopb"
|
||||
|
|
|
@ -5,22 +5,23 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/uber/jaeger-client-go/config"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
|
||||
"github.com/zilliztech/milvus-distributed/internal/types"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/types"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -81,7 +82,7 @@ func (i *IndexNode) Init() error {
|
|||
|
||||
resp, err2 := i.serviceClient.RegisterNode(ctx, request)
|
||||
if err2 != nil {
|
||||
log.Printf("Index NodeImpl connect to IndexService failed, error= %v", err)
|
||||
log.Debug("indexnode", zap.String("Index NodeImpl connect to IndexService failed", err.Error()))
|
||||
return err2
|
||||
}
|
||||
|
||||
|
@ -149,7 +150,7 @@ func (i *IndexNode) Stop() error {
|
|||
for _, cb := range i.closeCallbacks {
|
||||
cb()
|
||||
}
|
||||
log.Print("NodeImpl closed.")
|
||||
log.Debug("NodeImpl closed.")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -183,7 +184,7 @@ func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexC
|
|||
ret.Reason = err.Error()
|
||||
return ret, nil
|
||||
}
|
||||
log.Println("indexnode successfully schedule with indexBuildID = ", request.IndexBuildID)
|
||||
log.Debug("indexnode", zap.Int64("indexnode successfully schedule with indexBuildID", request.IndexBuildID))
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,12 +2,16 @@ package indexnode
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"log"
|
||||
"fmt"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/spf13/cast"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
|
||||
)
|
||||
|
@ -32,6 +36,8 @@ type ParamTable struct {
|
|||
MinIOSecretAccessKey string
|
||||
MinIOUseSSL bool
|
||||
MinioBucketName string
|
||||
|
||||
Log log.Config
|
||||
}
|
||||
|
||||
var Params ParamTable
|
||||
|
@ -50,6 +56,7 @@ func (pt *ParamTable) initParams() {
|
|||
pt.initMinIOSecretAccessKey()
|
||||
pt.initMinIOUseSSL()
|
||||
pt.initMinioBucketName()
|
||||
pt.initLogCfg()
|
||||
}
|
||||
|
||||
func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParams) error {
|
||||
|
@ -77,7 +84,7 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParam
|
|||
for _, v := range val {
|
||||
ss, err := cast.ToStringE(v)
|
||||
if err != nil {
|
||||
log.Panic(err)
|
||||
log.Debug("indexnode", zap.String("error", err.Error()))
|
||||
}
|
||||
if len(str) == 0 {
|
||||
str = ss
|
||||
|
@ -87,7 +94,7 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParam
|
|||
}
|
||||
|
||||
default:
|
||||
log.Panicf("undefine config type, key=%s", key)
|
||||
log.Debug("indexnode", zap.String("undefine config type, key=", key))
|
||||
}
|
||||
}
|
||||
err = pt.Save(key, str)
|
||||
|
@ -143,3 +150,34 @@ func (pt *ParamTable) initMinioBucketName() {
|
|||
}
|
||||
pt.MinioBucketName = bucketName
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initLogCfg() {
|
||||
pt.Log = log.Config{}
|
||||
format, err := pt.Load("log.format")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Format = format
|
||||
level, err := pt.Load("log.level")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Level = level
|
||||
devStr, err := pt.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Development = dev
|
||||
pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize")
|
||||
pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups")
|
||||
pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge")
|
||||
rootPath, err := pt.Load("log.file.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("indexnode-%d.log", pt.NodeID))
|
||||
}
|
||||
|
|
|
@ -4,16 +4,17 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/storage"
|
||||
"github.com/zilliztech/milvus-distributed/internal/types"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -95,17 +96,17 @@ func (bt *BaseTask) Name() string {
|
|||
|
||||
func (it *IndexBuildTask) OnEnqueue() error {
|
||||
it.SetID(it.cmd.IndexBuildID)
|
||||
log.Printf("[IndexBuilderTask] Enqueue TaskID: %v", it.ID())
|
||||
log.Debug("indexnode", zap.Int64("[IndexBuilderTask] Enqueue TaskID", it.ID()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
|
||||
log.Println("preExecute...")
|
||||
log.Debug("preExecute...")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
||||
log.Println("PostExecute...")
|
||||
log.Debug("PostExecute...")
|
||||
|
||||
defer func() {
|
||||
if it.err != nil {
|
||||
|
@ -115,7 +116,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
|||
|
||||
if it.serviceClient == nil {
|
||||
err := errors.New("IndexBuildTask, serviceClient is nil")
|
||||
log.Println("[IndexBuildTask][PostExecute] serviceClient is nil")
|
||||
log.Debug("[IndexBuildTask][PostExecute] serviceClient is nil")
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -134,24 +135,21 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
|||
ctx = context.TODO()
|
||||
resp, err := it.serviceClient.NotifyBuildIndex(ctx, nty)
|
||||
if err != nil {
|
||||
log.Println("IndexBuildTask notify err:", err.Error())
|
||||
log.Warn("indexnode", zap.String("error", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.ErrorCode != commonpb.ErrorCode_ERROR_CODE_SUCCESS {
|
||||
err = errors.New(resp.Reason)
|
||||
}
|
||||
log.Println("[IndexBuildTask][PostExecute] err", err)
|
||||
log.Debug("indexnode", zap.String("[IndexBuildTask][PostExecute] err", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
log.Println("start build index ...")
|
||||
log.Debug("start build index ...")
|
||||
var err error
|
||||
|
||||
log.Println("type params: ", it.cmd.Req.GetTypeParams())
|
||||
log.Println("index params: ", it.cmd.Req.GetIndexParams())
|
||||
|
||||
typeParams := make(map[string]string)
|
||||
for _, kvPair := range it.cmd.Req.GetTypeParams() {
|
||||
key, value := kvPair.GetKey(), kvPair.GetValue()
|
||||
|
@ -200,7 +198,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
defer func() {
|
||||
err = it.index.Delete()
|
||||
if err != nil {
|
||||
log.Print("CIndexDelete Failed")
|
||||
log.Warn("CIndexDelete Failed")
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -324,7 +322,7 @@ func (it *IndexBuildTask) Rollback() error {
|
|||
|
||||
err := it.kv.MultiRemove(it.savePaths)
|
||||
if err != nil {
|
||||
log.Println("IndexBuildTask Rollback Failed:", err.Error())
|
||||
log.Warn("indexnode", zap.String("IndexBuildTask Rollback Failed", err.Error()))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -4,12 +4,14 @@ import (
|
|||
"container/list"
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
oplog "github.com/opentracing/opentracing-go/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/trace"
|
||||
)
|
||||
|
||||
|
@ -69,7 +71,7 @@ func (queue *BaseTaskQueue) FrontUnissuedTask() task {
|
|||
defer queue.utLock.Unlock()
|
||||
|
||||
if queue.unissuedTasks.Len() <= 0 {
|
||||
log.Println("FrontUnissuedTask sorry, but the unissued task list is empty!")
|
||||
log.Debug("FrontUnissuedTask sorry, but the unissued task list is empty!")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -81,7 +83,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task {
|
|||
defer queue.utLock.Unlock()
|
||||
|
||||
if queue.unissuedTasks.Len() <= 0 {
|
||||
log.Println("PopUnissued task sorry, but the unissued task list is empty!")
|
||||
log.Debug("PopUnissued task sorry, but the unissued task list is empty!")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -98,7 +100,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) {
|
|||
tID := t.ID()
|
||||
_, ok := queue.activeTasks[tID]
|
||||
if ok {
|
||||
log.Fatalf("task with ID %v already in active task list!", tID)
|
||||
log.Debug("indexnode", zap.Int64("task with ID %v already in active task list!", tID))
|
||||
}
|
||||
|
||||
queue.activeTasks[tID] = t
|
||||
|
@ -113,7 +115,7 @@ func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
|
|||
delete(queue.activeTasks, tID)
|
||||
return t
|
||||
}
|
||||
log.Fatalf("sorry, but the ID %d was not found in the active task list!", tID)
|
||||
log.Debug("indexnode", zap.Int64("sorry, but the ID was not found in the active task list!", tID))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -184,7 +186,7 @@ func NewTaskScheduler(ctx context.Context,
|
|||
|
||||
func (sched *TaskScheduler) setParallelism(parallel int) {
|
||||
if parallel <= 0 {
|
||||
log.Println("can not set parallelism to less than zero!")
|
||||
log.Debug("can not set parallelism to less than zero!")
|
||||
return
|
||||
}
|
||||
sched.buildParallel = parallel
|
||||
|
@ -241,7 +243,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
|
|||
}
|
||||
|
||||
func (sched *TaskScheduler) indexBuildLoop() {
|
||||
log.Println("index build loop ...")
|
||||
log.Debug("index build loop ...")
|
||||
defer sched.wg.Done()
|
||||
for {
|
||||
select {
|
||||
|
|
|
@ -4,25 +4,26 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
|
||||
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/tso"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/retry"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -72,7 +73,7 @@ func NewIndexService(ctx context.Context) (*IndexService, error) {
|
|||
|
||||
func (i *IndexService) Init() error {
|
||||
etcdAddress := Params.EtcdAddress
|
||||
log.Println("etcd address = ", etcdAddress)
|
||||
log.Debug("indexservice", zap.String("etcd address", etcdAddress))
|
||||
connectEtcdFn := func() error {
|
||||
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
|
||||
if err != nil {
|
||||
|
@ -134,7 +135,7 @@ func (i *IndexService) Start() error {
|
|||
for _, cb := range i.startCallbacks {
|
||||
cb()
|
||||
}
|
||||
log.Print("IndexService start")
|
||||
log.Debug("IndexService start")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -316,12 +317,11 @@ func (i *IndexService) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildI
|
|||
ret := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_ERROR_CODE_SUCCESS,
|
||||
}
|
||||
log.Println("[IndexService][NotifyBuildIndex]", nty.String())
|
||||
log.Debug("indexservice", zap.String("[IndexService][NotifyBuildIndex]", nty.String()))
|
||||
if err := i.metaTable.NotifyBuildIndex(nty); err != nil {
|
||||
ret.ErrorCode = commonpb.ErrorCode_ERROR_CODE_BUILD_INDEX_ERROR
|
||||
ret.Reason = err.Error()
|
||||
log.Println("[IndexService][NotifyBuildIndex][metaTable][NotifyBuildIndex]", err)
|
||||
|
||||
log.Debug("indexservice", zap.String("[IndexService][NotifyBuildIndex][metaTable][NotifyBuildIndex]", err.Error()))
|
||||
}
|
||||
i.nodeClients.IncPriority(nty.NodeID, -1)
|
||||
return ret, nil
|
||||
|
@ -337,12 +337,12 @@ func (i *IndexService) tsLoop() {
|
|||
select {
|
||||
case <-tsoTicker.C:
|
||||
if err := i.idAllocator.UpdateID(); err != nil {
|
||||
log.Println("failed to update id", err)
|
||||
log.Debug("indexservice", zap.String("failed to update id", err.Error()))
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
// Server is closed and it should return nil.
|
||||
log.Println("tsLoop is closed")
|
||||
log.Debug("tsLoop is closed")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,13 +13,14 @@ package indexservice
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
)
|
||||
|
@ -170,7 +171,7 @@ func (mt *metaTable) removeIndexFile(indexID UniqueID) {
|
|||
if meta.Req.IndexID == indexID {
|
||||
err := mt.client.MultiRemove(meta.IndexFilePaths)
|
||||
if err != nil {
|
||||
log.Println("remove index file err: ", err)
|
||||
log.Warn("indexservice", zap.String("remove index file err", err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,12 +2,10 @@ package indexservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
|
||||
"errors"
|
||||
|
||||
grpcindexnodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode/client"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
|
|
|
@ -2,9 +2,11 @@ package indexservice
|
|||
|
||||
import (
|
||||
"net"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
|
||||
)
|
||||
|
||||
|
@ -25,6 +27,8 @@ type ParamTable struct {
|
|||
MinIOSecretAccessKey string
|
||||
MinIOUseSSL bool
|
||||
MinioBucketName string
|
||||
|
||||
Log log.Config
|
||||
}
|
||||
|
||||
var Params ParamTable
|
||||
|
@ -44,6 +48,7 @@ func (pt *ParamTable) Init() {
|
|||
pt.initMinIOSecretAccessKey()
|
||||
pt.initMinIOUseSSL()
|
||||
pt.initMinioBucketName()
|
||||
pt.initLogCfg()
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -158,3 +163,34 @@ func (pt *ParamTable) initMinioBucketName() {
|
|||
}
|
||||
pt.MinioBucketName = bucketName
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initLogCfg() {
|
||||
pt.Log = log.Config{}
|
||||
format, err := pt.Load("log.format")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Format = format
|
||||
level, err := pt.Load("log.level")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Level = level
|
||||
devStr, err := pt.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Development = dev
|
||||
pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize")
|
||||
pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups")
|
||||
pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge")
|
||||
rootPath, err := pt.Load("log.file.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.File.Filename = path.Join(rootPath, "indexservice-%d.log")
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/types"
|
||||
)
|
||||
|
||||
|
|
|
@ -3,14 +3,15 @@ package indexservice
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
"github.com/zilliztech/milvus-distributed/internal/types"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/types"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -96,7 +97,7 @@ func (it *IndexAddTask) OnEnqueue() error {
|
|||
}
|
||||
|
||||
func (it *IndexAddTask) PreExecute(ctx context.Context) error {
|
||||
log.Println("pretend to check Index Req")
|
||||
log.Debug("pretend to check Index Req")
|
||||
nodeID, builderClient := it.nodeClients.PeekClient()
|
||||
if builderClient == nil {
|
||||
return errors.New("IndexAddTask Service not available")
|
||||
|
@ -115,12 +116,12 @@ func (it *IndexAddTask) Execute(ctx context.Context) error {
|
|||
IndexBuildID: it.indexBuildID,
|
||||
Req: it.req,
|
||||
}
|
||||
log.Println("before index ...")
|
||||
log.Debug("before index ...")
|
||||
resp, err := it.builderClient.BuildIndex(ctx, cmd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Println("build index finish, err = ", err)
|
||||
log.Debug("indexservice", zap.String("build index finish err", err.Error()))
|
||||
if resp.ErrorCode != commonpb.ErrorCode_ERROR_CODE_SUCCESS {
|
||||
return errors.New(resp.Reason)
|
||||
}
|
||||
|
|
|
@ -3,16 +3,16 @@ package indexservice
|
|||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"log"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"errors"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
oplog "github.com/opentracing/opentracing-go/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/trace"
|
||||
)
|
||||
|
||||
|
@ -101,7 +101,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) {
|
|||
tID := t.ID()
|
||||
_, ok := queue.activeTasks[tID]
|
||||
if ok {
|
||||
log.Fatalf("task with ID %v already in active task list!", tID)
|
||||
log.Warn("indexservice", zap.Int64("task with ID already in active task list!", tID))
|
||||
}
|
||||
|
||||
queue.activeTasks[tID] = t
|
||||
|
@ -116,13 +116,13 @@ func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
|
|||
delete(queue.activeTasks, tID)
|
||||
return t
|
||||
}
|
||||
log.Fatalf("sorry, but the ID %d was not found in the active task list!", tID)
|
||||
log.Debug("indexservice", zap.Int64("sorry, but the ID was not found in the active task list!", tID))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (queue *BaseTaskQueue) Enqueue(t task) error {
|
||||
tID, _ := queue.sched.idAllocator.AllocOne()
|
||||
log.Printf("[Builder] allocate reqID: %v", tID)
|
||||
log.Debug("indexservice", zap.Int64("[Builder] allocate reqID", tID))
|
||||
t.SetID(tID)
|
||||
err := t.OnEnqueue()
|
||||
if err != nil {
|
||||
|
@ -219,7 +219,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
|
|||
|
||||
defer func() {
|
||||
t.Notify(err)
|
||||
log.Printf("notify with error: %v", err)
|
||||
log.Debug("indexservice", zap.String("notify with error", err.Error()))
|
||||
}()
|
||||
if err != nil {
|
||||
trace.LogError(span, err)
|
||||
|
|
Loading…
Reference in New Issue