Change Database to DatabaseIndex, remove leftover warn statement
parent
c3ab88a715
commit
6c80108f63
20
tsdb/meta.go
20
tsdb/meta.go
|
@ -15,8 +15,8 @@ const (
|
|||
maxStringLength = 64 * 1024
|
||||
)
|
||||
|
||||
// Database is the in memory index of a collection of measurements, time series, and their tags.
|
||||
type Database struct {
|
||||
// DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags.
|
||||
type DatabaseIndex struct {
|
||||
// in memory metadata index, built on load and updated when new series come in
|
||||
mu sync.RWMutex
|
||||
measurements map[string]*Measurement // measurement name to object and index
|
||||
|
@ -25,8 +25,8 @@ type Database struct {
|
|||
lastID uint64 // last used series ID. They're in memory only for this shard
|
||||
}
|
||||
|
||||
func NewDatabase() *Database {
|
||||
return &Database{
|
||||
func NewDatabaseIndex() *DatabaseIndex {
|
||||
return &DatabaseIndex{
|
||||
measurements: make(map[string]*Measurement),
|
||||
series: make(map[string]*Series),
|
||||
names: make([]string, 0),
|
||||
|
@ -34,7 +34,7 @@ func NewDatabase() *Database {
|
|||
}
|
||||
|
||||
// createSeriesIndexIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object
|
||||
func (s *Database) createSeriesIndexIfNotExists(measurementName string, series *Series) *Series {
|
||||
func (s *DatabaseIndex) createSeriesIndexIfNotExists(measurementName string, series *Series) *Series {
|
||||
// if there is a measurement for this id, it's already been added
|
||||
ss := s.series[series.Key]
|
||||
if ss != nil {
|
||||
|
@ -57,7 +57,7 @@ func (s *Database) createSeriesIndexIfNotExists(measurementName string, series *
|
|||
}
|
||||
|
||||
// addMeasurementToIndexIfNotExists creates or retrieves an in memory index object for the measurement
|
||||
func (s *Database) createMeasurementIndexIfNotExists(name string) *Measurement {
|
||||
func (s *DatabaseIndex) createMeasurementIndexIfNotExists(name string) *Measurement {
|
||||
m := s.measurements[name]
|
||||
if m == nil {
|
||||
m = NewMeasurement(name)
|
||||
|
@ -70,7 +70,7 @@ func (s *Database) createMeasurementIndexIfNotExists(name string) *Measurement {
|
|||
|
||||
// measurementsByExpr takes and expression containing only tags and returns
|
||||
// a list of matching *Measurement.
|
||||
func (db *Database) measurementsByExpr(expr influxql.Expr) (Measurements, error) {
|
||||
func (db *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, error) {
|
||||
switch e := expr.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
|
@ -125,7 +125,7 @@ func (db *Database) measurementsByExpr(expr influxql.Expr) (Measurements, error)
|
|||
return nil, fmt.Errorf("%#v", expr)
|
||||
}
|
||||
|
||||
func (db *Database) measurementsByTagFilters(filters []*TagFilter) Measurements {
|
||||
func (db *DatabaseIndex) measurementsByTagFilters(filters []*TagFilter) Measurements {
|
||||
// If no filters, then return all measurements.
|
||||
if len(filters) == 0 {
|
||||
measurements := make(Measurements, 0, len(db.measurements))
|
||||
|
@ -186,7 +186,7 @@ func (db *Database) measurementsByTagFilters(filters []*TagFilter) Measurements
|
|||
}
|
||||
|
||||
// measurementsByRegex returns the measurements that match the regex.
|
||||
func (db *Database) measurementsByRegex(re *regexp.Regexp) Measurements {
|
||||
func (db *DatabaseIndex) measurementsByRegex(re *regexp.Regexp) Measurements {
|
||||
var matches Measurements
|
||||
for _, m := range db.measurements {
|
||||
if re.MatchString(m.Name) {
|
||||
|
@ -197,7 +197,7 @@ func (db *Database) measurementsByRegex(re *regexp.Regexp) Measurements {
|
|||
}
|
||||
|
||||
// Measurements returns a list of all measurements.
|
||||
func (db *Database) Measurements() Measurements {
|
||||
func (db *DatabaseIndex) Measurements() Measurements {
|
||||
measurements := make(Measurements, 0, len(db.measurements))
|
||||
for _, m := range db.measurements {
|
||||
measurements = append(measurements, m)
|
||||
|
|
|
@ -20,16 +20,16 @@ import (
|
|||
// is responsible for combining the output of many shards into a single query result.
|
||||
type Shard struct {
|
||||
db *bolt.DB // underlying data store
|
||||
index *Database
|
||||
index *DatabaseIndex
|
||||
|
||||
mu sync.RWMutex
|
||||
measurementFields map[string]*measurementFields // measurement name to their fields
|
||||
}
|
||||
|
||||
// NewShard returns a new initialized Shard
|
||||
func NewShard(database *Database) *Shard {
|
||||
func NewShard(index *DatabaseIndex) *Shard {
|
||||
return &Shard{
|
||||
index: database,
|
||||
index: index,
|
||||
measurementFields: make(map[string]*measurementFields),
|
||||
}
|
||||
}
|
||||
|
@ -113,7 +113,6 @@ func (s *Shard) WritePoints(points []Point) error {
|
|||
// marshal the raw data if it hasn't been marshaled already
|
||||
if p.Data() == nil {
|
||||
// this was populated earlier, don't need to validate that it's there.
|
||||
warn(p.Name(), s.measurementFields)
|
||||
data, err := s.measurementFields[p.Name()].codec.EncodeFields(p.Fields())
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -13,7 +13,7 @@ func TestShardWriteAndIndex(t *testing.T) {
|
|||
defer os.RemoveAll(path)
|
||||
path += "/shard"
|
||||
|
||||
index := NewDatabase()
|
||||
index := NewDatabaseIndex()
|
||||
sh := NewShard(index)
|
||||
if err := sh.Open(path); err != nil {
|
||||
t.Fatalf("error openeing shard: %s", err.Error())
|
||||
|
@ -58,7 +58,7 @@ func TestShardWriteAndIndex(t *testing.T) {
|
|||
// ensure the index gets loaded after closing and opening the shard
|
||||
sh.Close()
|
||||
|
||||
index = NewDatabase()
|
||||
index = NewDatabaseIndex()
|
||||
sh = NewShard(index)
|
||||
if err := sh.Open(path); err != nil {
|
||||
t.Fatalf("error openeing shard: %s", err.Error())
|
||||
|
|
Loading…
Reference in New Issue