mirror of https://github.com/milvus-io/milvus.git
enhance: add intent ctx to rootcoord init (#38439)
issue: #35917 add intent ctx to rootcoord initialization process Signed-off-by: tinswzy <zhenyuan.wei@zilliz.com>pull/38896/head
parent
95c1ccc20d
commit
11f8f4a378
|
@ -385,6 +385,9 @@ func (mr *MilvusRoles) Run() {
|
|||
paramtable.SetRole(mr.ServerType)
|
||||
}
|
||||
|
||||
// init tracer before run any component
|
||||
tracer.Init()
|
||||
|
||||
// Initialize streaming service if enabled.
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
streaming.Init()
|
||||
|
@ -518,7 +521,6 @@ func (mr *MilvusRoles) Run() {
|
|||
return nil
|
||||
})
|
||||
|
||||
tracer.Init()
|
||||
paramtable.Get().WatchKeyPrefix("trace", config.NewHandler("tracing handler", func(e *config.Event) {
|
||||
params := paramtable.Get()
|
||||
|
||||
|
|
|
@ -147,7 +147,10 @@ type dmlChannels struct {
|
|||
channelsHeap channelsHeap
|
||||
}
|
||||
|
||||
func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePrefixDefault string, chanNumDefault int64) *dmlChannels {
|
||||
func newDmlChannels(initCtx context.Context, factory msgstream.Factory, chanNamePrefixDefault string, chanNumDefault int64) *dmlChannels {
|
||||
log.Ctx(initCtx).Info("new DmlChannels",
|
||||
zap.String("chanNamePrefixDefault", chanNamePrefixDefault),
|
||||
zap.Int64("chanNumDefault", chanNumDefault))
|
||||
params := ¶mtable.Get().CommonCfg
|
||||
var (
|
||||
chanNamePrefix string
|
||||
|
@ -167,7 +170,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
|
|||
}
|
||||
|
||||
d := &dmlChannels{
|
||||
ctx: ctx,
|
||||
ctx: context.TODO(),
|
||||
factory: factory,
|
||||
namePrefix: chanNamePrefix,
|
||||
capacity: chanNum,
|
||||
|
@ -176,19 +179,19 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
|
|||
}
|
||||
|
||||
for i, name := range names {
|
||||
ms, err := factory.NewMsgStream(ctx)
|
||||
ms, err := factory.NewMsgStream(initCtx)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error("Failed to add msgstream",
|
||||
log.Ctx(initCtx).Error("Failed to add msgstream",
|
||||
zap.String("name", name),
|
||||
zap.Error(err))
|
||||
panic("Failed to add msgstream")
|
||||
}
|
||||
|
||||
if params.PreCreatedTopicEnabled.GetAsBool() {
|
||||
d.checkPreCreatedTopic(ctx, factory, name)
|
||||
d.checkPreCreatedTopic(initCtx, factory, name)
|
||||
}
|
||||
|
||||
ms.AsProducer(ctx, []string{name})
|
||||
ms.AsProducer(initCtx, []string{name})
|
||||
dms := &dmlMsgStream{
|
||||
ms: ms,
|
||||
refcnt: 0,
|
||||
|
@ -202,7 +205,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
|
|||
|
||||
heap.Init(&d.channelsHeap)
|
||||
|
||||
log.Ctx(ctx).Info("init dml channels", zap.String("prefix", chanNamePrefix), zap.Int64("num", chanNum))
|
||||
log.Ctx(initCtx).Info("init dml channels", zap.String("prefix", chanNamePrefix), zap.Int64("num", chanNum))
|
||||
|
||||
metrics.RootCoordNumOfDMLChannel.Add(float64(chanNum))
|
||||
metrics.RootCoordNumOfMsgStream.Add(float64(chanNum))
|
||||
|
|
|
@ -712,7 +712,7 @@ func newRocksMqTtSynchronizer() *timetickSync {
|
|||
ctx := context.Background()
|
||||
factory := dependency.NewDefaultFactory(true)
|
||||
chans := map[UniqueID][]string{}
|
||||
ticker := newTimeTickSync(ctx, TestRootCoordID, factory, chans)
|
||||
ticker := newTimeTickSync(context.TODO(), ctx, TestRootCoordID, factory, chans)
|
||||
return ticker
|
||||
}
|
||||
|
||||
|
@ -1054,7 +1054,7 @@ func newTickerWithFactory(factory msgstream.Factory) *timetickSync {
|
|||
paramtable.Get().Save(Params.RootCoordCfg.DmlChannelNum.Key, "4")
|
||||
ctx := context.Background()
|
||||
chans := map[UniqueID][]string{}
|
||||
ticker := newTimeTickSync(ctx, TestRootCoordID, factory, chans)
|
||||
ticker := newTimeTickSync(context.TODO(), ctx, TestRootCoordID, factory, chans)
|
||||
return ticker
|
||||
}
|
||||
|
||||
|
|
|
@ -341,15 +341,14 @@ func (c *Core) initKVCreator() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Core) initMetaTable() error {
|
||||
log := log.Ctx(c.ctx)
|
||||
func (c *Core) initMetaTable(initCtx context.Context) error {
|
||||
fn := func() error {
|
||||
var catalog metastore.RootCoordCatalog
|
||||
var err error
|
||||
|
||||
switch Params.MetaStoreCfg.MetaStoreType.GetValue() {
|
||||
case util.MetaStoreTypeEtcd:
|
||||
log.Info("Using etcd as meta storage.")
|
||||
log.Ctx(initCtx).Info("Using etcd as meta storage.")
|
||||
var metaKV kv.MetaKv
|
||||
var ss *kvmetestore.SuffixSnapshot
|
||||
var err error
|
||||
|
@ -363,7 +362,7 @@ func (c *Core) initMetaTable() error {
|
|||
}
|
||||
catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
|
||||
case util.MetaStoreTypeTiKV:
|
||||
log.Info("Using tikv as meta storage.")
|
||||
log.Ctx(initCtx).Info("Using tikv as meta storage.")
|
||||
var metaKV kv.MetaKv
|
||||
var ss *kvmetestore.SuffixSnapshot
|
||||
var err error
|
||||
|
@ -387,10 +386,10 @@ func (c *Core) initMetaTable() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
return retry.Do(c.ctx, fn, retry.Attempts(10))
|
||||
return retry.Do(initCtx, fn, retry.Attempts(10))
|
||||
}
|
||||
|
||||
func (c *Core) initIDAllocator() error {
|
||||
func (c *Core) initIDAllocator(initCtx context.Context) error {
|
||||
var tsoKV kv.TxnKV
|
||||
var kvPath string
|
||||
if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
|
||||
|
@ -406,7 +405,7 @@ func (c *Core) initIDAllocator() error {
|
|||
}
|
||||
c.idAllocator = idAllocator
|
||||
|
||||
log.Ctx(c.ctx).Info("id allocator initialized",
|
||||
log.Ctx(initCtx).Info("id allocator initialized",
|
||||
zap.String("root_path", kvPath),
|
||||
zap.String("sub_path", globalIDAllocatorSubPath),
|
||||
zap.String("key", globalIDAllocatorKey))
|
||||
|
@ -414,7 +413,7 @@ func (c *Core) initIDAllocator() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) initTSOAllocator() error {
|
||||
func (c *Core) initTSOAllocator(initCtx context.Context) error {
|
||||
var tsoKV kv.TxnKV
|
||||
var kvPath string
|
||||
if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
|
||||
|
@ -430,7 +429,7 @@ func (c *Core) initTSOAllocator() error {
|
|||
}
|
||||
c.tsoAllocator = tsoAllocator
|
||||
|
||||
log.Ctx(c.ctx).Info("tso allocator initialized",
|
||||
log.Ctx(initCtx).Info("tso allocator initialized",
|
||||
zap.String("root_path", kvPath),
|
||||
zap.String("sub_path", globalIDAllocatorSubPath),
|
||||
zap.String("key", globalIDAllocatorKey))
|
||||
|
@ -439,19 +438,22 @@ func (c *Core) initTSOAllocator() error {
|
|||
}
|
||||
|
||||
func (c *Core) initInternal() error {
|
||||
log := log.Ctx(c.ctx)
|
||||
initCtx, initSpan := log.NewIntentContext(typeutil.RootCoordRole, "initInternal")
|
||||
defer initSpan.End()
|
||||
log := log.Ctx(initCtx)
|
||||
|
||||
c.UpdateStateCode(commonpb.StateCode_Initializing)
|
||||
c.initKVCreator()
|
||||
|
||||
if err := c.initIDAllocator(); err != nil {
|
||||
if err := c.initIDAllocator(initCtx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.initTSOAllocator(); err != nil {
|
||||
if err := c.initTSOAllocator(initCtx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.initMetaTable(); err != nil {
|
||||
if err := c.initMetaTable(initCtx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -459,7 +461,7 @@ func (c *Core) initInternal() error {
|
|||
|
||||
c.factory.Init(Params)
|
||||
chanMap := c.meta.ListCollectionPhysicalChannels(c.ctx)
|
||||
c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.factory, chanMap)
|
||||
c.chanTimeTick = newTimeTickSync(initCtx, c.ctx, c.session.ServerID, c.factory, chanMap)
|
||||
log.Info("create TimeTick sync done")
|
||||
|
||||
c.proxyClientManager = proxyutil.NewProxyClientManager(c.proxyCreator)
|
||||
|
@ -492,12 +494,12 @@ func (c *Core) initInternal() error {
|
|||
c.quotaCenter = NewQuotaCenter(c.proxyClientManager, c.queryCoord, c.dataCoord, c.tsoAllocator, c.meta)
|
||||
log.Debug("RootCoord init QuotaCenter done")
|
||||
|
||||
if err := c.initCredentials(); err != nil {
|
||||
if err := c.initCredentials(initCtx); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info("init credentials done")
|
||||
|
||||
if err := c.initRbac(); err != nil {
|
||||
if err := c.initRbac(initCtx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -555,34 +557,33 @@ func (c *Core) Init() error {
|
|||
return initError
|
||||
}
|
||||
|
||||
func (c *Core) initCredentials() error {
|
||||
log := log.Ctx(c.ctx)
|
||||
credInfo, _ := c.meta.GetCredential(c.ctx, util.UserRoot)
|
||||
func (c *Core) initCredentials(initCtx context.Context) error {
|
||||
credInfo, _ := c.meta.GetCredential(initCtx, util.UserRoot)
|
||||
if credInfo == nil {
|
||||
encryptedRootPassword, err := crypto.PasswordEncrypt(Params.CommonCfg.DefaultRootPassword.GetValue())
|
||||
if err != nil {
|
||||
log.Warn("RootCoord init user root failed", zap.Error(err))
|
||||
log.Ctx(initCtx).Warn("RootCoord init user root failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Info("RootCoord init user root")
|
||||
err = c.meta.AddCredential(c.ctx, &internalpb.CredentialInfo{Username: util.UserRoot, EncryptedPassword: encryptedRootPassword})
|
||||
log.Ctx(initCtx).Info("RootCoord init user root")
|
||||
err = c.meta.AddCredential(initCtx, &internalpb.CredentialInfo{Username: util.UserRoot, EncryptedPassword: encryptedRootPassword})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) initRbac() error {
|
||||
func (c *Core) initRbac(initCtx context.Context) error {
|
||||
var err error
|
||||
// create default roles, including admin, public
|
||||
for _, role := range util.DefaultRoles {
|
||||
err = c.meta.CreateRole(c.ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: role})
|
||||
err = c.meta.CreateRole(initCtx, util.DefaultTenant, &milvuspb.RoleEntity{Name: role})
|
||||
if err != nil && !common.IsIgnorableError(err) {
|
||||
return errors.Wrap(err, "failed to create role")
|
||||
}
|
||||
}
|
||||
|
||||
if Params.ProxyCfg.EnablePublicPrivilege.GetAsBool() {
|
||||
err = c.initPublicRolePrivilege()
|
||||
err = c.initPublicRolePrivilege(initCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -594,7 +595,7 @@ func (c *Core) initRbac() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) initPublicRolePrivilege() error {
|
||||
func (c *Core) initPublicRolePrivilege(initCtx context.Context) error {
|
||||
// grant privileges for the public role
|
||||
globalPrivileges := []string{
|
||||
commonpb.ObjectPrivilege_PrivilegeDescribeCollection.String(),
|
||||
|
@ -606,7 +607,7 @@ func (c *Core) initPublicRolePrivilege() error {
|
|||
|
||||
var err error
|
||||
for _, globalPrivilege := range globalPrivileges {
|
||||
err = c.meta.OperatePrivilege(c.ctx, util.DefaultTenant, &milvuspb.GrantEntity{
|
||||
err = c.meta.OperatePrivilege(initCtx, util.DefaultTenant, &milvuspb.GrantEntity{
|
||||
Role: &milvuspb.RoleEntity{Name: util.RolePublic},
|
||||
Object: &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Global.String()},
|
||||
ObjectName: util.AnyWord,
|
||||
|
@ -621,7 +622,7 @@ func (c *Core) initPublicRolePrivilege() error {
|
|||
}
|
||||
}
|
||||
for _, collectionPrivilege := range collectionPrivileges {
|
||||
err = c.meta.OperatePrivilege(c.ctx, util.DefaultTenant, &milvuspb.GrantEntity{
|
||||
err = c.meta.OperatePrivilege(initCtx, util.DefaultTenant, &milvuspb.GrantEntity{
|
||||
Role: &milvuspb.RoleEntity{Name: util.RolePublic},
|
||||
Object: &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Collection.String()},
|
||||
ObjectName: util.AnyWord,
|
||||
|
|
|
@ -2071,7 +2071,7 @@ func TestCore_InitRBAC(t *testing.T) {
|
|||
Params.Reset(Params.ProxyCfg.EnablePublicPrivilege.Key)
|
||||
}()
|
||||
|
||||
err := c.initRbac()
|
||||
err := c.initRbac(context.TODO())
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
|
@ -2092,7 +2092,7 @@ func TestCore_InitRBAC(t *testing.T) {
|
|||
Params.Reset(Params.ProxyCfg.EnablePublicPrivilege.Key)
|
||||
}()
|
||||
|
||||
err := c.initRbac()
|
||||
err := c.initRbac(context.TODO())
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -113,22 +113,22 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp {
|
|||
return c.defaultTs
|
||||
}
|
||||
|
||||
func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync {
|
||||
func newTimeTickSync(initCtx context.Context, parentLoopCtx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync {
|
||||
// if the old channels number used by the user is greater than the set default value currently
|
||||
// keep the old channels
|
||||
chanNum := getNeedChanNum(Params.RootCoordCfg.DmlChannelNum.GetAsInt(), chanMap)
|
||||
|
||||
// initialize dml channels used for insert
|
||||
dmlChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDml.GetValue(), int64(chanNum))
|
||||
dmlChannels := newDmlChannels(initCtx, factory, Params.CommonCfg.RootCoordDml.GetValue(), int64(chanNum))
|
||||
|
||||
// recover physical channels for all collections
|
||||
for collID, chanNames := range chanMap {
|
||||
dmlChannels.addChannels(chanNames...)
|
||||
log.Ctx(ctx).Info("recover physical channels", zap.Int64("collectionID", collID), zap.Strings("physical channels", chanNames))
|
||||
log.Ctx(initCtx).Info("recover physical channels", zap.Int64("collectionID", collID), zap.Strings("physical channels", chanNames))
|
||||
}
|
||||
|
||||
return &timetickSync{
|
||||
ctx: ctx,
|
||||
ctx: parentLoopCtx,
|
||||
sourceID: sourceID,
|
||||
|
||||
dmlChannels: dmlChannels,
|
||||
|
|
|
@ -42,7 +42,7 @@ func TestTimetickSync(t *testing.T) {
|
|||
|
||||
paramtable.Get().Save(Params.RootCoordCfg.DmlChannelNum.Key, "2")
|
||||
paramtable.Get().Save(Params.CommonCfg.RootCoordDml.Key, "rootcoord-dml")
|
||||
ttSync := newTimeTickSync(ctx, sourceID, factory, nil)
|
||||
ttSync := newTimeTickSync(context.TODO(), ctx, sourceID, factory, nil)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
@ -120,7 +120,7 @@ func TestMultiTimetickSync(t *testing.T) {
|
|||
|
||||
paramtable.Get().Save(Params.RootCoordCfg.DmlChannelNum.Key, "1")
|
||||
paramtable.Get().Save(Params.CommonCfg.RootCoordDml.Key, "rootcoord-dml")
|
||||
ttSync := newTimeTickSync(ctx, UniqueID(0), factory, nil)
|
||||
ttSync := newTimeTickSync(context.TODO(), ctx, UniqueID(0), factory, nil)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
|
@ -190,7 +190,7 @@ func TestTimetickSyncWithExistChannels(t *testing.T) {
|
|||
|
||||
chans[UniqueID(100)] = []string{"by-dev-rootcoord-dml_4", "by-dev-rootcoord-dml_8"}
|
||||
chans[UniqueID(102)] = []string{"by-dev-rootcoord-dml_2", "by-dev-rootcoord-dml_9"}
|
||||
ttSync := newTimeTickSync(ctx, sourceID, factory, chans)
|
||||
ttSync := newTimeTickSync(context.TODO(), ctx, sourceID, factory, chans)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
@ -237,12 +237,12 @@ func TestTimetickSyncInvalidName(t *testing.T) {
|
|||
chans := map[UniqueID][]string{}
|
||||
chans[UniqueID(100)] = []string{"rootcoord-dml4"}
|
||||
assert.Panics(t, func() {
|
||||
newTimeTickSync(ctx, sourceID, factory, chans)
|
||||
newTimeTickSync(context.TODO(), ctx, sourceID, factory, chans)
|
||||
})
|
||||
|
||||
chans = map[UniqueID][]string{}
|
||||
chans[UniqueID(102)] = []string{"rootcoord-dml_a"}
|
||||
assert.Panics(t, func() {
|
||||
newTimeTickSync(ctx, sourceID, factory, chans)
|
||||
newTimeTickSync(context.TODO(), ctx, sourceID, factory, chans)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@ package log
|
|||
import (
|
||||
"context"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
@ -159,6 +161,16 @@ func WithFields(ctx context.Context, fields ...zap.Field) context.Context {
|
|||
return context.WithValue(ctx, CtxLogKey, mLogger)
|
||||
}
|
||||
|
||||
// NewIntentContext creates a new context with intent information and returns it along with a span.
|
||||
func NewIntentContext(name string, intent string) (context.Context, trace.Span) {
|
||||
intentCtx, initSpan := otel.Tracer(name).Start(context.Background(), intent)
|
||||
intentCtx = WithFields(intentCtx,
|
||||
zap.String("role", name),
|
||||
zap.String("intent", intent),
|
||||
zap.String("traceID", initSpan.SpanContext().TraceID().String()))
|
||||
return intentCtx, initSpan
|
||||
}
|
||||
|
||||
// Ctx returns a logger which will log contextual messages attached in ctx
|
||||
func Ctx(ctx context.Context) *MLogger {
|
||||
if ctx == nil {
|
||||
|
|
|
@ -2,6 +2,7 @@ package log
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -101,3 +102,32 @@ func TestMLoggerRatedLog(t *testing.T) {
|
|||
assert.True(t, success)
|
||||
Ctx(ctx).Sync()
|
||||
}
|
||||
|
||||
func TestNewIntentContext(t *testing.T) {
|
||||
ts := newTestLogSpy(t)
|
||||
conf := &Config{Level: "debug", DisableTimestamp: true}
|
||||
logger, p, _ := InitTestLogger(ts, conf)
|
||||
ReplaceGlobals(logger, p)
|
||||
|
||||
replaceLeveledLoggers(logger)
|
||||
testName := "testRole"
|
||||
testIntent := "testIntent"
|
||||
ctx, span := NewIntentContext(testName, testIntent)
|
||||
traceID := span.SpanContext().TraceID().String()
|
||||
assert.NotNil(t, ctx)
|
||||
assert.NotNil(t, span)
|
||||
assert.NotNil(t, ctx.Value(CtxLogKey))
|
||||
mLogger, ok := ctx.Value(CtxLogKey).(*MLogger)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, mLogger)
|
||||
|
||||
Ctx(ctx).Info("Info Test")
|
||||
Ctx(ctx).Debug("Debug Test")
|
||||
Ctx(ctx).Warn("Warn Test")
|
||||
Ctx(ctx).Error("Error Test")
|
||||
Ctx(ctx).Sync()
|
||||
|
||||
ts.assertLastMessageContains(fmt.Sprintf("role=%s", testName))
|
||||
ts.assertLastMessageContains(fmt.Sprintf("intent=%s", testIntent))
|
||||
ts.assertLastMessageContains(fmt.Sprintf("traceID=%s", traceID))
|
||||
}
|
||||
|
|
|
@ -23,13 +23,13 @@ type CommonFactory struct {
|
|||
}
|
||||
|
||||
// NewMsgStream is used to generate a new Msgstream object
|
||||
func (f *CommonFactory) NewMsgStream(ctx context.Context) (ms MsgStream, err error) {
|
||||
func (f *CommonFactory) NewMsgStream(initCtx context.Context) (ms MsgStream, err error) {
|
||||
defer wrapError(&err, "NewMsgStream")
|
||||
cli, err := f.Newer(ctx)
|
||||
cli, err := f.Newer(context.TODO())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewMqMsgStream(context.Background(), f.ReceiveBufSize, f.MQBufSize, cli, f.DispatcherFactory.NewUnmarshalDispatcher())
|
||||
return NewMqMsgStream(initCtx, f.ReceiveBufSize, f.MQBufSize, cli, f.DispatcherFactory.NewUnmarshalDispatcher())
|
||||
}
|
||||
|
||||
// NewTtMsgStream is used to generate a new TtMsgstream object
|
||||
|
|
|
@ -78,13 +78,13 @@ type mqMsgStream struct {
|
|||
}
|
||||
|
||||
// NewMqMsgStream is used to generate a new mqMsgStream object
|
||||
func NewMqMsgStream(ctx context.Context,
|
||||
func NewMqMsgStream(initCtx context.Context,
|
||||
receiveBufSize int64,
|
||||
bufSize int64,
|
||||
client mqwrapper.Client,
|
||||
unmarshal UnmarshalDispatcher,
|
||||
) (*mqMsgStream, error) {
|
||||
streamCtx, streamCancel := context.WithCancel(ctx)
|
||||
streamCtx, streamCancel := context.WithCancel(context.Background())
|
||||
producers := make(map[string]mqwrapper.Producer)
|
||||
consumers := make(map[string]mqwrapper.Consumer)
|
||||
producerChannels := make([]string, 0)
|
||||
|
@ -108,7 +108,7 @@ func NewMqMsgStream(ctx context.Context,
|
|||
closeRWMutex: &sync.RWMutex{},
|
||||
closed: 0,
|
||||
}
|
||||
ctxLog := log.Ctx(ctx)
|
||||
ctxLog := log.Ctx(initCtx)
|
||||
stream.forceEnableProduce.Store(false)
|
||||
stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
|
||||
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) {
|
||||
|
@ -130,7 +130,7 @@ func NewMqMsgStream(ctx context.Context,
|
|||
func (ms *mqMsgStream) AsProducer(ctx context.Context, channels []string) {
|
||||
for _, channel := range channels {
|
||||
if len(channel) == 0 {
|
||||
log.Ctx(ms.ctx).Error("MsgStream asProducer's channel is an empty string")
|
||||
log.Ctx(ctx).Error("MsgStream asProducer's channel is an empty string")
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -149,7 +149,7 @@ func (ms *mqMsgStream) AsProducer(ctx context.Context, channels []string) {
|
|||
ms.producerChannels = append(ms.producerChannels, channel)
|
||||
return nil
|
||||
}
|
||||
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
|
||||
err := retry.Do(ctx, fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
|
||||
if err != nil {
|
||||
errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
|
||||
panic(errMsg)
|
||||
|
|
Loading…
Reference in New Issue