remove code as reported by the unused tool
parent
b0a317a34c
commit
d44b583c4d
|
@ -74,14 +74,6 @@ type Config struct {
|
||||||
Engine string `toml:"-"`
|
Engine string `toml:"-"`
|
||||||
Index string `toml:"index-version"`
|
Index string `toml:"index-version"`
|
||||||
|
|
||||||
// General WAL configuration options
|
|
||||||
WALDir string `toml:"wal-dir"`
|
|
||||||
|
|
||||||
// WALFsyncDelay is the amount of time that a write will wait before fsyncing. A duration
|
|
||||||
// greater than 0 can be used to batch up multiple fsync calls. This is useful for slower
|
|
||||||
// disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL.
|
|
||||||
WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"`
|
|
||||||
|
|
||||||
// Enables unicode validation on series keys on write.
|
// Enables unicode validation on series keys on write.
|
||||||
ValidateKeys bool `toml:"validate-keys"`
|
ValidateKeys bool `toml:"validate-keys"`
|
||||||
|
|
||||||
|
@ -159,8 +151,6 @@ func NewConfig() Config {
|
||||||
func (c *Config) Validate() error {
|
func (c *Config) Validate() error {
|
||||||
if c.Dir == "" {
|
if c.Dir == "" {
|
||||||
return errors.New("Data.Dir must be specified")
|
return errors.New("Data.Dir must be specified")
|
||||||
} else if c.WALDir == "" {
|
|
||||||
return errors.New("Data.WALDir must be specified")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.MaxConcurrentCompactions < 0 {
|
if c.MaxConcurrentCompactions < 0 {
|
||||||
|
@ -196,8 +186,6 @@ func (c *Config) Validate() error {
|
||||||
func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
|
func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
|
||||||
return diagnostics.RowFromMap(map[string]interface{}{
|
return diagnostics.RowFromMap(map[string]interface{}{
|
||||||
"dir": c.Dir,
|
"dir": c.Dir,
|
||||||
"wal-dir": c.WALDir,
|
|
||||||
"wal-fsync-delay": c.WALFsyncDelay,
|
|
||||||
"cache-max-memory-size": c.CacheMaxMemorySize,
|
"cache-max-memory-size": c.CacheMaxMemorySize,
|
||||||
"cache-snapshot-memory-size": c.CacheSnapshotMemorySize,
|
"cache-snapshot-memory-size": c.CacheSnapshotMemorySize,
|
||||||
"cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration,
|
"cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration,
|
||||||
|
|
|
@ -55,17 +55,3 @@ type CursorIterator interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type CursorIterators []CursorIterator
|
type CursorIterators []CursorIterator
|
||||||
|
|
||||||
func CreateCursorIterators(ctx context.Context, shards []*Shard) (CursorIterators, error) {
|
|
||||||
q := make(CursorIterators, 0, len(shards))
|
|
||||||
for _, s := range shards {
|
|
||||||
// possible errors are ErrEngineClosed or ErrShardDisabled, so we can safely skip those shards
|
|
||||||
if cq, err := s.CreateCursorIterator(ctx); cq != nil && err == nil {
|
|
||||||
q = append(q, cq)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(q) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return q, nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -20,9 +20,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// ErrFormatNotFound is returned when no format can be determined from a path.
|
|
||||||
ErrFormatNotFound = errors.New("format not found")
|
|
||||||
|
|
||||||
// ErrUnknownEngineFormat is returned when the engine format is
|
// ErrUnknownEngineFormat is returned when the engine format is
|
||||||
// unknown. ErrUnknownEngineFormat is currently returned if a format
|
// unknown. ErrUnknownEngineFormat is currently returned if a format
|
||||||
// other than tsm1 is encountered.
|
// other than tsm1 is encountered.
|
||||||
|
@ -89,14 +86,6 @@ type SeriesIDSets interface {
|
||||||
ForEach(f func(ids *SeriesIDSet)) error
|
ForEach(f func(ids *SeriesIDSet)) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// EngineFormat represents the format for an engine.
|
|
||||||
type EngineFormat int
|
|
||||||
|
|
||||||
const (
|
|
||||||
// TSM1Format is the format used by the tsm1 engine.
|
|
||||||
TSM1Format EngineFormat = 2
|
|
||||||
)
|
|
||||||
|
|
||||||
// NewEngineFunc creates a new engine.
|
// NewEngineFunc creates a new engine.
|
||||||
type NewEngineFunc func(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine
|
type NewEngineFunc func(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine
|
||||||
|
|
||||||
|
|
|
@ -6,19 +6,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// ErrFieldOverflow is returned when too many fields are created on a measurement.
|
|
||||||
ErrFieldOverflow = errors.New("field overflow")
|
|
||||||
|
|
||||||
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
|
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
|
||||||
ErrFieldTypeConflict = errors.New("field type conflict")
|
ErrFieldTypeConflict = errors.New("field type conflict")
|
||||||
|
|
||||||
// ErrFieldNotFound is returned when a field cannot be found.
|
|
||||||
ErrFieldNotFound = errors.New("field not found")
|
|
||||||
|
|
||||||
// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
|
|
||||||
// there is no mapping for.
|
|
||||||
ErrFieldUnmappedID = errors.New("field ID not mapped")
|
|
||||||
|
|
||||||
// ErrEngineClosed is returned when a caller attempts indirectly to
|
// ErrEngineClosed is returned when a caller attempts indirectly to
|
||||||
// access the shard's underlying engine.
|
// access the shard's underlying engine.
|
||||||
ErrEngineClosed = errors.New("engine is closed")
|
ErrEngineClosed = errors.New("engine is closed")
|
||||||
|
@ -27,16 +17,8 @@ var (
|
||||||
// queries or writes.
|
// queries or writes.
|
||||||
ErrShardDisabled = errors.New("shard is disabled")
|
ErrShardDisabled = errors.New("shard is disabled")
|
||||||
|
|
||||||
// ErrUnknownFieldsFormat is returned when the fields index file is not identifiable by
|
|
||||||
// the file's magic number.
|
|
||||||
ErrUnknownFieldsFormat = errors.New("unknown field index format")
|
|
||||||
|
|
||||||
// ErrUnknownFieldType is returned when the type of a field cannot be determined.
|
// ErrUnknownFieldType is returned when the type of a field cannot be determined.
|
||||||
ErrUnknownFieldType = errors.New("unknown field type")
|
ErrUnknownFieldType = errors.New("unknown field type")
|
||||||
|
|
||||||
// ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is
|
|
||||||
// attempted on a hot shard.
|
|
||||||
ErrShardNotIdle = errors.New("shard not idle")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// A ShardError implements the error interface, and contains extra
|
// A ShardError implements the error interface, and contains extra
|
||||||
|
|
|
@ -220,13 +220,6 @@ func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.Dat
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MeasurementFields) FieldN() int {
|
|
||||||
m.mu.RLock()
|
|
||||||
n := len(m.fields)
|
|
||||||
m.mu.RUnlock()
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
// Field returns the field for name, or nil if there is no field for name.
|
// Field returns the field for name, or nil if there is no field for name.
|
||||||
func (m *MeasurementFields) Field(name string) *Field {
|
func (m *MeasurementFields) Field(name string) *Field {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
|
@ -256,28 +249,6 @@ func (m *MeasurementFields) FieldBytes(name []byte) *Field {
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
|
||||||
// FieldSet returns the set of fields and their types for the measurement.
|
|
||||||
func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
|
|
||||||
m.mu.RLock()
|
|
||||||
defer m.mu.RUnlock()
|
|
||||||
|
|
||||||
fields := make(map[string]influxql.DataType)
|
|
||||||
for name, f := range m.fields {
|
|
||||||
fields[name] = f.Type
|
|
||||||
}
|
|
||||||
return fields
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) {
|
|
||||||
m.mu.RLock()
|
|
||||||
defer m.mu.RUnlock()
|
|
||||||
for name, f := range m.fields {
|
|
||||||
if !fn(name, f.Type) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clone returns copy of the MeasurementFields
|
// Clone returns copy of the MeasurementFields
|
||||||
func (m *MeasurementFields) Clone() *MeasurementFields {
|
func (m *MeasurementFields) Clone() *MeasurementFields {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
|
@ -364,13 +335,6 @@ func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name []byte) *Measurement
|
||||||
return mf
|
return mf
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete removes a field set for a measurement.
|
|
||||||
func (fs *MeasurementFieldSet) Delete(name string) {
|
|
||||||
fs.mu.Lock()
|
|
||||||
delete(fs.fields, name)
|
|
||||||
fs.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteWithLock executes fn and removes a field set from a measurement under lock.
|
// DeleteWithLock executes fn and removes a field set from a measurement under lock.
|
||||||
func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) error {
|
func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) error {
|
||||||
fs.mu.Lock()
|
fs.mu.Lock()
|
||||||
|
|
1053
tsdb/index.go
1053
tsdb/index.go
File diff suppressed because it is too large
Load Diff
|
@ -316,7 +316,6 @@ func (i *SeriesCollectionIterator) Next() bool {
|
||||||
// Helpers that return the current state of the iterator.
|
// Helpers that return the current state of the iterator.
|
||||||
|
|
||||||
func (i SeriesCollectionIterator) Index() int { return i.index }
|
func (i SeriesCollectionIterator) Index() int { return i.index }
|
||||||
func (i SeriesCollectionIterator) Length() int { return i.length }
|
|
||||||
func (i SeriesCollectionIterator) Point() models.Point { return i.s.Points[i.index] }
|
func (i SeriesCollectionIterator) Point() models.Point { return i.s.Points[i.index] }
|
||||||
func (i SeriesCollectionIterator) Key() []byte { return i.s.Keys[i.index] }
|
func (i SeriesCollectionIterator) Key() []byte { return i.s.Keys[i.index] }
|
||||||
func (i SeriesCollectionIterator) SeriesKey() []byte { return i.s.SeriesKeys[i.index] }
|
func (i SeriesCollectionIterator) SeriesKey() []byte { return i.s.SeriesKeys[i.index] }
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"errors"
|
"errors"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -35,21 +34,6 @@ type SeriesCursorRow struct {
|
||||||
Tags models.Tags
|
Tags models.Tags
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SeriesCursorRow) Compare(other *SeriesCursorRow) int {
|
|
||||||
if r == other {
|
|
||||||
return 0
|
|
||||||
} else if r == nil {
|
|
||||||
return -1
|
|
||||||
} else if other == nil {
|
|
||||||
return 1
|
|
||||||
}
|
|
||||||
cmp := bytes.Compare(r.Name, other.Name)
|
|
||||||
if cmp != 0 {
|
|
||||||
return cmp
|
|
||||||
}
|
|
||||||
return models.CompareTags(r.Tags, other.Tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newSeriesCursor returns a new instance of SeriesCursor.
|
// newSeriesCursor returns a new instance of SeriesCursor.
|
||||||
func newSeriesCursor(req SeriesCursorRequest, indexSet IndexSet, cond influxql.Expr) (_ SeriesCursor, err error) {
|
func newSeriesCursor(req SeriesCursorRequest, indexSet IndexSet, cond influxql.Expr) (_ SeriesCursor, err error) {
|
||||||
// Only equality operators are allowed.
|
// Only equality operators are allowed.
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/cespare/xxhash"
|
"github.com/cespare/xxhash"
|
||||||
|
@ -20,13 +19,9 @@ import (
|
||||||
const SeriesFileDirectory = "_series"
|
const SeriesFileDirectory = "_series"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrSeriesFileClosed = errors.New("tsdb: series file closed")
|
|
||||||
ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id")
|
ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id")
|
||||||
)
|
)
|
||||||
|
|
||||||
// SeriesIDSize is the size in bytes of a series key ID.
|
|
||||||
const SeriesIDSize = 8
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// SeriesFilePartitionN is the number of partitions a series file is split into.
|
// SeriesFilePartitionN is the number of partitions a series file is split into.
|
||||||
SeriesFilePartitionN = 8
|
SeriesFilePartitionN = 8
|
||||||
|
@ -99,9 +94,6 @@ func (f *SeriesFile) SeriesPartitionPath(i int) string {
|
||||||
return filepath.Join(f.path, fmt.Sprintf("%02x", i))
|
return filepath.Join(f.path, fmt.Sprintf("%02x", i))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Partitions returns all partitions.
|
|
||||||
func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions }
|
|
||||||
|
|
||||||
// Retain adds a reference count to the file. It returns a release func.
|
// Retain adds a reference count to the file. It returns a release func.
|
||||||
func (f *SeriesFile) Retain() func() {
|
func (f *SeriesFile) Retain() func() {
|
||||||
if f != nil {
|
if f != nil {
|
||||||
|
@ -188,15 +180,6 @@ func (f *SeriesFile) SeriesKey(id SeriesID) []byte {
|
||||||
return p.SeriesKey(id)
|
return p.SeriesKey(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeriesKeys returns a list of series keys from a list of ids.
|
|
||||||
func (f *SeriesFile) SeriesKeys(ids []SeriesID) [][]byte {
|
|
||||||
keys := make([][]byte, len(ids))
|
|
||||||
for i := range ids {
|
|
||||||
keys[i] = f.SeriesKey(ids[i])
|
|
||||||
}
|
|
||||||
return keys
|
|
||||||
}
|
|
||||||
|
|
||||||
// Series returns the parsed series name and tags for an offset.
|
// Series returns the parsed series name and tags for an offset.
|
||||||
func (f *SeriesFile) Series(id SeriesID) ([]byte, models.Tags) {
|
func (f *SeriesFile) Series(id SeriesID) ([]byte, models.Tags) {
|
||||||
key := f.SeriesKey(id)
|
key := f.SeriesKey(id)
|
||||||
|
@ -221,25 +204,6 @@ func (f *SeriesFile) HasSeries(name []byte, tags models.Tags, buf []byte) bool {
|
||||||
return !f.SeriesID(name, tags, buf).IsZero()
|
return !f.SeriesID(name, tags, buf).IsZero()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeriesCount returns the number of series.
|
|
||||||
func (f *SeriesFile) SeriesCount() uint64 {
|
|
||||||
var n uint64
|
|
||||||
for _, p := range f.partitions {
|
|
||||||
n += p.SeriesCount()
|
|
||||||
}
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
// SeriesIterator returns an iterator over all the series.
|
|
||||||
func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator {
|
|
||||||
var ids []SeriesID
|
|
||||||
for _, p := range f.partitions {
|
|
||||||
ids = p.AppendSeriesIDs(ids)
|
|
||||||
}
|
|
||||||
sort.Slice(ids, func(i, j int) bool { return ids[i].Less(ids[j]) })
|
|
||||||
return NewSeriesIDSliceIterator(ids)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *SeriesFile) SeriesIDPartitionID(id SeriesID) int {
|
func (f *SeriesFile) SeriesIDPartitionID(id SeriesID) int {
|
||||||
return int((id.RawID() - 1) % SeriesFilePartitionN)
|
return int((id.RawID() - 1) % SeriesFilePartitionN)
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,11 +12,9 @@ const (
|
||||||
seriesIDValueMask = 0xFFFFFFFF // series ids numerically are 32 bits
|
seriesIDValueMask = 0xFFFFFFFF // series ids numerically are 32 bits
|
||||||
seriesIDTypeShift = 32 // we put the type right after the value info
|
seriesIDTypeShift = 32 // we put the type right after the value info
|
||||||
seriesIDTypeMask = 0xFF << seriesIDTypeShift // a mask for the type byte
|
seriesIDTypeMask = 0xFF << seriesIDTypeShift // a mask for the type byte
|
||||||
|
seriesIDSize = 8
|
||||||
)
|
)
|
||||||
|
|
||||||
// SeriesIDHasType returns if the raw id contains type information.
|
|
||||||
func SeriesIDHasType(id uint64) bool { return id&seriesIDTypeFlag > 0 }
|
|
||||||
|
|
||||||
// SeriesID is the type of a series id. It is logically a uint64, but encoded as a struct so
|
// SeriesID is the type of a series id. It is logically a uint64, but encoded as a struct so
|
||||||
// that we gain more type checking when changing operations on it. The field is exported only
|
// that we gain more type checking when changing operations on it. The field is exported only
|
||||||
// so that tests that use reflection based comparisons still work; no one should use the field
|
// so that tests that use reflection based comparisons still work; no one should use the field
|
||||||
|
@ -72,9 +70,9 @@ func (s SeriesIDTyped) Type() models.FieldType {
|
||||||
type (
|
type (
|
||||||
// some static assertions that the SeriesIDSize matches the structs we defined.
|
// some static assertions that the SeriesIDSize matches the structs we defined.
|
||||||
// if the values are not the same, at least one will be negative causing a compilation failure
|
// if the values are not the same, at least one will be negative causing a compilation failure
|
||||||
_ [SeriesIDSize - unsafe.Sizeof(SeriesID{})]byte
|
_ [seriesIDSize - unsafe.Sizeof(SeriesID{})]byte
|
||||||
_ [unsafe.Sizeof(SeriesID{}) - SeriesIDSize]byte
|
_ [unsafe.Sizeof(SeriesID{}) - seriesIDSize]byte
|
||||||
|
|
||||||
_ [SeriesIDSize - unsafe.Sizeof(SeriesIDTyped{})]byte
|
_ [seriesIDSize - unsafe.Sizeof(SeriesIDTyped{})]byte
|
||||||
_ [unsafe.Sizeof(SeriesIDTyped{}) - SeriesIDSize]byte
|
_ [unsafe.Sizeof(SeriesIDTyped{}) - seriesIDSize]byte
|
||||||
)
|
)
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/models"
|
|
||||||
"github.com/influxdata/influxdb/pkg/mmap"
|
"github.com/influxdata/influxdb/pkg/mmap"
|
||||||
"github.com/influxdata/influxdb/pkg/rhh"
|
"github.com/influxdata/influxdb/pkg/rhh"
|
||||||
)
|
)
|
||||||
|
@ -215,27 +214,6 @@ func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (idx *SeriesIndex) FindIDByNameTags(segments []*SeriesSegment, name []byte, tags models.Tags, buf []byte) SeriesIDTyped {
|
|
||||||
id := idx.FindIDBySeriesKey(segments, AppendSeriesKey(buf[:0], name, tags))
|
|
||||||
if _, ok := idx.tombstones[id.SeriesID()]; ok {
|
|
||||||
return SeriesIDTyped{}
|
|
||||||
}
|
|
||||||
return id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (idx *SeriesIndex) FindIDListByNameTags(segments []*SeriesSegment, names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []SeriesIDTyped, ok bool) {
|
|
||||||
ids, ok = make([]SeriesIDTyped, len(names)), true
|
|
||||||
for i := range names {
|
|
||||||
id := idx.FindIDByNameTags(segments, names[i], tagsSlice[i], buf)
|
|
||||||
if id.IsZero() {
|
|
||||||
ok = false
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
ids[i] = id
|
|
||||||
}
|
|
||||||
return ids, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (idx *SeriesIndex) FindOffsetByID(id SeriesID) int64 {
|
func (idx *SeriesIndex) FindOffsetByID(id SeriesID) int64 {
|
||||||
if offset := idx.idOffsetMap[id]; offset != 0 {
|
if offset := idx.idOffsetMap[id]; offset != 0 {
|
||||||
return offset
|
return offset
|
||||||
|
|
|
@ -311,13 +311,6 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(collection *SeriesCollecti
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compacting returns if the SeriesPartition is currently compacting.
|
|
||||||
func (p *SeriesPartition) Compacting() bool {
|
|
||||||
p.mu.RLock()
|
|
||||||
defer p.mu.RUnlock()
|
|
||||||
return p.compacting
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteSeriesID flags a series as permanently deleted.
|
// DeleteSeriesID flags a series as permanently deleted.
|
||||||
// If the series is reintroduced later then it must create a new id.
|
// If the series is reintroduced later then it must create a new id.
|
||||||
func (p *SeriesPartition) DeleteSeriesID(id SeriesID) error {
|
func (p *SeriesPartition) DeleteSeriesID(id SeriesID) error {
|
||||||
|
@ -372,15 +365,6 @@ func (p *SeriesPartition) SeriesKey(id SeriesID) []byte {
|
||||||
return key
|
return key
|
||||||
}
|
}
|
||||||
|
|
||||||
// Series returns the parsed series name and tags for an offset.
|
|
||||||
func (p *SeriesPartition) Series(id SeriesID) ([]byte, models.Tags) {
|
|
||||||
key := p.SeriesKey(id)
|
|
||||||
if key == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return ParseSeriesKey(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// FindIDBySeriesKey return the series id for the series key.
|
// FindIDBySeriesKey return the series id for the series key.
|
||||||
func (p *SeriesPartition) FindIDBySeriesKey(key []byte) SeriesID {
|
func (p *SeriesPartition) FindIDBySeriesKey(key []byte) SeriesID {
|
||||||
p.mu.RLock()
|
p.mu.RLock()
|
||||||
|
@ -393,18 +377,6 @@ func (p *SeriesPartition) FindIDBySeriesKey(key []byte) SeriesID {
|
||||||
return id.SeriesID()
|
return id.SeriesID()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeriesCount returns the number of series.
|
|
||||||
func (p *SeriesPartition) SeriesCount() uint64 {
|
|
||||||
p.mu.RLock()
|
|
||||||
if p.closed {
|
|
||||||
p.mu.RUnlock()
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
n := p.index.Count()
|
|
||||||
p.mu.RUnlock()
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *SeriesPartition) DisableCompactions() {
|
func (p *SeriesPartition) DisableCompactions() {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
@ -425,14 +397,6 @@ func (p *SeriesPartition) compactionsEnabled() bool {
|
||||||
return p.compactionsDisabled == 0
|
return p.compactionsDisabled == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendSeriesIDs returns a list of all series ids.
|
|
||||||
func (p *SeriesPartition) AppendSeriesIDs(a []SeriesID) []SeriesID {
|
|
||||||
for _, segment := range p.segments {
|
|
||||||
a = segment.AppendSeriesIDs(a)
|
|
||||||
}
|
|
||||||
return a
|
|
||||||
}
|
|
||||||
|
|
||||||
// activeSegment returns the last segment.
|
// activeSegment returns the last segment.
|
||||||
func (p *SeriesPartition) activeSegment() *SeriesSegment {
|
func (p *SeriesPartition) activeSegment() *SeriesSegment {
|
||||||
if len(p.segments) == 0 {
|
if len(p.segments) == 0 {
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/pkg/mmap"
|
"github.com/influxdata/influxdb/pkg/mmap"
|
||||||
|
@ -168,16 +167,9 @@ func (s *SeriesSegment) CloseForWrite() (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Data returns the raw data.
|
|
||||||
func (s *SeriesSegment) Data() []byte { return s.data }
|
|
||||||
|
|
||||||
// ID returns the id the segment was initialized with.
|
// ID returns the id the segment was initialized with.
|
||||||
func (s *SeriesSegment) ID() uint16 { return s.id }
|
func (s *SeriesSegment) ID() uint16 { return s.id }
|
||||||
|
|
||||||
// Size returns the size of the data in the segment.
|
|
||||||
// This is only populated once InitForWrite() is called.
|
|
||||||
func (s *SeriesSegment) Size() int64 { return int64(s.size) }
|
|
||||||
|
|
||||||
// Slice returns a byte slice starting at pos.
|
// Slice returns a byte slice starting at pos.
|
||||||
func (s *SeriesSegment) Slice(pos uint32) []byte { return s.data[pos:] }
|
func (s *SeriesSegment) Slice(pos uint32) []byte { return s.data[pos:] }
|
||||||
|
|
||||||
|
@ -210,17 +202,6 @@ func (s *SeriesSegment) Flush() error {
|
||||||
return s.w.Flush()
|
return s.w.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendSeriesIDs appends all the segments ids to a slice. Returns the new slice.
|
|
||||||
func (s *SeriesSegment) AppendSeriesIDs(a []SeriesID) []SeriesID {
|
|
||||||
s.ForEachEntry(func(flag uint8, id SeriesIDTyped, _ int64, _ []byte) error {
|
|
||||||
if flag == SeriesEntryInsertFlag {
|
|
||||||
a = append(a, id.SeriesID())
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
return a
|
|
||||||
}
|
|
||||||
|
|
||||||
// MaxSeriesID returns the highest series id in the segment.
|
// MaxSeriesID returns the highest series id in the segment.
|
||||||
func (s *SeriesSegment) MaxSeriesID() SeriesID {
|
func (s *SeriesSegment) MaxSeriesID() SeriesID {
|
||||||
var max SeriesID
|
var max SeriesID
|
||||||
|
@ -302,19 +283,12 @@ func SplitSeriesOffset(offset int64) (segmentID uint16, pos uint32) {
|
||||||
return uint16((offset >> 32) & 0xFFFF), uint32(offset & 0xFFFFFFFF)
|
return uint16((offset >> 32) & 0xFFFF), uint32(offset & 0xFFFFFFFF)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsValidSeriesSegmentFilename returns true if filename is a 4-character lowercase hexidecimal number.
|
|
||||||
func IsValidSeriesSegmentFilename(filename string) bool {
|
|
||||||
return seriesSegmentFilenameRegex.MatchString(filename)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ParseSeriesSegmentFilename returns the id represented by the hexidecimal filename.
|
// ParseSeriesSegmentFilename returns the id represented by the hexidecimal filename.
|
||||||
func ParseSeriesSegmentFilename(filename string) (uint16, error) {
|
func ParseSeriesSegmentFilename(filename string) (uint16, error) {
|
||||||
i, err := strconv.ParseUint(filename, 16, 32)
|
i, err := strconv.ParseUint(filename, 16, 32)
|
||||||
return uint16(i), err
|
return uint16(i), err
|
||||||
}
|
}
|
||||||
|
|
||||||
var seriesSegmentFilenameRegex = regexp.MustCompile(`^[0-9a-f]{4}$`)
|
|
||||||
|
|
||||||
// SeriesSegmentSize returns the maximum size of the segment.
|
// SeriesSegmentSize returns the maximum size of the segment.
|
||||||
// The size goes up by powers of 2 starting from 4MB and reaching 256MB.
|
// The size goes up by powers of 2 starting from 4MB and reaching 256MB.
|
||||||
func SeriesSegmentSize(id uint16) uint32 {
|
func SeriesSegmentSize(id uint16) uint32 {
|
||||||
|
|
|
@ -108,19 +108,6 @@ func (s *SeriesIDSet) Merge(others ...*SeriesIDSet) {
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Equals returns true if other and s are the same set of ids.
|
|
||||||
func (s *SeriesIDSet) Equals(other *SeriesIDSet) bool {
|
|
||||||
if s == other {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
s.RLock()
|
|
||||||
defer s.RUnlock()
|
|
||||||
other.RLock()
|
|
||||||
defer other.RUnlock()
|
|
||||||
return s.bitmap.Equals(other.bitmap)
|
|
||||||
}
|
|
||||||
|
|
||||||
// And returns a new SeriesIDSet containing elements that were present in s and other.
|
// And returns a new SeriesIDSet containing elements that were present in s and other.
|
||||||
func (s *SeriesIDSet) And(other *SeriesIDSet) *SeriesIDSet {
|
func (s *SeriesIDSet) And(other *SeriesIDSet) *SeriesIDSet {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
|
@ -176,13 +163,6 @@ func (s *SeriesIDSet) Diff(other *SeriesIDSet) {
|
||||||
s.bitmap = roaring.AndNot(s.bitmap, other.bitmap)
|
s.bitmap = roaring.AndNot(s.bitmap, other.bitmap)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clone returns a new SeriesIDSet with a deep copy of the underlying bitmap.
|
|
||||||
func (s *SeriesIDSet) Clone() *SeriesIDSet {
|
|
||||||
s.RLock()
|
|
||||||
defer s.RUnlock()
|
|
||||||
return s.CloneNoLock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// CloneNoLock calls Clone without taking a lock.
|
// CloneNoLock calls Clone without taking a lock.
|
||||||
func (s *SeriesIDSet) CloneNoLock() *SeriesIDSet {
|
func (s *SeriesIDSet) CloneNoLock() *SeriesIDSet {
|
||||||
new := NewSeriesIDSet()
|
new := NewSeriesIDSet()
|
||||||
|
|
|
@ -86,15 +86,6 @@ func (s *Shard) SetEnabled(enabled bool) {
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ScheduleFullCompaction forces a full compaction to be schedule on the shard.
|
|
||||||
func (s *Shard) ScheduleFullCompaction() error {
|
|
||||||
engine, err := s.Engine()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return engine.ScheduleFullCompaction()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ID returns the shards ID.
|
// ID returns the shards ID.
|
||||||
func (s *Shard) ID() uint64 {
|
func (s *Shard) ID() uint64 {
|
||||||
return s.id
|
return s.id
|
||||||
|
|
Loading…
Reference in New Issue