2015-05-26 19:56:54 +00:00
|
|
|
package tsdb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2015-05-28 22:02:12 +00:00
|
|
|
"io/ioutil"
|
2015-05-26 19:56:54 +00:00
|
|
|
"log"
|
|
|
|
"os"
|
|
|
|
"path/filepath"
|
2015-05-28 22:02:12 +00:00
|
|
|
"strconv"
|
2015-06-10 18:19:50 +00:00
|
|
|
"strings"
|
2015-05-28 22:02:12 +00:00
|
|
|
"sync"
|
|
|
|
|
|
|
|
"github.com/influxdb/influxdb/influxql"
|
2015-05-26 19:56:54 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
func NewStore(path string) *Store {
|
2015-08-18 20:59:54 +00:00
|
|
|
opts := NewEngineOptions()
|
|
|
|
opts.Config = NewConfig()
|
|
|
|
|
2015-05-26 19:56:54 +00:00
|
|
|
return &Store{
|
2015-07-22 14:53:20 +00:00
|
|
|
path: path,
|
2015-08-18 20:59:54 +00:00
|
|
|
EngineOptions: opts,
|
2015-07-22 14:53:20 +00:00
|
|
|
Logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
|
2015-05-26 19:56:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
ErrShardNotFound = fmt.Errorf("shard not found")
|
|
|
|
)
|
|
|
|
|
|
|
|
type Store struct {
|
2015-05-30 16:11:23 +00:00
|
|
|
mu sync.RWMutex
|
2015-05-26 19:56:54 +00:00
|
|
|
path string
|
|
|
|
|
|
|
|
databaseIndexes map[string]*DatabaseIndex
|
|
|
|
shards map[uint64]*Shard
|
|
|
|
|
2015-07-22 14:53:20 +00:00
|
|
|
EngineOptions EngineOptions
|
|
|
|
Logger *log.Logger
|
2015-09-04 22:32:08 +00:00
|
|
|
closing chan struct{}
|
2015-05-26 19:56:54 +00:00
|
|
|
}
|
|
|
|
|
2015-06-08 19:07:05 +00:00
|
|
|
// Path returns the store's root path.
|
|
|
|
func (s *Store) Path() string { return s.path }
|
|
|
|
|
2015-07-20 19:59:46 +00:00
|
|
|
// DatabaseIndexN returns the number of databases indicies in the store.
|
|
|
|
func (s *Store) DatabaseIndexN() int {
|
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
return len(s.databaseIndexes)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shard returns a shard by id.
|
|
|
|
func (s *Store) Shard(id uint64) *Shard {
|
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
return s.shards[id]
|
|
|
|
}
|
|
|
|
|
|
|
|
// ShardN returns the number of shard in the store.
|
|
|
|
func (s *Store) ShardN() int {
|
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
return len(s.shards)
|
|
|
|
}
|
|
|
|
|
2015-05-26 22:35:16 +00:00
|
|
|
func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) error {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
2015-09-04 22:32:08 +00:00
|
|
|
select {
|
|
|
|
case <-s.closing:
|
|
|
|
return fmt.Errorf("closing")
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
2015-05-26 22:35:16 +00:00
|
|
|
// shard already exists
|
|
|
|
if _, ok := s.shards[shardID]; ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// created the db and retention policy dirs if they don't exist
|
|
|
|
if err := os.MkdirAll(filepath.Join(s.path, database, retentionPolicy), 0700); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-08-21 15:22:04 +00:00
|
|
|
// create the WAL directory
|
|
|
|
walPath := filepath.Join(s.EngineOptions.Config.WALDir, database, retentionPolicy, fmt.Sprintf("%d", shardID))
|
|
|
|
if err := os.MkdirAll(walPath, 0700); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-05-26 22:35:16 +00:00
|
|
|
// create the database index if it does not exist
|
|
|
|
db, ok := s.databaseIndexes[database]
|
|
|
|
if !ok {
|
|
|
|
db = NewDatabaseIndex()
|
|
|
|
s.databaseIndexes[database] = db
|
|
|
|
}
|
|
|
|
|
|
|
|
shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
|
2015-08-21 15:22:04 +00:00
|
|
|
shard := NewShard(shardID, db, shardPath, walPath, s.EngineOptions)
|
2015-05-29 21:15:05 +00:00
|
|
|
if err := shard.Open(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-05-26 22:35:16 +00:00
|
|
|
|
|
|
|
s.shards[shardID] = shard
|
|
|
|
|
2015-05-26 19:56:54 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-06-04 01:02:49 +00:00
|
|
|
// DeleteShard removes a shard from disk.
|
|
|
|
func (s *Store) DeleteShard(shardID uint64) error {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
// ensure shard exists
|
|
|
|
sh, ok := s.shards[shardID]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := sh.Close(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := os.Remove(sh.path); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-08-21 15:22:04 +00:00
|
|
|
if err := os.RemoveAll(sh.walPath); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-06-04 01:02:49 +00:00
|
|
|
delete(s.shards, shardID)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-06-05 16:31:04 +00:00
|
|
|
// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
|
|
|
|
func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
for _, id := range shardIDs {
|
|
|
|
shard := s.shards[id]
|
|
|
|
if shard != nil {
|
|
|
|
shard.Close()
|
|
|
|
}
|
|
|
|
}
|
2015-07-15 18:11:02 +00:00
|
|
|
if err := os.RemoveAll(filepath.Join(s.path, name)); err != nil {
|
2015-06-22 18:44:46 +00:00
|
|
|
return err
|
|
|
|
}
|
2015-08-21 15:22:04 +00:00
|
|
|
if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, name)); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-06-22 18:44:46 +00:00
|
|
|
delete(s.databaseIndexes, name)
|
|
|
|
return nil
|
2015-06-05 16:31:04 +00:00
|
|
|
}
|
|
|
|
|
2015-06-04 20:37:51 +00:00
|
|
|
// ShardIDs returns a slice of all ShardIDs under management.
|
|
|
|
func (s *Store) ShardIDs() []uint64 {
|
|
|
|
ids := make([]uint64, 0, len(s.shards))
|
|
|
|
for i, _ := range s.shards {
|
|
|
|
ids = append(ids, i)
|
|
|
|
}
|
|
|
|
return ids
|
|
|
|
}
|
|
|
|
|
2015-05-28 22:02:12 +00:00
|
|
|
func (s *Store) ValidateAggregateFieldsInStatement(shardID uint64, measurementName string, stmt *influxql.SelectStatement) error {
|
|
|
|
s.mu.RLock()
|
|
|
|
shard := s.shards[shardID]
|
|
|
|
s.mu.RUnlock()
|
|
|
|
if shard == nil {
|
|
|
|
return ErrShardNotFound
|
|
|
|
}
|
|
|
|
return shard.ValidateAggregateFieldsInStatement(measurementName, stmt)
|
|
|
|
}
|
|
|
|
|
2015-05-29 14:31:03 +00:00
|
|
|
func (s *Store) DatabaseIndex(name string) *DatabaseIndex {
|
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
return s.databaseIndexes[name]
|
|
|
|
}
|
|
|
|
|
2015-08-25 21:44:42 +00:00
|
|
|
// Databases returns all the databases in the indexes
|
|
|
|
func (s *Store) Databases() []string {
|
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
databases := []string{}
|
|
|
|
for db := range s.databaseIndexes {
|
|
|
|
databases = append(databases, db)
|
|
|
|
}
|
|
|
|
return databases
|
|
|
|
}
|
|
|
|
|
2015-05-28 22:02:12 +00:00
|
|
|
func (s *Store) Measurement(database, name string) *Measurement {
|
2015-06-02 15:20:20 +00:00
|
|
|
s.mu.RLock()
|
2015-05-28 22:02:12 +00:00
|
|
|
db := s.databaseIndexes[database]
|
2015-06-02 15:20:20 +00:00
|
|
|
s.mu.RUnlock()
|
2015-05-28 22:02:12 +00:00
|
|
|
if db == nil {
|
|
|
|
return nil
|
|
|
|
}
|
2015-06-04 18:50:32 +00:00
|
|
|
return db.Measurement(name)
|
2015-05-28 22:02:12 +00:00
|
|
|
}
|
|
|
|
|
2015-08-25 21:44:42 +00:00
|
|
|
// DiskSize returns the size of all the shard files in bytes. This size does not include the WAL size.
|
|
|
|
func (s *Store) DiskSize() (int64, error) {
|
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
var size int64
|
|
|
|
for _, shardID := range s.ShardIDs() {
|
|
|
|
shard := s.Shard(shardID)
|
|
|
|
sz, err := shard.DiskSize()
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
size += sz
|
|
|
|
}
|
|
|
|
return size, nil
|
|
|
|
}
|
|
|
|
|
2015-06-09 21:35:38 +00:00
|
|
|
// deleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys
|
2015-06-02 15:20:20 +00:00
|
|
|
func (s *Store) deleteSeries(keys []string) error {
|
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
for _, sh := range s.shards {
|
2015-07-22 14:53:20 +00:00
|
|
|
if err := sh.DeleteSeries(keys); err != nil {
|
2015-06-02 15:20:20 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-06-03 15:32:50 +00:00
|
|
|
// deleteMeasurement loops through the local shards and removes the measurement field encodings from each shard
|
|
|
|
func (s *Store) deleteMeasurement(name string, seriesKeys []string) error {
|
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
for _, sh := range s.shards {
|
2015-07-22 14:53:20 +00:00
|
|
|
if err := sh.DeleteMeasurement(name, seriesKeys); err != nil {
|
2015-06-03 15:32:50 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-05-26 21:33:19 +00:00
|
|
|
func (s *Store) loadIndexes() error {
|
|
|
|
dbs, err := ioutil.ReadDir(s.path)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for _, db := range dbs {
|
|
|
|
if !db.IsDir() {
|
|
|
|
s.Logger.Printf("Skipping database dir: %s. Not a directory", db.Name())
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
s.databaseIndexes[db.Name()] = NewDatabaseIndex()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Store) loadShards() error {
|
|
|
|
// loop through the current database indexes
|
|
|
|
for db := range s.databaseIndexes {
|
|
|
|
rps, err := ioutil.ReadDir(filepath.Join(s.path, db))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, rp := range rps {
|
|
|
|
// retention policies should be directories. Skip anything that is not a dir.
|
|
|
|
if !rp.IsDir() {
|
|
|
|
s.Logger.Printf("Skipping retention policy dir: %s. Not a directory", rp.Name())
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
shards, err := ioutil.ReadDir(filepath.Join(s.path, db, rp.Name()))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for _, sh := range shards {
|
|
|
|
path := filepath.Join(s.path, db, rp.Name(), sh.Name())
|
2015-08-21 15:22:04 +00:00
|
|
|
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp.Name(), sh.Name())
|
2015-05-26 21:33:19 +00:00
|
|
|
|
|
|
|
// Shard file names are numeric shardIDs
|
|
|
|
shardID, err := strconv.ParseUint(sh.Name(), 10, 64)
|
|
|
|
if err != nil {
|
|
|
|
s.Logger.Printf("Skipping shard: %s. Not a valid path", rp.Name())
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2015-08-21 15:22:04 +00:00
|
|
|
shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions)
|
2015-07-30 15:15:06 +00:00
|
|
|
err = shard.Open()
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to open shard %d: %s", shardID, err)
|
|
|
|
}
|
2015-05-26 21:33:19 +00:00
|
|
|
s.shards[shardID] = shard
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2015-05-26 19:56:54 +00:00
|
|
|
func (s *Store) Open() error {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
2015-09-04 22:32:08 +00:00
|
|
|
s.closing = make(chan struct{})
|
|
|
|
|
2015-05-26 19:56:54 +00:00
|
|
|
s.shards = map[uint64]*Shard{}
|
|
|
|
s.databaseIndexes = map[string]*DatabaseIndex{}
|
|
|
|
|
2015-08-12 22:07:12 +00:00
|
|
|
s.Logger.Printf("Using data dir: %v", s.Path())
|
|
|
|
|
2015-05-28 04:06:09 +00:00
|
|
|
// Create directory.
|
|
|
|
if err := os.MkdirAll(s.path, 0777); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-05-26 21:33:19 +00:00
|
|
|
// TODO: Start AE for Node
|
|
|
|
if err := s.loadIndexes(); err != nil {
|
2015-05-26 19:56:54 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-05-26 21:33:19 +00:00
|
|
|
if err := s.loadShards(); err != nil {
|
2015-05-26 19:56:54 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Store) WriteToShard(shardID uint64, points []Point) error {
|
2015-06-03 17:46:18 +00:00
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
2015-05-26 19:56:54 +00:00
|
|
|
sh, ok := s.shards[shardID]
|
|
|
|
if !ok {
|
|
|
|
return ErrShardNotFound
|
|
|
|
}
|
|
|
|
|
2015-05-29 21:15:05 +00:00
|
|
|
return sh.WritePoints(points)
|
2015-05-26 19:56:54 +00:00
|
|
|
}
|
|
|
|
|
2015-08-24 02:55:48 +00:00
|
|
|
func (s *Store) CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (Mapper, error) {
|
2015-07-06 12:31:52 +00:00
|
|
|
shard := s.Shard(shardID)
|
|
|
|
|
2015-08-24 02:55:48 +00:00
|
|
|
switch st := stmt.(type) {
|
|
|
|
case *influxql.SelectStatement:
|
|
|
|
return NewSelectMapper(shard, st, chunkSize), nil
|
|
|
|
case *influxql.ShowMeasurementsStatement:
|
|
|
|
return NewShowMeasurementsMapper(shard, st, chunkSize), nil
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("can't create mapper for statement type: %v", st)
|
|
|
|
}
|
2015-07-06 12:31:52 +00:00
|
|
|
}
|
|
|
|
|
2015-05-26 19:56:54 +00:00
|
|
|
func (s *Store) Close() error {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
for _, sh := range s.shards {
|
|
|
|
if err := sh.Close(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2015-09-04 22:32:08 +00:00
|
|
|
close(s.closing)
|
|
|
|
s.closing = nil
|
2015-05-26 19:56:54 +00:00
|
|
|
s.shards = nil
|
|
|
|
s.databaseIndexes = nil
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2015-06-10 18:19:50 +00:00
|
|
|
|
|
|
|
// IsRetryable returns true if this error is temporary and could be retried
|
|
|
|
func IsRetryable(err error) bool {
|
|
|
|
if err == nil {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
if strings.Contains(err.Error(), "field type conflict") {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|