Replace log in dataservice

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-02-23 09:58:06 +08:00 committed by yefu.chen
parent 68518fec38
commit ea1d9ea99a
29 changed files with 1147 additions and 344 deletions

View File

@ -2,16 +2,37 @@ package components
import (
"context"
"fmt"
"io"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
grpcindexnode "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode"
)
type IndexNode struct {
svr *grpcindexnode.Server
tracer opentracing.Tracer
closer io.Closer
}
func NewIndexNode(ctx context.Context) (*IndexNode, error) {
var err error
n := &IndexNode{}
cfg := &config.Configuration{
ServiceName: "indexnode",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
n.tracer, n.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(n.tracer)
svr, err := grpcindexnode.NewServer(ctx)
if err != nil {
return nil, err

View File

@ -2,17 +2,39 @@ package components
import (
"context"
"fmt"
"io"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
grpcindexserver "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice"
)
type IndexService struct {
svr *grpcindexserver.Server
tracer opentracing.Tracer
closer io.Closer
}
func NewIndexService(ctx context.Context) (*IndexService, error) {
var err error
s := &IndexService{}
cfg := &config.Configuration{
ServiceName: "indexservice",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
s.tracer, s.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(s.tracer)
svr, err := grpcindexserver.NewServer(ctx)
if err != nil {
return nil, err
}
@ -26,6 +48,7 @@ func (s *IndexService) Run() error {
return nil
}
func (s *IndexService) Stop() error {
s.closer.Close()
if err := s.svr.Stop(); err != nil {
return err
}

View File

@ -90,6 +90,7 @@ const {
}
type TsMsg interface {
ID() UniqueID
SetTs(ts Timestamp)
BeginTs() Timestamp
EndTs() Timestamp

4
go.mod
View File

@ -3,6 +3,7 @@ module github.com/zilliztech/milvus-distributed
go 1.15
require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/apache/pulsar-client-go v0.1.1
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
@ -21,6 +22,7 @@ require (
github.com/mitchellh/mapstructure v1.1.2
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/onsi/gomega v1.10.5 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/spaolacci/murmur3 v1.1.0
@ -28,6 +30,8 @@ require (
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.4.0+incompatible // indirect
github.com/yahoo/athenz v1.9.16 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.15.0

9
go.sum
View File

@ -15,6 +15,8 @@ github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpH
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw=
github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
@ -295,6 +297,7 @@ github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ=
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible h1:CAG0PUvo1fen+ZEfxKJjFIc8GuuN5RuaBuCAuaP2Hno=
github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible/go.mod h1:iIubILNIN6Jq9h8uiSLrN9L1tuj3iSSFwz3R61skm/A=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
@ -386,6 +389,12 @@ github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/uber/jaeger-client-go v1.6.0 h1:3+zLlq+4npI5fg8IsgAje3YsP7TcEdNzJScyqFIzxEQ=
github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U=
github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v1.5.0 h1:OHbgr8l656Ub3Fw5k9SWnBfIEwvoHQ+W2y+Aa9D1Uyo=
github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ=
github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/valyala/gozstd v1.7.0 h1:Ljh5c9zboqLhwTI33al32R72iCZfn0mCbVGcFWbGwRQ=
github.com/valyala/gozstd v1.7.0/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=

View File

@ -30,7 +30,7 @@ func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) {
},
Count: 1,
})
if err != nil {
if err = VerifyResponse(resp, err); err != nil {
return 0, err
}
return resp.Timestamp, nil
@ -46,8 +46,9 @@ func (allocator *allocatorImpl) allocID() (UniqueID, error) {
},
Count: 1,
})
if err != nil {
if err = VerifyResponse(resp, err); err != nil {
return 0, err
}
return resp.ID, nil
}

View File

@ -1,9 +1,12 @@
package dataservice
import (
"log"
"fmt"
"sync"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
@ -28,6 +31,10 @@ type (
}
)
func (node *dataNode) String() string {
return fmt.Sprintf("id: %d, address: %s:%d", node.id, node.address.ip, node.address.port)
}
func newDataNodeCluster(finishCh chan struct{}) *dataNodeCluster {
return &dataNodeCluster{
finishCh: finishCh,
@ -92,12 +99,8 @@ func (c *dataNodeCluster) WatchInsertChannels(channels []string) {
},
ChannelNames: group,
})
if err != nil {
log.Println(err.Error())
continue
}
if resp.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Println(resp.Reason)
if err = VerifyResponse(resp, err); err != nil {
log.Error("watch dm channels error", zap.Stringer("dataNode", c.nodes[i]), zap.Error(err))
continue
}
c.nodes[i].channelNum += len(group)
@ -111,7 +114,7 @@ func (c *dataNodeCluster) GetDataNodeStates() ([]*internalpb2.ComponentInfo, err
for _, node := range c.nodes {
states, err := node.client.GetComponentStates(&commonpb.Empty{})
if err != nil {
log.Println(err.Error())
log.Error("get component states error", zap.Stringer("dataNode", node), zap.Error(err))
continue
}
ret = append(ret, states.State)
@ -124,7 +127,7 @@ func (c *dataNodeCluster) FlushSegment(request *datapb.FlushSegRequest) {
defer c.mu.RUnlock()
for _, node := range c.nodes {
if _, err := node.client.FlushSegments(request); err != nil {
log.Println(err.Error())
log.Error("flush segment err", zap.Stringer("dataNode", node), zap.Error(err))
continue
}
}
@ -133,7 +136,7 @@ func (c *dataNodeCluster) FlushSegment(request *datapb.FlushSegRequest) {
func (c *dataNodeCluster) ShutDownClients() {
for _, node := range c.nodes {
if err := node.client.Stop(); err != nil {
log.Println(err.Error())
log.Error("stop client error", zap.Stringer("dataNode", node), zap.Error(err))
continue
}
}

View File

@ -2,7 +2,6 @@ package dataservice
import (
"context"
"errors"
"fmt"
"path"
"strconv"
@ -213,7 +212,8 @@ func (s *Server) initMsgProducer() error {
func (s *Server) loadMetaFromMaster() error {
log.Debug("loading collection meta from master")
if err := s.checkMasterIsHealthy(); err != nil {
var err error
if err = s.checkMasterIsHealthy(); err != nil {
return err
}
if s.ddChannelName == "" {
@ -232,7 +232,7 @@ func (s *Server) loadMetaFromMaster() error {
},
DbName: "",
})
if err != nil {
if err = VerifyResponse(collections, err); err != nil {
return err
}
for _, collectionName := range collections.CollectionNames {
@ -246,8 +246,8 @@ func (s *Server) loadMetaFromMaster() error {
DbName: "",
CollectionName: collectionName,
})
if err != nil {
log.Error("describe collection error", zap.Error(err))
if err = VerifyResponse(collection, err); err != nil {
log.Error("describe collection error", zap.String("collectionName", collectionName), zap.Error(err))
continue
}
partitions, err := s.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{
@ -261,8 +261,8 @@ func (s *Server) loadMetaFromMaster() error {
CollectionName: collectionName,
CollectionID: collection.CollectionID,
})
if err != nil {
log.Error("show partitions error", zap.Error(err))
if err = VerifyResponse(partitions, err); err != nil {
log.Error("show partitions error", zap.String("collectionName", collectionName), zap.Int64("collectionID", collection.CollectionID), zap.Error(err))
continue
}
err = s.meta.AddCollection(&collectionInfo{
@ -271,7 +271,7 @@ func (s *Server) loadMetaFromMaster() error {
Partitions: partitions.PartitionIDs,
})
if err != nil {
log.Error("add collection error", zap.Error(err))
log.Error("add collection to meta error", zap.Int64("collectionID", collection.CollectionID), zap.Error(err))
continue
}
}
@ -294,12 +294,9 @@ func (s *Server) checkMasterIsHealthy() error {
return fmt.Errorf("master is not healthy")
case <-ticker.C:
resp, err = s.masterClient.GetComponentStates()
if err != nil {
if err = VerifyResponse(resp, err); err != nil {
return err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(resp.Status.Reason)
}
}
if resp.State.StateCode == internalpb2.StateCode_HEALTHY {
break
@ -330,10 +327,13 @@ func (s *Server) startStatsChannel(ctx context.Context) {
}
msgPack := statsStream.Consume()
for _, msg := range msgPack.Msgs {
statistics := msg.(*msgstream.SegmentStatisticsMsg)
statistics, ok := msg.(*msgstream.SegmentStatisticsMsg)
if !ok {
log.Error("receive unknown type msg from stats channel", zap.Stringer("msgType", msg.Type()))
}
for _, stat := range statistics.SegStats {
if err := s.statsHandler.HandleSegmentStat(stat); err != nil {
log.Error("handle segment stat error", zap.Error(err))
log.Error("handle segment stat error", zap.Int64("segmentID", stat.SegmentID), zap.Error(err))
continue
}
}
@ -363,7 +363,7 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
segmentInfo, err := s.meta.GetSegment(realMsg.SegmentID)
if err != nil {
log.Error("get segment error", zap.Error(err))
log.Error("get segment from meta error", zap.Int64("segmentID", realMsg.SegmentID), zap.Error(err))
continue
}
segmentInfo.FlushedTime = realMsg.BeginTimestamp
@ -473,7 +473,7 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register
s.cluster.Register(node)
if s.ddChannelName == "" {
resp, err := s.masterClient.GetDdChannel()
if err != nil {
if err = VerifyResponse(resp, err); err != nil {
ret.Status.Reason = err.Error()
return ret, err
}

View File

@ -0,0 +1,33 @@
package dataservice
import (
"errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
type Response interface {
GetStatus() *commonpb.Status
}
func VerifyResponse(response interface{}, err error) error {
if err != nil {
return err
}
if response == nil {
return errors.New("response is nil")
}
switch resp := response.(type) {
case Response:
if resp.GetStatus().ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(resp.GetStatus().Reason)
}
case *commonpb.Status:
if resp.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(resp.Reason)
}
default:
return errors.New("unknown response type")
}
return nil
}

View File

@ -1,10 +1,10 @@
package dataservice
import (
"log"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"go.uber.org/zap"
"golang.org/x/net/context"
@ -35,11 +35,11 @@ func (watcher *proxyTimeTickWatcher) StartBackgroundLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Println("proxy time tick watcher closed")
log.Debug("proxy time tick watcher closed")
return
case msg := <-watcher.msgQueue:
if err := watcher.allocator.ExpireAllocations(msg.Base.Timestamp); err != nil {
log.Printf("expire allocations error : %s", err.Error())
log.Error("expire allocations error", zap.Error(err))
}
}
}
@ -66,11 +66,11 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context)
for {
select {
case <-ctx.Done():
log.Println("data node time tick watcher closed")
log.Debug("data node time tick watcher closed")
return
case msg := <-watcher.msgQueue:
if err := watcher.handleTimeTickMsg(msg); err != nil {
log.Println(err.Error())
log.Error("handle time tick error", zap.Error(err))
continue
}
}
@ -85,17 +85,17 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic
for _, id := range segments {
expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Base.Timestamp)
if err != nil {
log.Printf("check allocations expired error %s", err.Error())
log.Error("check allocations expired error", zap.Int64("segmentID", id), zap.Error(err))
continue
}
if expired {
segmentInfo, err := watcher.meta.GetSegment(id)
if err != nil {
log.Println(err.Error())
log.Error("get segment from meta error", zap.Int64("segmentID", id), zap.Error(err))
continue
}
if err = watcher.meta.SetSegmentState(id, commonpb.SegmentState_SegmentSealed); err != nil {
log.Println(err.Error())
log.Error("set segment state error", zap.Int64("segmentID", id), zap.Error(err))
continue
}
watcher.cluster.FlushSegment(&datapb.FlushSegRequest{

View File

@ -2,6 +2,8 @@ package grpcproxynode
import (
"context"
"fmt"
"io"
"log"
"net"
"os"
@ -9,22 +11,21 @@ import (
"sync"
"time"
grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"google.golang.org/grpc"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
grpcdataservice "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
grcpmasterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proxynode"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"google.golang.org/grpc"
)
type Server struct {
@ -46,16 +47,32 @@ type Server struct {
dataServiceClient *grpcdataservice.Client
queryServiceClient *grpcqueryserviceclient.Client
indexServiceClient *grpcindexserviceclient.Client
tracer opentracing.Tracer
closer io.Closer
}
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
var err error
server := &Server{
ctx: ctx,
grpcErrChan: make(chan error),
}
var err error
cfg := &config.Configuration{
ServiceName: "proxynode",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
server.tracer, server.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(server.tracer)
server.impl, err = proxynode.NewProxyNodeImpl(server.ctx, factory)
if err != nil {
return nil, err
@ -220,6 +237,7 @@ func (s *Server) start() error {
func (s *Server) Stop() error {
var err error
s.closer.Close()
if s.grpcServer != nil {
s.grpcServer.GracefulStop()

View File

@ -2,11 +2,15 @@ package grpcproxyservice
import (
"context"
"fmt"
"io"
"log"
"net"
"strconv"
"sync"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -26,10 +30,14 @@ type Server struct {
grpcErrChan chan error
impl *proxyservice.ServiceImpl
tracer opentracing.Tracer
closer io.Closer
}
func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error) {
ctx, cancel := context.WithCancel(ctx1)
var err error
server := &Server{
ctx: ctx,
@ -37,7 +45,19 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error)
grpcErrChan: make(chan error),
}
var err error
cfg := &config.Configuration{
ServiceName: "proxyservice",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
server.tracer, server.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(server.tracer)
server.impl, err = proxyservice.NewServiceImpl(server.ctx, factory)
if err != nil {
return nil, err
@ -113,6 +133,7 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
s.closer.Close()
err := s.impl.Stop()
if err != nil {
return err

View File

@ -146,11 +146,17 @@ func (i *NodeImpl) SetIndexServiceClient(serviceClient typeutil.IndexServiceInte
}
func (i *NodeImpl) BuildIndex(request *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
t := newIndexBuildTask()
t.cmd = request
t.kv = i.kv
t.serviceClient = i.serviceClient
t.nodeID = Params.NodeID
ctx := context.Background()
t := &IndexBuildTask{
BaseTask: BaseTask{
ctx: ctx,
done: make(chan error), // intend to do this
},
cmd: request,
kv: i.kv,
serviceClient: i.serviceClient,
nodeID: Params.NodeID,
}
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()

View File

@ -18,15 +18,18 @@ import (
)
const (
paramsKeyToParse = "params"
paramsKeyToParse = "params"
IndexBuildTaskName = "IndexBuildTask"
)
type task interface {
ID() UniqueID // return ReqID
Ctx() context.Context
ID() UniqueID // return ReqID
Name() string
SetID(uid UniqueID) // set ReqID
PreExecute() error
Execute() error
PostExecute() error
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
OnEnqueue() error
@ -69,32 +72,34 @@ type IndexBuildTask struct {
nodeID UniqueID
}
func newIndexBuildTask() *IndexBuildTask {
ctx := context.Background()
return &IndexBuildTask{
BaseTask: BaseTask{
ctx: ctx,
done: make(chan error), // intend to do this
},
}
func (it *IndexBuildTask) Ctx() context.Context {
return it.ctx
}
func (it *IndexBuildTask) ID() UniqueID {
return it.id
}
func (it *IndexBuildTask) SetID(ID UniqueID) {
it.BaseTask.setID(ID)
}
func (bt *BaseTask) Name() string {
return IndexBuildTaskName
}
func (it *IndexBuildTask) OnEnqueue() error {
it.SetID(it.cmd.IndexBuildID)
log.Printf("[IndexBuilderTask] Enqueue TaskID: %v", it.ID())
return nil
}
func (it *IndexBuildTask) PreExecute() error {
func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
log.Println("preExecute...")
return nil
}
func (it *IndexBuildTask) PostExecute() error {
func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
log.Println("PostExecute...")
var err error
defer func() {
@ -129,21 +134,7 @@ func (it *IndexBuildTask) PostExecute() error {
return err
}
func (it *IndexBuildTask) Rollback() error {
if it.savePaths == nil {
return nil
}
err := it.kv.MultiRemove(it.savePaths)
if err != nil {
log.Println("IndexBuildTask Rollback Failed:", err.Error())
return err
}
return nil
}
func (it *IndexBuildTask) Execute() error {
func (it *IndexBuildTask) Execute(ctx context.Context) error {
log.Println("start build index ...")
var err error
@ -313,3 +304,16 @@ func (it *IndexBuildTask) Execute() error {
// }
return nil
}
func (it *IndexBuildTask) Rollback() error {
if it.savePaths == nil {
return nil
}
err := it.kv.MultiRemove(it.savePaths)
if err != nil {
log.Println("IndexBuildTask Rollback Failed:", err.Error())
return err
}
return nil
}

View File

@ -7,7 +7,10 @@ import (
"log"
"sync"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
)
type TaskQueue interface {
@ -182,31 +185,42 @@ func (sched *TaskScheduler) scheduleIndexBuildTask() []task {
}
func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
err := t.PreExecute()
span, ctx := trace.StartSpanFromContext(t.Ctx(),
opentracing.Tags{
"Type": t.Name(),
"ID": t.ID(),
})
defer span.Finish()
span.LogFields(oplog.Int64("scheduler process PreExecute", t.ID()))
err := t.PreExecute(ctx)
defer func() {
t.Notify(err)
// log.Printf("notify with error: %v", err)
}()
if err != nil {
trace.LogError(span, err)
return
}
span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID()))
q.AddActiveTask(t)
// log.Printf("task add to active list ...")
defer func() {
span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID()))
q.PopActiveTask(t.ID())
// log.Printf("pop from active list ...")
}()
err = t.Execute()
span.LogFields(oplog.Int64("scheduler process Execute", t.ID()))
err = t.Execute(ctx)
if err != nil {
log.Printf("execute definition task failed, error = %v", err)
return
}
// log.Printf("task execution done ...")
err = t.PostExecute()
span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID()))
err = t.PostExecute(ctx)
// log.Printf("post execute task done ...")
}

View File

@ -187,11 +187,17 @@ func (i *ServiceImpl) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.Build
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
t := NewIndexAddTask()
t.req = req
t.idAllocator = i.idAllocator
t.table = i.metaTable
t.kv = i.kv
ctx := context.Background()
t := &IndexAddTask{
BaseTask: BaseTask{
ctx: ctx,
done: make(chan error),
table: i.metaTable,
},
req: req,
idAllocator: i.idAllocator,
kv: i.kv,
}
if i.nodeClients == nil || i.nodeClients.Len() <= 0 {
ret.Status.Reason = "IndexBuilding Service not available"
@ -200,7 +206,6 @@ func (i *ServiceImpl) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.Build
t.nodeClients = i.nodeClients
var cancel func()
ctx := context.Background()
t.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()

View File

@ -12,12 +12,18 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
)
const (
IndexAddTaskName = "IndexAddTask"
)
type task interface {
Ctx() context.Context
ID() UniqueID // return ReqID
SetID(uid UniqueID) // set ReqID
PreExecute() error
Execute() error
PostExecute() error
Name() string
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
OnEnqueue() error
@ -63,10 +69,22 @@ type IndexAddTask struct {
buildClientNodeID UniqueID
}
func (it *IndexAddTask) Ctx() context.Context {
return it.ctx
}
func (it *IndexAddTask) ID() UniqueID {
return it.id
}
func (it *IndexAddTask) SetID(ID UniqueID) {
it.BaseTask.setID(ID)
}
func (it *IndexAddTask) Name() string {
return IndexAddTaskName
}
func (it *IndexAddTask) OnEnqueue() error {
var err error
it.indexBuildID, err = it.idAllocator.AllocOne()
@ -76,7 +94,7 @@ func (it *IndexAddTask) OnEnqueue() error {
return nil
}
func (it *IndexAddTask) PreExecute() error {
func (it *IndexAddTask) PreExecute(ctx context.Context) error {
log.Println("pretend to check Index Req")
nodeID, builderClient := it.nodeClients.PeekClient()
if builderClient == nil {
@ -91,7 +109,7 @@ func (it *IndexAddTask) PreExecute() error {
return nil
}
func (it *IndexAddTask) Execute() error {
func (it *IndexAddTask) Execute(ctx context.Context) error {
cmd := &indexpb.BuildIndexCmd{
IndexBuildID: it.indexBuildID,
Req: it.req,
@ -109,14 +127,6 @@ func (it *IndexAddTask) Execute() error {
return nil
}
func (it *IndexAddTask) PostExecute() error {
func (it *IndexAddTask) PostExecute(ctx context.Context) error {
return nil
}
func NewIndexAddTask() *IndexAddTask {
return &IndexAddTask{
BaseTask: BaseTask{
done: make(chan error),
},
}
}

View File

@ -7,7 +7,10 @@ import (
"log"
"sync"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
)
type TaskQueue interface {
@ -184,32 +187,38 @@ func (sched *TaskScheduler) scheduleIndexAddTask() task {
//}
func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
err := t.PreExecute()
span, ctx := trace.StartSpanFromContext(t.Ctx(),
opentracing.Tags{
"Type": t.Name(),
})
defer span.Finish()
span.LogFields(oplog.String("scheduler process PreExecute", t.Name()))
err := t.PreExecute(ctx)
defer func() {
t.Notify(err)
log.Printf("notify with error: %v", err)
}()
if err != nil {
trace.LogError(span, err)
return
}
span.LogFields(oplog.String("scheduler process AddActiveTask", t.Name()))
q.AddActiveTask(t)
log.Printf("task add to active list ...")
defer func() {
span.LogFields(oplog.String("scheduler process PopActiveTask", t.Name()))
q.PopActiveTask(t.ID())
log.Printf("pop from active list ...")
}()
err = t.Execute()
span.LogFields(oplog.String("scheduler process Execute", t.Name()))
err = t.Execute(ctx)
if err != nil {
log.Printf("execute definition task failed, error = %v", err)
trace.LogError(span, err)
return
}
log.Printf("task execution done ...")
err = t.PostExecute()
log.Printf("post execute task done ...")
span.LogFields(oplog.String("scheduler process PostExecute", t.Name()))
err = t.PostExecute(ctx)
}
func (sched *TaskScheduler) indexAddLoop() {

View File

@ -473,7 +473,6 @@ func (c *Core) setMsgStreams() error {
c.SendTimeTick = func(t typeutil.Timestamp) error {
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
MsgCtx: nil,
BeginTimestamp: t,
EndTimestamp: t,
HashValues: []uint32{0},

View File

@ -235,7 +235,6 @@ func TestMasterService(t *testing.T) {
var timeTick typeutil.Timestamp = 100
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
MsgCtx: nil,
BeginTimestamp: timeTick,
EndTimestamp: timeTick,
HashValues: []uint32{0},

View File

@ -1,7 +1,6 @@
package msgstream
import (
"context"
"errors"
"github.com/golang/protobuf/proto"
@ -14,6 +13,7 @@ type MsgType = commonpb.MsgType
type MarshalType = interface{}
type TsMsg interface {
ID() UniqueID
BeginTs() Timestamp
EndTs() Timestamp
Type() MsgType
@ -25,7 +25,6 @@ type TsMsg interface {
}
type BaseMsg struct {
MsgCtx context.Context
BeginTimestamp Timestamp
EndTimestamp Timestamp
HashValues []uint32
@ -67,6 +66,10 @@ type InsertMsg struct {
internalpb2.InsertRequest
}
func (it *InsertMsg) ID() UniqueID {
return it.Base.MsgID
}
func (it *InsertMsg) Type() MsgType {
return it.Base.MsgType
}
@ -115,6 +118,10 @@ type FlushCompletedMsg struct {
internalpb2.SegmentFlushCompletedMsg
}
func (fl *FlushCompletedMsg) ID() UniqueID {
return fl.Base.MsgID
}
func (fl *FlushCompletedMsg) Type() MsgType {
return fl.Base.MsgType
}
@ -153,6 +160,10 @@ type FlushMsg struct {
internalpb2.FlushMsg
}
func (fl *FlushMsg) ID() UniqueID {
return fl.Base.MsgID
}
func (fl *FlushMsg) Type() MsgType {
return fl.Base.MsgType
}
@ -190,6 +201,10 @@ type DeleteMsg struct {
internalpb2.DeleteRequest
}
func (dt *DeleteMsg) ID() UniqueID {
return dt.Base.MsgID
}
func (dt *DeleteMsg) Type() MsgType {
return dt.Base.MsgType
}
@ -239,6 +254,10 @@ type SearchMsg struct {
internalpb2.SearchRequest
}
func (st *SearchMsg) ID() UniqueID {
return st.Base.MsgID
}
func (st *SearchMsg) Type() MsgType {
return st.Base.MsgType
}
@ -276,6 +295,10 @@ type SearchResultMsg struct {
internalpb2.SearchResults
}
func (srt *SearchResultMsg) ID() UniqueID {
return srt.Base.MsgID
}
func (srt *SearchResultMsg) Type() MsgType {
return srt.Base.MsgType
}
@ -313,6 +336,10 @@ type TimeTickMsg struct {
internalpb2.TimeTickMsg
}
func (tst *TimeTickMsg) ID() UniqueID {
return tst.Base.MsgID
}
func (tst *TimeTickMsg) Type() MsgType {
return tst.Base.MsgType
}
@ -351,6 +378,10 @@ type QueryNodeStatsMsg struct {
internalpb2.QueryNodeStats
}
func (qs *QueryNodeStatsMsg) ID() UniqueID {
return qs.Base.MsgID
}
func (qs *QueryNodeStatsMsg) Type() MsgType {
return qs.Base.MsgType
}
@ -386,6 +417,10 @@ type SegmentStatisticsMsg struct {
internalpb2.SegmentStatistics
}
func (ss *SegmentStatisticsMsg) ID() UniqueID {
return ss.Base.MsgID
}
func (ss *SegmentStatisticsMsg) Type() MsgType {
return ss.Base.MsgType
}
@ -431,6 +466,10 @@ type CreateCollectionMsg struct {
internalpb2.CreateCollectionRequest
}
func (cc *CreateCollectionMsg) ID() UniqueID {
return cc.Base.MsgID
}
func (cc *CreateCollectionMsg) Type() MsgType {
return cc.Base.MsgType
}
@ -468,6 +507,10 @@ type DropCollectionMsg struct {
internalpb2.DropCollectionRequest
}
func (dc *DropCollectionMsg) ID() UniqueID {
return dc.Base.MsgID
}
func (dc *DropCollectionMsg) Type() MsgType {
return dc.Base.MsgType
}
@ -505,6 +548,10 @@ type CreatePartitionMsg struct {
internalpb2.CreatePartitionRequest
}
func (cc *CreatePartitionMsg) ID() UniqueID {
return cc.Base.MsgID
}
func (cc *CreatePartitionMsg) Type() MsgType {
return cc.Base.MsgType
}
@ -542,6 +589,10 @@ type DropPartitionMsg struct {
internalpb2.DropPartitionRequest
}
func (dc *DropPartitionMsg) ID() UniqueID {
return dc.Base.MsgID
}
func (dc *DropPartitionMsg) Type() MsgType {
return dc.Base.MsgType
}
@ -579,18 +630,14 @@ type LoadIndexMsg struct {
internalpb2.LoadIndex
}
func (lim *LoadIndexMsg) ID() UniqueID {
return lim.Base.MsgID
}
func (lim *LoadIndexMsg) Type() MsgType {
return lim.Base.MsgType
}
func (lim *LoadIndexMsg) GetMsgContext() context.Context {
return lim.MsgCtx
}
func (lim *LoadIndexMsg) SetMsgContext(ctx context.Context) {
lim.MsgCtx = ctx
}
func (lim *LoadIndexMsg) Marshal(input TsMsg) (MarshalType, error) {
loadIndexMsg := input.(*LoadIndexMsg)
loadIndexRequest := &loadIndexMsg.LoadIndex
@ -622,6 +669,10 @@ type SegmentInfoMsg struct {
datapb.SegmentMsg
}
func (sim *SegmentInfoMsg) ID() UniqueID {
return sim.Base.MsgID
}
func (sim *SegmentInfoMsg) Type() MsgType {
return sim.Base.MsgType
}

View File

@ -39,6 +39,7 @@ func (node *NodeImpl) CreateCollection(request *milvuspb.CreateCollectionRequest
defer cancel()
cct := &CreateCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: request,
masterClient: node.masterClient,
@ -70,6 +71,7 @@ func (node *NodeImpl) DropCollection(request *milvuspb.DropCollectionRequest) (*
defer cancel()
dct := &DropCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropCollectionRequest: request,
masterClient: node.masterClient,
@ -100,6 +102,7 @@ func (node *NodeImpl) HasCollection(request *milvuspb.HasCollectionRequest) (*mi
defer cancel()
hct := &HasCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
HasCollectionRequest: request,
masterClient: node.masterClient,
@ -134,6 +137,7 @@ func (node *NodeImpl) LoadCollection(request *milvuspb.LoadCollectionRequest) (*
defer cancel()
lct := &LoadCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
LoadCollectionRequest: request,
queryserviceClient: node.queryServiceClient,
@ -164,6 +168,7 @@ func (node *NodeImpl) ReleaseCollection(request *milvuspb.ReleaseCollectionReque
defer cancel()
rct := &ReleaseCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
ReleaseCollectionRequest: request,
queryserviceClient: node.queryServiceClient,
@ -194,6 +199,7 @@ func (node *NodeImpl) DescribeCollection(request *milvuspb.DescribeCollectionReq
defer cancel()
dct := &DescribeCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeCollectionRequest: request,
masterClient: node.masterClient,
@ -227,6 +233,7 @@ func (node *NodeImpl) GetCollectionStatistics(request *milvuspb.CollectionStatsR
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
g := &GetCollectionsStatisticsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
CollectionStatsRequest: request,
dataServiceClient: node.dataServiceClient,
@ -260,6 +267,7 @@ func (node *NodeImpl) ShowCollections(request *milvuspb.ShowCollectionRequest) (
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
sct := &ShowCollectionsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
ShowCollectionRequest: request,
masterClient: node.masterClient,
@ -293,6 +301,7 @@ func (node *NodeImpl) CreatePartition(request *milvuspb.CreatePartitionRequest)
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
cpt := &CreatePartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreatePartitionRequest: request,
masterClient: node.masterClient,
@ -321,6 +330,7 @@ func (node *NodeImpl) DropPartition(request *milvuspb.DropPartitionRequest) (*co
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
dpt := &DropPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropPartitionRequest: request,
masterClient: node.masterClient,
@ -350,6 +360,7 @@ func (node *NodeImpl) HasPartition(request *milvuspb.HasPartitionRequest) (*milv
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
hpt := &HasPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
HasPartitionRequest: request,
masterClient: node.masterClient,
@ -386,6 +397,7 @@ func (node *NodeImpl) LoadPartitions(request *milvuspb.LoadPartitonRequest) (*co
defer cancel()
lpt := &LoadPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
LoadPartitonRequest: request,
queryserviceClient: node.queryServiceClient,
@ -416,6 +428,7 @@ func (node *NodeImpl) ReleasePartitions(request *milvuspb.ReleasePartitionReques
defer cancel()
rpt := &ReleasePartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
ReleasePartitionRequest: request,
queryserviceClient: node.queryServiceClient,
@ -449,6 +462,7 @@ func (node *NodeImpl) ShowPartitions(request *milvuspb.ShowPartitionRequest) (*m
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
spt := &ShowPartitionsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
ShowPartitionRequest: request,
masterClient: node.masterClient,
@ -483,6 +497,7 @@ func (node *NodeImpl) CreateIndex(request *milvuspb.CreateIndexRequest) (*common
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
cit := &CreateIndexTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreateIndexRequest: request,
masterClient: node.masterClient,
@ -512,6 +527,7 @@ func (node *NodeImpl) DescribeIndex(request *milvuspb.DescribeIndexRequest) (*mi
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
dit := &DescribeIndexTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeIndexRequest: request,
masterClient: node.masterClient,
@ -545,6 +561,7 @@ func (node *NodeImpl) DropIndex(request *milvuspb.DropIndexRequest) (*commonpb.S
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
dit := &DropIndexTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropIndexRequest: request,
masterClient: node.masterClient,
@ -571,6 +588,7 @@ func (node *NodeImpl) GetIndexState(request *milvuspb.IndexStateRequest) (*milvu
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
dipt := &GetIndexStateTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
IndexStateRequest: request,
indexServiceClient: node.indexServiceClient,
@ -605,6 +623,7 @@ func (node *NodeImpl) Insert(request *milvuspb.InsertRequest) (*milvuspb.InsertR
defer cancel()
it := &InsertTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
dataServiceClient: node.dataServiceClient,
BaseInsertTask: BaseInsertTask{
@ -656,8 +675,9 @@ func (node *NodeImpl) Search(request *milvuspb.SearchRequest) (*milvuspb.SearchR
defer cancel()
qt := &SearchTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
SearchRequest: internalpb2.SearchRequest{
SearchRequest: &internalpb2.SearchRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSearch,
SourceID: Params.ProxyID,
@ -697,6 +717,7 @@ func (node *NodeImpl) Flush(request *milvuspb.FlushRequest) (*commonpb.Status, e
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
ft := &FlushTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
FlushRequest: request,
dataServiceClient: node.dataServiceClient,

File diff suppressed because it is too large Load Diff

View File

@ -9,10 +9,12 @@ import (
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
)
type TaskQueue interface {
@ -295,31 +297,42 @@ func (sched *TaskScheduler) getTaskByReqID(collMeta UniqueID) task {
}
func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
err := t.PreExecute()
span, ctx := trace.StartSpanFromContext(t.Ctx(),
opentracing.Tags{
"Type": t.Name(),
"ID": t.ID(),
})
defer span.Finish()
span.LogFields(oplog.Int64("scheduler process PreExecute", t.ID()))
err := t.PreExecute(ctx)
defer func() {
t.Notify(err)
// log.Printf("notify with error: %v", err)
}()
if err != nil {
trace.LogError(span, err)
return
}
span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID()))
q.AddActiveTask(t)
// log.Printf("task add to active list ...")
defer func() {
span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID()))
q.PopActiveTask(t.EndTs())
// log.Printf("pop from active list ...")
}()
err = t.Execute()
span.LogFields(oplog.Int64("scheduler process Execute", t.ID()))
err = t.Execute(ctx)
if err != nil {
log.Printf("execute definition task failed, error = %v", err)
trace.LogError(span, err)
return
}
// log.Printf("task execution done ...")
err = t.PostExecute()
span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID()))
err = t.PostExecute(ctx)
// log.Printf("post execute task done ...")
}

View File

@ -199,6 +199,7 @@ func (s *ServiceImpl) RegisterLink() (*milvuspb.RegisterLinkResponse, error) {
defer cancel()
t := &RegisterLinkTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
nodeInfos: s.nodeInfos,
}
@ -236,6 +237,7 @@ func (s *ServiceImpl) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxy
defer cancel()
t := &RegisterNodeTask{
ctx: ctx,
request: request,
startParams: s.nodeStartParams,
Condition: NewTaskCondition(ctx),
@ -276,6 +278,7 @@ func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateC
defer cancel()
t := &InvalidateCollectionMetaCacheTask{
ctx: ctx,
request: request,
Condition: NewTaskCondition(ctx),
nodeInfos: s.nodeInfos,

View File

@ -18,10 +18,19 @@ const (
FromNode TaskEnum = 2
)
const (
RegisterLinkTaskName = "RegisLinkTask"
RegisterNodeTaskName = "RegisNodeTask"
InvalidateCollectionMetaCacheTaskName = "InvalidateCollectionMetaCacheTask"
)
type task interface {
PreExecute() error
Execute() error
PostExecute() error
Ctx() context.Context
ID() UniqueID // return ReqID
Name() string
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
}
@ -58,15 +67,28 @@ func NewTaskCondition(ctx context.Context) Condition {
type RegisterLinkTask struct {
Condition
ctx context.Context
response *milvuspb.RegisterLinkResponse
nodeInfos *GlobalNodeInfoTable
}
func (t *RegisterLinkTask) PreExecute() error {
func (t *RegisterLinkTask) Ctx() context.Context {
return t.ctx
}
func (t *RegisterLinkTask) ID() UniqueID {
return 0
}
func (t *RegisterLinkTask) Name() string {
return RegisterLinkTaskName
}
func (t *RegisterLinkTask) PreExecute(ctx context.Context) error {
return nil
}
func (t *RegisterLinkTask) Execute() error {
func (t *RegisterLinkTask) Execute(ctx context.Context) error {
info, err := t.nodeInfos.Pick()
if err != nil {
return err
@ -84,12 +106,13 @@ func (t *RegisterLinkTask) Execute() error {
return nil
}
func (t *RegisterLinkTask) PostExecute() error {
func (t *RegisterLinkTask) PostExecute(ctx context.Context) error {
return nil
}
type RegisterNodeTask struct {
Condition
ctx context.Context
request *proxypb.RegisterNodeRequest
response *proxypb.RegisterNodeResponse
startParams []*commonpb.KeyValuePair
@ -97,11 +120,23 @@ type RegisterNodeTask struct {
nodeInfos *GlobalNodeInfoTable
}
func (t *RegisterNodeTask) PreExecute() error {
func (t *RegisterNodeTask) Ctx() context.Context {
return t.ctx
}
func (t *RegisterNodeTask) ID() UniqueID {
return t.request.Base.MsgID
}
func (t *RegisterNodeTask) Name() string {
return RegisterNodeTaskName
}
func (t *RegisterNodeTask) PreExecute(ctx context.Context) error {
return nil
}
func (t *RegisterNodeTask) Execute() error {
func (t *RegisterNodeTask) Execute(ctx context.Context) error {
nodeID := t.allocator.AllocOne()
info := NodeInfo{
ip: t.request.Address.Ip,
@ -122,22 +157,35 @@ func (t *RegisterNodeTask) Execute() error {
return err
}
func (t *RegisterNodeTask) PostExecute() error {
func (t *RegisterNodeTask) PostExecute(ctx context.Context) error {
return nil
}
type InvalidateCollectionMetaCacheTask struct {
Condition
ctx context.Context
request *proxypb.InvalidateCollMetaCacheRequest
response *commonpb.Status
nodeInfos *GlobalNodeInfoTable
}
func (t *InvalidateCollectionMetaCacheTask) PreExecute() error {
func (t *InvalidateCollectionMetaCacheTask) Ctx() context.Context {
return t.ctx
}
func (t *InvalidateCollectionMetaCacheTask) ID() UniqueID {
return t.request.Base.MsgID
}
func (t *InvalidateCollectionMetaCacheTask) Name() string {
return InvalidateCollectionMetaCacheTaskName
}
func (t *InvalidateCollectionMetaCacheTask) PreExecute(ctx context.Context) error {
return nil
}
func (t *InvalidateCollectionMetaCacheTask) Execute() error {
func (t *InvalidateCollectionMetaCacheTask) Execute(ctx context.Context) error {
var err error
clients, err := t.nodeInfos.ObtainAllClients()
if err != nil {
@ -158,6 +206,6 @@ func (t *InvalidateCollectionMetaCacheTask) Execute() error {
return nil
}
func (t *InvalidateCollectionMetaCacheTask) PostExecute() error {
func (t *InvalidateCollectionMetaCacheTask) PostExecute(ctx context.Context) error {
return nil
}

View File

@ -3,6 +3,10 @@ package proxyservice
import (
"context"
"sync"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
)
type TaskScheduler struct {
@ -40,21 +44,30 @@ func (sched *TaskScheduler) scheduleInvalidateCollectionMetaCacheTask() task {
}
func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
var err error
err = t.PreExecute()
span, ctx := trace.StartSpanFromContext(t.Ctx(),
opentracing.Tags{
"Type": t.Name(),
})
defer span.Finish()
span.LogFields(oplog.String("scheduler process PreExecute", t.Name()))
err := t.PreExecute(ctx)
defer func() {
trace.LogError(span, err)
t.Notify(err)
}()
if err != nil {
return
}
err = t.Execute()
span.LogFields(oplog.String("scheduler process Execute", t.Name()))
err = t.Execute(ctx)
if err != nil {
trace.LogError(span, err)
return
}
err = t.PostExecute()
span.LogFields(oplog.String("scheduler process PostExecute", t.Name()))
err = t.PostExecute(ctx)
}
func (sched *TaskScheduler) registerLinkLoop() {

178
internal/util/trace/util.go Normal file
View File

@ -0,0 +1,178 @@
package trace
import (
"context"
"errors"
"runtime"
"strings"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/uber/jaeger-client-go"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
func StartSpanFromContext(ctx context.Context, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
if ctx == nil {
panic("StartSpanFromContext called with nil context")
}
var pcs [1]uintptr
n := runtime.Callers(2, pcs[:])
if n < 1 {
span, ctx := opentracing.StartSpanFromContext(ctx, "unknown", opts...)
span.LogFields(log.Error(errors.New("runtime.Callers failed")))
return span, ctx
}
fn := runtime.FuncForPC(pcs[0])
name := fn.Name()
if lastSlash := strings.LastIndexByte(name, '/'); lastSlash > 0 {
name = name[lastSlash+1:]
}
if parent := opentracing.SpanFromContext(ctx); parent != nil {
opts = append(opts, opentracing.ChildOf(parent.Context()))
}
span := opentracing.StartSpan(name, opts...)
file, line := fn.FileLine(pcs[0])
span.LogFields(log.String("filename", file), log.Int("line", line))
return span, opentracing.ContextWithSpan(ctx, span)
}
func StartSpanFromContextWithOperationName(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
if ctx == nil {
panic("StartSpanFromContextWithOperationName called with nil context")
}
var pcs [1]uintptr
n := runtime.Callers(2, pcs[:])
if n < 1 {
span, ctx := opentracing.StartSpanFromContext(ctx, operationName, opts...)
span.LogFields(log.Error(errors.New("runtime.Callers failed")))
return span, ctx
}
file, line := runtime.FuncForPC(pcs[0]).FileLine(pcs[0])
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
opts = append(opts, opentracing.ChildOf(parentSpan.Context()))
}
span := opentracing.StartSpan(operationName, opts...)
ctx = opentracing.ContextWithSpan(ctx, span)
span.LogFields(log.String("filename", file), log.Int("line", line))
return span, ctx
}
func LogError(span opentracing.Span, err error) error {
if err == nil {
return nil
}
// Get caller frame.
var pcs [1]uintptr
n := runtime.Callers(2, pcs[:])
if n < 1 {
span.LogFields(log.Error(err))
span.LogFields(log.Error(errors.New("runtime.Callers failed")))
return err
}
file, line := runtime.FuncForPC(pcs[0]).FileLine(pcs[0])
span.LogFields(log.String("filename", file), log.Int("line", line), log.Error(err))
return err
}
func InfoFromSpan(span opentracing.Span) (traceID string, sampled bool, found bool) {
if spanContext, ok := span.Context().(jaeger.SpanContext); ok {
traceID = spanContext.TraceID().String()
sampled = spanContext.IsSampled()
return traceID, sampled, true
}
return "", false, false
}
func InfoFromContext(ctx context.Context) (traceID string, sampled bool, found bool) {
if span := opentracing.SpanFromContext(ctx); span != nil {
return InfoFromSpan(span)
}
return "", false, false
}
func InjectContextToPulsarMsgProperties(sc opentracing.SpanContext, properties map[string]string) {
tracer := opentracing.GlobalTracer()
tracer.Inject(sc, opentracing.TextMap, propertiesReaderWriter{properties})
}
func ExtractFromPulsarMsgProperties(msg msgstream.TsMsg, properties map[string]string) opentracing.Span {
if !allowTrace(msg) {
return noopSpan()
}
tracer := opentracing.GlobalTracer()
sc, _ := tracer.Extract(opentracing.TextMap, propertiesReaderWriter{properties})
name := "receive pulsar msg"
opts := []opentracing.StartSpanOption{
ext.RPCServerOption(sc),
opentracing.Tags{
"ID": msg.ID(),
"Type": msg.Type(),
"HashKeys": msg.HashKeys(),
"Position": msg.Position(),
}}
return opentracing.StartSpan(name, opts...)
}
func MsgSpanFromCtx(ctx context.Context, msg msgstream.TsMsg, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
if !allowTrace(msg) {
return noopSpan(), ctx
}
name := "send pulsar msg"
opts = append(opts, opentracing.Tags{
"ID": msg.ID(),
"Type": msg.Type(),
"HashKeys": msg.HashKeys(),
"Position": msg.Position(),
})
return StartSpanFromContextWithOperationName(ctx, name, opts...)
}
type propertiesReaderWriter struct {
ppMap map[string]string
}
func (ppRW propertiesReaderWriter) Set(key, val string) {
key = strings.ToLower(key)
ppRW.ppMap[key] = val
}
func (ppRW propertiesReaderWriter) ForeachKey(handler func(key, val string) error) error {
for k, val := range ppRW.ppMap {
if err := handler(k, val); err != nil {
return err
}
}
return nil
}
func allowTrace(in interface{}) bool {
if in == nil {
return false
}
switch res := in.(type) {
case msgstream.TsMsg:
return !(res.Type() == commonpb.MsgType_kTimeTick ||
res.Type() == commonpb.MsgType_kQueryNodeStats ||
res.Type() == commonpb.MsgType_kLoadIndex)
default:
return false
}
}
func noopSpan() opentracing.Span {
return opentracing.NoopTracer{}.StartSpan("Default-span")
}

View File

@ -0,0 +1,86 @@
package trace
import (
"context"
"errors"
"fmt"
"io"
"testing"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/uber/jaeger-client-go/config"
)
func InitTracing() io.Closer {
cfg := &config.Configuration{
ServiceName: "test",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
return closer
}
type simpleStruct struct {
name string
value string
}
func TestTracing(t *testing.T) {
//Already Init in each framework, this can be ignored in debug
closer := InitTracing()
defer closer.Close()
// context normally can be propagated through func params
ctx := context.Background()
//start span
//default use function name for operation name
sp, ctx := StartSpanFromContext(ctx)
sp.SetTag("tag1", "tag1")
// use self-defined operation name for span
// sp, ctx := StartSpanFromContextWithOperationName(ctx, "self-defined name")
defer sp.Finish()
ss := &simpleStruct{
name: "name",
value: "value",
}
sp.LogFields(oplog.String("key", "value"), oplog.Object("key", ss))
err := caller(ctx)
if err != nil {
LogError(sp, err) //LogError do something error log in trace and returns origin error.
}
}
func caller(ctx context.Context) error {
for i := 0; i < 2; i++ {
// if span starts in a loop, defer is not allowed.
// manually call span.Finish() if error occurs or one loop ends
sp, _ := StartSpanFromContextWithOperationName(ctx, fmt.Sprintf("test:%d", i))
sp.SetTag(fmt.Sprintf("tags:%d", i), fmt.Sprintf("tags:%d", i))
var err error
if i == 1 {
err = errors.New("test")
}
if err != nil {
sp.Finish()
return LogError(sp, err)
}
sp.Finish()
}
return nil
}