Add tsi FileSet.

pull/7913/head
Ben Johnson 2016-12-28 12:59:09 -07:00
parent dcd2a771b0
commit 31e74d809b
No known key found for this signature in database
GPG Key ID: 81741CD251883081
5 changed files with 869 additions and 722 deletions

View File

@ -15,7 +15,7 @@ type Index interface {
Open() error
Close() error
Measurement(name []byte) (*Measurement, error)
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
DropMeasurement(name []byte) error

File diff suppressed because it is too large Load Diff

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/mmap"
@ -41,6 +42,7 @@ var (
// IndexFile represents a collection of measurement, tag, and series data.
type IndexFile struct {
wg sync.WaitGroup // ref count
data []byte
// Components
@ -87,6 +89,12 @@ func (f *IndexFile) Close() error {
return mmap.Unmap(f.data)
}
// Retain adds a reference count to the file.
func (f *IndexFile) Retain() { f.wg.Add(1) }
// Release removes a reference count from the file.
func (f *IndexFile) Release() { f.wg.Done() }
// UnmarshalBinary opens an index from data.
// The byte slice is retained so it must be kept open.
func (f *IndexFile) UnmarshalBinary(data []byte) error {

View File

@ -26,7 +26,10 @@ func TestIndex_SeriesIterator(t *testing.T) {
// Verify initial set of series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.SeriesIterator()
fs := idx.RetainFileSet()
defer fs.Release()
itr := fs.SeriesIterator()
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
@ -55,7 +58,10 @@ func TestIndex_SeriesIterator(t *testing.T) {
// Verify additional series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.SeriesIterator()
fs := idx.RetainFileSet()
defer fs.Release()
itr := fs.SeriesIterator()
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
@ -151,7 +157,10 @@ func TestIndex_MeasurementSeriesIterator(t *testing.T) {
// Verify initial set of series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.MeasurementSeriesIterator([]byte("cpu"))
fs := idx.RetainFileSet()
defer fs.Release()
itr := fs.MeasurementSeriesIterator([]byte("cpu"))
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
@ -177,7 +186,10 @@ func TestIndex_MeasurementSeriesIterator(t *testing.T) {
// Verify additional series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.MeasurementSeriesIterator([]byte("cpu"))
fs := idx.RetainFileSet()
defer fs.Release()
itr := fs.MeasurementSeriesIterator([]byte("cpu"))
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
@ -211,7 +223,10 @@ func TestIndex_MeasurementIterator(t *testing.T) {
// Verify initial set of series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.MeasurementIterator()
fs := idx.RetainFileSet()
defer fs.Release()
itr := fs.MeasurementIterator()
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
@ -237,7 +252,10 @@ func TestIndex_MeasurementIterator(t *testing.T) {
// Verify additional series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.MeasurementIterator()
fs := idx.RetainFileSet()
defer fs.Release()
itr := fs.MeasurementIterator()
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
@ -272,7 +290,10 @@ func TestIndex_TagKeyIterator(t *testing.T) {
// Verify initial set of series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.TagKeyIterator([]byte("cpu"))
fs := idx.RetainFileSet()
defer fs.Release()
itr := fs.TagKeyIterator([]byte("cpu"))
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
@ -298,7 +319,10 @@ func TestIndex_TagKeyIterator(t *testing.T) {
// Verify additional series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.TagKeyIterator([]byte("cpu"))
fs := idx.RetainFileSet()
defer fs.Release()
itr := fs.TagKeyIterator([]byte("cpu"))
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}

View File

@ -36,11 +36,12 @@ const (
// LogFile represents an on-disk write-ahead log file.
type LogFile struct {
mu sync.RWMutex
data []byte // mmap
file *os.File // writer
w *bufio.Writer // buffered writer
buf []byte // marshaling buffer
size int64 // tracks current file size
wg sync.WaitGroup // ref count
data []byte // mmap
file *os.File // writer
w *bufio.Writer // buffered writer
buf []byte // marshaling buffer
size int64 // tracks current file size
mSketch, mTSketch estimator.Sketch // Measurement sketches
sSketch, sTSketch estimator.Sketch // Series sketche
@ -136,6 +137,12 @@ func (f *LogFile) Close() error {
return nil
}
// Retain adds a reference count to the file.
func (f *LogFile) Retain() { f.wg.Add(1) }
// Release removes a reference count from the file.
func (f *LogFile) Release() { f.wg.Done() }
// Size returns the tracked in-memory file size of the log file.
func (f *LogFile) Size() int64 {
f.mu.Lock()