fix: fail to init fg clears flushTs so that slows flush (#36740)

See also: #36709

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/36782/head
XuanYang-cn 2024-10-11 17:37:04 +08:00 committed by GitHub
parent 0751c508de
commit 794e3ab7e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 12 additions and 14 deletions

View File

@ -239,19 +239,6 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
serverID: serverID,
}
err := params.WriteBufferManager.Register(channelName, metacache,
writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)),
writebuffer.WithIDAllocator(params.Allocator))
if err != nil {
log.Warn("failed to register channel buffer", zap.Error(err))
return nil, err
}
defer func() {
if err != nil {
defer params.WriteBufferManager.RemoveChannel(channelName)
}
}()
ctx, cancel := context.WithCancel(params.Ctx)
ds := &DataSyncService{
ctx: ctx,
@ -324,6 +311,17 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
}
ds.fg = fg
// Register channel after channel pipeline is ready.
// This'll reject any FlushChannel and FlushSegments calls to prevent inconsistency between DN and DC over flushTs
// if fail to init flowgraph nodes.
err = params.WriteBufferManager.Register(channelName, metacache,
writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)),
writebuffer.WithIDAllocator(params.Allocator))
if err != nil {
log.Warn("failed to register channel buffer", zap.String("channel", channelName), zap.Error(err))
return nil, err
}
return ds, nil
}

View File

@ -178,7 +178,7 @@ func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushT
m.mut.RUnlock()
if !ok {
log.Ctx(ctx).Warn("write buffer not found when flush segments",
log.Ctx(ctx).Warn("write buffer not found when flush channel",
zap.String("channel", channel),
zap.Uint64("flushTs", flushTs))
return merr.WrapErrChannelNotFound(channel)