mirror of https://github.com/milvus-io/milvus.git
Fix DropCollection not processed by datanode (#7151)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/7160/merge
parent
171088409b
commit
ec9ccd8bd1
|
@ -20,37 +20,28 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type dmlStream struct {
|
||||
msgStream msgstream.MsgStream
|
||||
valid bool
|
||||
}
|
||||
|
||||
type dmlChannels struct {
|
||||
core *Core
|
||||
lock sync.RWMutex
|
||||
dml map[string]*dmlStream
|
||||
dml map[string]msgstream.MsgStream
|
||||
}
|
||||
|
||||
func newDMLChannels(c *Core) *dmlChannels {
|
||||
return &dmlChannels{
|
||||
core: c,
|
||||
lock: sync.RWMutex{},
|
||||
dml: make(map[string]*dmlStream),
|
||||
dml: make(map[string]msgstream.MsgStream),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dmlChannels) GetNumChannles() int {
|
||||
// GetNumChannels get current dml channel count
|
||||
func (d *dmlChannels) GetNumChannels() int {
|
||||
d.lock.RLock()
|
||||
defer d.lock.RUnlock()
|
||||
count := 0
|
||||
for _, ds := range d.dml {
|
||||
if ds.valid {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
return len(d.dml)
|
||||
}
|
||||
|
||||
// ListChannels lists all dml channel names
|
||||
func (d *dmlChannels) ListChannels() []string {
|
||||
d.lock.RLock()
|
||||
defer d.lock.RUnlock()
|
||||
|
@ -63,6 +54,7 @@ func (d *dmlChannels) ListChannels() []string {
|
|||
|
||||
}
|
||||
|
||||
// Produce produces msg pack into specified channel
|
||||
func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
@ -71,16 +63,13 @@ func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error {
|
|||
if !ok {
|
||||
return fmt.Errorf("channel %s not exist", name)
|
||||
}
|
||||
if err := ds.msgStream.Produce(pack); err != nil {
|
||||
if err := ds.Produce(pack); err != nil {
|
||||
return err
|
||||
}
|
||||
if !ds.valid {
|
||||
ds.msgStream.Close()
|
||||
delete(d.dml, name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Broadcast broadcasts msg pack into specified channel
|
||||
func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
@ -89,16 +78,13 @@ func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error {
|
|||
if !ok {
|
||||
return fmt.Errorf("channel %s not exist", name)
|
||||
}
|
||||
if err := ds.msgStream.Broadcast(pack); err != nil {
|
||||
if err := ds.Broadcast(pack); err != nil {
|
||||
return err
|
||||
}
|
||||
if !ds.valid {
|
||||
ds.msgStream.Close()
|
||||
delete(d.dml, name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BroadcastAll invoke broadcast with provided msg pack in all channels specified
|
||||
func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) error {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
@ -108,17 +94,14 @@ func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) e
|
|||
if !ok {
|
||||
return fmt.Errorf("channel %s not exist", ch)
|
||||
}
|
||||
if err := ds.msgStream.Broadcast(pack); err != nil {
|
||||
if err := ds.Broadcast(pack); err != nil {
|
||||
return err
|
||||
}
|
||||
if !ds.valid {
|
||||
ds.msgStream.Close()
|
||||
delete(d.dml, ch)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddProducerChannels add named channels as producer
|
||||
func (d *dmlChannels) AddProducerChannels(names ...string) {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
@ -133,22 +116,20 @@ func (d *dmlChannels) AddProducerChannels(names ...string) {
|
|||
continue
|
||||
}
|
||||
ms.AsProducer([]string{name})
|
||||
d.dml[name] = &dmlStream{
|
||||
msgStream: ms,
|
||||
valid: true,
|
||||
}
|
||||
d.dml[name] = ms
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveProducerChannels removes specified channels
|
||||
func (d *dmlChannels) RemoveProducerChannels(names ...string) {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
||||
for _, name := range names {
|
||||
//log.Debug("delete dml channel", zap.String("channel name", name))
|
||||
if ds, ok := d.dml[name]; ok {
|
||||
ds.valid = false
|
||||
ds.Close()
|
||||
delete(d.dml, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -286,6 +286,11 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
|
|||
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
|
||||
t.core.SendTimeTick(ts, reason)
|
||||
|
||||
for _, chanName := range collMeta.PhysicalChannelNames {
|
||||
// send tt into deleted channels to tell data_node to clear flowgragh
|
||||
t.core.chanTimeTick.SendChannelTimeTick(chanName, ts)
|
||||
}
|
||||
|
||||
// remove dml channel after send dd msg
|
||||
t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...)
|
||||
return nil
|
||||
|
|
|
@ -292,7 +292,7 @@ func (t *timetickSync) GetProxyNum() int {
|
|||
|
||||
// GetChanNum return the num of channel
|
||||
func (t *timetickSync) GetChanNum() int {
|
||||
return t.core.dmlChannels.GetNumChannles()
|
||||
return t.core.dmlChannels.GetNumChannels()
|
||||
}
|
||||
|
||||
func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp {
|
||||
|
|
Loading…
Reference in New Issue