diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 5ecee30c93..d845df1a50 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -1684,19 +1684,12 @@ func NewEngine(index string) (*Engine, error) { } // Setup series file. - f, err := ioutil.TempFile(dbPath, "series") + seriesPath, err := ioutil.TempDir(dbPath, "series") if err != nil { return nil, err } - f.Close() - sfile := tsdb.NewSeriesFile(f.Name()) - - // If we're running on a 32-bit system then reduce the SeriesFile size, so we - // can address is in memory. - if runtime.GOARCH == "386" { - sfile.MaxSize = 1 << 27 // 128MB - } + sfile := tsdb.NewSeriesFile(seriesPath) if err = sfile.Open(); err != nil { return nil, err } @@ -1729,19 +1722,11 @@ type SeriesFile struct { // NewSeriesFile returns a new instance of SeriesFile with a temporary file path. func NewSeriesFile() *SeriesFile { - file, err := ioutil.TempFile("", "tsdb-series-file-") + dir, err := ioutil.TempDir("", "tsdb-series-file-") if err != nil { panic(err) } - file.Close() - - s := &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())} - // If we're running on a 32-bit system then reduce the SeriesFile size, so we - // can address is in memory. - if runtime.GOARCH == "386" { - s.SeriesFile.MaxSize = 1 << 27 // 128MB - } - return s + return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(dir)} } // MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. diff --git a/tsdb/index/tsi1/file_set_test.go b/tsdb/index/tsi1/file_set_test.go index 0232f7498c..be396ebb42 100644 --- a/tsdb/index/tsi1/file_set_test.go +++ b/tsdb/index/tsi1/file_set_test.go @@ -85,10 +85,10 @@ func TestFileSet_SeriesIDIterator(t *testing.T) { tagset string }{ {`cpu`, `[{region east}]`}, - {`cpu`, `[{region north}]`}, {`cpu`, `[{region west}]`}, - {`disk`, `[]`}, {`mem`, `[{region east}]`}, + {`disk`, `[]`}, + {`cpu`, `[{region north}]`}, } for _, expected := range allexpected { diff --git a/tsdb/index/tsi1/tsi1_test.go b/tsdb/index/tsi1/tsi1_test.go index 3c25abb288..a96b4fd2f8 100644 --- a/tsdb/index/tsi1/tsi1_test.go +++ b/tsdb/index/tsi1/tsi1_test.go @@ -6,7 +6,6 @@ import ( "os" "path/filepath" "reflect" - "runtime" "testing" "github.com/influxdata/influxdb/models" @@ -290,19 +289,11 @@ type SeriesFile struct { // NewSeriesFile returns a new instance of SeriesFile with a temporary file path. func NewSeriesFile() *SeriesFile { - file, err := ioutil.TempFile("", "tsdb-series-file-") + dir, err := ioutil.TempDir("", "tsdb-series-file-") if err != nil { panic(err) } - file.Close() - - s := &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())} - // If we're running on a 32-bit system then reduce the SeriesFile size, so we - // can address is in memory. - if runtime.GOARCH == "386" { - s.SeriesFile.MaxSize = 1 << 27 // 128MB - } - return s + return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(dir)} } // MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. @@ -316,6 +307,6 @@ func MustOpenSeriesFile() *SeriesFile { // Close closes the log file and removes it from disk. func (f *SeriesFile) Close() error { - defer os.Remove(f.Path()) + defer os.RemoveAll(f.Path()) return f.SeriesFile.Close() } diff --git a/tsdb/index_test.go b/tsdb/index_test.go index fcb2625dec..d8668adfd5 100644 --- a/tsdb/index_test.go +++ b/tsdb/index_test.go @@ -6,7 +6,6 @@ import ( "os" "path/filepath" "reflect" - "runtime" "testing" "github.com/influxdata/influxdb/internal" @@ -147,19 +146,12 @@ func MustNewIndex(index string) *Index { panic(err) } - file, err := ioutil.TempFile(rootPath, "series") + seriesPath, err := ioutil.TempDir(rootPath, "series") if err != nil { panic(err) } - file.Close() - - sfile := tsdb.NewSeriesFile(file.Name()) - // If we're running on a 32-bit system then reduce the SeriesFile size, so we - // can address is in memory. - if runtime.GOARCH == "386" { - sfile.MaxSize = 1 << 27 // 128MB - } + sfile := tsdb.NewSeriesFile(seriesPath) if err := sfile.Open(); err != nil { panic(err) } diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 1e4fc33387..2cdda8115c 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -1,39 +1,24 @@ package tsdb import ( - "bufio" "bytes" "encoding/binary" "encoding/hex" - "errors" "fmt" "io" + "io/ioutil" "os" "path/filepath" - "sort" "sync" "github.com/cespare/xxhash" "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/pkg/mmap" "github.com/influxdata/influxdb/pkg/rhh" ) -// ErrSeriesOverflow is returned when too many series are added to a series writer. -var ErrSeriesOverflow = errors.New("series overflow") - // SeriesIDSize is the size in bytes of a series key ID. const SeriesIDSize = 8 -const SeriesFileVersion = 1 - -// Series flag constants. -const ( - SeriesFileFlagSize = 1 - SeriesFileInsertFlag = 0x00 - SeriesFileTombstoneFlag = 0x01 -) - // SeriesMapThreshold is the number of series IDs to hold in the in-memory // series map before compacting and rebuilding the on-disk representation. const SeriesMapThreshold = 1 << 25 // ~33M ids * 8 bytes per id == 256MB @@ -43,99 +28,43 @@ type SeriesFile struct { mu sync.RWMutex path string - mmap []byte // entire mmapped file - data []byte // active part of mmap file - file *os.File // write file handle - w *bufio.Writer // bufferred file handle - size int64 // current file size - seq uint64 // series id sequence - - log []byte - keyIDMap *seriesKeyIDMap - idOffsetMap *seriesIDOffsetMap - walOffset int64 - tombstones map[uint64]struct{} - - // MaxSize is the maximum size of the file. - MaxSize int64 + segments []*SeriesSegment + index *SeriesIndex + seq uint64 // series id sequence } // NewSeriesFile returns a new instance of SeriesFile. func NewSeriesFile(path string) *SeriesFile { return &SeriesFile{ - path: path, - tombstones: make(map[uint64]struct{}), - - MaxSize: DefaultMaxSeriesFileSize, + path: path, } } // Open memory maps the data file at the file's path. func (f *SeriesFile) Open() error { - // Create the parent directories if they don't exist. - if err := os.MkdirAll(filepath.Join(filepath.Dir(f.path)), 0777); err != nil { + // Create path if it doesn't exist. + if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil { return err } // Open components. if err := func() (err error) { - // Open file handler for appending. - if f.file, err = os.OpenFile(f.path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666); err != nil { + if err := f.openSegments(); err != nil { return err } - // Read file size. - // If file is empty then write an empty header. - fi, err := f.file.Stat() - if err != nil { - return err - } else if fi.Size() > 0 { - f.size = fi.Size() - } else { - hdr := NewSeriesFileHeader() - if f.size, err = hdr.WriteTo(f.file); err != nil { - return err - } - } - f.w = bufio.NewWriter(f.file) - - // Memory map file data. - if f.mmap, err = mmap.Map(f.path, f.MaxSize); err != nil { - return err - } - f.data = f.mmap[:f.size] - - // Read header. - hdr, err := ReadSeriesFileHeader(f.data) - if err != nil { + // Init last segment for writes. + if err := f.activeSegment().InitForWrite(); err != nil { return err } - // Subslice log & maps. - f.log = f.data[hdr.Log.Offset : hdr.Log.Offset+hdr.Log.Size] - f.keyIDMap = newSeriesKeyIDMap(f.data, f.data[hdr.KeyIDMap.Offset:hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size]) - f.idOffsetMap = newSeriesIDOffsetMap(f.data, f.data[hdr.IDOffsetMap.Offset:hdr.IDOffsetMap.Offset+hdr.IDOffsetMap.Size]) - f.walOffset = hdr.WAL.Offset - - // Replay post-compaction log. - for off := f.walOffset; off < f.size; { - flag, id, key, sz := ReadSeriesFileLogEntry(f.data[off:]) - - switch flag { - case SeriesFileInsertFlag: - f.keyIDMap.insert(key, id) - f.idOffsetMap.insert(id, off+SeriesFileLogInsertEntryHeader) - - case SeriesFileTombstoneFlag: - f.tombstones[id] = struct{}{} - - default: - return fmt.Errorf("tsdb.SeriesFile.Open(): unknown log entry flag: %d", flag) - } - - off += sz + f.index = NewSeriesIndex(f.IndexPath()) + if err := f.index.Open(); err != nil { + return err } + // TODO: Replay new entries since index was built. + return nil }(); err != nil { f.Close() @@ -145,44 +74,75 @@ func (f *SeriesFile) Open() error { return nil } +func (f *SeriesFile) openSegments() error { + fis, err := ioutil.ReadDir(f.path) + if err != nil { + return err + } + + for _, fi := range fis { + if !IsValidSeriesSegmentFilename(fi.Name()) { + continue + } + + segment := NewSeriesSegment(ParseSeriesSegmentFilename(fi.Name()), filepath.Join(f.path, fi.Name())) + if err := segment.Open(); err != nil { + return err + } + f.segments = append(f.segments, segment) + } + + // Find max series id by searching segments in reverse order. + for i := len(f.segments) - 1; i >= 0; i-- { + if f.seq = f.segments[i].MaxSeriesID(); f.seq > 0 { + break + } + } + + // Create initial segment if none exist. + if len(f.segments) == 0 { + segment, err := CreateSeriesSegment(0, filepath.Join(f.path, "0000")) + if err != nil { + return err + } + f.segments = append(f.segments, segment) + } + + return nil +} + // Close unmaps the data file. -func (f *SeriesFile) Close() error { +func (f *SeriesFile) Close() (err error) { f.mu.Lock() defer f.mu.Unlock() - if f.mmap != nil { - mmap.Unmap(f.mmap) - f.mmap = nil - f.data = nil + for _, s := range f.segments { + if e := s.Close(); e != nil && err == nil { + err = e + } } - if f.file != nil { - f.file.Close() - f.file = nil - } - f.w = nil + f.segments = nil - if f.keyIDMap != nil { - f.keyIDMap.Close() - f.keyIDMap = nil - } - if f.idOffsetMap != nil { - f.idOffsetMap.Close() - f.idOffsetMap = nil + if f.index != nil { + if e := f.index.Close(); e != nil && err == nil { + err = e + } } + f.index = nil - f.log = nil - f.tombstones = nil - - return nil + return err } // Path returns the path to the file. func (f *SeriesFile) Path() string { return f.path } +// Path returns the path to the series index. +func (f *SeriesFile) IndexPath() string { return filepath.Join(f.path, "index") } + // CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. Returns the offset of the series. func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, err error) { f.mu.RLock() - ids, ok := f.findIDListByNameTags(names, tagsSlice, buf) + ids, ok := f.index.FindIDListByNameTags(f.segments, names, tagsSlice, buf) if ok { f.mu.RUnlock() return ids, nil @@ -192,7 +152,6 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod type keyRange struct { id uint64 offset int64 - size int64 } newKeyRanges := make([]keyRange, 0, len(names)) @@ -215,7 +174,7 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod // Re-attempt lookup under write lock. if ids[i] = newIDs[string(buf)]; ids[i] != 0 { continue - } else if ids[i] = f.findIDByNameTags(names[i], tagsSlice[i], buf); ids[i] != 0 { + } else if ids[i] = f.index.FindIDByNameTags(f.segments, names[i], tagsSlice[i], buf); ids[i] != 0 { continue } @@ -228,45 +187,24 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod // Append new key to be added to hash map after flush. ids[i] = id newIDs[string(buf)] = id - newKeyRanges = append(newKeyRanges, keyRange{id, offset, f.size - offset}) + newKeyRanges = append(newKeyRanges, keyRange{id, offset}) } - // Flush log writes so we can access data in mmap. - if err := f.w.Flush(); err != nil { - return nil, err + // Flush active segment writes so we can access data in mmap. + if segment := f.activeSegment(); segment != nil { + if err := segment.Flush(); err != nil { + return nil, err + } } // Add keys to hash map(s). for _, keyRange := range newKeyRanges { - key := f.data[keyRange.offset : keyRange.offset+keyRange.size] - f.keyIDMap.insert(key, keyRange.id) - f.idOffsetMap.insert(keyRange.id, keyRange.offset) + f.index.Insert(f.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset) } return ids, nil } -func (f *SeriesFile) findIDByNameTags(name []byte, tags models.Tags, buf []byte) uint64 { - id := f.keyIDMap.get(AppendSeriesKey(buf[:0], name, tags)) - if _, ok := f.tombstones[id]; ok { - return 0 - } - return id -} - -func (f *SeriesFile) findIDListByNameTags(names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, ok bool) { - ids, ok = make([]uint64, len(names)), true - for i := range names { - id := f.findIDByNameTags(names[i], tagsSlice[i], buf) - if id == 0 { - ok = false - continue - } - ids[i] = id - } - return ids, ok -} - // DeleteSeriesID flags a series as permanently deleted. // If the series is reintroduced later then it must create a new id. func (f *SeriesFile) DeleteSeriesID(id uint64) error { @@ -274,22 +212,18 @@ func (f *SeriesFile) DeleteSeriesID(id uint64) error { defer f.mu.Unlock() // Already tombstoned, ignore. - if _, ok := f.tombstones[id]; ok { + if f.index.IsDeleted(id) { return nil } // Write tombstone entry. - buf := AppendSeriesFileLogEntry(nil, SeriesFileTombstoneFlag, id, nil) - if _, err := f.w.Write(buf); err != nil { - return err - } else if err := f.w.Flush(); err != nil { + _, err := f.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id, nil)) + if err != nil { return err } - f.size += int64(len(buf)) - f.data = f.data[:f.size] // Mark tombstone in memory. - f.tombstones[id] = struct{}{} + f.index.Delete(id) return nil } @@ -297,7 +231,7 @@ func (f *SeriesFile) DeleteSeriesID(id uint64) error { // IsDeleted returns true if the ID has been deleted before. func (f *SeriesFile) IsDeleted(id uint64) bool { f.mu.RLock() - _, v := f.tombstones[id] + v := f.index.IsDeleted(id) f.mu.RUnlock() return v } @@ -308,7 +242,7 @@ func (f *SeriesFile) SeriesKey(id uint64) []byte { return nil } f.mu.RLock() - key := f.seriesKeyByOffset(f.idOffsetMap.get(id)) + key := f.seriesKeyByOffset(f.index.FindOffsetByID(id)) f.mu.RUnlock() return key } @@ -325,214 +259,112 @@ func (f *SeriesFile) Series(id uint64) ([]byte, models.Tags) { // SeriesID return the series id for the series. func (f *SeriesFile) SeriesID(name []byte, tags models.Tags, buf []byte) uint64 { f.mu.RLock() - id := f.keyIDMap.get(AppendSeriesKey(buf[:0], name, tags)) + id := f.index.FindIDBySeriesKey(f.segments, AppendSeriesKey(buf[:0], name, tags)) f.mu.RUnlock() return id } // HasSeries return true if the series exists. func (f *SeriesFile) HasSeries(name []byte, tags models.Tags, buf []byte) bool { - f.mu.RLock() - v := f.keyIDMap.get(AppendSeriesKey(buf[:0], name, tags)) > 0 - f.mu.RUnlock() - return v + return f.SeriesID(name, tags, buf) > 0 } // SeriesCount returns the number of series. func (f *SeriesFile) SeriesCount() uint64 { f.mu.RLock() - n := f.seriesCount() + n := f.index.Count() f.mu.RUnlock() return n } -func (f *SeriesFile) seriesCount() uint64 { - return f.idOffsetMap.count() -} - // SeriesIterator returns an iterator over all the series. func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator { var ids []uint64 - ids = append(ids, ReadSeriesFileLogIDs(f.log)...) - ids = append(ids, ReadSeriesFileLogIDs(f.data[f.walOffset:])...) - - sort.Slice(ids, func(i, j int) bool { - keyi := f.SeriesKey(ids[i]) - keyj := f.SeriesKey(ids[j]) - return CompareSeriesKeys(keyi, keyj) == -1 - }) - + for _, segment := range f.segments { + ids = segment.AppendSeriesIDs(ids) + } return NewSeriesIDSliceIterator(ids) } +// activeSegment returns the last segment. +func (f *SeriesFile) activeSegment() *SeriesSegment { + if len(f.segments) == 0 { + return nil + } + return f.segments[len(f.segments)-1] +} + func (f *SeriesFile) insert(key []byte) (id uint64, offset int64, err error) { id = f.seq + 1 - offset = f.size + SeriesFileLogInsertEntryHeader - buf := AppendSeriesFileLogEntry(nil, SeriesFileInsertFlag, id, key) - if _, err := f.w.Write(buf); err != nil { + offset, err = f.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryInsertFlag, id, key)) + if err != nil { return 0, 0, err } + f.seq++ - f.size += int64(len(buf)) - f.data = f.data[:f.size] return id, offset, nil } +// writeLogEntry appends an entry to the end of the active segment. +// If there is no more room in the segment then a new segment is added. +func (f *SeriesFile) writeLogEntry(data []byte) (offset int64, err error) { + segment := f.activeSegment() + if segment == nil || !segment.CanWrite(data) { + if segment, err = f.createSegment(); err != nil { + return 0, err + } + } + return segment.WriteLogEntry(data) +} + +// createSegment appends a new segment +func (f *SeriesFile) createSegment() (*SeriesSegment, error) { + // Close writer for active segment, if one exists. + if segment := f.activeSegment(); segment != nil { + if err := segment.CloseForWrite(); err != nil { + return nil, err + } + } + + // Generate a new sequential segment identifier. + var id uint16 + if len(f.segments) > 0 { + id = f.segments[len(f.segments)-1].ID() + 1 + } + filename := fmt.Sprintf("%04x", id) + + // Generate new empty segment. + segment, err := CreateSeriesSegment(id, filepath.Join(f.path, filename)) + if err != nil { + return nil, err + } + f.segments = append(f.segments, segment) + + // Allow segment to write. + if err := segment.InitForWrite(); err != nil { + return nil, err + } + + return segment, nil +} + func (f *SeriesFile) seriesKeyByOffset(offset int64) []byte { - if offset == 0 || f.data == nil { + if offset == 0 { return nil } - key, _ := ReadSeriesKey(f.data[offset:]) - return key -} -const SeriesFileLogInsertEntryHeader = 1 + 8 // flag + id - -func ReadSeriesFileLogEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) { - flag, data = uint8(data[1]), data[1:] - id, data = binary.BigEndian.Uint64(data), data[8:] - switch flag { - case SeriesFileInsertFlag: - key, _ = ReadSeriesKey(data) - } - return flag, id, key, int64(SeriesFileLogInsertEntryHeader + len(key)) -} - -func AppendSeriesFileLogEntry(dst []byte, flag uint8, id uint64, key []byte) []byte { - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, id) - - dst = append(dst, flag) - dst = append(dst, buf...) - - switch flag { - case SeriesFileInsertFlag: - dst = append(dst, key...) - case SeriesFileTombstoneFlag: - default: - panic(fmt.Sprintf("unreachable: invalid flag: %d", flag)) - } - return dst -} - -func ReadSeriesFileLogIDs(data []byte) []uint64 { - var ids []uint64 - for len(data) > 0 { - flag, id, _, sz := ReadSeriesFileLogEntry(data) - if flag == SeriesFileInsertFlag { - ids = append(ids, id) + segmentID, pos := SplitSeriesOffset(offset) + for _, segment := range f.segments { + if segment.ID() != segmentID { + continue } - data = data[sz:] - } - return ids -} -const SeriesFileMagic = uint32(0x49465346) // "IFSF" - -var ErrInvalidSeriesFile = errors.New("invalid series file") - -const SeriesFileHeaderSize = 0 + - 4 + 1 + // magic + version - 8 + 8 + // log - 8 + 8 + // key/id map - 8 + 8 + // id/offset map - 8 // wall offset - -// SeriesFileHeader represents the version & position information of a series file. -type SeriesFileHeader struct { - Version uint8 - - Log struct { - Offset int64 - Size int64 + key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize)) + return key } - KeyIDMap struct { - Offset int64 - Size int64 - } - - IDOffsetMap struct { - Offset int64 - Size int64 - } - - WAL struct { - Offset int64 - } -} - -// NewSeriesFileHeader returns a new instance of SeriesFileHeader. -func NewSeriesFileHeader() SeriesFileHeader { - hdr := SeriesFileHeader{Version: SeriesFileVersion} - hdr.Log.Offset = SeriesFileHeaderSize - hdr.KeyIDMap.Offset = SeriesFileHeaderSize - hdr.IDOffsetMap.Offset = SeriesFileHeaderSize - hdr.WAL.Offset = SeriesFileHeaderSize - return hdr -} - -// ReadSeriesFileHeader returns the header from data. -func ReadSeriesFileHeader(data []byte) (hdr SeriesFileHeader, err error) { - r := bytes.NewReader(data) - if len(data) == 0 { - return NewSeriesFileHeader(), nil - } - - // Read magic number & version. - var magic uint32 - if err := binary.Read(r, binary.BigEndian, &magic); err != nil { - return hdr, err - } else if magic != SeriesFileMagic { - return hdr, ErrInvalidSeriesFile - } - if err := binary.Read(r, binary.BigEndian, &hdr.Version); err != nil { - return hdr, err - } - - // Read log position. - if err := binary.Read(r, binary.BigEndian, &hdr.Log.Offset); err != nil { - return hdr, err - } else if err := binary.Read(r, binary.BigEndian, &hdr.Log.Size); err != nil { - return hdr, err - } - - // Read key/id map position. - if err := binary.Read(r, binary.BigEndian, &hdr.KeyIDMap.Offset); err != nil { - return hdr, err - } else if err := binary.Read(r, binary.BigEndian, &hdr.KeyIDMap.Size); err != nil { - return hdr, err - } - - // Read offset/id map position. - if err := binary.Read(r, binary.BigEndian, &hdr.IDOffsetMap.Offset); err != nil { - return hdr, err - } else if err := binary.Read(r, binary.BigEndian, &hdr.IDOffsetMap.Size); err != nil { - return hdr, err - } - - // Read WAL offset. - if err := binary.Read(r, binary.BigEndian, &hdr.WAL.Offset); err != nil { - return hdr, err - } - - return hdr, nil -} - -// WriteTo writes the trailer to w. -func (hdr *SeriesFileHeader) WriteTo(w io.Writer) (n int64, err error) { - var buf bytes.Buffer - binary.Write(&buf, binary.BigEndian, SeriesFileMagic) - binary.Write(&buf, binary.BigEndian, hdr.Version) - binary.Write(&buf, binary.BigEndian, hdr.Log.Offset) - binary.Write(&buf, binary.BigEndian, hdr.Log.Size) - binary.Write(&buf, binary.BigEndian, hdr.KeyIDMap.Offset) - binary.Write(&buf, binary.BigEndian, hdr.KeyIDMap.Size) - binary.Write(&buf, binary.BigEndian, hdr.IDOffsetMap.Offset) - binary.Write(&buf, binary.BigEndian, hdr.IDOffsetMap.Size) - binary.Write(&buf, binary.BigEndian, hdr.WAL.Offset) - return buf.WriteTo(w) + return nil } // AppendSeriesKey serializes name and tags to a byte slice. @@ -698,168 +530,92 @@ func (a seriesKeys) Less(i, j int) bool { return CompareSeriesKeys(a[i], a[j]) == -1 } -const ( - seriesKeyIDMapHeaderSize = 16 // count + capacity - seriesKeyIDMapElemSize = 16 // offset + id - seriesKeyIDMapLoadFactor = 90 -) - -// seriesKeyIDMap represents a fixed hash map of key-to-id. -type seriesKeyIDMap struct { - src []byte // series key data - data []byte // rhh map data - inmem *rhh.HashMap // offset-to-id -} - -func (m *seriesKeyIDMap) Close() error { - m.inmem = nil - return nil -} - -func newSeriesKeyIDMap(src, data []byte) *seriesKeyIDMap { - return &seriesKeyIDMap{ - src: src, - data: data, - inmem: rhh.NewHashMap(rhh.DefaultOptions), - } -} - -func (m *seriesKeyIDMap) count() uint64 { - n := uint64(m.inmem.Len()) - if len(m.data) > 0 { - n += binary.BigEndian.Uint64(m.data[:8]) - } - return n -} - -func (m *seriesKeyIDMap) insert(key []byte, id uint64) { - m.inmem.Put(key, id) -} - -func (m *seriesKeyIDMap) get(key []byte) uint64 { - if v := m.inmem.Get(key); v != nil { - if id, _ := v.(uint64); id != 0 { - return id - } - } - if len(m.data) == 0 { - return 0 - } - - capacity := int64(binary.BigEndian.Uint64(m.data[8:])) - mask := capacity - 1 - - hash := rhh.HashKey(key) - for d, pos := int64(0), hash&mask; ; d, pos = d+1, (pos+1)&mask { - elem := m.data[seriesKeyIDMapHeaderSize+(pos*seriesKeyIDMapElemSize):] - elemOffset := binary.BigEndian.Uint64(elem[:8]) - - if elemOffset == 0 { - return 0 - } - - elemKey, _ := ReadSeriesKey(m.src[elemOffset:]) - elemHash := rhh.HashKey(elemKey) - if d > rhh.Dist(elemHash, pos, capacity) { - return 0 - } else if elemHash == hash && bytes.Equal(elemKey, key) { - return binary.BigEndian.Uint64(elem[8:]) - } - } -} - -const ( - seriesIDOffsetMapHeaderSize = 16 // count + capacity - seriesIDOffsetMapElemSize = 16 // id + offset - seriesIDOffsetMapLoadFactor = 90 -) - -// seriesIDOffsetMap represents a fixed hash map of id-to-offset. -type seriesIDOffsetMap struct { - src []byte // series key data - data []byte // rhh map data - inmem map[uint64]int64 // id-to-offset -} - -func newSeriesIDOffsetMap(src, data []byte) *seriesIDOffsetMap { - return &seriesIDOffsetMap{ - src: src, - data: data, - inmem: make(map[uint64]int64), - } -} - -func (m *seriesIDOffsetMap) Close() error { - m.inmem = nil - return nil -} - -func (m *seriesIDOffsetMap) count() uint64 { - n := uint64(len(m.inmem)) - if len(m.data) > 0 { - n += binary.BigEndian.Uint64(m.data[:8]) - } - return n -} - -func (m *seriesIDOffsetMap) insert(id uint64, offset int64) { - m.inmem[id] = offset -} - -func (m *seriesIDOffsetMap) get(id uint64) int64 { - if offset := m.inmem[id]; offset != 0 { - return offset - } else if len(m.data) == 0 { - return 0 - } - - capacity := int64(binary.BigEndian.Uint64(m.data[8:])) - mask := capacity - 1 - - hash := rhh.HashUint64(id) - for d, pos := int64(0), hash&mask; ; d, pos = d+1, (pos+1)&mask { - elem := m.data[seriesIDOffsetMapHeaderSize+(pos*seriesIDOffsetMapElemSize):] - elemID := binary.BigEndian.Uint64(elem[:8]) - - if elemID == id { - return int64(binary.BigEndian.Uint64(elem[8:])) - } else if elemID == 0 || d > rhh.Dist(rhh.HashUint64(elemID), pos, capacity) { - return 0 - } - } -} - -// SeriesFileCompactor represents an object that compacts and reindexes a series file. -type SeriesFileCompactor struct { - src *SeriesFile - seriesN uint64 - wal []byte - tombstones map[uint64]struct{} -} +// SeriesFileCompactor represents an object reindexes a series file and optionally compacts segments. +type SeriesFileCompactor struct{} // NewSeriesFileCompactor returns a new instance of SeriesFileCompactor. -func NewSeriesFileCompactor(src *SeriesFile) *SeriesFileCompactor { - src.mu.RLock() - defer src.mu.RUnlock() - - // Snapshot tombstones. - tombstones := make(map[uint64]struct{}, len(src.tombstones)) - for id := range src.tombstones { - tombstones[id] = struct{}{} - } - - c := &SeriesFileCompactor{ - src: src, - seriesN: src.seriesCount(), - wal: src.data[src.walOffset:src.size], - tombstones: tombstones, - } - - return c +func NewSeriesFileCompactor() *SeriesFileCompactor { + return &SeriesFileCompactor{} } -// Compact rewrites src to path as a reindexed series file. -func (c *SeriesFileCompactor) CompactTo(path string) error { +// Compact rebuilds the series file index. +func (c *SeriesFileCompactor) Compact(f *SeriesFile) error { + // Snapshot the partitions and index so we can check tombstones and replay at the end under lock. + f.mu.RLock() + segments := CloneSeriesSegments(f.segments) + index := f.index.Clone() + seriesN := f.index.Count() + f.mu.RUnlock() + + // Compact index to a temporary location. + indexPath := index.path + ".compacting" + if err := c.compactIndexTo(index, seriesN, segments, indexPath); err != nil { + return err + } + + // Swap compacted index under lock & replay since compaction. + if err := func() error { + f.mu.Lock() + defer f.mu.Unlock() + + // Reopen index with new file. + if err := index.Close(); err != nil { + return err + } else if err := os.Rename(indexPath, index.path); err != nil { + return err + } else if err := index.Open(); err != nil { + return err + } + + // Replay new entries. + if err := index.Recover(f.segments); err != nil { + return err + } + return nil + }(); err != nil { + return err + } + + return nil +} + +func (c *SeriesFileCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error { + hdr := NewSeriesIndexHeader() + hdr.Count = seriesN + hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor) + + // Allocate space for maps. + keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) + idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) + + // Reindex all partitions. + for _, segment := range segments { + if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { + // Only process insert entries. + switch flag { + case SeriesEntryInsertFlag: // fallthrough + case SeriesEntryTombstoneFlag: + return nil + default: + return fmt.Errorf("unexpected series file log entry flag: %d", flag) + } + + // Ignore entry if tombstoned. + if index.IsDeleted(id) { + return nil + } + + // Save highest id/offset to header. + hdr.MaxSeriesID, hdr.MaxOffset = id, offset + + // Insert into maps. + c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) + return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) + }); err != nil { + return err + } + } + // Open file handler. f, err := os.Create(path) if err != nil { @@ -867,58 +623,23 @@ func (c *SeriesFileCompactor) CompactTo(path string) error { } defer f.Close() - // Open read handler for looking up keys for existing hashmap entries. - r, err := os.Open(path) - if err != nil { - return err - } - defer r.Close() + // Calculate map positions. + hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap)) + hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap)) - // Write empty header. - hdr := NewSeriesFileHeader() + // Write header. if _, err := hdr.WriteTo(f); err != nil { return err } - // Allocate space for maps. - keyIDMap := c.allocKeyIDMap() - idOffsetMap := c.allocIDOffsetMap() - - // Iterate over compacted log & WAL. - n := int64(SeriesFileHeaderSize) - if err := c.compactLogEntries(f, r, c.src.log, keyIDMap, idOffsetMap, &n); err != nil { - return fmt.Errorf("series file log compaction error: %s", err) - } else if err := c.compactLogEntries(f, r, c.wal, keyIDMap, idOffsetMap, &n); err != nil { - return fmt.Errorf("series file wal compaction error: %s", err) - } - hdr.Log.Offset = SeriesFileHeaderSize - hdr.Log.Size = n - SeriesFileHeaderSize - - // Write key/id map. - hdr.KeyIDMap.Offset = n + // Write maps. if _, err := f.Write(keyIDMap); err != nil { - return fmt.Errorf("series file key/id map write error: %s", err) - } - hdr.KeyIDMap.Size, n = int64(len(keyIDMap)), n+int64(len(keyIDMap)) - - // Write id/offset map. - hdr.IDOffsetMap.Offset = n - if _, err := f.Write(idOffsetMap); err != nil { - return fmt.Errorf("series file id/offset map write error: %s", err) - } - hdr.IDOffsetMap.Size, n = int64(len(idOffsetMap)), n+int64(len(idOffsetMap)) - - // WAL starts at the end of the file. - hdr.WAL.Offset = n - - // Overwrite header. - if _, err := f.Seek(0, os.SEEK_SET); err != nil { return err - } else if _, err := hdr.WriteTo(f); err != nil { + } else if _, err := f.Write(idOffsetMap); err != nil { return err } - // Flush & close file. + // Sync & close. if err := f.Sync(); err != nil { return err } else if err := f.Close(); err != nil { @@ -928,71 +649,13 @@ func (c *SeriesFileCompactor) CompactTo(path string) error { return nil } -func (c *SeriesFileCompactor) compactLogEntries(w io.Writer, r *os.File, data []byte, keyIDMap, idOffsetmap []byte, n *int64) error { - for len(data) > 0 { - flag, id, key, sz := ReadSeriesFileLogEntry(data) - data = data[sz:] - - // Only process insert entries. - switch flag { - case SeriesFileInsertFlag: - // fallthrough - case SeriesFileTombstoneFlag: - continue - default: - return fmt.Errorf("unexpected series file log entry flag: %d", flag) - } - - // Ignore entry if tombstoned. - if _, ok := c.tombstones[id]; ok { - continue - } - - // Write entry. - offset := *n + SeriesFileLogInsertEntryHeader - buf := AppendSeriesFileLogEntry(nil, SeriesFileInsertFlag, id, key) - if _, err := w.Write(buf); err != nil { - return err - } - *n += int64(len(buf)) - - // Insert into maps. - if err := c.insertKeyIDMap(keyIDMap, r, key, offset, id); err != nil { - return err - } - c.insertIDOffsetMap(idOffsetmap, id, offset) - } - return nil -} - -func (c *SeriesFileCompactor) allocKeyIDMap() []byte { - capacity := (int64(c.seriesN) * 100) / seriesKeyIDMapLoadFactor - capacity = pow2(capacity) - - data := make([]byte, seriesKeyIDMapHeaderSize+(capacity*seriesKeyIDMapElemSize)) - binary.BigEndian.PutUint64(data[0:8], c.seriesN) - binary.BigEndian.PutUint64(data[8:16], uint64(capacity)) - return data -} - -func (c *SeriesFileCompactor) allocIDOffsetMap() []byte { - capacity := (int64(c.seriesN) * 100) / seriesIDOffsetMapLoadFactor - capacity = pow2(capacity) - - data := make([]byte, seriesIDOffsetMapHeaderSize+(capacity*seriesIDOffsetMapElemSize)) - binary.BigEndian.PutUint64(data[0:8], c.seriesN) - binary.BigEndian.PutUint64(data[8:16], uint64(capacity)) - return data -} - -func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, r *os.File, key []byte, offset int64, id uint64) error { - capacity := int64(binary.BigEndian.Uint64(dst[8:16])) +func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, capacity int64, segments []*SeriesSegment, key []byte, offset int64, id uint64) error { mask := capacity - 1 hash := rhh.HashKey(key) // Continue searching until we find an empty slot or lower probe distance. for dist, pos := int64(0), hash&mask; ; dist, pos = dist+1, (pos+1)&mask { - elem := dst[seriesKeyIDMapHeaderSize+(pos*seriesKeyIDMapElemSize):] + elem := dst[(pos * SeriesIndexElemSize):] // If empty slot found or matching offset, insert and exit. elemOffset := int64(binary.BigEndian.Uint64(elem[:8])) @@ -1003,24 +666,8 @@ func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, r *os.File, key []byte, return nil } - // Read key at position. - _, err := r.Seek(elemOffset, os.SEEK_SET) - if err != nil { - return err - } - br := bufio.NewReader(r) - elemKeyLen, err := binary.ReadUvarint(br) - if err != nil { - return err - } - elemKey := make([]byte, binary.MaxVarintLen64+elemKeyLen) - sz := binary.PutUvarint(elemKey, elemKeyLen) - elemKey = elemKey[:uint64(sz)+elemKeyLen] - if _, err := io.ReadFull(br, elemKey[sz:]); err != nil { - return err - } - - // Hash element key. + // Read key at position & hash. + elemKey := ReadSeriesKeyFromSegments(segments, elemOffset) elemHash := rhh.HashKey(elemKey) // If the existing elem has probed less than us, then swap places with @@ -1040,14 +687,13 @@ func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, r *os.File, key []byte, } } -func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, id uint64, offset int64) { - capacity := int64(binary.BigEndian.Uint64(dst[8:16])) +func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, capacity int64, id uint64, offset int64) { mask := capacity - 1 hash := rhh.HashUint64(id) // Continue searching until we find an empty slot or lower probe distance. - for dist, pos := int64(0), hash&mask; ; dist, pos = dist+1, (pos+1)&mask { - elem := dst[seriesIDOffsetMapHeaderSize+(pos*seriesIDOffsetMapElemSize):] + for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask { + elem := dst[(pos * SeriesIndexElemSize):] // If empty slot found or matching id, insert and exit. elemID := binary.BigEndian.Uint64(elem[:8]) @@ -1074,6 +720,10 @@ func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, id uint64, offset in // Update current distance. dist = d } + + if i > capacity { + panic("rhh map full") + } } } diff --git a/tsdb/series_file_386.go b/tsdb/series_file_386.go deleted file mode 100644 index 99ca829695..0000000000 --- a/tsdb/series_file_386.go +++ /dev/null @@ -1,5 +0,0 @@ -package tsdb - -// DefaultMaxSeriesFileSize is the maximum series file size. Assuming that each -// series key takes, for example, 150 bytes, the limit would support ~900K series. -const DefaultMaxSeriesFileSize = 128 * (1 << 20) // 128MB diff --git a/tsdb/series_file_amd64.go b/tsdb/series_file_amd64.go deleted file mode 100644 index 5c4770e496..0000000000 --- a/tsdb/series_file_amd64.go +++ /dev/null @@ -1,5 +0,0 @@ -package tsdb - -// DefaultMaxSeriesFileSize is the maximum series file size. Assuming that each -// series key takes, for example, 150 bytes, the limit would support ~229M series. -const DefaultMaxSeriesFileSize = 32 * (1 << 30) // 32GB diff --git a/tsdb/series_file_test.go b/tsdb/series_file_test.go index ed8077b5e1..b4386645f7 100644 --- a/tsdb/series_file_test.go +++ b/tsdb/series_file_test.go @@ -4,7 +4,6 @@ import ( "fmt" "io/ioutil" "os" - "runtime" "testing" "github.com/influxdata/influxdb/models" @@ -65,25 +64,15 @@ func TestSeriesFileCompactor(t *testing.T) { t.Fatalf("unexpected series count: %d", n) } - // Compact to new file. - compactionPath := sfile.Path() + ".compacting" - defer os.Remove(compactionPath) - - compactor := tsdb.NewSeriesFileCompactor(sfile.SeriesFile) - if err := compactor.CompactTo(compactionPath); err != nil { + // Compact in-place. + compactor := tsdb.NewSeriesFileCompactor() + if err := compactor.Compact(sfile.SeriesFile); err != nil { t.Fatal(err) } - // Open new series file. - other := tsdb.NewSeriesFile(compactionPath) - if err := other.Open(); err != nil { - t.Fatal(err) - } - defer other.Close() - // Verify all series exist. for i := range names { - if seriesID := other.SeriesID(names[i], tagsSlice[i], nil); seriesID == 0 { + if seriesID := sfile.SeriesID(names[i], tagsSlice[i], nil); seriesID == 0 { t.Fatalf("series does not exist: %s,%s", names[i], tagsSlice[i].String()) } } @@ -103,19 +92,11 @@ type SeriesFile struct { // NewSeriesFile returns a new instance of SeriesFile with a temporary file path. func NewSeriesFile() *SeriesFile { - file, err := ioutil.TempFile("", "tsdb-series-file-") + dir, err := ioutil.TempDir("", "tsdb-series-file-") if err != nil { panic(err) } - file.Close() - - s := &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())} - // If we're running on a 32-bit system then reduce the SeriesFile size, so we - // can address is in memory. - if runtime.GOARCH == "386" { - s.SeriesFile.MaxSize = 1 << 27 // 128MB - } - return s + return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(dir)} } // MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. @@ -129,6 +110,6 @@ func MustOpenSeriesFile() *SeriesFile { // Close closes the log file and removes it from disk. func (f *SeriesFile) Close() error { - defer os.Remove(f.Path()) + defer os.RemoveAll(f.Path()) return f.SeriesFile.Close() } diff --git a/tsdb/series_index.go b/tsdb/series_index.go new file mode 100644 index 0000000000..c803cf0db0 --- /dev/null +++ b/tsdb/series_index.go @@ -0,0 +1,346 @@ +package tsdb + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "os" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/mmap" + "github.com/influxdata/influxdb/pkg/rhh" +) + +const ( + SeriesIndexVersion = 1 + SeriesIndexMagic = "SIDX" +) + +const ( + SeriesIndexElemSize = 16 // offset + id + SeriesIndexLoadFactor = 90 // rhh load factor + + SeriesIndexHeaderSize = 0 + + 4 + 1 + // magic + version + 8 + 8 + // max series + max offset + 8 + 8 + // count + capacity + 8 + 8 + // key/id map offset & size + 8 + 8 + // id/offset map offset & size + 0 +) + +var ErrInvalidSeriesIndex = errors.New("invalid series index") + +// SeriesIndex represents an index of key-to-id & id-to-offset mappings. +type SeriesIndex struct { + path string + + count uint64 + capacity int64 + mask int64 + + maxSeriesID uint64 + maxOffset int64 + + data []byte // mmap data + keyIDData []byte // key/id mmap data + idOffsetData []byte // id/offset mmap data + + // In-memory data since rebuild. + keyIDMap *rhh.HashMap + idOffsetMap map[uint64]int64 + tombstones map[uint64]struct{} +} + +func NewSeriesIndex(path string) *SeriesIndex { + return &SeriesIndex{ + path: path, + } +} + +// Open memory-maps the index file. +func (idx *SeriesIndex) Open() (err error) { + // Map data file, if it exists. + if err := func() error { + if _, err := os.Stat(idx.path); err != nil && !os.IsNotExist(err) { + return err + } else if err == nil { + if idx.data, err = mmap.Map(idx.path, 0); err != nil { + return err + } + + hdr, err := ReadSeriesIndexHeader(idx.data) + if err != nil { + return err + } + idx.count, idx.capacity, idx.mask = hdr.Count, hdr.Capacity, hdr.Capacity-1 + idx.maxSeriesID, idx.maxOffset = hdr.MaxSeriesID, hdr.MaxOffset + + idx.keyIDData = idx.data[hdr.KeyIDMap.Offset : hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size] + idx.idOffsetData = idx.data[hdr.IDOffsetMap.Offset : hdr.IDOffsetMap.Offset+hdr.IDOffsetMap.Size] + } + return nil + }(); err != nil { + idx.Close() + return err + } + + idx.keyIDMap = rhh.NewHashMap(rhh.DefaultOptions) + idx.idOffsetMap = make(map[uint64]int64) + idx.tombstones = make(map[uint64]struct{}) + return nil +} + +// Close unmaps the index file. +func (idx *SeriesIndex) Close() (err error) { + if idx.data != nil { + err = mmap.Unmap(idx.data) + } + idx.keyIDData = nil + idx.idOffsetData = nil + + idx.keyIDMap = nil + idx.idOffsetMap = nil + idx.tombstones = nil + return err +} + +// Recover rebuilds the in-memory index for all new entries. +func (idx *SeriesIndex) Recover(segments []*SeriesSegment) error { + // Allocate new in-memory maps. + idx.keyIDMap = rhh.NewHashMap(rhh.DefaultOptions) + idx.idOffsetMap = make(map[uint64]int64) + idx.tombstones = make(map[uint64]struct{}) + + // Process all entries since the maximum offset in the on-disk index. + minSegmentID, _ := SplitSeriesOffset(idx.maxOffset) + for _, segment := range segments { + if segment.ID() < minSegmentID { + continue + } + + if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { + if offset <= idx.maxOffset { + return nil + } + idx.execEntry(flag, id, offset, key) + return nil + }); err != nil { + return err + } + } + return nil +} + +// Count returns the number of series in the index. +func (idx *SeriesIndex) Count() uint64 { + return idx.count + uint64(len(idx.idOffsetMap)) +} + +func (idx *SeriesIndex) Insert(key []byte, id uint64, offset int64) { + idx.execEntry(SeriesEntryInsertFlag, id, offset, key) +} + +// Delete marks the series id as deleted. +func (idx *SeriesIndex) Delete(id uint64) { + idx.execEntry(SeriesEntryTombstoneFlag, id, 0, nil) +} + +// IsDeleted returns true if series id has been deleted. +func (idx *SeriesIndex) IsDeleted(id uint64) bool { + _, ok := idx.tombstones[id] + return ok +} + +func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byte) { + switch flag { + case SeriesEntryInsertFlag: + idx.keyIDMap.Put(key, id) + idx.idOffsetMap[id] = offset + + case SeriesEntryTombstoneFlag: + idx.tombstones[id] = struct{}{} + + default: + panic("unreachable") + } +} + +func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64 { + if v := idx.keyIDMap.Get(key); v != nil { + if id, _ := v.(uint64); id != 0 { + return id + } + } + if len(idx.data) == 0 { + return 0 + } + + hash := rhh.HashKey(key) + for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask { + elem := idx.keyIDData[(pos * SeriesIndexElemSize):] + elemOffset := int64(binary.BigEndian.Uint64(elem[:8])) + + if elemOffset == 0 { + return 0 + } + + elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize) + elemHash := rhh.HashKey(elemKey) + if d > rhh.Dist(elemHash, pos, idx.capacity) { + return 0 + } else if elemHash == hash && bytes.Equal(elemKey, key) { + return binary.BigEndian.Uint64(elem[8:]) + } + } +} + +func (idx *SeriesIndex) FindIDByNameTags(segments []*SeriesSegment, name []byte, tags models.Tags, buf []byte) uint64 { + id := idx.FindIDBySeriesKey(segments, AppendSeriesKey(buf[:0], name, tags)) + if _, ok := idx.tombstones[id]; ok { + return 0 + } + return id +} + +func (idx *SeriesIndex) FindIDListByNameTags(segments []*SeriesSegment, names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, ok bool) { + ids, ok = make([]uint64, len(names)), true + for i := range names { + id := idx.FindIDByNameTags(segments, names[i], tagsSlice[i], buf) + if id == 0 { + ok = false + continue + } + ids[i] = id + } + return ids, ok +} + +func (idx *SeriesIndex) FindOffsetByID(id uint64) int64 { + if offset := idx.idOffsetMap[id]; offset != 0 { + return offset + } else if len(idx.data) == 0 { + return 0 + } + + hash := rhh.HashUint64(id) + for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask { + elem := idx.idOffsetData[(pos * SeriesIndexElemSize):] + elemID := binary.BigEndian.Uint64(elem[:8]) + + if elemID == id { + return int64(binary.BigEndian.Uint64(elem[8:])) + } else if elemID == 0 || d > rhh.Dist(rhh.HashUint64(elemID), pos, idx.capacity) { + return 0 + } + } +} + +// Clone returns a copy of idx for use during compaction. In-memory maps are not cloned. +func (idx *SeriesIndex) Clone() *SeriesIndex { + tombstones := make(map[uint64]struct{}, len(idx.tombstones)) + for id := range idx.tombstones { + tombstones[id] = struct{}{} + } + + return &SeriesIndex{ + path: idx.path, + count: idx.count, + capacity: idx.capacity, + mask: idx.mask, + data: idx.data, + keyIDData: idx.keyIDData, + idOffsetData: idx.idOffsetData, + tombstones: tombstones, + } +} + +// SeriesIndexHeader represents the header of a series index. +type SeriesIndexHeader struct { + Version uint8 + + MaxSeriesID uint64 + MaxOffset int64 + + Count uint64 + Capacity int64 + + KeyIDMap struct { + Offset int64 + Size int64 + } + + IDOffsetMap struct { + Offset int64 + Size int64 + } +} + +// NewSeriesIndexHeader returns a new instance of SeriesIndexHeader. +func NewSeriesIndexHeader() SeriesIndexHeader { + return SeriesIndexHeader{Version: SeriesIndexVersion} +} + +// ReadSeriesIndexHeader returns the header from data. +func ReadSeriesIndexHeader(data []byte) (hdr SeriesIndexHeader, err error) { + r := bytes.NewReader(data) + + // Read magic number. + magic := make([]byte, len(SeriesIndexMagic)) + if _, err := io.ReadFull(r, magic); err != nil { + return hdr, err + } else if !bytes.Equal([]byte(SeriesIndexMagic), magic) { + return hdr, ErrInvalidSeriesIndex + } + + // Read version. + if err := binary.Read(r, binary.BigEndian, &hdr.Version); err != nil { + return hdr, err + } + + // Read max offset. + if err := binary.Read(r, binary.BigEndian, &hdr.MaxSeriesID); err != nil { + return hdr, err + } else if err := binary.Read(r, binary.BigEndian, &hdr.MaxOffset); err != nil { + return hdr, err + } + + // Read count & capacity. + if err := binary.Read(r, binary.BigEndian, &hdr.Count); err != nil { + return hdr, err + } else if err := binary.Read(r, binary.BigEndian, &hdr.Capacity); err != nil { + return hdr, err + } + + // Read key/id map position. + if err := binary.Read(r, binary.BigEndian, &hdr.KeyIDMap.Offset); err != nil { + return hdr, err + } else if err := binary.Read(r, binary.BigEndian, &hdr.KeyIDMap.Size); err != nil { + return hdr, err + } + + // Read offset/id map position. + if err := binary.Read(r, binary.BigEndian, &hdr.IDOffsetMap.Offset); err != nil { + return hdr, err + } else if err := binary.Read(r, binary.BigEndian, &hdr.IDOffsetMap.Size); err != nil { + return hdr, err + } + return hdr, nil +} + +// WriteTo writes the header to w. +func (hdr *SeriesIndexHeader) WriteTo(w io.Writer) (n int64, err error) { + var buf bytes.Buffer + buf.WriteString(SeriesIndexMagic) + binary.Write(&buf, binary.BigEndian, hdr.Version) + binary.Write(&buf, binary.BigEndian, hdr.MaxSeriesID) + binary.Write(&buf, binary.BigEndian, hdr.MaxOffset) + binary.Write(&buf, binary.BigEndian, hdr.Count) + binary.Write(&buf, binary.BigEndian, hdr.Capacity) + binary.Write(&buf, binary.BigEndian, hdr.KeyIDMap.Offset) + binary.Write(&buf, binary.BigEndian, hdr.KeyIDMap.Size) + binary.Write(&buf, binary.BigEndian, hdr.IDOffsetMap.Offset) + binary.Write(&buf, binary.BigEndian, hdr.IDOffsetMap.Size) + return buf.WriteTo(w) +} diff --git a/tsdb/series_index_test.go b/tsdb/series_index_test.go new file mode 100644 index 0000000000..fa24dcc65b --- /dev/null +++ b/tsdb/series_index_test.go @@ -0,0 +1,132 @@ +package tsdb_test + +import ( + "bytes" + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb/tsdb" +) + +func TestSeriesIndex_Count(t *testing.T) { + dir, cleanup := MustTempDir() + defer cleanup() + + idx := tsdb.NewSeriesIndex(filepath.Join(dir, "index")) + if err := idx.Open(); err != nil { + t.Fatal(err) + } + defer idx.Close() + + key0 := tsdb.AppendSeriesKey(nil, []byte("m0"), nil) + idx.Insert(key0, 1, 10) + key1 := tsdb.AppendSeriesKey(nil, []byte("m1"), nil) + idx.Insert(key1, 2, 20) + + if n := idx.Count(); n != 2 { + t.Fatalf("unexpected count: %d", n) + } +} + +func TestSeriesIndex_Delete(t *testing.T) { + dir, cleanup := MustTempDir() + defer cleanup() + + idx := tsdb.NewSeriesIndex(filepath.Join(dir, "index")) + if err := idx.Open(); err != nil { + t.Fatal(err) + } + defer idx.Close() + + key0 := tsdb.AppendSeriesKey(nil, []byte("m0"), nil) + idx.Insert(key0, 1, 10) + key1 := tsdb.AppendSeriesKey(nil, []byte("m1"), nil) + idx.Insert(key1, 2, 20) + idx.Delete(1) + + if !idx.IsDeleted(1) { + t.Fatal("expected deletion") + } else if idx.IsDeleted(2) { + t.Fatal("expected series to exist") + } +} + +func TestSeriesIndex_FindIDBySeriesKey(t *testing.T) { + dir, cleanup := MustTempDir() + defer cleanup() + + idx := tsdb.NewSeriesIndex(filepath.Join(dir, "index")) + if err := idx.Open(); err != nil { + t.Fatal(err) + } + defer idx.Close() + + key0 := tsdb.AppendSeriesKey(nil, []byte("m0"), nil) + idx.Insert(key0, 1, 10) + key1 := tsdb.AppendSeriesKey(nil, []byte("m1"), nil) + idx.Insert(key1, 2, 20) + badKey := tsdb.AppendSeriesKey(nil, []byte("not_found"), nil) + + if id := idx.FindIDBySeriesKey(nil, key0); id != 1 { + t.Fatalf("unexpected id(0): %d", id) + } else if id := idx.FindIDBySeriesKey(nil, key1); id != 2 { + t.Fatalf("unexpected id(1): %d", id) + } else if id := idx.FindIDBySeriesKey(nil, badKey); id != 0 { + t.Fatalf("unexpected id(2): %d", id) + } + + if id := idx.FindIDByNameTags(nil, []byte("m0"), nil, nil); id != 1 { + t.Fatalf("unexpected id(0): %d", id) + } else if id := idx.FindIDByNameTags(nil, []byte("m1"), nil, nil); id != 2 { + t.Fatalf("unexpected id(1): %d", id) + } else if id := idx.FindIDByNameTags(nil, []byte("not_found"), nil, nil); id != 0 { + t.Fatalf("unexpected id(2): %d", id) + } +} + +func TestSeriesIndex_FindOffsetByID(t *testing.T) { + dir, cleanup := MustTempDir() + defer cleanup() + + idx := tsdb.NewSeriesIndex(filepath.Join(dir, "index")) + if err := idx.Open(); err != nil { + t.Fatal(err) + } + defer idx.Close() + + idx.Insert(tsdb.AppendSeriesKey(nil, []byte("m0"), nil), 1, 10) + idx.Insert(tsdb.AppendSeriesKey(nil, []byte("m1"), nil), 2, 20) + + if offset := idx.FindOffsetByID(1); offset != 10 { + t.Fatalf("unexpected offset(0): %d", offset) + } else if offset := idx.FindOffsetByID(2); offset != 20 { + t.Fatalf("unexpected offset(1): %d", offset) + } else if offset := idx.FindOffsetByID(3); offset != 0 { + t.Fatalf("unexpected offset(2): %d", offset) + } +} + +func TestSeriesIndexHeader(t *testing.T) { + // Verify header initializes correctly. + hdr := tsdb.NewSeriesIndexHeader() + if hdr.Version != tsdb.SeriesIndexVersion { + t.Fatalf("unexpected version: %d", hdr.Version) + } + hdr.MaxSeriesID = 10 + hdr.MaxOffset = 20 + hdr.Count = 30 + hdr.Capacity = 40 + hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = 50, 60 + hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = 70, 80 + + // Marshal/unmarshal. + var buf bytes.Buffer + if _, err := hdr.WriteTo(&buf); err != nil { + t.Fatal(err) + } else if other, err := tsdb.ReadSeriesIndexHeader(buf.Bytes()); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(hdr, other); diff != "" { + t.Fatal(diff) + } +} diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go new file mode 100644 index 0000000000..a9600a26d1 --- /dev/null +++ b/tsdb/series_segment.go @@ -0,0 +1,396 @@ +package tsdb + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "regexp" + "strconv" + + "github.com/influxdata/influxdb/pkg/mmap" +) + +const ( + SeriesSegmentVersion = 1 + SeriesSegmentMagic = "SSEG" + + SeriesSegmentHeaderSize = 4 + 1 // magic + version +) + +// Series entry constants. +const ( + SeriesEntryFlagSize = 1 + SeriesEntryHeaderSize = 1 + 8 // flag + id + + SeriesEntryInsertFlag = 0x01 + SeriesEntryTombstoneFlag = 0x02 +) + +var ( + ErrInvalidSeriesSegment = errors.New("invalid series segment") + ErrInvalidSeriesSegmentVersion = errors.New("invalid series segment version") + ErrSeriesSegmentNotWritable = errors.New("series segment not writable") +) + +// SeriesSegment represents a log of series entries. +type SeriesSegment struct { + id uint16 + path string + + data []byte // mmap file + file *os.File // write file handle + w *bufio.Writer // bufferred file handle + size uint32 // current file size +} + +// NewSeriesSegment returns a new instance of SeriesSegment. +func NewSeriesSegment(id uint16, path string) *SeriesSegment { + return &SeriesSegment{ + id: id, + path: path, + } +} + +// CreateSeriesSegment generates an empty segment at path. +func CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) { + // Generate segment in temp location. + f, err := ioutil.TempFile("", "series-segment-") + if err != nil { + return nil, err + } + defer f.Close() + + // Write header to file and close. + hdr := NewSeriesSegmentHeader() + if _, err := hdr.WriteTo(f); err != nil { + return nil, err + } else if err := f.Truncate(int64(SeriesSegmentSize(id))); err != nil { + return nil, err + } else if err := f.Close(); err != nil { + return nil, err + } + + // Swap with target path. + if err := os.Rename(f.Name(), path); err != nil { + return nil, err + } + + // Open segment at new location. + segment := NewSeriesSegment(id, path) + if err := segment.Open(); err != nil { + return nil, err + } + return segment, nil +} + +// Open memory maps the data file at the file's path. +func (s *SeriesSegment) Open() error { + if err := func() (err error) { + // Memory map file data. + if s.data, err = mmap.Map(s.path, int64(SeriesSegmentSize(s.id))); err != nil { + return err + } + + // Read header. + hdr, err := ReadSeriesSegmentHeader(s.data) + if err != nil { + return err + } else if hdr.Version != SeriesSegmentVersion { + return ErrInvalidSeriesSegmentVersion + } + + return nil + }(); err != nil { + s.Close() + return err + } + + return nil +} + +// InitForWrite initializes a write handle for the segment. +// This is only used for the last segment in the series file. +func (s *SeriesSegment) InitForWrite() (err error) { + // Only calculcate segment data size if writing. + for s.size = uint32(SeriesSegmentHeaderSize); s.size < uint32(len(s.data)); { + flag, _, _, sz := ReadSeriesEntry(s.data[s.size:]) + if flag == 0 { + break + } + s.size += uint32(sz) + } + + // Open file handler for writing & seek to end of data. + if s.file, err = os.OpenFile(s.path, os.O_WRONLY|os.O_CREATE, 0666); err != nil { + return err + } else if _, err := s.file.Seek(int64(s.size), os.SEEK_SET); err != nil { + return err + } + s.w = bufio.NewWriter(s.file) + + return nil +} + +// Close unmaps the segment. +func (s *SeriesSegment) Close() (err error) { + if e := s.CloseForWrite(); e != nil && err == nil { + err = e + } + + if s.data != nil { + if e := mmap.Unmap(s.data); e != nil && err == nil { + err = e + } + s.data = nil + } + + return err +} + +func (s *SeriesSegment) CloseForWrite() (err error) { + if s.w != nil { + if e := s.w.Flush(); e != nil && err == nil { + err = e + } + s.w = nil + } + + if s.file != nil { + if e := s.file.Close(); e != nil && err == nil { + err = e + } + s.file = nil + } + return err +} + +// ID returns the id the segment was initialized with. +func (s *SeriesSegment) ID() uint16 { return s.id } + +// Size returns the size of the data in the segment. +// This is only populated once InitForWrite() is called. +func (s *SeriesSegment) Size() int64 { return int64(s.size) } + +// Slice returns a byte slice starting at pos. +func (s *SeriesSegment) Slice(pos uint32) []byte { return s.data[pos:] } + +// WriteLogEntry writes entry data into the segment. +// Returns the offset of the beginning of the entry. +func (s *SeriesSegment) WriteLogEntry(data []byte) (offset int64, err error) { + if !s.CanWrite(data) { + return 0, ErrSeriesSegmentNotWritable + } + + offset = JoinSeriesOffset(s.id, s.size) + if _, err := s.w.Write(data); err != nil { + return 0, err + } + s.size += uint32(len(data)) + + return offset, nil +} + +// CanWrite returns true if segment has space to write entry data. +func (s *SeriesSegment) CanWrite(data []byte) bool { + return s.w != nil && s.size+uint32(len(data)) <= SeriesSegmentSize(s.id) +} + +// Flush flushes the buffer to disk. +func (s *SeriesSegment) Flush() error { + if s.w == nil { + return nil + } + return s.w.Flush() +} + +// AppendSeriesIDs appends all the segments ids to a slice. Returns the new slice. +func (s *SeriesSegment) AppendSeriesIDs(a []uint64) []uint64 { + s.ForEachEntry(func(flag uint8, id uint64, _ int64, _ []byte) error { + if flag == SeriesEntryInsertFlag { + a = append(a, id) + } + return nil + }) + return a +} + +// MaxSeriesID returns the highest series id in the segment. +func (s *SeriesSegment) MaxSeriesID() uint64 { + var max uint64 + s.ForEachEntry(func(flag uint8, id uint64, _ int64, _ []byte) error { + if flag == SeriesEntryInsertFlag && id > max { + max = id + } + return nil + }) + return max +} + +// ForEachEntry executes fn for every entry in the segment. +func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id uint64, offset int64, key []byte) error) error { + for pos := uint32(SeriesSegmentHeaderSize); pos < s.size; { + flag, id, key, sz := ReadSeriesEntry(s.data[pos:]) + if flag == 0 { + break + } + + offset := JoinSeriesOffset(s.id, pos) + if err := fn(flag, id, offset, key); err != nil { + return err + } + pos += uint32(sz) + } + return nil +} + +// Clone returns a copy of the segment. Excludes the write handler, if set. +func (s *SeriesSegment) Clone() *SeriesSegment { + return &SeriesSegment{ + id: s.id, + path: s.path, + data: s.data, + size: s.size, + } +} + +// CloneSeriesSegments returns a copy of a slice of segments. +func CloneSeriesSegments(a []*SeriesSegment) []*SeriesSegment { + other := make([]*SeriesSegment, len(a)) + for i := range a { + other[i] = a[i].Clone() + } + return other +} + +// FindSegment returns a segment by id. +func FindSegment(a []*SeriesSegment, id uint16) *SeriesSegment { + for _, segment := range a { + if segment.id == id { + return segment + } + } + return nil +} + +// ReadSeriesKeyFromSegments returns a series key from an offset within a set of segments. +func ReadSeriesKeyFromSegments(a []*SeriesSegment, offset int64) []byte { + segmentID, pos := SplitSeriesOffset(offset) + segment := FindSegment(a, segmentID) + if segment == nil { + return nil + } + buf := segment.Slice(pos) + key, _ := ReadSeriesKey(buf) + return key +} + +// JoinSeriesOffset returns an offset that combines the 2-byte segmentID and 4-byte pos. +func JoinSeriesOffset(segmentID uint16, pos uint32) int64 { + return (int64(segmentID) << 32) | int64(pos) +} + +// SplitSeriesOffset splits a offset into its 2-byte segmentID and 4-byte pos parts. +func SplitSeriesOffset(offset int64) (segmentID uint16, pos uint32) { + return uint16((offset >> 32) & 0xFFFF), uint32(offset & 0xFFFFFFFF) +} + +// IsValidSeriesSegmentFilename returns true if filename is a 4-character lowercase hexidecimal number. +func IsValidSeriesSegmentFilename(filename string) bool { + return seriesSegmentFilenameRegex.MatchString(filename) +} + +// ParseSeriesSegmentFilename returns the id represented by the hexidecimal filename. +func ParseSeriesSegmentFilename(filename string) uint16 { + i, _ := strconv.ParseUint(filename, 16, 32) + return uint16(i) +} + +var seriesSegmentFilenameRegex = regexp.MustCompile(`^[0-9a-f]{4}$`) + +// SeriesSegmentSize returns the maximum size of the segment. +// The size goes up by powers of 2 starting from 4MB and reaching 256MB. +func SeriesSegmentSize(id uint16) uint32 { + const min = 22 // 4MB + const max = 28 // 256MB + + shift := id + min + if shift >= max { + shift = max + } + return 1 << shift +} + +// SeriesSegmentHeader represents the header of a series segment. +type SeriesSegmentHeader struct { + Version uint8 +} + +// NewSeriesSegmentHeader returns a new instance of SeriesSegmentHeader. +func NewSeriesSegmentHeader() SeriesSegmentHeader { + return SeriesSegmentHeader{Version: SeriesSegmentVersion} +} + +// ReadSeriesSegmentHeader returns the header from data. +func ReadSeriesSegmentHeader(data []byte) (hdr SeriesSegmentHeader, err error) { + r := bytes.NewReader(data) + + // Read magic number. + magic := make([]byte, len(SeriesSegmentMagic)) + if _, err := io.ReadFull(r, magic); err != nil { + return hdr, err + } else if !bytes.Equal([]byte(SeriesSegmentMagic), magic) { + return hdr, ErrInvalidSeriesSegment + } + + // Read version. + if err := binary.Read(r, binary.BigEndian, &hdr.Version); err != nil { + return hdr, err + } + + return hdr, nil +} + +// WriteTo writes the header to w. +func (hdr *SeriesSegmentHeader) WriteTo(w io.Writer) (n int64, err error) { + var buf bytes.Buffer + buf.WriteString(SeriesSegmentMagic) + binary.Write(&buf, binary.BigEndian, hdr.Version) + return buf.WriteTo(w) +} + +func ReadSeriesEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) { + // If flag byte is zero then no more entries exist. + flag, data = uint8(data[0]), data[1:] + if flag == 0 { + return 0, 0, nil, 1 + } + + id, data = binary.BigEndian.Uint64(data), data[8:] + switch flag { + case SeriesEntryInsertFlag: + key, _ = ReadSeriesKey(data) + } + return flag, id, key, int64(SeriesEntryHeaderSize + len(key)) +} + +func AppendSeriesEntry(dst []byte, flag uint8, id uint64, key []byte) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, id) + + dst = append(dst, flag) + dst = append(dst, buf...) + + switch flag { + case SeriesEntryInsertFlag: + dst = append(dst, key...) + case SeriesEntryTombstoneFlag: + default: + panic(fmt.Sprintf("unreachable: invalid flag: %d", flag)) + } + return dst +} diff --git a/tsdb/series_segment_test.go b/tsdb/series_segment_test.go new file mode 100644 index 0000000000..b938f4d0aa --- /dev/null +++ b/tsdb/series_segment_test.go @@ -0,0 +1,208 @@ +package tsdb_test + +import ( + "bytes" + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb/tsdb" +) + +func TestSeriesSegment(t *testing.T) { + dir, cleanup := MustTempDir() + defer cleanup() + + // Create a new initial segment (4mb) and initialize for writing. + segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000")) + if err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } + defer segment.Close() + + // Write initial entry. + key1 := tsdb.AppendSeriesKey(nil, []byte("m0"), nil) + offset, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, key1)) + if err != nil { + t.Fatal(err) + } else if offset != tsdb.SeriesSegmentHeaderSize { + t.Fatalf("unexpected offset: %d", offset) + } + + // Write a large entry (3mb). + key2 := tsdb.AppendSeriesKey(nil, bytes.Repeat([]byte("m"), 3*(1<<20)), nil) + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, key2)); err != nil { + t.Fatal(err) + } else if offset != tsdb.SeriesSegmentHeaderSize { + t.Fatalf("unexpected offset: %d", offset) + } + + // Write another entry that is too large for the remaining segment space. + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 3, tsdb.AppendSeriesKey(nil, bytes.Repeat([]byte("n"), 3*(1<<20)), nil))); err != tsdb.ErrSeriesSegmentNotWritable { + t.Fatalf("unexpected error: %s", err) + } + + // Verify two entries exist. + var n int + segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { + switch n { + case 0: + if flag != tsdb.SeriesEntryInsertFlag || id != 1 || !bytes.Equal(key1, key) { + t.Fatalf("unexpected entry(0): %d, %d, %q", flag, id, key) + } + case 1: + if flag != tsdb.SeriesEntryInsertFlag || id != 2 || !bytes.Equal(key2, key) { + t.Fatalf("unexpected entry(1): %d, %d, %q", flag, id, key) + } + default: + t.Fatalf("too many entries") + } + n++ + return nil + }) + if n != 2 { + t.Fatalf("unexpected entry count: %d", n) + } +} + +func TestSeriesSegment_AppendSeriesIDs(t *testing.T) { + dir, cleanup := MustTempDir() + defer cleanup() + + segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000")) + if err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } + defer segment.Close() + + // Write entries. + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 10, tsdb.AppendSeriesKey(nil, []byte("m0"), nil))); err != nil { + t.Fatal(err) + } else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 11, tsdb.AppendSeriesKey(nil, []byte("m1"), nil))); err != nil { + t.Fatal(err) + } else if err := segment.Flush(); err != nil { + t.Fatal(err) + } + + // Collect series ids with existing set. + a := segment.AppendSeriesIDs([]uint64{1, 2}) + if diff := cmp.Diff(a, []uint64{1, 2, 10, 11}); diff != "" { + t.Fatal(diff) + } +} + +func TestSeriesSegment_MaxSeriesID(t *testing.T) { + dir, cleanup := MustTempDir() + defer cleanup() + + segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000")) + if err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } + defer segment.Close() + + // Write entries. + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 10, tsdb.AppendSeriesKey(nil, []byte("m0"), nil))); err != nil { + t.Fatal(err) + } else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 11, tsdb.AppendSeriesKey(nil, []byte("m1"), nil))); err != nil { + t.Fatal(err) + } else if err := segment.Flush(); err != nil { + t.Fatal(err) + } + + // Verify maximum. + if max := segment.MaxSeriesID(); max != 11 { + t.Fatalf("unexpected max: %d", max) + } +} + +func TestSeriesSegmentHeader(t *testing.T) { + // Verify header initializes correctly. + hdr := tsdb.NewSeriesSegmentHeader() + if hdr.Version != tsdb.SeriesSegmentVersion { + t.Fatalf("unexpected version: %d", hdr.Version) + } + + // Marshal/unmarshal. + var buf bytes.Buffer + if _, err := hdr.WriteTo(&buf); err != nil { + t.Fatal(err) + } else if other, err := tsdb.ReadSeriesSegmentHeader(buf.Bytes()); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(hdr, other); diff != "" { + t.Fatal(diff) + } +} + +func TestJoinSeriesOffset(t *testing.T) { + if offset := tsdb.JoinSeriesOffset(0x1234, 0x56789ABC); offset != 0x123456789ABC { + t.Fatalf("unexpected offset: %x", offset) + } +} + +func TestSplitSeriesOffset(t *testing.T) { + if segmentID, pos := tsdb.SplitSeriesOffset(0x123456789ABC); segmentID != 0x1234 || pos != 0x56789ABC { + t.Fatalf("unexpected segmentID/pos: %x/%x", segmentID, pos) + } +} + +func TestIsValidSeriesSegmentFilename(t *testing.T) { + if tsdb.IsValidSeriesSegmentFilename("") { + t.Fatal("expected invalid") + } else if tsdb.IsValidSeriesSegmentFilename("0ab") { + t.Fatal("expected invalid") + } else if !tsdb.IsValidSeriesSegmentFilename("192a") { + t.Fatal("expected valid") + } +} + +func TestParseSeriesSegmentFilename(t *testing.T) { + if v := tsdb.ParseSeriesSegmentFilename("a90b"); v != 0xA90B { + t.Fatalf("unexpected value: %x", v) + } else if v := tsdb.ParseSeriesSegmentFilename("0001"); v != 1 { + t.Fatalf("unexpected value: %x", v) + } else if v := tsdb.ParseSeriesSegmentFilename("invalid"); v != 0 { + t.Fatalf("unexpected value: %x", v) + } +} + +func TestSeriesSegmentSize(t *testing.T) { + const mb = (1 << 20) + if sz := tsdb.SeriesSegmentSize(0); sz != 4*mb { + t.Fatalf("unexpected size: %d", sz) + } else if sz := tsdb.SeriesSegmentSize(1); sz != 8*mb { + t.Fatalf("unexpected size: %d", sz) + } else if sz := tsdb.SeriesSegmentSize(2); sz != 16*mb { + t.Fatalf("unexpected size: %d", sz) + } else if sz := tsdb.SeriesSegmentSize(3); sz != 32*mb { + t.Fatalf("unexpected size: %d", sz) + } else if sz := tsdb.SeriesSegmentSize(4); sz != 64*mb { + t.Fatalf("unexpected size: %d", sz) + } else if sz := tsdb.SeriesSegmentSize(5); sz != 128*mb { + t.Fatalf("unexpected size: %d", sz) + } else if sz := tsdb.SeriesSegmentSize(6); sz != 256*mb { + t.Fatalf("unexpected size: %d", sz) + } else if sz := tsdb.SeriesSegmentSize(7); sz != 256*mb { + t.Fatalf("unexpected size: %d", sz) + } +} + +func TestSeriesEntry(t *testing.T) { + seriesKey := tsdb.AppendSeriesKey(nil, []byte("m0"), nil) + buf := tsdb.AppendSeriesEntry(nil, 1, 2, seriesKey) + if flag, id, key, sz := tsdb.ReadSeriesEntry(buf); flag != 1 { + t.Fatalf("unexpected flag: %d", flag) + } else if id != 2 { + t.Fatalf("unexpected id: %d", id) + } else if !bytes.Equal(seriesKey, key) { + t.Fatalf("unexpected key: %q", key) + } else if sz != int64(tsdb.SeriesEntryHeaderSize+len(key)) { + t.Fatalf("unexpected size: %d", sz) + } +} diff --git a/tsdb/shard_internal_test.go b/tsdb/shard_internal_test.go index 085f68cd6d..1da93603e4 100644 --- a/tsdb/shard_internal_test.go +++ b/tsdb/shard_internal_test.go @@ -7,7 +7,6 @@ import ( "path" "path/filepath" "regexp" - "runtime" "sort" "strings" "testing" @@ -222,12 +221,6 @@ func NewTempShard(index string) *TempShard { // Create series file. sfile := NewSeriesFile(filepath.Join(dir, "db0", SeriesFileName)) - // If we're running on a 32-bit system then reduce the SeriesFile size, so we - // can address is in memory. - if runtime.GOARCH == "386" { - sfile.MaxSize = 1 << 27 // 128MB - } - if err := sfile.Open(); err != nil { panic(err) } diff --git a/tsdb/store.go b/tsdb/store.go index 59fd169187..fad24f5e77 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -354,12 +354,6 @@ func (s *Store) openSeriesFile(database string) (*SeriesFile, error) { } sfile := NewSeriesFile(filepath.Join(s.path, database, SeriesFileName)) - // Set a custom mmap size if one has been specified, otherwise the default - // will be used. - if s.SeriesFileMaxSize > 0 { - sfile.MaxSize = s.SeriesFileMaxSize - } - if err := sfile.Open(); err != nil { return nil, err }