milvus/internal/streamingnode/server/wal/adaptor/opener.go

81 lines
2.5 KiB
Go

package adaptor
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ wal.Opener = (*openerAdaptorImpl)(nil)
// adaptImplsToOpener creates a new wal opener with opener impls.
func adaptImplsToOpener(opener walimpls.OpenerImpls, builders []interceptors.InterceptorBuilder) wal.Opener {
return &openerAdaptorImpl{
lifetime: typeutil.NewLifetime(),
opener: opener,
idAllocator: typeutil.NewIDAllocator(),
walInstances: typeutil.NewConcurrentMap[int64, wal.WAL](),
interceptorBuilders: builders,
}
}
// openerAdaptorImpl is the wrapper of OpenerImpls to Opener.
type openerAdaptorImpl struct {
lifetime *typeutil.Lifetime
opener walimpls.OpenerImpls
idAllocator *typeutil.IDAllocator
walInstances *typeutil.ConcurrentMap[int64, wal.WAL] // store all wal instances allocated by these allocator.
interceptorBuilders []interceptors.InterceptorBuilder
}
// Open opens a wal instance for the channel.
func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal.WAL, error) {
if !o.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, status.NewOnShutdownError("wal opener is on shutdown")
}
defer o.lifetime.Done()
id := o.idAllocator.Allocate()
log := log.With(zap.Any("channel", opt.Channel), zap.Int64("id", id))
l, err := o.opener.Open(ctx, &walimpls.OpenOption{
Channel: opt.Channel,
})
if err != nil {
log.Warn("open wal failed", zap.Error(err))
return nil, err
}
// wrap the wal into walExtend with cleanup function and interceptors.
wal := adaptImplsToWAL(l, o.interceptorBuilders, func() {
o.walInstances.Remove(id)
log.Info("wal deleted from opener")
})
o.walInstances.Insert(id, wal)
log.Info("new wal created")
return wal, nil
}
// Close the wal opener, release the underlying resources.
func (o *openerAdaptorImpl) Close() {
o.lifetime.SetState(typeutil.LifetimeStateStopped)
o.lifetime.Wait()
// close all wal instances.
o.walInstances.Range(func(id int64, l wal.WAL) bool {
l.Close()
log.Info("close wal by opener", zap.Int64("id", id), zap.Any("channel", l.Channel()))
return true
})
// close the opener
o.opener.Close()
}