Merge pull request #1644 from influxdb/batched_series_create

Batched write support
pull/1637/merge
Philip O'Toole 2015-02-20 16:43:30 -08:00
commit 1522842532
6 changed files with 478 additions and 300 deletions

View File

@ -663,7 +663,7 @@ func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
if field == nil {
panic(fmt.Sprintf("field does not exist for %s", k))
} else if influxql.InspectDataType(v) != field.Type {
return nil, fmt.Errorf("field %s is not of type %s", k, field.Type)
return nil, fmt.Errorf("field \"%s\" is type %T, mapped as type %s", k, k, field.Type)
}
var buf []byte

View File

@ -1421,6 +1421,89 @@ func TestHandler_serveWriteSeriesBoolValues(t *testing.T) {
}
}
func TestHandler_serveWriteSeriesBatch(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")
s := NewHTTPServer(srvr)
defer s.Close()
batch := `
{
"database": "foo",
"retentionPolicy": "bar",
"points": [
{
"name": "disk",
"timestamp": "2009-11-10T23:00:00Z",
"tags": {
"host": "server01"
},
"values": {
"full": false
}
},
{
"name": "disk",
"timestamp": "2009-11-10T23:00:01Z",
"tags": {
"host": "server01"
},
"values": {
"full": true
}
},
{
"name": "disk",
"timestamp": "2009-11-10T23:00:02Z",
"tags": {
"host": "server02"
},
"values": {
"full_pct": 64
}
}
]
}
`
status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, batch)
if status != http.StatusOK {
t.Log(body)
t.Fatalf("unexpected status: %d", status)
}
time.Sleep(200 * time.Millisecond) // Ensure data node picks up write.
query := map[string]string{"db": "foo", "q": "select * from disk"}
status, body = MustHTTP("GET", s.URL+`/query`, query, nil, "")
if status != http.StatusOK {
t.Logf("query %s\n", query)
t.Log(body)
t.Errorf("unexpected status: %d", status)
}
r := &influxdb.Results{}
if err := json.Unmarshal([]byte(body), r); err != nil {
t.Logf("query : %s\n", query)
t.Log(body)
t.Error(err)
}
if len(r.Results) != 1 {
t.Fatalf("unexpected results count")
}
result := r.Results[0]
if len(result.Rows) != 1 {
t.Fatalf("unexpected row count, expected: %d, actual: %d", 1, len(result.Rows))
}
if len(result.Rows[0].Columns) != 3 {
t.Fatalf("unexpected column count, expected: %d, actual: %d", 3, len(result.Rows[0].Columns))
}
if len(result.Rows[0].Values) != 3 {
t.Fatalf("unexpected values count, expected: %d, actual: %d", 3, len(result.Rows[0].Values))
}
}
func TestHandler_serveWriteSeriesInvalidQueryField(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
srvr.CreateDatabase("foo")

View File

@ -81,6 +81,9 @@ var (
// ErrShardNotFound is returned writing to a non-existent shard.
ErrShardNotFound = errors.New("shard not found")
// ErrInvalidPointBuffer is returned when a buffer containing data for writing is invalid
ErrInvalidPointBuffer = errors.New("invalid point buffer")
// ErrReadAccessDenied is returned when a user attempts to read
// data that he or she does not have permission to read.
ErrReadAccessDenied = errors.New("read access denied")

View File

@ -90,6 +90,93 @@ func TestMeasurement_expandExpr(t *testing.T) {
}
}
// Ensure the createMeasurementsIfNotExistsCommand operates correctly.
func TestCreateMeasurementsCommand(t *testing.T) {
var err error
var n int
c := newCreateMeasurementsIfNotExistsCommand("foo")
if c == nil {
t.Fatal("createMeasurementsIfNotExistsCommand is nil")
}
// Add Measurement twice, to make sure nothing blows up.
c.addMeasurementIfNotExists("bar")
c.addMeasurementIfNotExists("bar")
n = len(c.Measurements)
if n != 1 {
t.Fatalf("wrong number of measurements, expected 1, got %d", n)
}
// Add Series, no tags.
c.addSeriesIfNotExists("bar", nil)
// Add Series, some tags.
tags := map[string]string{"host": "server01"}
c.addSeriesIfNotExists("bar", tags)
// Add Series, same tags again.
c.addSeriesIfNotExists("bar", tags)
n = len(c.Measurements["bar"].Tags)
if n != 2 {
t.Fatalf("measurement has wrong number of tags, expected 2, got %d", n)
}
// Add a field.
err = c.addFieldIfNotExists("bar", "value", influxql.Number)
if err != nil {
t.Fatal("error adding field \"value\"")
}
// Add same field again.
err = c.addFieldIfNotExists("bar", "value", influxql.Number)
if err != nil {
t.Fatal("error re-adding field \"value\"")
}
// Add another field.
err = c.addFieldIfNotExists("bar", "value2", influxql.String)
if err != nil {
t.Fatal("error re-adding field \"value2\"")
}
n = len(c.Measurements["bar"].Fields)
if n != 2 {
t.Fatalf("wrong number of fields, expected 2, got %d", n)
}
}
// Ensure the createMeasurementsIfNotExistsCommand returns expected errors.
func TestCreateMeasurementsCommand_Errors(t *testing.T) {
var err error
c := newCreateMeasurementsIfNotExistsCommand("foo")
if c == nil {
t.Fatal("createMeasurementsIfNotExistsCommand is nil")
}
// Ensure fields can be added to non-existent Measurements. The
// Measurements should be created automatically.
c.addSeriesIfNotExists("bar", nil)
err = c.addFieldIfNotExists("bar", "value", influxql.Number)
if err != nil {
t.Fatalf("unexpected error got %s", err.Error())
}
// Add Measurement. Adding it now should be OK.
c.addMeasurementIfNotExists("bar")
// Test type conflicts
err = c.addFieldIfNotExists("bar", "value", influxql.Number)
if err != nil {
t.Fatal("error adding field \"value\"")
}
err = c.addFieldIfNotExists("bar", "value", influxql.String)
if err != ErrFieldTypeConflict {
t.Fatalf("expected ErrFieldTypeConflict got %s", err.Error())
}
}
// MustParseExpr parses an expression string and returns its AST representation.
func MustParseExpr(s string) influxql.Expr {
expr, err := influxql.ParseExpr(s)

548
server.go
View File

@ -68,20 +68,17 @@ const (
createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40)
deleteShardGroupMessageType = messaging.MessageType(0x41)
// Series messages
createSeriesIfNotExistsMessageType = messaging.MessageType(0x50)
// Measurement messages
createFieldsIfNotExistsMessageType = messaging.MessageType(0x60)
createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x50)
// Continuous Query messages
createContinuousQueryMessageType = messaging.MessageType(0x70)
createContinuousQueryMessageType = messaging.MessageType(0x60)
// Write series data messages (per-topic)
writeRawSeriesMessageType = messaging.MessageType(0x80)
writeRawSeriesMessageType = messaging.MessageType(0x70)
// Privilege messages
setPrivilegeMessageType = messaging.MessageType(0x90)
setPrivilegeMessageType = messaging.MessageType(0x80)
)
// Server represents a collection of metadata and raw metric data.
@ -102,8 +99,7 @@ type Server struct {
databases map[string]*database // databases by name
users map[string]*User // user by name
shards map[uint64]*Shard // shards by shard id
shardsBySeriesID map[uint32][]*Shard // shards by series id
shards map[uint64]*Shard // shards by shard id
Logger *log.Logger
@ -132,9 +128,8 @@ func NewServer() *Server {
databases: make(map[string]*database),
users: make(map[string]*User),
shards: make(map[uint64]*Shard),
shardsBySeriesID: make(map[uint32][]*Shard),
Logger: log.New(os.Stderr, "[server] ", log.LstdFlags),
shards: make(map[uint64]*Shard),
Logger: log.New(os.Stderr, "[server] ", log.LstdFlags),
}
// Server will always return with authentication enabled.
// This ensures that disabling authentication must be an explicit decision.
@ -808,26 +803,6 @@ func (s *Server) CreateShardGroupIfNotExists(database, policy string, timestamp
return err
}
// createShardIfNotExists returns the shard group for a database, policy, and timestamp.
// If the group doesn't exist then one will be created automatically.
func (s *Server) createShardGroupIfNotExists(database, policy string, timestamp time.Time) (*ShardGroup, error) {
// Check if shard group exists first.
g, err := s.shardGroupByTimestamp(database, policy, timestamp)
if err != nil {
return nil, err
} else if g != nil {
return g, nil
}
// If the shard doesn't exist then create it.
if err := s.CreateShardGroupIfNotExists(database, policy, timestamp); err != nil {
return nil, err
}
// Lookup the shard again.
return s.shardGroupByTimestamp(database, policy, timestamp)
}
func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err error) {
var c createShardGroupIfNotExistsCommand
mustUnmarshalJSON(m.Data, &c)
@ -1494,97 +1469,78 @@ type setDefaultRetentionPolicyCommand struct {
Name string `json:"name"`
}
type createFieldsIfNotExistCommand struct {
Database string `json:"database"`
Measurement string `json:"measurement"`
Fields map[string]influxql.DataType `json:"fields"`
type createMeasurementSubcommand struct {
Name string `json:"name"`
Tags []map[string]string `json:"tags"`
Fields []*Field `json:"fields"`
}
func (s *Server) applyCreateFieldsIfNotExist(m *messaging.Message) error {
var c createFieldsIfNotExistCommand
mustUnmarshalJSON(m.Data, &c)
type createMeasurementsIfNotExistsCommand struct {
Database string `json:"database"`
Measurements map[string]*createMeasurementSubcommand `json:"measurements"`
}
s.mu.Lock()
defer s.mu.Unlock()
// Validate command.
db := s.databases[c.Database]
if db == nil {
return ErrDatabaseNotFound
}
mm := db.measurements[c.Measurement]
if mm == nil {
return ErrMeasurementNotFound
func newCreateMeasurementsIfNotExistsCommand(database string) *createMeasurementsIfNotExistsCommand {
return &createMeasurementsIfNotExistsCommand{
Database: database,
Measurements: make(map[string]*createMeasurementSubcommand),
}
}
// Create fields in Metastore.
nCurrFields := len(mm.Fields)
for k, v := range c.Fields {
if err := mm.createFieldIfNotExists(k, v); err != nil {
if err == ErrFieldOverflow {
log.Printf("no more fields allowed: %s::%s", mm.Name, k)
continue
} else if err == ErrFieldTypeConflict {
log.Printf("field type conflict: %s::%s", mm.Name, k)
continue
// addMeasurementIfNotExists adds the Measurement to the command, but only if not already present
// in the command.
func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name string) {
_, ok := c.Measurements[name]
if !ok {
c.Measurements[name] = &createMeasurementSubcommand{
Name: name,
Tags: make([]map[string]string, 0),
Fields: make([]*Field, 0),
}
}
}
// addSeriesIfNotExists adds the Series, identified by Measurement name and tag set, to
// the command, but only if not already present in the command.
func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement string, tags map[string]string) {
c.addMeasurementIfNotExists(measurement)
m := c.Measurements[measurement]
tagset := string(marshalTags(tags))
for _, t := range m.Tags {
if string(marshalTags(t)) == tagset {
// Series already present in subcommand, nothing to do.
return
}
}
// Tag-set needs to added to subcommand.
m.Tags = append(m.Tags, tags)
return
}
// addFieldIfNotExists adds the field to the command for the Measurement, but only if it is not already
// present. It will return an error if the field is present in the command, but is of a different type.
func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, name string, typ influxql.DataType) error {
c.addMeasurementIfNotExists(measurement)
m := c.Measurements[measurement]
for _, f := range m.Fields {
if f.Name == name {
if f.Type != typ {
return ErrFieldTypeConflict
}
return err
}
}
// Update Metastore only if the Measurement's fields were actually changed.
if len(mm.Fields) > nCurrFields {
if err := s.meta.mustUpdate(func(tx *metatx) error {
if err := tx.saveMeasurement(db.name, mm); err != nil {
return fmt.Errorf("save measurement: %s", err)
}
return tx.saveDatabase(db)
}); err != nil {
return err
// Field already present in subcommand with same type, nothing to do.
return nil
}
}
// New field for this measurement so add it to the subcommand.
newField := &Field{Name: name, Type: typ}
m.Fields = append(m.Fields, newField)
return nil
}
func (s *Server) applyCreateSeriesIfNotExists(m *messaging.Message) error {
var c createSeriesIfNotExistsCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Validate command.
db := s.databases[c.Database]
if db == nil {
return ErrDatabaseNotFound
}
if _, series := db.MeasurementAndSeries(c.Name, c.Tags); series != nil {
return nil
}
// save to the metastore and add it to the in memory index
var series *Series
if err := s.meta.mustUpdate(func(tx *metatx) error {
var err error
series, err = tx.createSeries(db.name, c.Name, c.Tags)
return err
}); err != nil {
return err
}
db.addSeriesToIndex(c.Name, series)
return nil
}
type createSeriesIfNotExistsCommand struct {
Database string `json:"database"`
Name string `json:"name"`
Tags map[string]string `json:"tags"`
}
// Point defines the values that will be written to the database
type Point struct {
Name string
@ -1607,104 +1563,89 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
retentionPolicy = rp.Name
}
// Collect responses for each channel.
type resp struct {
index uint64
err error
// Ensure all required Series and Measurement Fields are created cluster-wide.
if err := s.createMeasurementsIfNotExists(database, retentionPolicy, points); err != nil {
return 0, err
}
ch := make(chan resp, len(points))
// Write each point in parallel.
var wg sync.WaitGroup
for i := range points {
wg.Add(1)
go func(p *Point) {
index, err := s.writePoint(database, retentionPolicy, p)
ch <- resp{index, err}
wg.Done()
}(&points[i])
// Ensure all the required shard groups exist. TODO: this should be done async.
if err := s.createShardGroupsIfNotExists(database, retentionPolicy, points); err != nil {
return 0, err
}
wg.Wait()
close(ch)
// Calculate max index and check for errors.
var index uint64
// Build writeRawSeriesMessageType publish commands.
shardData := make(map[uint64][]byte, 0)
codecs := make(map[string]*FieldCodec, 0)
if err := func() error {
// Local function makes lock management foolproof.
s.mu.RLock()
defer s.mu.RUnlock()
db := s.databases[database]
if db == nil {
return ErrDatabaseNotFound
}
for _, p := range points {
measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
if series == nil {
return ErrSeriesNotFound
}
// Retrieve shard group.
g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp)
if err != nil {
return err
}
// Find appropriate shard within the shard group.
sh := g.ShardBySeriesID(series.ID)
// Many points are likely to have the same Measurement name. Re-use codecs if possible.
var codec *FieldCodec
codec, ok := codecs[measurement.Name]
if !ok {
codec = NewFieldCodec(measurement)
codecs[measurement.Name] = codec
}
// Convert string-key/values to encoded fields.
encodedFields, err := codec.EncodeFields(p.Values)
if err != nil {
return err
}
// Encode point header, followed by point data, and add to shard's batch.
data := marshalPointHeader(series.ID, uint32(len(encodedFields)), p.Timestamp.UnixNano())
data = append(data, encodedFields...)
if shardData[sh.ID] == nil {
shardData[sh.ID] = make([]byte, 0)
}
shardData[sh.ID] = append(shardData[sh.ID], data...)
}
return nil
}(); err != nil {
return 0, err
}
// Write data for each shard to the Broker.
var err error
for resp := range ch {
if resp.index > index {
index = resp.index
var maxIndex uint64
for i, d := range shardData {
index, err := s.client.Publish(&messaging.Message{
Type: writeRawSeriesMessageType,
TopicID: i,
Data: d,
})
if err != nil {
return maxIndex, err
}
if err == nil && resp.err != nil {
err = resp.err
if index > maxIndex {
maxIndex = index
}
}
return index, err
}
func (s *Server) writePoint(database, retentionPolicy string, point *Point) (uint64, error) {
measurement, tags, timestamp, values := point.Name, point.Tags, point.Timestamp, point.Values
// Sanity-check the data point.
if measurement == "" {
return 0, ErrMeasurementNameRequired
}
if len(values) == 0 {
return 0, ErrValuesRequired
}
// Find the id for the series and tagset
seriesID, err := s.createSeriesIfNotExists(database, measurement, tags)
if err != nil {
return 0, err
}
// Retrieve measurement.
m, err := s.measurement(database, measurement)
if err != nil {
return 0, err
} else if m == nil {
return 0, ErrMeasurementNotFound
}
// Retrieve shard group.
g, err := s.createShardGroupIfNotExists(database, retentionPolicy, timestamp)
if err != nil {
return 0, fmt.Errorf("create shard(%s/%s): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
}
// Find appropriate shard within the shard group.
sh := g.ShardBySeriesID(seriesID)
// Ensure fields are created as necessary.
err = s.createFieldsIfNotExists(database, measurement, values)
if err != nil {
return 0, err
}
// Get a field codec.
s.mu.RLock()
codec := NewFieldCodec(m)
s.mu.RUnlock()
if codec == nil {
panic("field codec is nil")
}
// Convert string-key/values to encoded fields.
encodedFields, err := codec.EncodeFields(values)
if err != nil {
return 0, err
}
// Encode point header.
data := marshalPointHeader(seriesID, timestamp.UnixNano())
data = append(data, encodedFields...)
// Publish "raw write series" message on shard's topic to broker.
return s.client.Publish(&messaging.Message{
Type: writeRawSeriesMessageType,
TopicID: sh.ID,
Data: data,
})
return maxIndex, err
}
// applyWriteRawSeries writes raw series data to the database.
@ -1717,106 +1658,151 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
return ErrShardNotFound
}
// Extract the series id and timestamp from the header.
// Everything after the header is the marshalled value.
seriesID, timestamp := unmarshalPointHeader(m.Data[:pointHeaderSize])
data := m.Data[pointHeaderSize:]
if err := sh.writeSeries(m.Data); err != nil {
return err
}
// Add to lookup.
s.addShardBySeriesID(sh, seriesID)
// TODO: Enable some way to specify if the data should be overwritten
overwrite := true
// Write to shard.
return sh.writeSeries(seriesID, timestamp, data, overwrite)
return nil
}
func (s *Server) addShardBySeriesID(sh *Shard, seriesID uint32) {
for _, other := range s.shardsBySeriesID[seriesID] {
if other.ID == sh.ID {
return
}
}
s.shardsBySeriesID[seriesID] = append(s.shardsBySeriesID[seriesID], sh)
}
// createMeasurementsIfNotExists walks the "points" and ensures that all new Series are created, and all
// new Measurement fields have been created, across the cluster.
func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string, points []Point) error {
c := newCreateMeasurementsIfNotExistsCommand(database)
func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]string) (uint32, error) {
// Try to find series locally first.
s.mu.RLock()
db := s.databases[database]
if db == nil {
s.mu.RUnlock()
return 0, fmt.Errorf("database not found %q", database)
}
if _, series := db.MeasurementAndSeries(name, tags); series != nil {
s.mu.RUnlock()
return series.ID, nil
}
// release the read lock so the broadcast can actually go through and acquire the write lock
s.mu.RUnlock()
// If it doesn't exist then create a message and broadcast.
c := &createSeriesIfNotExistsCommand{Database: database, Name: name, Tags: tags}
_, err := s.broadcast(createSeriesIfNotExistsMessageType, c)
if err != nil {
return 0, err
}
// Lookup series again.
_, series := db.MeasurementAndSeries(name, tags)
if series == nil {
return 0, ErrSeriesNotFound
}
return series.ID, nil
}
func (s *Server) createFieldsIfNotExists(database string, measurement string, values map[string]interface{}) error {
// Local function keeps locking foolproof.
f := func(database string, measurement string, values map[string]interface{}) (map[string]influxql.DataType, error) {
// Local function keeps lock management foolproof.
func() error {
s.mu.RLock()
defer s.mu.RUnlock()
// Check to see if the fields already exist.
m, err := s.measurement(database, measurement)
if err != nil {
return nil, err
} else if m == nil {
return nil, ErrMeasurementNotFound
db := s.databases[database]
if db == nil {
return fmt.Errorf("database not found %q", database)
}
newFields := make(map[string]influxql.DataType)
for k, v := range values {
f := m.FieldByName(k)
if f == nil {
newFields[k] = influxql.InspectDataType(v)
} else {
if f.Type != influxql.InspectDataType(v) {
return nil, fmt.Errorf(fmt.Sprintf("field \"%s\" is type %T, mapped as type %s", k, v, f.Type))
for _, p := range points {
measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
if series == nil {
// Series does not exist in Metastore, add it so it's created cluster-wide.
c.addSeriesIfNotExists(p.Name, p.Tags)
}
for k, v := range p.Values {
if measurement != nil {
if f := measurement.FieldByName(k); f != nil {
// Field present in Metastore, make sure there is no type conflict.
if f.Type != influxql.InspectDataType(v) {
return fmt.Errorf(fmt.Sprintf("field \"%s\" is type %T, mapped as type %s", k, v, f.Type))
}
continue // Field is present, and it's of the same type. Nothing more to do.
}
}
// Field isn't in Metastore. Add it to command so it's created cluster-wide.
if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
return err
}
}
}
return newFields, nil
}
newFields, err := f(database, measurement, values)
if err != nil {
return err
}
if len(newFields) == 0 {
return nil
}()
// Any broadcast actually required?
if len(c.Measurements) > 0 {
_, err := s.broadcast(createMeasurementsIfNotExistsMessageType, c)
if err != nil {
return err
}
}
// There are some new fields, so create field types mappings on cluster.
c := &createFieldsIfNotExistCommand{Database: database, Measurement: measurement, Fields: newFields}
_, err = s.broadcast(createFieldsIfNotExistsMessageType, c)
if err != nil {
return nil
}
// applyCreateMeasurementsIfNotExists creates the Measurements, Series, and Fields in the Metastore.
func (s *Server) applyCreateMeasurementsIfNotExists(m *messaging.Message) error {
var c createMeasurementsIfNotExistsCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Validate command.
db := s.databases[c.Database]
if db == nil {
return ErrDatabaseNotFound
}
// Process command within a transaction.
if err := s.meta.mustUpdate(func(tx *metatx) error {
for _, cm := range c.Measurements {
// Create each series
for _, t := range cm.Tags {
_, ss := db.MeasurementAndSeries(cm.Name, t)
// Ensure creation of Series is idempotent.
if ss != nil {
continue
}
series, err := tx.createSeries(db.name, cm.Name, t)
if err != nil {
return err
}
db.addSeriesToIndex(cm.Name, series)
}
// Create each new field.
mm := db.measurements[cm.Name]
if mm == nil {
panic(fmt.Sprintf("Measurement %s does not exist", cm.Name))
}
for _, f := range cm.Fields {
if err := mm.createFieldIfNotExists(f.Name, f.Type); err != nil {
if err == ErrFieldOverflow {
log.Printf("no more fields allowed: %s::%s", mm.Name, f.Name)
continue
} else if err == ErrFieldTypeConflict {
log.Printf("field type conflict: %s::%s", mm.Name, f.Name)
continue
}
return err
}
if err := tx.saveMeasurement(db.name, mm); err != nil {
return fmt.Errorf("save measurement: %s", err)
}
}
if err := tx.saveDatabase(db); err != nil {
return fmt.Errorf("save database: %s", err)
}
}
return nil
}); err != nil {
return err
}
return nil
}
// createShardGroupsIfNotExist walks the "points" and ensures that all required shards exist on the cluster.
func (s *Server) createShardGroupsIfNotExists(database, retentionPolicy string, points []Point) error {
for _, p := range points {
// Check if shard group exists first.
g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp)
if err != nil {
return err
} else if g != nil {
continue
}
err = s.CreateShardGroupIfNotExists(database, retentionPolicy, p.Timestamp)
if err != nil {
return fmt.Errorf("create shard(%s/%s): %s", retentionPolicy, p.Timestamp.Format(time.RFC3339Nano), err)
}
}
return nil
}
// ReadSeries reads a single point from a series in the database. It is used for debug and test only.
func (s *Server) ReadSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time) (map[string]interface{}, error) {
s.mu.RLock()
@ -2719,10 +2705,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
err = s.applyDeleteShardGroup(m)
case setDefaultRetentionPolicyMessageType:
err = s.applySetDefaultRetentionPolicy(m)
case createFieldsIfNotExistsMessageType:
err = s.applyCreateFieldsIfNotExist(m)
case createSeriesIfNotExistsMessageType:
err = s.applyCreateSeriesIfNotExists(m)
case createMeasurementsIfNotExistsMessageType:
err = s.applyCreateMeasurementsIfNotExists(m)
case setPrivilegeMessageType:
err = s.applySetPrivilege(m)
case createContinuousQueryMessageType:

View File

@ -108,18 +108,37 @@ func (s *Shard) readSeries(seriesID uint32, timestamp int64) (values []byte, err
return
}
// writeSeries writes series data to a shard.
func (s *Shard) writeSeries(seriesID uint32, timestamp int64, values []byte, overwrite bool) error {
// writeSeries writes series batch to a shard.
func (s *Shard) writeSeries(batch []byte) error {
return s.store.Update(func(tx *bolt.Tx) error {
// Create a bucket for the series.
b, err := tx.CreateBucketIfNotExists(u32tob(seriesID))
if err != nil {
return err
}
for {
if pointHeaderSize > len(batch) {
return ErrInvalidPointBuffer
}
seriesID, payloadLength, timestamp := unmarshalPointHeader(batch[:pointHeaderSize])
batch = batch[pointHeaderSize:]
// Insert the values by timestamp.
if err := b.Put(u64tob(uint64(timestamp)), values); err != nil {
return err
if payloadLength > uint32(len(batch)) {
return ErrInvalidPointBuffer
}
data := batch[:payloadLength]
// Create a bucket for the series.
b, err := tx.CreateBucketIfNotExists(u32tob(seriesID))
if err != nil {
return err
}
// Insert the values by timestamp.
if err := b.Put(u64tob(uint64(timestamp)), data); err != nil {
return err
}
// Push the buffer forward and check if we're done.
batch = batch[payloadLength:]
if len(batch) == 0 {
break
}
}
return nil
@ -134,20 +153,22 @@ func (s *Shard) deleteSeries(name string) error {
type Shards []*Shard
// pointHeaderSize represents the size of a point header, in bytes.
const pointHeaderSize = 4 + 8 // seriesID + timestamp
const pointHeaderSize = 4 + 4 + 8 // seriesID + payload length + timestamp
// marshalPointHeader encodes a series id, timestamp, & flagset into a byte slice.
func marshalPointHeader(seriesID uint32, timestamp int64) []byte {
b := make([]byte, 12)
// marshalPointHeader encodes a series id, payload length, timestamp, & flagset into a byte slice.
func marshalPointHeader(seriesID uint32, payloadLength uint32, timestamp int64) []byte {
b := make([]byte, pointHeaderSize)
binary.BigEndian.PutUint32(b[0:4], seriesID)
binary.BigEndian.PutUint64(b[4:12], uint64(timestamp))
binary.BigEndian.PutUint32(b[4:8], payloadLength)
binary.BigEndian.PutUint64(b[8:16], uint64(timestamp))
return b
}
// unmarshalPointHeader decodes a byte slice into a series id, timestamp & flagset.
func unmarshalPointHeader(b []byte) (seriesID uint32, timestamp int64) {
func unmarshalPointHeader(b []byte) (seriesID uint32, payloadLength uint32, timestamp int64) {
seriesID = binary.BigEndian.Uint32(b[0:4])
timestamp = int64(binary.BigEndian.Uint64(b[4:12]))
payloadLength = binary.BigEndian.Uint32(b[4:8])
timestamp = int64(binary.BigEndian.Uint64(b[8:16]))
return
}