remove and document some things

pull/10616/head
Jeff Wendling 2018-09-24 21:27:38 -06:00 committed by Edd Robinson
parent 8a21a3568a
commit b0a317a34c
4 changed files with 37 additions and 149 deletions

View File

@ -37,9 +37,6 @@ var (
// ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is
// attempted on a hot shard.
ErrShardNotIdle = errors.New("shard not idle")
// fieldsIndexMagicNumber is the file magic number for the fields index file.
fieldsIndexMagicNumber = []byte{0, 6, 1, 3}
)
// A ShardError implements the error interface, and contains extra

View File

@ -2,18 +2,13 @@ package tsdb
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"sync"
"unsafe"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/pkg/file"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/tsdb/internal"
)
//
@ -403,106 +398,10 @@ func (fs *MeasurementFieldSet) Save() error {
}
func (fs *MeasurementFieldSet) saveNoLock() error {
// No fields left, remove the fields index file
if len(fs.fields) == 0 {
return os.RemoveAll(fs.path)
}
// Write the new index to a temp file and rename when it's sync'd
path := fs.path + ".tmp"
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
}
defer os.RemoveAll(path)
if _, err := fd.Write(fieldsIndexMagicNumber); err != nil {
return err
}
pb := internal.MeasurementFieldSet{
Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
}
for name, mf := range fs.fields {
fs := &internal.MeasurementFields{
Name: name,
Fields: make([]*internal.Field, 0, mf.FieldN()),
}
mf.ForEachField(func(field string, typ influxql.DataType) bool {
fs.Fields = append(fs.Fields, &internal.Field{Name: field, Type: int32(typ)})
return true
})
pb.Measurements = append(pb.Measurements, fs)
}
b, err := proto.Marshal(&pb)
if err != nil {
return err
}
if _, err := fd.Write(b); err != nil {
return err
}
if err = fd.Sync(); err != nil {
return err
}
//close file handle before renaming to support Windows
if err = fd.Close(); err != nil {
return err
}
if err := file.RenameFile(path, fs.path); err != nil {
return err
}
return file.SyncDir(filepath.Dir(fs.path))
return errors.New("save removed")
}
func (fs *MeasurementFieldSet) load() error {
fs.mu.Lock()
defer fs.mu.Unlock()
fd, err := os.Open(fs.path)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
defer fd.Close()
var magic [4]byte
if _, err := fd.Read(magic[:]); err != nil {
return err
}
if !bytes.Equal(magic[:], fieldsIndexMagicNumber) {
return ErrUnknownFieldsFormat
}
var pb internal.MeasurementFieldSet
b, err := ioutil.ReadAll(fd)
if err != nil {
return err
}
if err := proto.Unmarshal(b, &pb); err != nil {
return err
}
fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements()))
for _, measurement := range pb.GetMeasurements() {
set := &MeasurementFields{
fields: make(map[string]*Field, len(measurement.GetFields())),
}
for _, field := range measurement.GetFields() {
set.fields[field.GetName()] = &Field{Name: field.GetName(), Type: influxql.DataType(field.GetType())}
}
fs.fields[measurement.GetName()] = set
}
return nil
}

View File

@ -25,64 +25,54 @@ const (
)
type Index interface {
Open() error
Close() error
WithLogger(*zap.Logger)
Open() error // used by this package
Close() error // used by this package
WithLogger(*zap.Logger) // used by this package
Database() string
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
DropMeasurement(name []byte) error
ForEachMeasurementName(fn func(name []byte) error) error
Database() string // used by this package
MeasurementExists(name []byte) (bool, error) // used by engine
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) // used by engine
ForEachMeasurementName(fn func(name []byte) error) error // used by engine
InitializeSeries(*SeriesCollection) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error
CreateSeriesListIfNotExists(*SeriesCollection) error
DropSeries(seriesID SeriesID, key []byte, cascade bool) error
DropMeasurementIfSeriesNotExist(name []byte) error
InitializeSeries(*SeriesCollection) error // used by engine
CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error // used by engine
CreateSeriesListIfNotExists(*SeriesCollection) error // used by engine
DropSeries(seriesID SeriesID, key []byte, cascade bool) error // used by engine
DropMeasurementIfSeriesNotExist(name []byte) error // used by engine
// Used to clean up series in inmem index that were dropped with a shard.
DropSeriesGlobal(key []byte) error
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) // used by engine
SeriesN() int64 // used by engine
SeriesSketches() (estimator.Sketch, estimator.Sketch, error) // used by engine
SeriesIDSet() *SeriesIDSet // used by idpe
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesIDSet() *SeriesIDSet
HasTagKey(name, key []byte) (bool, error) // used by this package
HasTagValue(name, key, value []byte) (bool, error) // used by this package
HasTagKey(name, key []byte) (bool, error)
HasTagValue(name, key, value []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) // used by this package
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
TagKeyCardinality(name, key []byte) int
TagKeyCardinality(name, key []byte) int // used by engine
// InfluxQL system iterators
MeasurementIterator() (MeasurementIterator, error)
TagKeyIterator(name []byte) (TagKeyIterator, error)
TagValueIterator(name, key []byte) (TagValueIterator, error)
MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error)
TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error)
TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error)
MeasurementIterator() (MeasurementIterator, error) // used by this package
TagKeyIterator(name []byte) (TagKeyIterator, error) // used by this package
TagValueIterator(name, key []byte) (TagValueIterator, error) // used by this package
MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) // used by this package
TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) // used by this package
TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) // used by this package
// Sets a shared fieldset from the engine.
FieldSet() *MeasurementFieldSet
SetFieldSet(fs *MeasurementFieldSet)
// Size of the index on disk, if applicable.
DiskSizeBytes() int64
// Bytes estimates the memory footprint of this Index, in bytes.
Bytes() int
// // Sets a shared fieldset from the engine.
FieldSet() *MeasurementFieldSet // used by this package
SetFieldSet(fs *MeasurementFieldSet) // used by engine
// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)
SetFieldName(measurement []byte, name string) // used by this package
Type() string
Type() string // used by this package
// Returns a unique reference ID to the index instance.
// For inmem, returns a reference to the backing Index, not ShardIndex.
UniqueReferenceID() uintptr
UniqueReferenceID() uintptr // used by this package
Rebuild()
Rebuild() // used by engine
}
// SeriesElem represents a generic series element.

View File

@ -17,6 +17,8 @@ import (
"golang.org/x/sync/errgroup"
)
const SeriesFileDirectory = "_series"
var (
ErrSeriesFileClosed = errors.New("tsdb: series file closed")
ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id")