Integrate WAL into engine

pull/10616/head
Edd Robinson 2018-10-05 12:43:56 +01:00
parent e85999ed45
commit 81e0fbabeb
10 changed files with 103 additions and 127 deletions

View File

@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"io"
nethttp "net/http"
_ "net/http/pprof"
"os"
@ -57,7 +56,7 @@ var (
httpBindAddress string
authorizationPath string
boltPath string
walPath string
natsPath string
developerMode bool
enginePath string
)
@ -115,10 +114,11 @@ func init() {
os.Exit(1)
}
platformCmd.Flags().StringVar(&walPath, "wal-path", filepath.Join(dir, "wal"), "path to persistent WAL files")
viper.BindEnv("WAL_PATH")
if h := viper.GetString("WAL_PATH"); h != "" {
walPath = h
// TODO(edd): do we need NATS for anything?
platformCmd.Flags().StringVar(&natsPath, "nats-path", filepath.Join(dir, "nats"), "path to persistent NATS files")
viper.BindEnv("NATS_PATH")
if h := viper.GetString("NATS_PATH"); h != "" {
natsPath = h
}
platformCmd.Flags().StringVar(&enginePath, "engine-path", filepath.Join(dir, "engine"), "path to persistent engine files")
@ -211,14 +211,11 @@ func platformF(cmd *cobra.Command, args []string) {
// TODO(jeff): this block is hacky support for a storage engine. it is not intended to
// be a long term solution.
var (
natsHandler nats.Handler
storageQueryService query.ProxyQueryService
)
var storageQueryService query.ProxyQueryService
var pointsWriter storage.PointsWriter
{
config := storage.NewConfig()
config.CacheSnapshotMemorySize = 0
config.TraceLoggingEnabled = true
config.EngineOptions.WALEnabled = true // Enable a disk-based WAL.
config.EngineOptions.Config = config.Config
engine := storage.NewEngine(enginePath, config)
@ -229,12 +226,7 @@ func platformF(cmd *cobra.Command, args []string) {
os.Exit(1)
}
engineHandler := nats.NewEngineHandler()
engineHandler.Logger = logger.With(zap.String("handler", "engine"))
engineHandler.Engine = engine
natsHandler = engineHandler
pointsWriter = engine
storageQueryService = query.ProxyQueryServiceBridge{
QueryService: query.QueryServiceBridge{
AsyncQueryService: &queryAdapter{
@ -297,7 +289,7 @@ func platformF(cmd *cobra.Command, args []string) {
signal.Notify(sigs, syscall.SIGTERM, os.Interrupt)
// NATS streaming server
natsServer := nats.NewServer(nats.Config{FilestoreDir: walPath})
natsServer := nats.NewServer(nats.Config{FilestoreDir: natsPath})
if err := natsServer.Open(); err != nil {
logger.Error("failed to start nats streaming server", zap.Error(err))
os.Exit(1)
@ -316,11 +308,6 @@ func platformF(cmd *cobra.Command, args []string) {
os.Exit(1)
}
if err := subscriber.Subscribe(IngressSubject, IngressGroup, natsHandler); err != nil {
logger.Error("failed to create nats subscriber", zap.Error(err))
os.Exit(1)
}
scraperScheduler, err := gather.NewScheduler(10, logger, scraperTargetSvc, publisher, subscriber, 0, 0)
if err != nil {
logger.Error("failed to create scraper subscriber", zap.Error(err))
@ -338,9 +325,7 @@ func platformF(cmd *cobra.Command, args []string) {
Logger: logger,
NewBucketService: source.NewBucketService,
NewQueryService: source.NewQueryService,
PublisherFn: func(r io.Reader) error {
return publisher.Publish(IngressSubject, r)
},
PointsWriter: pointsWriter,
AuthorizationService: authSvc,
BucketService: bucketSvc,
SessionService: sessionSvc,

View File

@ -1,13 +1,13 @@
package http
import (
"io"
http "net/http"
"strings"
"github.com/influxdata/platform"
"github.com/influxdata/platform/chronograf/server"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/storage"
"go.uber.org/zap"
)
@ -39,8 +39,8 @@ type APIBackend struct {
NewBucketService func(*platform.Source) (platform.BucketService, error)
NewQueryService func(*platform.Source) (query.ProxyQueryService, error)
PublisherFn func(r io.Reader) error
PointsWriter storage.PointsWriter
AuthorizationService platform.AuthorizationService
BucketService platform.BucketService
SessionService platform.SessionService
@ -105,7 +105,7 @@ func NewAPIHandler(b *APIBackend) *APIHandler {
h.TaskHandler = NewTaskHandler(b.Logger)
h.TaskHandler.TaskService = b.TaskService
h.WriteHandler = NewWriteHandler(b.PublisherFn)
h.WriteHandler = NewWriteHandler(b.PointsWriter)
h.WriteHandler.AuthorizationService = b.AuthorizationService
h.WriteHandler.OrganizationService = b.OrganizationService
h.WriteHandler.BucketService = b.BucketService

View File

@ -1,11 +1,9 @@
package http
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
@ -13,6 +11,7 @@ import (
pcontext "github.com/influxdata/platform/context"
"github.com/influxdata/platform/kit/errors"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/storage"
"github.com/influxdata/platform/tsdb"
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
@ -28,15 +27,15 @@ type WriteHandler struct {
BucketService platform.BucketService
OrganizationService platform.OrganizationService
Publish func(io.Reader) error
PointsWriter storage.PointsWriter
}
// NewWriteHandler creates a new handler at /api/v2/write to receive line protocol.
func NewWriteHandler(publishFn func(io.Reader) error) *WriteHandler {
func NewWriteHandler(writer storage.PointsWriter) *WriteHandler {
h := &WriteHandler{
Router: httprouter.New(),
Logger: zap.NewNop(),
Publish: publishFn,
Router: httprouter.New(),
Logger: zap.NewNop(),
PointsWriter: writer,
}
h.HandlerFunc("POST", "/api/v2/write", h.handleWrite)
@ -137,7 +136,6 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
// TODO(jeff): we should be publishing with the org and bucket instead of
// parsing, rewriting, and publishing, but the interface isn't quite there yet.
// be sure to remove this when it is there!
{
data, err := ioutil.ReadAll(in)
if err != nil {
logger.Info("Error reading body", zap.Error(err))
@ -159,19 +157,11 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
return
}
var buf []byte
for _, pt := range exploded {
buf = pt.AppendString(buf)
buf = append(buf, '\n')
if err := h.PointsWriter.WritePoints(exploded); err != nil {
EncodeError(ctx, errors.BadRequestError(err.Error()), w)
return
}
in = ioutil.NopCloser(bytes.NewReader(buf))
}
if err := h.Publish(in); err != nil {
EncodeError(ctx, errors.BadRequestError(err.Error()), w)
return
}
w.WriteHeader(http.StatusNoContent)
}

View File

@ -1,45 +0,0 @@
package nats
import (
"github.com/influxdata/platform/models"
"go.uber.org/zap"
)
type Engine interface {
WritePoints(points []models.Point) error
}
type EngineHandler struct {
Logger *zap.Logger
Engine Engine
}
func NewEngineHandler() *EngineHandler {
return &EngineHandler{}
}
func (lh *EngineHandler) Process(s Subscription, m Message) {
points, err := models.ParsePoints(m.Data())
if err != nil {
lh.Logger.Info("error parsing points", zap.Error(err))
m.Ack()
return
}
// TODO(jeff): This is super problematic. We need to only ack after the engine has flushed
// the cache. There's no real good way to either force that or to wait for it. It's also
// unclear what the semantics of Ack are with respect to previous messages. Oh well, for
// now just ack and if the process dies, you lose data!
if err := lh.Engine.WritePoints(points); err != nil {
// TODO(jeff): we need some idea of if this is a permanent or temporary error.
// For example, some sorts of PartialWriteErrors should not be retried. For
// now, just Ack.
lh.Logger.Info("error writing points", zap.Error(err))
m.Ack()
return
}
m.Ack()
return
}

View File

@ -1,7 +1,9 @@
package storage
import (
"bytes"
"context"
"errors"
"fmt"
"path/filepath"
"sync"
@ -17,11 +19,16 @@ import (
// Static objects to prevent small allocs.
var timeBytes = []byte("time")
// ErrEngineClosed is returned when a caller attempts to use the engine while
// it's closed.
var ErrEngineClosed = errors.New("engine is closed")
type Engine struct {
config Config
path string
mu sync.RWMutex
open bool
index *tsi1.Index
sfile *tsdb.SeriesFile
engine *tsm1.Engine
@ -56,7 +63,9 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
e.index = index
// Initialise Engine
engine := tsm1.NewEngine(0, tsdb.Index(e.index), filepath.Join(path, "data"), "remove-me-wal", e.sfile, c.EngineOptions)
// TODO(edd): should just be able to use the config values for data/wal.
engine := tsm1.NewEngine(0, tsdb.Index(e.index), filepath.Join(path, "data"), filepath.Join(path, "wal"), e.sfile, c.EngineOptions)
// TODO(edd): Once the tsdb.Engine abstraction is gone, this won't be needed.
e.engine = engine.(*tsm1.Engine)
@ -81,6 +90,10 @@ func (e *Engine) Open() error {
e.mu.Lock()
defer e.mu.Unlock()
if e.open {
return nil // no-op
}
if err := e.sfile.Open(); err != nil {
return err
}
@ -93,6 +106,7 @@ func (e *Engine) Open() error {
return err
}
e.engine.SetCompactionsEnabled(true) // TODO(edd):is this needed?
e.open = true
return nil
}
@ -102,6 +116,10 @@ func (e *Engine) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
if !e.open {
return nil // no-op
}
e.open = false
if err := e.sfile.Close(); err != nil {
return err
}
@ -116,6 +134,9 @@ func (e *Engine) Close() error {
func (e *Engine) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.open {
return nil, ErrEngineClosed
}
// TODO(edd): remove IndexSet
return newSeriesCursor(req, tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}, cond)
}
@ -123,6 +144,9 @@ func (e *Engine) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest
func (e *Engine) CreateCursorIterator(ctx context.Context) (tsdb.CursorIterator, error) {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.open {
return nil, ErrEngineClosed
}
return e.engine.CreateCursorIterator(ctx)
}
@ -138,12 +162,19 @@ func (e *Engine) WritePoints(points []models.Point) error {
for iter := collection.Iterator(); iter.Next(); {
tags := iter.Tags()
if tags.Len() > 0 && bytes.Equal(tags[0].Key, tsdb.FieldKeyTagKeyBytes) && bytes.Equal(tags[0].Value, timeBytes) {
// Field key "time" is invalid
if collection.Reason == "" {
collection.Reason = fmt.Sprintf("invalid field key: input field %q is invalid", timeBytes)
}
collection.Dropped++
collection.DroppedKeys = append(collection.DroppedKeys, iter.Key())
}
// Filter out any tags with key equal to "time": they are invalid.
if tags.Get(timeBytes) != nil {
if collection.Reason == "" {
collection.Reason = fmt.Sprintf(
"invalid tag key: input tag %q on measurement %q is invalid",
timeBytes, iter.Name())
collection.Reason = fmt.Sprintf("invalid tag key: input tag %q on measurement %q is invalid", timeBytes, iter.Name())
}
collection.Dropped++
collection.DroppedKeys = append(collection.DroppedKeys, iter.Key())
@ -168,6 +199,10 @@ func (e *Engine) WritePoints(points []models.Point) error {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.open {
return ErrEngineClosed
}
// Add new series to the index and series file. Check for partial writes.
if err := e.index.CreateSeriesListIfNotExists(collection); err != nil {
// ignore PartialWriteErrors. The collection captures it.
@ -189,6 +224,9 @@ func (e *Engine) WritePoints(points []models.Point) error {
func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, fn func([]byte, models.Tags) (int64, int64, bool)) error {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.open {
return ErrEngineClosed
}
return e.engine.DeleteSeriesRangeWithPredicate(itr, fn)
}
@ -196,6 +234,9 @@ func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, fn func
func (e *Engine) SeriesCardinality() int64 {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.open {
return 0
}
return e.index.SeriesN()
}
@ -209,5 +250,8 @@ func (e *Engine) Path() string {
func (e *Engine) ApplyFnToSeriesIDSet(fn func(*tsdb.SeriesIDSet)) {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.open {
return
}
fn(e.index.SeriesIDSet())
}

8
storage/points_writer.go Normal file
View File

@ -0,0 +1,8 @@
package storage
import "github.com/influxdata/platform/models"
// PointsWriter describes the ability to write points into a storage engine.
type PointsWriter interface {
WritePoints([]models.Point) error
}

View File

@ -9,10 +9,6 @@ var (
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
ErrFieldTypeConflict = errors.New("field type conflict")
// ErrEngineClosed is returned when a caller attempts indirectly to
// access the shard's underlying engine.
ErrEngineClosed = errors.New("engine is closed")
// ErrShardDisabled is returned when a the shard is not available for
// queries or writes.
ErrShardDisabled = errors.New("shard is disabled")

View File

@ -825,12 +825,14 @@ func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
return i.sSketch, i.sTSketch, nil
}
// Since indexes are not shared across shards, the count returned by SeriesN
// cannot be combined with other shard's results. If you need to count series
// across indexes then use either the database-wide series file, or merge the
// index-level bitsets or sketches.
// SeriesN returns the series cardinality in the index. It is the sum of all
// partition cardinalities.
func (i *Index) SeriesN() int64 {
return int64(i.SeriesIDSet().Cardinality())
var total int64
for _, p := range i.partitions {
total += int64(p.seriesIDSet.Cardinality())
}
return total
}
// HasTagKey returns true if tag key exists. It returns the first error

View File

@ -186,13 +186,14 @@ func (s *Shard) close() error {
// ready determines if the Shard is ready for queries or writes.
// It returns nil if ready, otherwise ErrShardClosed or ErrShardDisabled
func (s *Shard) ready() error {
var err error
if s._engine == nil {
err = ErrEngineClosed
} else if !s.enabled {
err = ErrShardDisabled
}
return err
return nil // TODO(edd)remove
// var err error
// if s._engine == nil {
// err = ErrEngineClosed
// } else if !s.enabled {
// err = ErrShardDisabled
// }
// return err
}
// Engine returns a reference to the currently loaded engine.

View File

@ -225,7 +225,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
traceLogger: logger,
traceLogging: opt.Config.TraceLoggingEnabled,
WAL: NopWAL{},
WAL: NopWAL{},
Cache: cache,
FileStore: fs,
@ -243,7 +243,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
}
if opt.WALEnabled {
wal := NewWAL(walPath)
wal := NewWAL(walPath)
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
e.WAL = wal
}
@ -749,7 +749,7 @@ func (e *Engine) Close() error {
if err := e.FileStore.Close(); err != nil {
return err
}
return e.WAL.Close()
return e.WAL.Close()
}
// WithLogger sets the logger for the engine.
@ -1260,11 +1260,6 @@ func (e *Engine) WritePoints(points []models.Point) error {
iter := p.FieldIterator()
t := p.Time().UnixNano()
for iter.Next() {
// Skip fields name "time", they are illegal
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
}
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
if e.seriesTypeMap != nil {
@ -1553,9 +1548,9 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
e.Cache.DeleteRange(deleteKeys, min, max)
// delete from the WAL
if _, err := e.WAL.DeleteRange(deleteKeys, min, max); err != nil {
return err
}
if _, err := e.WAL.DeleteRange(deleteKeys, min, max); err != nil {
return err
}
// The series are deleted on disk, but the index may still say they exist.
// Depending on the the min,max time passed in, the series may or not actually
@ -2203,7 +2198,7 @@ func (e *Engine) reloadCache() error {
return err
}
e.traceLogger.Info("Reloaded WAL cache",zap.String("path", e.WAL.Path()), zap.Duration("duration", time.Since(now)))
e.traceLogger.Info("Reloaded WAL cache", zap.String("path", e.WAL.Path()), zap.Duration("duration", time.Since(now)))
return nil
}