mirror of https://github.com/milvus-io/milvus.git
Add zap log in query service
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/4973/head^2
parent
7893ebdb40
commit
d3db2b0d82
|
@ -41,7 +41,7 @@ func main() {
|
|||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
sig := <-sc
|
||||
log.Info("Get signal to exit", zap.String("signal", sig.String()))
|
||||
log.Debug("Get signal to exit", zap.String("signal", sig.String()))
|
||||
err = ms.Stop()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
|
@ -2,19 +2,30 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
|
||||
"github.com/zilliztech/milvus-distributed/internal/queryservice"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
queryservice.Params.Init()
|
||||
log.SetupLogger(&queryservice.Params.Log)
|
||||
defer func() {
|
||||
if err := log.Sync(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
msFactory := pulsarms.NewFactory()
|
||||
|
||||
svr, err := distributed.NewQueryService(ctx, msFactory)
|
||||
|
@ -33,7 +44,7 @@ func main() {
|
|||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
sig := <-sc
|
||||
log.Printf("Got %s signal to exit", sig.String())
|
||||
log.Debug("Get signal to exit", zap.String("signal", sig.String()))
|
||||
|
||||
if err := svr.Stop(); err != nil {
|
||||
panic(err)
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
queryService:
|
||||
nodeID: 200
|
|
@ -469,7 +469,7 @@ func (s *Server) RegisterNode(ctx context.Context, req *datapb.RegisterNodeReque
|
|||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
},
|
||||
}
|
||||
log.Info("DataService: RegisterNode:", zap.String("IP", req.Address.Ip), zap.Int64("Port", req.Address.Port))
|
||||
log.Debug("DataService: RegisterNode:", zap.String("IP", req.Address.Ip), zap.Int64("Port", req.Address.Port))
|
||||
node, err := s.newDataNode(req.Address.Ip, req.Address.Port, req.Base.SourceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -4,15 +4,17 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/uber/jaeger-client-go/config"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client"
|
||||
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
|
||||
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
|
||||
|
@ -25,8 +27,6 @@ import (
|
|||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// grpc wrapper
|
||||
|
@ -104,7 +104,7 @@ func (s *Server) init() error {
|
|||
Params.Init()
|
||||
ctx := context.Background()
|
||||
|
||||
log.Info("init params done")
|
||||
log.Debug("init params done")
|
||||
|
||||
err := s.startGrpc()
|
||||
if err != nil {
|
||||
|
@ -114,7 +114,7 @@ func (s *Server) init() error {
|
|||
s.core.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
|
||||
|
||||
if s.connectProxyService {
|
||||
log.Info("proxy service", zap.String("address", Params.ProxyServiceAddress))
|
||||
log.Debug("proxy service", zap.String("address", Params.ProxyServiceAddress))
|
||||
proxyService := psc.NewClient(Params.ProxyServiceAddress)
|
||||
if err := proxyService.Init(); err != nil {
|
||||
panic(err)
|
||||
|
@ -130,7 +130,7 @@ func (s *Server) init() error {
|
|||
}
|
||||
}
|
||||
if s.connectDataService {
|
||||
log.Info("data service", zap.String("address", Params.DataServiceAddress))
|
||||
log.Debug("data service", zap.String("address", Params.DataServiceAddress))
|
||||
dataService := dsc.NewClient(Params.DataServiceAddress)
|
||||
if err := dataService.Init(); err != nil {
|
||||
panic(err)
|
||||
|
@ -148,7 +148,7 @@ func (s *Server) init() error {
|
|||
}
|
||||
}
|
||||
if s.connectIndexService {
|
||||
log.Info("index service", zap.String("address", Params.IndexServiceAddress))
|
||||
log.Debug("index service", zap.String("address", Params.IndexServiceAddress))
|
||||
indexService := isc.NewClient(Params.IndexServiceAddress)
|
||||
if err := indexService.Init(); err != nil {
|
||||
panic(err)
|
||||
|
@ -175,7 +175,7 @@ func (s *Server) init() error {
|
|||
}
|
||||
}
|
||||
cms.Params.Init()
|
||||
log.Info("grpc init done ...")
|
||||
log.Debug("grpc init done ...")
|
||||
|
||||
if err := s.core.Init(); err != nil {
|
||||
return err
|
||||
|
@ -195,10 +195,10 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
|
||||
defer s.wg.Done()
|
||||
|
||||
log.Info("start grpc ", zap.Int("port", grpcPort))
|
||||
log.Debug("start grpc ", zap.Int("port", grpcPort))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
|
||||
if err != nil {
|
||||
log.Warn("GrpcServer:failed to listen", zap.String("error", err.Error()))
|
||||
log.Error("GrpcServer:failed to listen", zap.String("error", err.Error()))
|
||||
s.grpcErrChan <- err
|
||||
return
|
||||
}
|
||||
|
@ -221,7 +221,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
}
|
||||
|
||||
func (s *Server) start() error {
|
||||
log.Info("Master Core start ...")
|
||||
log.Debug("Master Core start ...")
|
||||
if err := s.core.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -2,13 +2,14 @@ package grpcqueryserviceclient
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
|
@ -55,7 +56,7 @@ func (c *Client) Init() error {
|
|||
}
|
||||
|
||||
c.grpcClient = querypb.NewQueryServiceClient(c.conn)
|
||||
log.Printf("connected to queryService, queryService=%s", c.addr)
|
||||
log.Debug("connected to queryService", zap.String("queryService", c.addr))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package grpcqueryservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
@ -10,18 +9,19 @@ import (
|
|||
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client"
|
||||
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client"
|
||||
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
|
||||
qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
|
@ -62,7 +62,7 @@ func (s *Server) Run() error {
|
|||
if err := s.init(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Println("queryservice init done ...")
|
||||
log.Debug("queryservice init done ...")
|
||||
|
||||
if err := s.start(); err != nil {
|
||||
return err
|
||||
|
@ -82,8 +82,8 @@ func (s *Server) init() error {
|
|||
}
|
||||
|
||||
// --- Master Server Client ---
|
||||
log.Println("Master service address:", Params.MasterAddress)
|
||||
log.Println("Init master service client ...")
|
||||
log.Debug("Master service", zap.String("address", Params.MasterAddress))
|
||||
log.Debug("Init master service client ...")
|
||||
|
||||
masterService, err := msc.NewClient(Params.MasterAddress, 20*time.Second)
|
||||
|
||||
|
@ -109,8 +109,8 @@ func (s *Server) init() error {
|
|||
}
|
||||
|
||||
// --- Data service client ---
|
||||
log.Println("DataService Address:", Params.DataServiceAddress)
|
||||
log.Println("QueryService Init data service client ...")
|
||||
log.Debug("DataService", zap.String("Address", Params.DataServiceAddress))
|
||||
log.Debug("QueryService Init data service client ...")
|
||||
|
||||
dataService := dsc.NewClient(Params.DataServiceAddress)
|
||||
if err = dataService.Init(); err != nil {
|
||||
|
@ -140,10 +140,10 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
|
||||
defer s.wg.Done()
|
||||
|
||||
log.Println("network port: ", grpcPort)
|
||||
log.Debug("network", zap.String("port", strconv.Itoa(grpcPort)))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
|
||||
if err != nil {
|
||||
log.Printf("GrpcServer:failed to listen: %v", err)
|
||||
log.Debug("GrpcServer:failed to listen:", zap.String("error", err.Error()))
|
||||
s.grpcErrChan <- err
|
||||
return
|
||||
}
|
||||
|
|
|
@ -8,6 +8,9 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
|
||||
|
@ -25,8 +28,6 @@ import (
|
|||
"github.com/zilliztech/milvus-distributed/internal/util/retry"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// internalpb2 -> internalpb
|
||||
|
@ -266,9 +267,9 @@ func (c *Core) checkInit() error {
|
|||
return errors.Errorf("ReleaseCollection is nil")
|
||||
}
|
||||
|
||||
log.Info("master", zap.Int64("node id", int64(Params.NodeID)))
|
||||
log.Info("master", zap.String("dd channel name", Params.DdChannel))
|
||||
log.Info("master", zap.String("time tick channel name", Params.TimeTickChannel))
|
||||
log.Debug("master", zap.Int64("node id", int64(Params.NodeID)))
|
||||
log.Debug("master", zap.String("dd channel name", Params.DdChannel))
|
||||
log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -276,11 +277,11 @@ func (c *Core) startDdScheduler() {
|
|||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
log.Info("close dd scheduler, exit task execution loop")
|
||||
log.Debug("close dd scheduler, exit task execution loop")
|
||||
return
|
||||
case task, ok := <-c.ddReqQueue:
|
||||
if !ok {
|
||||
log.Info("dd chan is closed, exit task execution loop")
|
||||
log.Debug("dd chan is closed, exit task execution loop")
|
||||
return
|
||||
}
|
||||
ts, err := task.Ts()
|
||||
|
@ -305,11 +306,11 @@ func (c *Core) startTimeTickLoop() {
|
|||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
log.Info("close master time tick loop")
|
||||
log.Debug("close master time tick loop")
|
||||
return
|
||||
case tt, ok := <-c.ProxyTimeTickChan:
|
||||
if !ok {
|
||||
log.Info("proxyTimeTickStream is closed, exit time tick loop")
|
||||
log.Warn("proxyTimeTickStream is closed, exit time tick loop")
|
||||
return
|
||||
}
|
||||
if tt <= c.lastTimeTick {
|
||||
|
@ -328,11 +329,11 @@ func (c *Core) startDataServiceSegmentLoop() {
|
|||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
log.Info("close data service segment loop")
|
||||
log.Debug("close data service segment loop")
|
||||
return
|
||||
case seg, ok := <-c.DataServiceSegmentChan:
|
||||
if !ok {
|
||||
log.Info("data service segment is closed, exit loop")
|
||||
log.Debug("data service segment is closed, exit loop")
|
||||
return
|
||||
}
|
||||
if seg == nil {
|
||||
|
@ -352,11 +353,11 @@ func (c *Core) startCreateIndexLoop() {
|
|||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
log.Info("close create index loop")
|
||||
log.Debug("close create index loop")
|
||||
return
|
||||
case t, ok := <-c.indexTaskQueue:
|
||||
if !ok {
|
||||
log.Info("index task chan has closed, exit loop")
|
||||
log.Debug("index task chan has closed, exit loop")
|
||||
return
|
||||
}
|
||||
if err := t.BuildIndex(); err != nil {
|
||||
|
@ -372,11 +373,11 @@ func (c *Core) startSegmentFlushCompletedLoop() {
|
|||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
log.Info("close segment flush completed loop")
|
||||
log.Debug("close segment flush completed loop")
|
||||
return
|
||||
case seg, ok := <-c.DataNodeSegmentFlushCompletedChan:
|
||||
if !ok {
|
||||
log.Info("data node segment flush completed chan has colsed, exit loop")
|
||||
log.Debug("data node segment flush completed chan has colsed, exit loop")
|
||||
}
|
||||
coll, err := c.MetaTable.GetCollectionBySegmentID(seg)
|
||||
if err != nil {
|
||||
|
@ -425,7 +426,7 @@ func (c *Core) tsLoop() {
|
|||
}
|
||||
case <-ctx.Done():
|
||||
// Server is closed and it should return nil.
|
||||
log.Info("tsLoop is closed")
|
||||
log.Debug("tsLoop is closed")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -645,7 +646,7 @@ func (c *Core) SetProxyService(ctx context.Context, s ProxyServiceInterface) err
|
|||
return err
|
||||
}
|
||||
Params.ProxyTimeTickChannel = rsp.Value
|
||||
log.Info("proxy time tick", zap.String("channel name", Params.ProxyTimeTickChannel))
|
||||
log.Debug("proxy time tick", zap.String("channel name", Params.ProxyTimeTickChannel))
|
||||
|
||||
c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error {
|
||||
status, _ := s.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
|
||||
|
@ -675,7 +676,7 @@ func (c *Core) SetDataService(ctx context.Context, s DataServiceInterface) error
|
|||
return err
|
||||
}
|
||||
Params.DataServiceSegmentChannel = rsp.Value
|
||||
log.Info("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel))
|
||||
log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel))
|
||||
|
||||
c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
|
||||
ts, err := c.tsoAllocator.Alloc(1)
|
||||
|
@ -797,7 +798,7 @@ func (c *Core) Init() error {
|
|||
initError = c.setMsgStreams()
|
||||
})
|
||||
if initError == nil {
|
||||
log.Info("Master service", zap.String("State Code", internalpb2.StateCode_name[int32(internalpb2.StateCode_INITIALIZING)]))
|
||||
log.Debug("Master service", zap.String("State Code", internalpb2.StateCode_name[int32(internalpb2.StateCode_INITIALIZING)]))
|
||||
}
|
||||
return initError
|
||||
}
|
||||
|
@ -815,7 +816,7 @@ func (c *Core) Start() error {
|
|||
go c.tsLoop()
|
||||
c.stateCode.Store(internalpb2.StateCode_HEALTHY)
|
||||
})
|
||||
log.Info("Master service", zap.String("State Code", internalpb2.StateCode_name[int32(internalpb2.StateCode_HEALTHY)]))
|
||||
log.Debug("Master service", zap.String("State Code", internalpb2.StateCode_name[int32(internalpb2.StateCode_HEALTHY)]))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -827,7 +828,7 @@ func (c *Core) Stop() error {
|
|||
|
||||
func (c *Core) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
log.Info("GetComponentStates", zap.String("State Code", internalpb2.StateCode_name[int32(code)]))
|
||||
log.Debug("GetComponentStates", zap.String("State Code", internalpb2.StateCode_name[int32(code)]))
|
||||
|
||||
return &internalpb2.ComponentStates{
|
||||
State: &internalpb2.ComponentInfo{
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
|
@ -14,7 +16,6 @@ import (
|
|||
pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
|
@ -12,7 +14,6 @@ import (
|
|||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type reqTask interface {
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
package queryservice
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
)
|
||||
|
@ -12,6 +16,8 @@ type UniqueID = typeutil.UniqueID
|
|||
type ParamTable struct {
|
||||
paramtable.BaseTable
|
||||
|
||||
NodeID uint64
|
||||
|
||||
Address string
|
||||
QueryServiceID UniqueID
|
||||
|
||||
|
@ -20,6 +26,8 @@ type ParamTable struct {
|
|||
|
||||
// timetick
|
||||
TimeTickChannelName string
|
||||
|
||||
Log log.Config
|
||||
}
|
||||
|
||||
var Params ParamTable
|
||||
|
@ -33,17 +41,60 @@ func (p *ParamTable) Init() {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
err = p.LoadYaml("advanced/query_service.yaml")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = p.LoadYaml("milvus.yaml")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
p.initNodeID()
|
||||
p.initLogCfg()
|
||||
|
||||
p.initStatsChannelName()
|
||||
p.initTimeTickChannelName()
|
||||
p.initQueryServiceAddress()
|
||||
})
|
||||
}
|
||||
|
||||
func (p *ParamTable) initNodeID() {
|
||||
p.NodeID = uint64(p.ParseInt64("queryService.nodeID"))
|
||||
}
|
||||
|
||||
func (p *ParamTable) initLogCfg() {
|
||||
p.Log = log.Config{}
|
||||
format, err := p.Load("log.format")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Format = format
|
||||
level, err := p.Load("log.level")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Level = level
|
||||
devStr, err := p.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Development = dev
|
||||
p.Log.File.MaxSize = p.ParseInt("log.file.maxSize")
|
||||
p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups")
|
||||
p.Log.File.MaxDays = p.ParseInt("log.file.maxAge")
|
||||
rootPath, err := p.Load("log.file.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("masterservice-%d.log", p.NodeID))
|
||||
}
|
||||
|
||||
func (p *ParamTable) initStatsChannelName() {
|
||||
channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
|
||||
if err != nil {
|
||||
|
|
|
@ -12,8 +12,11 @@ import (
|
|||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/uber/jaeger-client-go/config"
|
||||
"go.uber.org/zap"
|
||||
|
||||
nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
|
||||
|
@ -142,7 +145,7 @@ func (qs *QueryService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Str
|
|||
}
|
||||
|
||||
func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
|
||||
fmt.Println("register query node =", req.Address)
|
||||
log.Debug("register query node", zap.String("address", req.Address.String()))
|
||||
// TODO:: add mutex
|
||||
nodeID := req.Base.SourceID
|
||||
if _, ok := qs.queryNodes[nodeID]; ok {
|
||||
|
@ -201,7 +204,7 @@ func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterN
|
|||
|
||||
func (qs *QueryService) ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) {
|
||||
dbID := req.DbID
|
||||
fmt.Println("show collection start, dbID = ", dbID)
|
||||
log.Debug("show collection start, dbID = ", zap.String("dbID", strconv.FormatInt(dbID, 10)))
|
||||
collections, err := qs.replica.getCollections(dbID)
|
||||
collectionIDs := make([]UniqueID, 0)
|
||||
for _, collection := range collections {
|
||||
|
@ -215,7 +218,7 @@ func (qs *QueryService) ShowCollections(ctx context.Context, req *querypb.ShowCo
|
|||
},
|
||||
}, err
|
||||
}
|
||||
fmt.Println("show collection end")
|
||||
log.Debug("show collection end")
|
||||
return &querypb.ShowCollectionResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
|
@ -240,10 +243,10 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol
|
|||
}
|
||||
}
|
||||
|
||||
fmt.Println("load collection start, collectionID = ", collectionID)
|
||||
log.Debug("load collection start", zap.String("collectionID", fmt.Sprintln(collectionID)))
|
||||
_, err := qs.replica.getCollectionByID(dbID, collectionID)
|
||||
if err == nil {
|
||||
fmt.Println("load collection end, collection already exist, collectionID = ", collectionID)
|
||||
log.Error("load collection end, collection already exist", zap.String("collectionID", fmt.Sprintln(collectionID)))
|
||||
return fn(nil), nil
|
||||
}
|
||||
|
||||
|
@ -284,17 +287,17 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol
|
|||
|
||||
status, err := qs.LoadPartitions(ctx, loadPartitionsRequest)
|
||||
|
||||
fmt.Println("load collection end, collectionID = ", collectionID)
|
||||
log.Debug("load collection end", zap.String("collectionID", fmt.Sprintln(collectionID)))
|
||||
return status, err
|
||||
}
|
||||
|
||||
func (qs *QueryService) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
||||
dbID := req.DbID
|
||||
collectionID := req.CollectionID
|
||||
fmt.Println("release collection start, collectionID = ", collectionID)
|
||||
log.Debug("release collection start", zap.String("collectionID", fmt.Sprintln(collectionID)))
|
||||
_, err := qs.replica.getCollectionByID(dbID, collectionID)
|
||||
if err != nil {
|
||||
fmt.Println("release collection end, query service don't have the log of collection ", collectionID)
|
||||
log.Error("release collection end, query service don't have the log of", zap.String("collectionID", fmt.Sprintln(collectionID)))
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
}, nil
|
||||
|
@ -303,7 +306,7 @@ func (qs *QueryService) ReleaseCollection(ctx context.Context, req *querypb.Rele
|
|||
for nodeID, node := range qs.queryNodes {
|
||||
status, err := node.ReleaseCollection(ctx, req)
|
||||
if err != nil {
|
||||
fmt.Println("release collection end, node ", nodeID, " occur error")
|
||||
log.Error("release collection end, node occur error", zap.String("nodeID", fmt.Sprintln(nodeID)))
|
||||
return status, err
|
||||
}
|
||||
}
|
||||
|
@ -316,7 +319,7 @@ func (qs *QueryService) ReleaseCollection(ctx context.Context, req *querypb.Rele
|
|||
}, err
|
||||
}
|
||||
|
||||
fmt.Println("release collection end")
|
||||
log.Debug("release collection end")
|
||||
//TODO:: queryNode cancel subscribe dmChannels
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
|
@ -365,7 +368,7 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar
|
|||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
}
|
||||
}
|
||||
fmt.Println("load partitions start, partitionIDs = ", partitionIDs)
|
||||
log.Debug("load partitions start", zap.String("partitionIDs", fmt.Sprintln(partitionIDs)))
|
||||
|
||||
if len(partitionIDs) == 0 {
|
||||
err := errors.New("partitionIDs are empty")
|
||||
|
@ -430,7 +433,7 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar
|
|||
return fn(err), err
|
||||
}
|
||||
for _, state := range resp.States {
|
||||
fmt.Println("segment ", state.SegmentID, " 's state is ", state.StartPosition)
|
||||
log.Error("segment ", zap.String("state.SegmentID", fmt.Sprintln(state.SegmentID)), zap.String("state", fmt.Sprintln(state.StartPosition)))
|
||||
segmentID := state.SegmentID
|
||||
segmentStates[segmentID] = state
|
||||
channelName := state.StartPosition.ChannelName
|
||||
|
@ -475,7 +478,7 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar
|
|||
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
|
||||
}
|
||||
|
||||
fmt.Println("load partitions end, partitionIDs = ", partitionIDs)
|
||||
log.Debug("load partitions end", zap.String("partitionIDs", fmt.Sprintln(partitionIDs)))
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
}, nil
|
||||
|
@ -485,7 +488,7 @@ func (qs *QueryService) ReleasePartitions(ctx context.Context, req *querypb.Rele
|
|||
dbID := req.DbID
|
||||
collectionID := req.CollectionID
|
||||
partitionIDs := req.PartitionIDs
|
||||
fmt.Println("start release partitions start, partitionIDs = ", partitionIDs)
|
||||
log.Debug("start release partitions start", zap.String("partitionIDs", fmt.Sprintln(partitionIDs)))
|
||||
toReleasedPartitionID := make([]UniqueID, 0)
|
||||
for _, partitionID := range partitionIDs {
|
||||
_, err := qs.replica.getPartitionByID(dbID, collectionID, partitionID)
|
||||
|
@ -513,7 +516,7 @@ func (qs *QueryService) ReleasePartitions(ctx context.Context, req *querypb.Rele
|
|||
}
|
||||
}
|
||||
|
||||
fmt.Println("start release partitions end")
|
||||
log.Debug("start release partitions end")
|
||||
//TODO:: queryNode cancel subscribe dmChannels
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
|
@ -535,9 +538,9 @@ func (qs *QueryService) CreateQueryChannel(ctx context.Context) (*querypb.Create
|
|||
RequestChannelID: allocatedQueryChannel,
|
||||
ResultChannelID: allocatedQueryResultChannel,
|
||||
}
|
||||
fmt.Println("query service create query channel, queryChannelName = ", allocatedQueryChannel)
|
||||
log.Debug("query service create query channel", zap.String("queryChannelName", allocatedQueryChannel))
|
||||
for nodeID, node := range qs.queryNodes {
|
||||
fmt.Println("node ", nodeID, " watch query channel")
|
||||
log.Debug("node watch query channel", zap.String("nodeID", fmt.Sprintln(nodeID)))
|
||||
fn := func() error {
|
||||
_, err := node.AddQueryChannel(ctx, addQueryChannelsRequest)
|
||||
return err
|
||||
|
@ -713,7 +716,7 @@ func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) er
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Println("query node ", nodeID, "watch channels = ", channels)
|
||||
log.Debug("query node ", zap.String("nodeID", strconv.FormatInt(nodeID, 10)), zap.String("watch channels", fmt.Sprintln(channels)))
|
||||
node.AddDmChannels(channels)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue