fix: Ensure TemporaryEngine returns new inner TSDB store
parent
f38f19787f
commit
b3138d2ead
|
@ -13,7 +13,6 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/kit/prom"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/storage"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -31,7 +30,7 @@ type Engine interface {
|
|||
|
||||
SeriesCardinality(orgID, bucketID influxdb.ID) int64
|
||||
|
||||
TSDBStore() *tsdb.Store
|
||||
TSDBStore() storage.TSDBStore
|
||||
MetaClient() storage.MetaClient
|
||||
|
||||
WithLogger(log *zap.Logger)
|
||||
|
@ -52,7 +51,8 @@ type TemporaryEngine struct {
|
|||
mu sync.Mutex
|
||||
opened bool
|
||||
|
||||
engine *storage.Engine
|
||||
engine *storage.Engine
|
||||
tsdbStore temporaryTSDBStore
|
||||
|
||||
log *zap.Logger
|
||||
}
|
||||
|
@ -90,6 +90,8 @@ func (t *TemporaryEngine) Open(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
t.tsdbStore.TSDBStore = t.engine.TSDBStore()
|
||||
|
||||
t.opened = true
|
||||
return nil
|
||||
}
|
||||
|
@ -168,10 +170,14 @@ func (t *TemporaryEngine) InternalBackupPath(backupID int) string {
|
|||
return t.engine.InternalBackupPath(backupID)
|
||||
}
|
||||
|
||||
func (t *TemporaryEngine) TSDBStore() *tsdb.Store {
|
||||
return t.engine.TSDBStore()
|
||||
func (t *TemporaryEngine) TSDBStore() storage.TSDBStore {
|
||||
return &t.tsdbStore
|
||||
}
|
||||
|
||||
func (t *TemporaryEngine) MetaClient() storage.MetaClient {
|
||||
return t.engine.MetaClient()
|
||||
}
|
||||
|
||||
type temporaryTSDBStore struct {
|
||||
storage.TSDBStore
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/influxql/query"
|
||||
"github.com/influxdata/influxdb/v2/kit/tracing"
|
||||
"github.com/influxdata/influxdb/v2/logger"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
|
@ -17,6 +18,7 @@ import (
|
|||
_ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
|
||||
"github.com/influxdata/influxdb/v2/v1/coordinator"
|
||||
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
|
@ -114,6 +116,14 @@ type MetaClient interface {
|
|||
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
|
||||
}
|
||||
|
||||
type TSDBStore interface {
|
||||
MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
|
||||
ShardGroup(ids []uint64) tsdb.ShardGroup
|
||||
Shards(ids []uint64) []*tsdb.Shard
|
||||
TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
|
||||
TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
|
||||
}
|
||||
|
||||
// NewEngine initialises a new storage engine, including a series file, index and
|
||||
// TSM engine.
|
||||
func NewEngine(path string, c Config, options ...Option) *Engine {
|
||||
|
@ -459,10 +469,10 @@ func (e *Engine) Path() string {
|
|||
return e.path
|
||||
}
|
||||
|
||||
func (t *Engine) TSDBStore() *tsdb.Store {
|
||||
return t.tsdbStore
|
||||
func (e *Engine) TSDBStore() TSDBStore {
|
||||
return e.tsdbStore
|
||||
}
|
||||
|
||||
func (t *Engine) MetaClient() MetaClient {
|
||||
return t.metaClient
|
||||
func (e *Engine) MetaClient() MetaClient {
|
||||
return e.metaClient
|
||||
}
|
||||
|
|
|
@ -25,6 +25,19 @@ var (
|
|||
ErrMissingReadSource = errors.New("missing ReadSource")
|
||||
)
|
||||
|
||||
type TSDBStore interface {
|
||||
MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
|
||||
ShardGroup(ids []uint64) tsdb.ShardGroup
|
||||
Shards(ids []uint64) []*tsdb.Shard
|
||||
TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
|
||||
TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
|
||||
}
|
||||
|
||||
type MetaClient interface {
|
||||
Database(name string) *meta.DatabaseInfo
|
||||
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
|
||||
}
|
||||
|
||||
// getReadSource will attempt to unmarshal a ReadSource from the ReadRequest or
|
||||
// return an error if no valid resource is present.
|
||||
func GetReadSource(any types.Any) (*ReadSource, error) {
|
||||
|
@ -35,18 +48,13 @@ func GetReadSource(any types.Any) (*ReadSource, error) {
|
|||
return &source, nil
|
||||
}
|
||||
|
||||
type MetaClient interface {
|
||||
Database(name string) *meta.DatabaseInfo
|
||||
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
TSDBStore *tsdb.Store
|
||||
TSDBStore TSDBStore
|
||||
MetaClient MetaClient
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewStore(store *tsdb.Store, metaClient MetaClient) *Store {
|
||||
func NewStore(store TSDBStore, metaClient MetaClient) *Store {
|
||||
return &Store{
|
||||
TSDBStore: store,
|
||||
MetaClient: metaClient,
|
||||
|
|
Loading…
Reference in New Issue