Add tsi1.Log.

pull/7913/head
Ben Johnson 2016-10-21 09:31:40 -06:00
parent 2a81351992
commit 992e651588
No known key found for this signature in database
GPG Key ID: 81741CD251883081
7 changed files with 548 additions and 24 deletions

View File

@ -191,5 +191,37 @@ Measurements
WAL
WAL
Entry
Flag <uint8>
len(Name) <varint>
Name <byte...>
len(Tags) <varint>
len(Key0) <varint>
Key0 <byte...>
len(Value0) <varint>
Value0 <byte...>
...
Checksum <uint32>
...
*/
package tsi1

View File

@ -271,11 +271,13 @@ func (i *Index) DropSeries(keys []string) error {
}
func (i *Index) SeriesN() (n uint64, err error) {
itr := i.file.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
n += uint64(e.Series.N)
}
return n, nil
panic("TODO: Use sketches")
// itr := i.file.MeasurementIterator()
// for e := itr.Next(); e != nil; e = itr.Next() {
// n += uint64(e.SeriesN)
// }
// return n, nil
}
func (i *Index) TagsForSeries(key string) (models.Tags, error) {

View File

@ -111,7 +111,7 @@ func (i *IndexFile) TagValueElem(name, key, value []byte) (TagValueElem, error)
}
// tagSetBlock returns a tag set block for a measurement.
func (i *IndexFile) tagSetBlock(e *MeasurementElem) (TagSet, error) {
func (i *IndexFile) tagSetBlock(e *MeasurementBlockElem) (TagSet, error) {
// Slice tag set data.
buf := i.data[e.TagSet.Offset:]
buf = buf[:e.TagSet.Size]

View File

@ -1,4 +1,358 @@
package tsi1
type Log struct{}
type LogEntry struct{}
import (
"bytes"
"encoding/binary"
"hash/crc32"
"os"
"sort"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/mmap"
)
// Log entry flag constants.
const (
LogEntrySeriesTombstoneFlag = 0x01
LogEntryMeasurementTombstoneFlag = 0x02
LogEntryTagKeyTombstoneFlag = 0x04
LogEntryTagValueTombstoneFlag = 0x08
)
// LogFile represents an on-disk write-ahead log file.
type LogFile struct {
data []byte // mmap
file *os.File // writer
buf []byte // marshaling buffer
entries []LogEntry // parsed entries
// In-memory index.
mms map[string]logMeasurement
// Filepath to the log file.
Path string
}
// NewLogFile returns a new instance of LogFile.
func NewLogFile() *LogFile {
return &LogFile{
mms: make(map[string]logMeasurement),
}
}
// Open reads the log from a file and validates all the checksums.
func (f *LogFile) Open() error {
if err := f.open(); err != nil {
f.Close()
return err
}
return nil
}
func (f *LogFile) open() error {
// Open file for appending.
file, err := os.OpenFile(f.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return err
}
f.file = file
// Open a read-only memory map of the existing data.
data, err := mmap.Map(f.Path)
if err != nil {
return err
}
f.data = data
// Read log entries from mmap.
f.entries = nil
for buf := f.data; len(buf) > 0; {
// Read next entry.
var e LogEntry
if err := e.UnmarshalBinary(buf); err != nil {
return err
}
f.entries = append(f.entries, e)
// Move buffer forward.
buf = buf[e.Size:]
}
return nil
}
// Close shuts down the file handle and mmap.
func (f *LogFile) Close() error {
if f.file != nil {
f.file.Close()
}
if f.data != nil {
mmap.Unmap(f.data)
}
f.entries = nil
return nil
}
// DeleteMeasurement adds a tombstone for a measurement to the log file.
func (f *LogFile) DeleteMeasurement(name []byte) error {
// Append log entry.
if err := f.append(LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}); err != nil {
return err
}
// Delete measurement from index.
mm := f.measurement(name)
mm.deleted = true
f.mms[string(name)] = mm
return nil
}
// DeleteTagKey adds a tombstone for a tag key to the log file.
func (f *LogFile) DeleteTagKey(name, key []byte) error {
return f.append(LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Tags: models.Tags{{Key: key}}})
}
// DeleteTagValue adds a tombstone for a tag value to the log file.
func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
return f.append(LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Tags: models.Tags{{Key: key, Value: value}}})
}
// AddSeries adds a series to the log file.
func (f *LogFile) AddSeries(name []byte, tags models.Tags) error {
return f.insertSeries(LogEntry{Name: name, Tags: tags})
}
// DeleteSeries adds a tombstone for a series to the log file.
func (f *LogFile) DeleteSeries(name []byte, tags models.Tags) error {
return f.insertSeries(LogEntry{Flag: LogEntrySeriesTombstoneFlag, Name: name, Tags: tags})
}
// insertSeries inserts a series entry.
func (f *LogFile) insertSeries(e LogEntry) error {
// Append log entry.
if err := f.append(e); err != nil {
return err
}
// Check if series is deleted.
deleted := (e.Flag & LogEntrySeriesTombstoneFlag) != 0
// Fetch measurement.
mm := f.measurement(e.Name)
if !deleted {
mm.deleted = false
}
// Save tags.
for _, t := range e.Tags {
// Fetch key.
ts, ok := mm.tagSet[string(t.Key)]
if !ok {
ts = logTagSet{name: t.Key, tagValues: make(map[string]logTagValue)}
}
if !deleted {
ts.deleted = false
}
// Fetch value.
tv, ok := ts.tagValues[string(t.Value)]
if !ok {
tv.name = t.Value
}
if !deleted {
tv.deleted = false
}
tv.insertEntry(e)
ts.tagValues[string(t.Value)] = tv
// Save key.
mm.tagSet[string(t.Key)] = ts
}
// Save measurement.
f.mms[string(e.Name)] = mm
return nil
}
// append adds a generic entry to the end of the file.
func (f *LogFile) append(e LogEntry) error {
// Marshal entry to the local buffer.
f.buf = appendLogEntry(f.buf[0:], &e)
// Append checksum.
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], e.Checksum)
f.buf = append(f.buf, buf[:]...)
// Save the size of the record.
e.Size = len(f.buf)
// Write record to file.
if n, err := f.file.Write(f.buf); err != nil {
// Move position backwards over partial entry.
// Log should be reopened if seeking cannot be completed.
if _, err := f.file.Seek(int64(-n), os.SEEK_CUR); err != nil {
f.Close()
}
return err
}
// Save entry to in-memory list.
f.entries = append(f.entries, e)
return nil
}
// measurement returns a measurement by name.
func (f *LogFile) measurement(name []byte) logMeasurement {
mm, ok := f.mms[string(name)]
if !ok {
mm = logMeasurement{name: name, tagSet: make(map[string]logTagSet)}
}
return mm
}
// MeasurementIterator returns an iterator over all the measurements in the file.
func (f *LogFile) MeasurementIterator() MeasurementIterator {
var itr measurementIterator
for _, mm := range f.mms {
itr.mms = append(itr.mms, MeasurementElem{
Name: mm.name,
Deleted: mm.deleted,
})
}
sort.Sort(MeasurementElems(itr.mms))
return &itr
}
// LogEntry represents a single log entry in the write-ahead log.
type LogEntry struct {
Flag byte // flag
Name []byte // measurement name
Tags models.Tags // tagset
Checksum uint32 // checksum of flag/name/tags.
Size int // total size of record, in bytes.
}
// UnmarshalBinary unmarshals data into e.
func (e *LogEntry) UnmarshalBinary(data []byte) error {
start := len(data)
// Parse flag data.
e.Flag, data = data[0], data[1:]
// Parse name.
sz, n := binary.Uvarint(data)
e.Name, data = data[n:n+int(sz)], data[n+int(sz):]
// Parse tag count.
tagN, n := binary.Uvarint(data)
data = data[n:]
// Parse tags.
tags := make(models.Tags, tagN)
for i := range tags {
tag := &tags[i]
// Parse key.
sz, n := binary.Uvarint(data)
tag.Key, data = data[n:n+int(sz)], data[n+int(sz):]
// Parse value.
sz, n = binary.Uvarint(data)
tag.Value, data = data[n:n+int(sz)], data[n+int(sz):]
}
// Parse checksum.
e.Checksum, data = binary.BigEndian.Uint32(data[:4]), data[4:]
// Save length of elem.
e.Size = start - len(data)
return nil
}
// appendLogEntry appends to dst and returns the new buffer.
// This updates the checksum on the entry.
func appendLogEntry(dst []byte, e *LogEntry) []byte {
var buf [binary.MaxVarintLen64]byte
start := len(dst)
// Append flag.
dst = append(dst, e.Flag)
// Append name.
n := binary.PutUvarint(buf[:], uint64(len(e.Name)))
dst = append(dst, buf[:n]...)
dst = append(dst, e.Name...)
// Append tag count.
n = binary.PutUvarint(buf[:], uint64(len(e.Tags)))
dst = append(dst, buf[:n]...)
// Append key/value pairs.
for i := range e.Tags {
t := &e.Tags[i]
// Append key.
n := binary.PutUvarint(buf[:], uint64(len(t.Key)))
dst = append(dst, buf[:n]...)
dst = append(dst, t.Key...)
// Append value.
n = binary.PutUvarint(buf[:], uint64(len(t.Value)))
dst = append(dst, buf[:n]...)
dst = append(dst, t.Value...)
}
// Calculate checksum.
e.Checksum = crc32.ChecksumIEEE(dst[start:])
// Append checksum.
binary.BigEndian.PutUint32(buf[:4], e.Checksum)
dst = append(dst, buf[:4]...)
return dst
}
type logMeasurement struct {
name []byte
tagSet map[string]logTagSet
deleted bool
}
type logTagSet struct {
name []byte
tagValues map[string]logTagValue
deleted bool
}
type logTagValue struct {
name []byte
deleted bool
entries []LogEntry
}
// insertEntry inserts an entry into the tag value in sorted order.
// If another entry matches the name/tags then it is overrwritten.
func (tv *logTagValue) insertEntry(e LogEntry) {
i := sort.Search(len(tv.entries), func(i int) bool {
if cmp := bytes.Compare(tv.entries[i].Name, e.Name); cmp != 0 {
return cmp != -1
}
return models.CompareTags(tv.entries[i].Tags, e.Tags) != -1
})
// Update entry if it already exists.
if i < len(tv.entries) && bytes.Equal(tv.entries[i].Name, e.Name) && tv.entries[i].Tags.Equal(e.Tags) {
tv.entries[i] = e
return
}
// Otherwise insert new entry.
tv.entries = append(tv.entries, LogEntry{})
copy(tv.entries[i+1:], tv.entries[i:])
tv.entries[i] = e
}

View File

@ -0,0 +1,100 @@
package tsi1_test
import (
"io/ioutil"
"os"
"reflect"
"testing"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/engine/tsi1"
)
// Ensure log file can append series.
func TestLogFile_AddSeries(t *testing.T) {
f := MustOpenLogFile()
defer f.Close()
// Add test data.
if err := f.AddSeries([]byte("mem"), models.Tags{{Key: []byte("host"), Value: []byte("serverA")}}); err != nil {
t.Fatal(err)
} else if err := f.AddSeries([]byte("cpu"), models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}); err != nil {
t.Fatal(err)
} else if err := f.AddSeries([]byte("cpu"), models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}); err != nil {
t.Fatal(err)
}
// Verify data.
itr := f.MeasurementIterator()
if e := itr.Next(); e == nil || string(e.Name) != "cpu" {
t.Fatalf("unexpected measurement: %#v", e)
} else if e := itr.Next(); e == nil || string(e.Name) != "mem" {
t.Fatalf("unexpected measurement: %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected eof, got: %#v", e)
}
}
// Ensure log file can delete an existing measurement.
func TestLogFile_DeleteMeasurement(t *testing.T) {
f := MustOpenLogFile()
defer f.Close()
// Add test data.
if err := f.AddSeries([]byte("mem"), models.Tags{{Key: []byte("host"), Value: []byte("serverA")}}); err != nil {
t.Fatal(err)
} else if err := f.AddSeries([]byte("cpu"), models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}); err != nil {
t.Fatal(err)
} else if err := f.AddSeries([]byte("cpu"), models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}); err != nil {
t.Fatal(err)
}
// Remove measurement.
if err := f.DeleteMeasurement([]byte("cpu")); err != nil {
t.Fatal(err)
}
// Verify data.
itr := f.MeasurementIterator()
if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("cpu"), Deleted: true}) {
t.Fatalf("unexpected measurement: %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("mem")}) {
t.Fatalf("unexpected measurement: %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected eof, got: %#v", e)
}
}
// LogFile is a test wrapper for tsi1.LogFile.
type LogFile struct {
*tsi1.LogFile
}
// NewLogFile returns a new instance of LogFile with a temporary file path.
func NewLogFile() *LogFile {
file, err := ioutil.TempFile("", "tsi1-log-file-")
if err != nil {
panic(err)
}
file.Close()
f := &LogFile{LogFile: tsi1.NewLogFile()}
f.Path = file.Name()
return f
}
// MustOpenLogFile returns a new, open instance of LogFile. Panic on error.
func MustOpenLogFile() *LogFile {
f := NewLogFile()
if err := f.Open(); err != nil {
panic(err)
}
return f
}
// Close closes the log file and removes it from disk.
func (f *LogFile) Close() error {
defer os.Remove(f.Path)
return f.LogFile.Close()
}

View File

@ -53,7 +53,7 @@ type MeasurementBlock struct {
func (blk *MeasurementBlock) Version() int { return blk.version }
// Elem returns an element for a measurement.
func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementElem, ok bool) {
func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) {
n := binary.BigEndian.Uint32(blk.hashData[:MeasurementNSize])
hash := hashKey(name)
pos := int(hash % n)
@ -68,7 +68,7 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementElem, ok bool) {
// Evaluate name if offset is not empty.
if offset > 0 {
// Parse into element.
var e MeasurementElem
var e MeasurementBlockElem
e.UnmarshalBinary(blk.data[offset:])
// Return if name match.
@ -78,7 +78,7 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementElem, ok bool) {
// Check if we've exceeded the probe distance.
if d > dist(hashKey(e.Name), pos, int(n)) {
return MeasurementElem{}, false
return MeasurementBlockElem{}, false
}
}
@ -115,27 +115,34 @@ func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error {
// Iterator returns an iterator over all measurements.
func (blk *MeasurementBlock) Iterator() MeasurementIterator {
return &measurementIterator{data: blk.data[MeasurementFillSize:]}
return &blockMeasurementIterator{data: blk.data[MeasurementFillSize:]}
}
// measurementIterator iterates over a list measurements in a block.
type measurementIterator struct {
// blockMeasurementIterator iterates over a list measurements in a block.
type blockMeasurementIterator struct {
elem MeasurementElem
data []byte
}
// Next returns the next measurement. Returns false when iterator is complete.
func (itr *measurementIterator) Next() *MeasurementElem {
// Next returns the next measurement. Returns nil when iterator is complete.
func (itr *blockMeasurementIterator) Next() *MeasurementElem {
// Return nil when we run out of data.
if len(itr.data) == 0 {
return nil
}
// Unmarshal the element at the current position.
itr.elem.UnmarshalBinary(itr.data)
var elem MeasurementBlockElem
elem.UnmarshalBinary(itr.data)
// Copy to a generic measurement element.
itr.elem = MeasurementElem{
Name: elem.Name,
Deleted: elem.Deleted(),
}
// Move the data forward past the record.
itr.data = itr.data[itr.elem.Size:]
itr.data = itr.data[elem.Size:]
return &itr.elem
}
@ -187,8 +194,8 @@ type MeasurementBlockTrailer struct {
}
}
// MeasurementElem represents an internal measurement element.
type MeasurementElem struct {
// MeasurementBlockElem represents an internal measurement element.
type MeasurementBlockElem struct {
Flag byte // flag
Name []byte // measurement name
@ -207,17 +214,17 @@ type MeasurementElem struct {
}
// Deleted returns true if the tombstone flag is set.
func (e *MeasurementElem) Deleted() bool {
func (e *MeasurementBlockElem) Deleted() bool {
return (e.Flag & MeasurementTombstoneFlag) != 0
}
// SeriesID returns series ID at an index.
func (e *MeasurementElem) SeriesID(i int) uint32 {
func (e *MeasurementBlockElem) SeriesID(i int) uint32 {
return binary.BigEndian.Uint32(e.Series.Data[i*SeriesIDSize:])
}
// SeriesIDs returns a list of decoded series ids.
func (e *MeasurementElem) SeriesIDs() []uint32 {
func (e *MeasurementBlockElem) SeriesIDs() []uint32 {
a := make([]uint32, e.Series.N)
for i := 0; i < int(e.Series.N); i++ {
a[i] = e.SeriesID(i)
@ -226,7 +233,7 @@ func (e *MeasurementElem) SeriesIDs() []uint32 {
}
// UnmarshalBinary unmarshals data into e.
func (e *MeasurementElem) UnmarshalBinary(data []byte) error {
func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error {
start := len(data)
// Parse flag data.

View File

@ -1,6 +1,7 @@
package tsi1
import (
"bytes"
"encoding/binary"
"fmt"
"io"
@ -10,11 +11,39 @@ import (
"github.com/influxdata/influxdb/models"
)
// MeasurementElem represents a generic measurement element.
type MeasurementElem struct {
Deleted bool
Name []byte
}
// MeasurementElems represents a list of MeasurementElem.
type MeasurementElems []MeasurementElem
func (a MeasurementElems) Len() int { return len(a) }
func (a MeasurementElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a MeasurementElems) Less(i, j int) bool { return bytes.Compare(a[i].Name, a[j].Name) == -1 }
// MeasurementIterator represents a iterator over a list of measurements.
type MeasurementIterator interface {
Next() *MeasurementElem
}
// measurementIterator represents an iterator over a slice of measurements.
type measurementIterator struct {
mms []MeasurementElem
}
// Next shifts the next element off the list.
func (itr *measurementIterator) Next() *MeasurementElem {
if len(itr.mms) == 0 {
return nil
}
mm := itr.mms[0]
itr.mms = itr.mms[1:]
return &mm
}
// SeriesIterator represents a iterator over a list of series.
type SeriesIterator interface {
Next(name *[]byte, tags *models.Tags, deleted *bool)