pull/9150/head
Ben Johnson 2017-12-20 15:13:34 -07:00
parent 8b2dbf4d83
commit d8b1d208c0
No known key found for this signature in database
GPG Key ID: 81741CD251883081
12 changed files with 103 additions and 200 deletions

View File

@ -1020,8 +1020,7 @@ func (c *CommandLine) help() {
show field keys show field key information
A full list of influxql commands can be found at:
https://docs.influxdata.com/influxdb/latest/query_language/spec/
`)
https://docs.influxdata.com/influxdb/latest/query_language/spec/`)
}
func (c *CommandLine) history() {
@ -1091,9 +1090,7 @@ func (c *CommandLine) gopher() {
o: -h///++////-.
/: .o/
//+ 'y
./sooy.
`)
./sooy.`)
}
// Version prints the CLI version.

View File

@ -104,8 +104,7 @@ Examples:
$ influx -database 'metrics' -execute 'select * from cpu' -format 'json' -pretty
# Connect to a specific database on startup and set database context:
$ influx -database 'metrics' -host 'localhost' -port '8086'
`)
$ influx -database 'metrics' -host 'localhost' -port '8086'`)
}
fs.Parse(os.Args[1:])

View File

@ -19,7 +19,6 @@ import (
"github.com/influxdata/influxdb/cmd/influxd/backup_util"
"github.com/influxdata/influxdb/services/snapshotter"
"github.com/influxdata/influxdb/tcp"
"github.com/influxdata/influxdb/tsdb"
)
const (
@ -301,21 +300,6 @@ func (cmd *Command) backupShard(db, rp, sid string) error {
}
// backupSeriesFile will write a tar archive of the series file for the database.
func (cmd *Command) backupSeriesFile() error {
seriesFileArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, tsdb.SeriesFileName))
if err != nil {
return err
}
cmd.StdoutLogger.Printf("backing up series file to %s", seriesFileArchivePath)
req := &snapshotter.Request{
Type: snapshotter.RequestSeriesFileBackup,
BackupDatabase: cmd.database,
}
return cmd.downloadAndVerify(req, seriesFileArchivePath, nil)
}
// backupDatabase will request the database information from the server and then backup
// every shard in every retention policy in the database. Each shard will be written to a separate file.
func (cmd *Command) backupDatabase() error {
@ -370,10 +354,6 @@ func (cmd *Command) backupResponsePaths(response *snapshotter.Response) error {
}
}
if err := cmd.backupSeriesFile(); err != nil {
return err
}
return nil
}

View File

@ -17,7 +17,6 @@ import (
"github.com/influxdata/influxdb/cmd/influxd/backup_util"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/services/snapshotter"
"github.com/influxdata/influxdb/tsdb"
)
// Command represents the program execution for "influxd restore".
@ -57,10 +56,6 @@ func (cmd *Command) Run(args ...string) error {
}
}
if err := cmd.unpackSeriesFile(); err != nil {
return err
}
if cmd.shard != "" {
return cmd.unpackShard(cmd.shard)
} else if cmd.retention != "" {
@ -117,42 +112,6 @@ func (cmd *Command) parseFlags(args []string) error {
return nil
}
// unpackSeriesFile reads the series file and restores it.
func (cmd *Command) unpackSeriesFile() error {
files, err := filepath.Glob(filepath.Join(cmd.backupFilesPath, tsdb.SeriesFileName+".*"))
if err != nil {
return err
} else if len(files) == 0 {
return fmt.Errorf("no series file backups in %s", cmd.backupFilesPath)
}
latest := files[len(files)-1]
dstname := filepath.Join(cmd.datadir, cmd.database, tsdb.SeriesFileName)
if err := os.MkdirAll(filepath.Dir(dstname), 0777); err != nil {
return fmt.Errorf("error making restore dir: %s", err.Error())
}
// Open backup file.
src, err := os.Open(latest)
if err != nil {
return err
}
defer src.Close()
// Open destination file.
dst, err := os.Create(dstname)
if err != nil {
return err
}
defer dst.Close()
if _, err := io.Copy(dst, src); err != nil {
return fmt.Errorf("copy series file: %s", err)
}
return nil
}
// unpackMeta reads the metadata from the backup directory and initializes a raft
// cluster and replaces the root metadata.
func (cmd *Command) unpackMeta() error {

View File

@ -41,7 +41,6 @@ type Service struct {
TSDBStore interface {
BackupShard(id uint64, since time.Time, w io.Writer) error
BackupSeriesFile(database string, w io.Writer) error
ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error
Shard(id uint64) *tsdb.Shard
ShardRelativePath(id uint64) (string, error)
@ -138,10 +137,6 @@ func (s *Service) handleConn(conn net.Conn) error {
if err := s.writeMetaStore(conn); err != nil {
return err
}
case RequestSeriesFileBackup:
if err := s.TSDBStore.BackupSeriesFile(r.BackupDatabase, conn); err != nil {
return err
}
case RequestDatabaseInfo:
return s.writeDatabaseInfo(conn, r.BackupDatabase)
case RequestRetentionPolicyInfo:

View File

@ -805,9 +805,6 @@ func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.
// Remove the temporary snapshot dir
defer os.RemoveAll(path)
if err := e.index.SnapshotTo(path); err != nil {
return err
}
tw := tar.NewWriter(w)
defer tw.Close()
@ -1585,7 +1582,7 @@ func (e *Engine) CreateSnapshot() (string, error) {
}
// Generate a snapshot of the index.
return path, e.index.SnapshotTo(path)
return path, nil
}
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments.

View File

@ -9,6 +9,7 @@ import (
"sort"
"sync"
"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bytesutil"
"github.com/influxdata/influxdb/pkg/estimator"
@ -55,9 +56,6 @@ type Index interface {
FieldSet() *MeasurementFieldSet
SetFieldSet(fs *MeasurementFieldSet)
// Creates hard links inside path for snapshotting.
SnapshotTo(path string) error
// Size of the index on disk, if applicable.
DiskSizeBytes() int64
@ -2315,3 +2313,6 @@ type byTagKey []*query.TagSet
func (t byTagKey) Len() int { return len(t) }
func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 }
func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
// TEMP
func dump(v interface{}) { spew.Dump(v) }

View File

@ -944,9 +944,6 @@ func (i *Index) SeriesIDIterator(opt query.IteratorOptions) (tsdb.SeriesIDIterat
}, nil
}
// SnapshotTo is a no-op since this is an in-memory index.
func (i *Index) SnapshotTo(path string) error { return nil }
// DiskSizeBytes always returns zero bytes, since this is an in-memory index.
func (i *Index) DiskSizeBytes() int64 { return 0 }

View File

@ -85,10 +85,10 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
tagset string
}{
{`cpu`, `[{region east}]`},
{`cpu`, `[{region west}]`},
{`mem`, `[{region east}]`},
{`disk`, `[]`},
{`cpu`, `[{region north}]`},
{`cpu`, `[{region west}]`},
{`disk`, `[]`},
{`mem`, `[{region east}]`},
}
for _, expected := range allexpected {

View File

@ -768,30 +768,6 @@ func (i *Index) TagKeyCardinality(name, key []byte) int {
return 0
}
// SnapshotTo creates hard links to the file set into path.
func (i *Index) SnapshotTo(path string) error {
newRoot := filepath.Join(path, "index")
if err := os.Mkdir(newRoot, 0777); err != nil {
return err
}
// Store results.
errC := make(chan error, len(i.partitions))
for _, p := range i.partitions {
go func(p *Partition) {
errC <- p.SnapshotTo(path)
}(p)
}
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return err
}
}
return nil
}
// RetainFileSet returns the set of all files across all partitions.
// This is only needed when all files need to be retained for an operation.
func (i *Index) RetainFileSet() (*FileSet, error) {

View File

@ -724,39 +724,6 @@ func (i *Partition) TagKeyCardinality(name, key []byte) int {
return 0
}
// SnapshotTo creates hard links to the file set into path.
func (i *Partition) SnapshotTo(path string) error {
i.mu.Lock()
defer i.mu.Unlock()
fs := i.retainFileSet()
defer fs.Release()
// Flush active log file, if any.
if err := i.activeLogFile.Flush(); err != nil {
return err
}
newRoot := filepath.Join(path, "index", i.id)
if err := os.Mkdir(newRoot, 0777); err != nil {
return err
}
// Link manifest.
if err := os.Link(i.ManifestPath(), filepath.Join(newRoot, filepath.Base(i.ManifestPath()))); err != nil {
return fmt.Errorf("error creating tsi manifest hard link: %q", err)
}
// Link files in directory.
for _, f := range fs.files {
if err := os.Link(f.Path(), filepath.Join(newRoot, filepath.Base(f.Path()))); err != nil {
return fmt.Errorf("error creating tsi hard link: %q", err)
}
}
return nil
}
func (i *Partition) SetFieldName(measurement []byte, name string) {}
func (i *Partition) RemoveShard(shardID uint64) {}
func (i *Partition) AssignShard(k string, shardID uint64) {}

View File

@ -9,6 +9,7 @@ import (
"io"
"os"
"path/filepath"
"sort"
"sync"
"github.com/cespare/xxhash"
@ -28,22 +29,14 @@ const SeriesFileVersion = 1
// Series flag constants.
const (
SeriesFileFlagSize = 1
SeriesFileInsertFlag = 0x00
SeriesFileTombstoneFlag = 0x01
)
// MaxSeriesFileHashSize is the maximum number of series in a single hash map.
const MaxSeriesFileHashSize = (1 << 20 * SeriesMapLoadFactor) / 100 // (1MB * 90) / 100 == ~943K
// SeriesMapThreshold is the number of series IDs to hold in the in-memory
// series map before compacting and rebuilding the on-disk representation.
const SeriesMapThreshold = 1 << 25 // ~33M ids * 8 bytes per id == 256MB
const (
// DefaultMaxSeriesFileSize is the maximum series file size. Assuming that each
// series key takes, for example, 150 bytes, the limit would support ~229M series.
DefaultMaxSeriesFileSize = 32 * (1 << 30) // 32GB
)
// SeriesFile represents the section of the index that holds series data.
type SeriesFile struct {
mu sync.RWMutex
@ -58,6 +51,7 @@ type SeriesFile struct {
log []byte
keyIDMap *seriesKeyIDMap
idOffsetMap *seriesIDOffsetMap
walOffset int64
tombstones map[uint64]struct{}
// MaxSize is the maximum size of the file.
@ -118,6 +112,26 @@ func (f *SeriesFile) Open() error {
f.log = f.data[hdr.Log.Offset : hdr.Log.Offset+hdr.Log.Size]
f.keyIDMap = newSeriesKeyIDMap(f.data, f.data[hdr.KeyIDMap.Offset:hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size])
f.idOffsetMap = newSeriesIDOffsetMap(f.data, f.data[hdr.IDOffsetMap.Offset:hdr.IDOffsetMap.Offset+hdr.IDOffsetMap.Size])
f.walOffset = hdr.WAL.Offset
// Replay post-compaction log.
for off := f.walOffset; off < f.size; {
flag, id, key, sz := ReadSeriesFileLogEntry(f.data[off:])
switch flag {
case SeriesFileInsertFlag:
f.keyIDMap.insert(key, id)
f.idOffsetMap.insert(id, off+SeriesFileLogInsertEntryHeader)
case SeriesFileTombstoneFlag:
f.tombstones[id] = struct{}{}
default:
return fmt.Errorf("tsdb.SeriesFile.Open(): unknown log entry flag: %d", flag)
}
off += sz
}
return nil
}(); err != nil {
@ -196,6 +210,7 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod
// Append new key to be added to hash map after flush.
ids[i] = id
newIDs[string(buf)] = id
newKeyRanges = append(newKeyRanges, keyRange{id, offset, f.size - offset})
}
@ -206,7 +221,8 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod
// Add keys to hash map(s).
for _, keyRange := range newKeyRanges {
f.keyIDMap.insert(f.data[keyRange.offset:keyRange.offset+keyRange.size], keyRange.id)
key := f.data[keyRange.offset : keyRange.offset+keyRange.size]
f.keyIDMap.insert(key, keyRange.id)
f.idOffsetMap.insert(keyRange.id, keyRange.offset)
}
@ -318,16 +334,25 @@ func (f *SeriesFile) SeriesCount() uint64 {
// SeriesIterator returns an iterator over all the series.
func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator {
return &seriesFileIterator{
offset: 1,
data: f.data[1:f.size],
}
var ids []uint64
ids = append(ids, ReadSeriesFileLogIDs(f.log)...)
ids = append(ids, ReadSeriesFileLogIDs(f.data[f.walOffset:f.size])...)
sort.Slice(ids, func(i, j int) bool {
keyi := f.SeriesKey(ids[i])
keyj := f.SeriesKey(ids[j])
return CompareSeriesKeys(keyi, keyj) == -1
})
return NewSeriesIDSliceIterator(ids)
}
func (f *SeriesFile) insert(key []byte) (id uint64, offset int64, err error) {
id = f.seq + 1
var buf bytes.Buffer
buf.WriteByte(0) // flag
binary.Write(&buf, binary.BigEndian, f.seq+1) // new id
buf.WriteByte(SeriesFileInsertFlag) // flag
binary.Write(&buf, binary.BigEndian, id) // new id
// Save offset position.
offset = f.size + int64(buf.Len())
@ -349,15 +374,45 @@ func (f *SeriesFile) seriesKeyByOffset(offset int64) []byte {
if offset == 0 || f.data == nil {
return nil
}
buf := f.data[offset:]
sz, _ := ReadSeriesKeyLen(buf)
return buf[:sz]
key, _ := ReadSeriesKey(f.data[offset:])
return key
}
const SeriesFileLogInsertEntryHeader = 1 + 8 // flag + id
func ReadSeriesFileLogEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) {
flag, data = uint8(data[1]), data[1:]
id, data = binary.BigEndian.Uint64(data), data[8:]
switch flag {
case SeriesFileInsertFlag:
key, _ = ReadSeriesKey(data)
}
return flag, id, key, int64(SeriesFileLogInsertEntryHeader + len(key))
}
func ReadSeriesFileLogIDs(data []byte) []uint64 {
var ids []uint64
for len(data) > 0 {
flag, id, _, sz := ReadSeriesFileLogEntry(data)
if flag == SeriesFileInsertFlag {
ids = append(ids, id)
}
data = data[sz:]
}
return ids
}
const SeriesFileMagic = uint32(0x49465346) // "IFSF"
var ErrInvalidSeriesFile = errors.New("invalid series file")
const SeriesFileHeaderSize = 0 +
4 + 1 + // magic + version
8 + 8 + // log
8 + 8 + // key/id map
8 + 8 + // id/offset map
8 // wall offset
// SeriesFileHeader represents the version & position information of a series file.
type SeriesFileHeader struct {
Version uint8
@ -376,18 +431,27 @@ type SeriesFileHeader struct {
Offset int64
Size int64
}
WAL struct {
Offset int64
}
}
// NewSeriesFileHeader returns a new instance of SeriesFileHeader.
func NewSeriesFileHeader() *SeriesFileHeader {
return &SeriesFileHeader{Version: SeriesFileVersion}
func NewSeriesFileHeader() SeriesFileHeader {
hdr := SeriesFileHeader{Version: SeriesFileVersion}
hdr.Log.Offset = SeriesFileHeaderSize
hdr.KeyIDMap.Offset = SeriesFileHeaderSize
hdr.IDOffsetMap.Offset = SeriesFileHeaderSize
hdr.WAL.Offset = SeriesFileHeaderSize
return hdr
}
// ReadSeriesFileHeader returns the header from data.
func ReadSeriesFileHeader(data []byte) (hdr SeriesFileHeader, err error) {
r := bytes.NewReader(data)
if len(data) == 0 {
return SeriesFileHeader{Version: SeriesFileVersion}, nil
return NewSeriesFileHeader(), nil
}
// Read magic number & version.
@ -422,6 +486,11 @@ func ReadSeriesFileHeader(data []byte) (hdr SeriesFileHeader, err error) {
return hdr, err
}
// Read WAL offset.
if err := binary.Read(r, binary.BigEndian, &hdr.WAL.Offset); err != nil {
return hdr, err
}
return hdr, nil
}
@ -436,44 +505,10 @@ func (hdr *SeriesFileHeader) WriteTo(w io.Writer) (n int64, err error) {
binary.Write(&buf, binary.BigEndian, hdr.KeyIDMap.Size)
binary.Write(&buf, binary.BigEndian, hdr.IDOffsetMap.Offset)
binary.Write(&buf, binary.BigEndian, hdr.IDOffsetMap.Size)
binary.Write(&buf, binary.BigEndian, hdr.WAL.Offset)
return buf.WriteTo(w)
}
// seriesFileIterator is an iterator over a series ids in a series list.
type seriesFileIterator struct {
data []byte
offset uint64
}
// Next returns the next series element.
func (itr *seriesFileIterator) Next() (SeriesIDElem, error) {
for {
if len(itr.data) == 0 {
return SeriesIDElem{}, nil
}
// Read flag.
flag := itr.data[0]
itr.data = itr.data[1:]
itr.offset++
switch flag {
case SeriesFileTombstoneFlag:
itr.data = itr.data[8:] // skip
itr.offset += 8
default:
var key []byte
key, itr.data = ReadSeriesKey(itr.data)
elem := SeriesIDElem{SeriesID: itr.offset}
itr.offset += uint64(len(key))
return elem, nil
}
}
}
func (itr *seriesFileIterator) Close() error { return nil }
// AppendSeriesKey serializes name and tags to a byte slice.
// The total length is prepended as a uvarint.
func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte {