mirror of https://github.com/milvus-io/milvus.git
81 lines
2.5 KiB
Go
81 lines
2.5 KiB
Go
package adaptor
|
|
|
|
import (
|
|
"context"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
|
|
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
|
|
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
)
|
|
|
|
var _ wal.Opener = (*openerAdaptorImpl)(nil)
|
|
|
|
// adaptImplsToOpener creates a new wal opener with opener impls.
|
|
func adaptImplsToOpener(opener walimpls.OpenerImpls, builders []interceptors.InterceptorBuilder) wal.Opener {
|
|
return &openerAdaptorImpl{
|
|
lifetime: typeutil.NewLifetime(),
|
|
opener: opener,
|
|
idAllocator: typeutil.NewIDAllocator(),
|
|
walInstances: typeutil.NewConcurrentMap[int64, wal.WAL](),
|
|
interceptorBuilders: builders,
|
|
}
|
|
}
|
|
|
|
// openerAdaptorImpl is the wrapper of OpenerImpls to Opener.
|
|
type openerAdaptorImpl struct {
|
|
lifetime *typeutil.Lifetime
|
|
opener walimpls.OpenerImpls
|
|
idAllocator *typeutil.IDAllocator
|
|
walInstances *typeutil.ConcurrentMap[int64, wal.WAL] // store all wal instances allocated by these allocator.
|
|
interceptorBuilders []interceptors.InterceptorBuilder
|
|
}
|
|
|
|
// Open opens a wal instance for the channel.
|
|
func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal.WAL, error) {
|
|
if !o.lifetime.Add(typeutil.LifetimeStateWorking) {
|
|
return nil, status.NewOnShutdownError("wal opener is on shutdown")
|
|
}
|
|
defer o.lifetime.Done()
|
|
|
|
id := o.idAllocator.Allocate()
|
|
log := log.With(zap.Any("channel", opt.Channel), zap.Int64("id", id))
|
|
|
|
l, err := o.opener.Open(ctx, &walimpls.OpenOption{
|
|
Channel: opt.Channel,
|
|
})
|
|
if err != nil {
|
|
log.Warn("open wal failed", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
// wrap the wal into walExtend with cleanup function and interceptors.
|
|
wal := adaptImplsToWAL(l, o.interceptorBuilders, func() {
|
|
o.walInstances.Remove(id)
|
|
log.Info("wal deleted from opener")
|
|
})
|
|
|
|
o.walInstances.Insert(id, wal)
|
|
log.Info("new wal created")
|
|
return wal, nil
|
|
}
|
|
|
|
// Close the wal opener, release the underlying resources.
|
|
func (o *openerAdaptorImpl) Close() {
|
|
o.lifetime.SetState(typeutil.LifetimeStateStopped)
|
|
o.lifetime.Wait()
|
|
|
|
// close all wal instances.
|
|
o.walInstances.Range(func(id int64, l wal.WAL) bool {
|
|
l.Close()
|
|
log.Info("close wal by opener", zap.Int64("id", id), zap.Any("channel", l.Channel()))
|
|
return true
|
|
})
|
|
// close the opener
|
|
o.opener.Close()
|
|
}
|