From 55fbd7e42cb6ef7a35ee0b2417c546a62f5e7f62 Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip@influxdb.com>
Date: Wed, 18 Feb 2015 15:58:26 -0800
Subject: [PATCH 01/17] Batch Measurement creation

---
 server.go | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 124 insertions(+)

diff --git a/server.go b/server.go
index 242a1ad6a5..4ca97dfc70 100644
--- a/server.go
+++ b/server.go
@@ -1585,6 +1585,68 @@ type createSeriesIfNotExistsCommand struct {
 	Tags     map[string]string `json:"tags"`
 }
 
+type createMeasurementSubcommand struct {
+	Name   string                       `json:"name"`
+	Tags   map[string]map[string]string `json:"tags"`
+	Fields map[string]influxql.DataType `json:"fields"`
+}
+
+type createMeasurementsIfNotExistsCommand struct {
+	Database     string                                 `json:"database"`
+	Measurements map[string]createMeasurementSubcommand `json:"measurements"`
+}
+
+// addMeasurementIfNotExists adds the Measurement to the command, but only if not already present
+// in the command.
+func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name string) error {
+	_, ok := c.Measurements[name]
+	if !ok {
+		c.Measurements[name] = createMeasurementSubcommand{Name: name}
+	}
+	return nil
+}
+
+// addSeriesIfNotExists adds the Series, identified by Measurement name and tag set, to
+// the command, but only if not already present in the command.
+func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement string, tags map[string]string) error {
+	_, ok := c.Measurements[measurement]
+	if !ok {
+		return ErrMeasurementNotFound
+	}
+
+	tagset := string(marshalTags(tags))
+	_, ok = c.Measurements[measurement].Tags[tagset]
+	if ok {
+		// Series already present in in subcommand, nothing to do.
+		return nil
+	}
+	// Tag-set needs to added to subcommand.
+	c.Measurements[measurement].Tags[tagset] = tags
+
+	return nil
+}
+
+// addFieldIfNotExists adds the field to the command for the Measurement, but only if it is not already
+// present. It will return an error if the field is present in the command, but is of a different type.
+func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, name string, typ influxql.DataType) error {
+	_, ok := c.Measurements[measurement]
+	if !ok {
+		return ErrMeasurementNotFound
+	}
+
+	t, ok := c.Measurements[measurement].Fields[name]
+	if ok {
+		if typ != t {
+			return ErrFieldTypeConflict
+		}
+		// Field already present in subcommand with same type, nothing to do.
+		return nil
+	}
+	// New field for this measurement so add it to the subcommand.
+	c.Measurements[measurement].Fields[name] = typ
+	return nil
+}
+
 // Point defines the values that will be written to the database
 type Point struct {
 	Name      string
@@ -1607,6 +1669,10 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
 		retentionPolicy = rp.Name
 	}
 
+	if err := s.createMeasurementsIfNotExists(database, retentionPolicy, points); err != nil {
+		return 0, err
+	}
+
 	// Collect responses for each channel.
 	type resp struct {
 		index uint64
@@ -1771,6 +1837,64 @@ func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]
 	return series.ID, nil
 }
 
+// createMeasurementsIfNotExists walks the "points" and ensures that all new Series are created, and all
+// new Measurement fields have been created, across the cluster.
+func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string, points []Point) error {
+	c := createMeasurementsIfNotExistsCommand{Database: database}
+
+	s.mu.RLock()
+	db := s.databases[database]
+	if db == nil {
+		s.mu.RUnlock()
+		return fmt.Errorf("database not found %q", database)
+	}
+
+	for _, p := range points {
+		measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
+
+		if measurement == nil {
+			// Measurement not in Metastore, add to command so it's created cluster-wide.
+			c.addMeasurementIfNotExists(p.Name)
+		}
+
+		if series == nil {
+			// Series does not exist in Metastore, add it so it's created cluster-wide.
+			c.addSeriesIfNotExists(p.Name, p.Tags)
+		}
+
+		for k, v := range p.Values {
+			if measurement != nil {
+				if f := measurement.FieldByName(k); f != nil {
+					// Field present in Metastore, make sure there is no type conflict.
+					if f.Type != influxql.InspectDataType(v) {
+						return fmt.Errorf(fmt.Sprintf("field \"%s\" is type %T, mapped as type %s", k, v, f.Type))
+					} else {
+						// Field does not exist in Metastore, add it so it's created cluster-wide.
+						if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
+							return err
+						}
+					}
+				}
+			} else {
+				// Measurement does not exist, so fields can't exist. Add each one unconditionally.
+				if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
+					return err
+				}
+			}
+		}
+	}
+
+	// Any broadcast actually required?
+	if len(c.Measurements) > 0 {
+		_, err := s.broadcast(createSeriesIfNotExistsMessageType, c)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 func (s *Server) createFieldsIfNotExists(database string, measurement string, values map[string]interface{}) error {
 	// Local function keeps locking foolproof.
 	f := func(database string, measurement string, values map[string]interface{}) (map[string]influxql.DataType, error) {

From 3435da962616dcf547d3c74110670f527481477f Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip@influxdb.com>
Date: Wed, 18 Feb 2015 16:38:49 -0800
Subject: [PATCH 02/17] Correctly initialize createMeasurement maps

Full unit tests added for happy paths.
---
 internal_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++++
 server.go        | 13 ++++++++-
 2 files changed, 84 insertions(+), 1 deletion(-)

diff --git a/internal_test.go b/internal_test.go
index ab31ae44b0..19be3797dd 100644
--- a/internal_test.go
+++ b/internal_test.go
@@ -90,6 +90,78 @@ func TestMeasurement_expandExpr(t *testing.T) {
 	}
 }
 
+// Ensure the createMeasurementsIfNotExistsCommand operates correctly.
+func TestCreateMeasurementsCommand(t *testing.T) {
+	var err error
+	var n int
+	c := newCreateMeasurementsIfNotExistsCommand("foo")
+	if c == nil {
+		t.Fatal("createMeasurementsIfNotExistsCommand is nil")
+	}
+
+	// Add Measurement.
+	err = c.addMeasurementIfNotExists("bar")
+	if err != nil {
+		t.Fatal("error adding measurement bar")
+	}
+	err = c.addMeasurementIfNotExists("bar")
+	if err != nil {
+		t.Fatal("error re-adding measurement bar")
+	}
+
+	n = len(c.Measurements)
+	if n != 1 {
+		t.Fatalf("wrong number of measurements, expected 1, got %d", n)
+	}
+
+	// Add Series, no tags.
+	err = c.addSeriesIfNotExists("bar", nil)
+	if err != nil {
+		t.Fatal("error adding series with nil tags")
+	}
+
+	// Add Series, some tags.
+	tags := map[string]string{"host": "server01"}
+	err = c.addSeriesIfNotExists("bar", tags)
+	if err != nil {
+		t.Fatal("error adding series with non-nil tags")
+	}
+
+	// Add Series, same tags again.
+	err = c.addSeriesIfNotExists("bar", tags)
+	if err != nil {
+		t.Fatal("error re-adding series with non-nil tags")
+	}
+
+	n = len(c.Measurements["bar"].Tags)
+	if n != 2 {
+		t.Fatalf("measurement has wrong number of tags, expected 2, got %d", n)
+	}
+
+	// Add a fields.
+	err = c.addFieldIfNotExists("bar", "value", influxql.Number)
+	if err != nil {
+		t.Fatal("error adding field \"value\"")
+	}
+
+	// Add same field again.
+	err = c.addFieldIfNotExists("bar", "value", influxql.Number)
+	if err != nil {
+		t.Fatal("error re-adding field \"value\"")
+	}
+
+	// Add another field.
+	err = c.addFieldIfNotExists("bar", "value2", influxql.String)
+	if err != nil {
+		t.Fatal("error re-adding field \"value2\"")
+	}
+
+	n = len(c.Measurements["bar"].Fields)
+	if n != 2 {
+		t.Fatalf("wrong number of fields, expected 2, got %d", n)
+	}
+}
+
 // MustParseExpr parses an expression string and returns its AST representation.
 func MustParseExpr(s string) influxql.Expr {
 	expr, err := influxql.ParseExpr(s)
diff --git a/server.go b/server.go
index 4ca97dfc70..6308a033a7 100644
--- a/server.go
+++ b/server.go
@@ -1596,12 +1596,23 @@ type createMeasurementsIfNotExistsCommand struct {
 	Measurements map[string]createMeasurementSubcommand `json:"measurements"`
 }
 
+func newCreateMeasurementsIfNotExistsCommand(database string) *createMeasurementsIfNotExistsCommand {
+	return &createMeasurementsIfNotExistsCommand{
+		Database:     database,
+		Measurements: make(map[string]createMeasurementSubcommand),
+	}
+}
+
 // addMeasurementIfNotExists adds the Measurement to the command, but only if not already present
 // in the command.
 func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name string) error {
 	_, ok := c.Measurements[name]
 	if !ok {
-		c.Measurements[name] = createMeasurementSubcommand{Name: name}
+		c.Measurements[name] = createMeasurementSubcommand{
+			Name:   name,
+			Tags:   make(map[string]map[string]string),
+			Fields: make(map[string]influxql.DataType),
+		}
 	}
 	return nil
 }

From 93dea5d527f5129de274bdc2f838393dcecac66f Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip@influxdb.com>
Date: Wed, 18 Feb 2015 16:44:54 -0800
Subject: [PATCH 03/17] Test failure cases for measurement create command

---
 internal_test.go | 37 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)

diff --git a/internal_test.go b/internal_test.go
index 19be3797dd..57ae52b18e 100644
--- a/internal_test.go
+++ b/internal_test.go
@@ -138,7 +138,7 @@ func TestCreateMeasurementsCommand(t *testing.T) {
 		t.Fatalf("measurement has wrong number of tags, expected 2, got %d", n)
 	}
 
-	// Add a fields.
+	// Add a field.
 	err = c.addFieldIfNotExists("bar", "value", influxql.Number)
 	if err != nil {
 		t.Fatal("error adding field \"value\"")
@@ -162,6 +162,41 @@ func TestCreateMeasurementsCommand(t *testing.T) {
 	}
 }
 
+// Ensure the createMeasurementsIfNotExistsCommand returns expected errors.
+func TestCreateMeasurementsCommand_Errors(t *testing.T) {
+	var err error
+	c := newCreateMeasurementsIfNotExistsCommand("foo")
+	if c == nil {
+		t.Fatal("createMeasurementsIfNotExistsCommand is nil")
+	}
+
+	err = c.addSeriesIfNotExists("bar", nil)
+	if err != ErrMeasurementNotFound {
+		t.Fatalf("expected ErrMeasurementNotFound got %s", err.Error())
+	}
+
+	err = c.addFieldIfNotExists("bar", "value", influxql.Number)
+	if err != ErrMeasurementNotFound {
+		t.Fatalf("expected ErrMeasurementNotFound got %s", err.Error())
+	}
+
+	// Add Measurement.
+	err = c.addMeasurementIfNotExists("bar")
+	if err != nil {
+		t.Fatal("error adding measurement bar")
+	}
+
+	// Test type conflicts
+	err = c.addFieldIfNotExists("bar", "value", influxql.Number)
+	if err != nil {
+		t.Fatal("error adding field \"value\"")
+	}
+	err = c.addFieldIfNotExists("bar", "value", influxql.String)
+	if err != ErrFieldTypeConflict {
+		t.Fatalf("expected ErrFieldTypeConflict got %s", err.Error())
+	}
+}
+
 // MustParseExpr parses an expression string and returns its AST representation.
 func MustParseExpr(s string) influxql.Expr {
 	expr, err := influxql.ParseExpr(s)

From 0d3ab9fa1d194fc3e0786b349fc4d6b7f7fa31c9 Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip@influxdb.com>
Date: Wed, 18 Feb 2015 17:14:45 -0800
Subject: [PATCH 04/17] Create-measurement command in local function

---
 server.go | 73 ++++++++++++++++++++++++++++++-------------------------
 1 file changed, 40 insertions(+), 33 deletions(-)

diff --git a/server.go b/server.go
index 6308a033a7..9790134bd9 100644
--- a/server.go
+++ b/server.go
@@ -1680,6 +1680,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
 		retentionPolicy = rp.Name
 	}
 
+	// Ensure all required Series and Measurement Fields are created cluster-wide.
 	if err := s.createMeasurementsIfNotExists(database, retentionPolicy, points); err != nil {
 		return 0, err
 	}
@@ -1851,49 +1852,55 @@ func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]
 // createMeasurementsIfNotExists walks the "points" and ensures that all new Series are created, and all
 // new Measurement fields have been created, across the cluster.
 func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string, points []Point) error {
-	c := createMeasurementsIfNotExistsCommand{Database: database}
+	c := newCreateMeasurementsIfNotExistsCommand(database)
 
-	s.mu.RLock()
-	db := s.databases[database]
-	if db == nil {
-		s.mu.RUnlock()
-		return fmt.Errorf("database not found %q", database)
-	}
+	// Local function keeps lock management foolproof.
+	func() error {
+		s.mu.RLock()
+		defer s.mu.RUnlock()
 
-	for _, p := range points {
-		measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
-
-		if measurement == nil {
-			// Measurement not in Metastore, add to command so it's created cluster-wide.
-			c.addMeasurementIfNotExists(p.Name)
+		db := s.databases[database]
+		if db == nil {
+			return fmt.Errorf("database not found %q", database)
 		}
 
-		if series == nil {
-			// Series does not exist in Metastore, add it so it's created cluster-wide.
-			c.addSeriesIfNotExists(p.Name, p.Tags)
-		}
+		for _, p := range points {
+			measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
 
-		for k, v := range p.Values {
-			if measurement != nil {
-				if f := measurement.FieldByName(k); f != nil {
-					// Field present in Metastore, make sure there is no type conflict.
-					if f.Type != influxql.InspectDataType(v) {
-						return fmt.Errorf(fmt.Sprintf("field \"%s\" is type %T, mapped as type %s", k, v, f.Type))
-					} else {
-						// Field does not exist in Metastore, add it so it's created cluster-wide.
-						if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
-							return err
+			if measurement == nil {
+				// Measurement not in Metastore, add to command so it's created cluster-wide.
+				c.addMeasurementIfNotExists(p.Name)
+			}
+
+			if series == nil {
+				// Series does not exist in Metastore, add it so it's created cluster-wide.
+				c.addSeriesIfNotExists(p.Name, p.Tags)
+			}
+
+			for k, v := range p.Values {
+				if measurement != nil {
+					if f := measurement.FieldByName(k); f != nil {
+						// Field present in Metastore, make sure there is no type conflict.
+						if f.Type != influxql.InspectDataType(v) {
+							return fmt.Errorf(fmt.Sprintf("field \"%s\" is type %T, mapped as type %s", k, v, f.Type))
+						} else {
+							// Field does not exist in Metastore, add it so it's created cluster-wide.
+							if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
+								return err
+							}
 						}
 					}
-				}
-			} else {
-				// Measurement does not exist, so fields can't exist. Add each one unconditionally.
-				if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
-					return err
+				} else {
+					// Measurement does not exist in Metastore, so fields can't exist. Add each one unconditionally.
+					if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
+						return err
+					}
 				}
 			}
 		}
-	}
+
+		return nil
+	}()
 
 	// Any broadcast actually required?
 	if len(c.Measurements) > 0 {

From 25c3b104b777faa2dadddad7ae9e9751a87c5fbd Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Wed, 18 Feb 2015 22:53:20 -0800
Subject: [PATCH 05/17] WriteSeries now uses batching

---
 server.go | 364 +++++++++++++-----------------------------------------
 1 file changed, 85 insertions(+), 279 deletions(-)

diff --git a/server.go b/server.go
index 9790134bd9..1474aa7f2d 100644
--- a/server.go
+++ b/server.go
@@ -68,20 +68,17 @@ const (
 	createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40)
 	deleteShardGroupMessageType            = messaging.MessageType(0x41)
 
-	// Series messages
-	createSeriesIfNotExistsMessageType = messaging.MessageType(0x50)
-
 	// Measurement messages
-	createFieldsIfNotExistsMessageType = messaging.MessageType(0x60)
+	createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x50)
 
 	// Continuous Query messages
-	createContinuousQueryMessageType = messaging.MessageType(0x70)
+	createContinuousQueryMessageType = messaging.MessageType(0x60)
 
 	// Write series data messages (per-topic)
-	writeRawSeriesMessageType = messaging.MessageType(0x80)
+	writeRawSeriesMessageType = messaging.MessageType(0x70)
 
 	// Privilege messages
-	setPrivilegeMessageType = messaging.MessageType(0x90)
+	setPrivilegeMessageType = messaging.MessageType(0x80)
 )
 
 // Server represents a collection of metadata and raw metric data.
@@ -808,26 +805,6 @@ func (s *Server) CreateShardGroupIfNotExists(database, policy string, timestamp
 	return err
 }
 
-// createShardIfNotExists returns the shard group for a database, policy, and timestamp.
-// If the group doesn't exist then one will be created automatically.
-func (s *Server) createShardGroupIfNotExists(database, policy string, timestamp time.Time) (*ShardGroup, error) {
-	// Check if shard group exists first.
-	g, err := s.shardGroupByTimestamp(database, policy, timestamp)
-	if err != nil {
-		return nil, err
-	} else if g != nil {
-		return g, nil
-	}
-
-	// If the shard doesn't exist then create it.
-	if err := s.CreateShardGroupIfNotExists(database, policy, timestamp); err != nil {
-		return nil, err
-	}
-
-	// Lookup the shard again.
-	return s.shardGroupByTimestamp(database, policy, timestamp)
-}
-
 func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err error) {
 	var c createShardGroupIfNotExistsCommand
 	mustUnmarshalJSON(m.Data, &c)
@@ -1494,97 +1471,6 @@ type setDefaultRetentionPolicyCommand struct {
 	Name     string `json:"name"`
 }
 
-type createFieldsIfNotExistCommand struct {
-	Database    string                       `json:"database"`
-	Measurement string                       `json:"measurement"`
-	Fields      map[string]influxql.DataType `json:"fields"`
-}
-
-func (s *Server) applyCreateFieldsIfNotExist(m *messaging.Message) error {
-	var c createFieldsIfNotExistCommand
-	mustUnmarshalJSON(m.Data, &c)
-
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	// Validate command.
-	db := s.databases[c.Database]
-	if db == nil {
-		return ErrDatabaseNotFound
-	}
-	mm := db.measurements[c.Measurement]
-	if mm == nil {
-		return ErrMeasurementNotFound
-	}
-
-	// Create fields in Metastore.
-	nCurrFields := len(mm.Fields)
-	for k, v := range c.Fields {
-		if err := mm.createFieldIfNotExists(k, v); err != nil {
-			if err == ErrFieldOverflow {
-				log.Printf("no more fields allowed: %s::%s", mm.Name, k)
-				continue
-			} else if err == ErrFieldTypeConflict {
-				log.Printf("field type conflict: %s::%s", mm.Name, k)
-				continue
-			}
-			return err
-		}
-	}
-
-	// Update Metastore only if the Measurement's fields were actually changed.
-	if len(mm.Fields) > nCurrFields {
-		if err := s.meta.mustUpdate(func(tx *metatx) error {
-			if err := tx.saveMeasurement(db.name, mm); err != nil {
-				return fmt.Errorf("save measurement: %s", err)
-			}
-			return tx.saveDatabase(db)
-		}); err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
-
-func (s *Server) applyCreateSeriesIfNotExists(m *messaging.Message) error {
-	var c createSeriesIfNotExistsCommand
-	mustUnmarshalJSON(m.Data, &c)
-
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	// Validate command.
-	db := s.databases[c.Database]
-	if db == nil {
-		return ErrDatabaseNotFound
-	}
-
-	if _, series := db.MeasurementAndSeries(c.Name, c.Tags); series != nil {
-		return nil
-	}
-
-	// save to the metastore and add it to the in memory index
-	var series *Series
-	if err := s.meta.mustUpdate(func(tx *metatx) error {
-		var err error
-		series, err = tx.createSeries(db.name, c.Name, c.Tags)
-		return err
-	}); err != nil {
-		return err
-	}
-
-	db.addSeriesToIndex(c.Name, series)
-
-	return nil
-}
-
-type createSeriesIfNotExistsCommand struct {
-	Database string            `json:"database"`
-	Name     string            `json:"name"`
-	Tags     map[string]string `json:"tags"`
-}
-
 type createMeasurementSubcommand struct {
 	Name   string                       `json:"name"`
 	Tags   map[string]map[string]string `json:"tags"`
@@ -1685,104 +1571,78 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
 		return 0, err
 	}
 
-	// Collect responses for each channel.
-	type resp struct {
-		index uint64
-		err   error
+	// Ensure all the required shard groups exist. TODO: this should be done async.
+	if err := s.createShardGroupsIfNotExists(database, retentionPolicy, points); err != nil {
+		return 0, err
 	}
-	ch := make(chan resp, len(points))
 
-	// Write each point in parallel.
-	var wg sync.WaitGroup
-	for i := range points {
-		wg.Add(1)
-		go func(p *Point) {
-			index, err := s.writePoint(database, retentionPolicy, p)
-			ch <- resp{index, err}
-			wg.Done()
-		}(&points[i])
+	// Build writeRawSeriesMessageType publish commands.
+	shardData := make(map[uint64][]byte)
+	for _, p := range points {
+		// Local function makes lock management foolproof.
+		measurement, series, err := func() (*Measurement, *Series, error) {
+			s.mu.RLock()
+			defer s.mu.RUnlock()
+			db := s.databases[database]
+			if db == nil {
+				return nil, nil, fmt.Errorf("database not found %q", database)
+			}
+			if measurement, series := db.MeasurementAndSeries(p.Name, p.Tags); series != nil {
+				return measurement, series, nil
+			}
+			panic("series not found")
+		}()
+		if err != nil {
+			return 0, err
+		}
+
+		// Retrieve shard group.
+		g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp)
+		if err != nil {
+			return 0, err
+		}
+
+		// Find appropriate shard within the shard group.
+		sh := g.ShardBySeriesID(series.ID)
+
+		// Get a field codec.
+		s.mu.RLock()
+		codec := NewFieldCodec(measurement)
+		s.mu.RUnlock()
+		if codec == nil {
+			panic("field codec is nil")
+		}
+
+		// Convert string-key/values to encoded fields.
+		encodedFields, err := codec.EncodeFields(p.Values)
+		if err != nil {
+			return 0, err
+		}
+
+		// Encode point header, followed by point data, and assign to shard.
+		data := marshalPointHeader(series.ID, p.Timestamp.UnixNano())
+		data = append(data, encodedFields...)
+		shardData[sh.ID] = append(shardData[sh.ID], data...)
 	}
-	wg.Wait()
-	close(ch)
 
-	// Calculate max index and check for errors.
-	var index uint64
+	// Write data for each shard to the Broker.
 	var err error
-	for resp := range ch {
-		if resp.index > index {
-			index = resp.index
+	var maxIndex uint64
+	for i, d := range shardData {
+		index, err := s.client.Publish(&messaging.Message{
+			Type:    writeRawSeriesMessageType,
+			TopicID: i,
+			Data:    d,
+		})
+		if err != nil {
+			return maxIndex, err
 		}
-		if err == nil && resp.err != nil {
-			err = resp.err
+		if index > maxIndex {
+			maxIndex = index
 		}
 	}
-	return index, err
-}
 
-func (s *Server) writePoint(database, retentionPolicy string, point *Point) (uint64, error) {
-	measurement, tags, timestamp, values := point.Name, point.Tags, point.Timestamp, point.Values
-
-	// Sanity-check the data point.
-	if measurement == "" {
-		return 0, ErrMeasurementNameRequired
-	}
-	if len(values) == 0 {
-		return 0, ErrValuesRequired
-	}
-
-	// Find the id for the series and tagset
-	seriesID, err := s.createSeriesIfNotExists(database, measurement, tags)
-	if err != nil {
-		return 0, err
-	}
-
-	// Retrieve measurement.
-	m, err := s.measurement(database, measurement)
-	if err != nil {
-		return 0, err
-	} else if m == nil {
-		return 0, ErrMeasurementNotFound
-	}
-
-	// Retrieve shard group.
-	g, err := s.createShardGroupIfNotExists(database, retentionPolicy, timestamp)
-	if err != nil {
-		return 0, fmt.Errorf("create shard(%s/%s): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
-	}
-
-	// Find appropriate shard within the shard group.
-	sh := g.ShardBySeriesID(seriesID)
-
-	// Ensure fields are created as necessary.
-	err = s.createFieldsIfNotExists(database, measurement, values)
-	if err != nil {
-		return 0, err
-	}
-
-	// Get a field codec.
-	s.mu.RLock()
-	codec := NewFieldCodec(m)
-	s.mu.RUnlock()
-	if codec == nil {
-		panic("field codec is nil")
-	}
-
-	// Convert string-key/values to encoded fields.
-	encodedFields, err := codec.EncodeFields(values)
-	if err != nil {
-		return 0, err
-	}
-
-	// Encode point header.
-	data := marshalPointHeader(seriesID, timestamp.UnixNano())
-	data = append(data, encodedFields...)
-
-	// Publish "raw write series" message on shard's topic to broker.
-	return s.client.Publish(&messaging.Message{
-		Type:    writeRawSeriesMessageType,
-		TopicID: sh.ID,
-		Data:    data,
-	})
+	return maxIndex, err
 }
 
 // applyWriteRawSeries writes raw series data to the database.
@@ -1819,36 +1679,6 @@ func (s *Server) addShardBySeriesID(sh *Shard, seriesID uint32) {
 	s.shardsBySeriesID[seriesID] = append(s.shardsBySeriesID[seriesID], sh)
 }
 
-func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]string) (uint32, error) {
-	// Try to find series locally first.
-	s.mu.RLock()
-	db := s.databases[database]
-	if db == nil {
-		s.mu.RUnlock()
-		return 0, fmt.Errorf("database not found %q", database)
-	}
-	if _, series := db.MeasurementAndSeries(name, tags); series != nil {
-		s.mu.RUnlock()
-		return series.ID, nil
-	}
-	// release the read lock so the broadcast can actually go through and acquire the write lock
-	s.mu.RUnlock()
-
-	// If it doesn't exist then create a message and broadcast.
-	c := &createSeriesIfNotExistsCommand{Database: database, Name: name, Tags: tags}
-	_, err := s.broadcast(createSeriesIfNotExistsMessageType, c)
-	if err != nil {
-		return 0, err
-	}
-
-	// Lookup series again.
-	_, series := db.MeasurementAndSeries(name, tags)
-	if series == nil {
-		return 0, ErrSeriesNotFound
-	}
-	return series.ID, nil
-}
-
 // createMeasurementsIfNotExists walks the "points" and ensures that all new Series are created, and all
 // new Measurement fields have been created, across the cluster.
 func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string, points []Point) error {
@@ -1904,7 +1734,7 @@ func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string,
 
 	// Any broadcast actually required?
 	if len(c.Measurements) > 0 {
-		_, err := s.broadcast(createSeriesIfNotExistsMessageType, c)
+		_, err := s.broadcast(createMeasurementsIfNotExistsMessageType, c)
 		if err != nil {
 			return err
 		}
@@ -1913,47 +1743,25 @@ func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string,
 	return nil
 }
 
-func (s *Server) createFieldsIfNotExists(database string, measurement string, values map[string]interface{}) error {
-	// Local function keeps locking foolproof.
-	f := func(database string, measurement string, values map[string]interface{}) (map[string]influxql.DataType, error) {
-		s.mu.RLock()
-		defer s.mu.RUnlock()
+// applyCreateMeasurementsIfNotExists creates the Measurements, Series, and Fields in the Metastore.
+func (s *Server) applyCreateMeasurementsIfNotExists(m *messaging.Message) error {
+	return nil
+}
 
-		// Check to see if the fields already exist.
-		m, err := s.measurement(database, measurement)
+// createShardGroupsIfNotExist walks the "points" and ensures that all required shards exist on the cluster.
+func (s *Server) createShardGroupsIfNotExists(database, retentionPolicy string, points []Point) error {
+	for _, p := range points {
+		// Check if shard group exists first.
+		g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp)
 		if err != nil {
-			return nil, err
-		} else if m == nil {
-			return nil, ErrMeasurementNotFound
+			return err
+		} else if g != nil {
+			continue
 		}
-
-		newFields := make(map[string]influxql.DataType)
-		for k, v := range values {
-			f := m.FieldByName(k)
-			if f == nil {
-				newFields[k] = influxql.InspectDataType(v)
-			} else {
-				if f.Type != influxql.InspectDataType(v) {
-					return nil, fmt.Errorf(fmt.Sprintf("field \"%s\" is type %T, mapped as type %s", k, v, f.Type))
-				}
-			}
+		err = s.CreateShardGroupIfNotExists(database, retentionPolicy, p.Timestamp)
+		if err != nil {
+			return fmt.Errorf("create shard(%s/%s): %s", retentionPolicy, p.Timestamp.Format(time.RFC3339Nano), err)
 		}
-		return newFields, nil
-	}
-
-	newFields, err := f(database, measurement, values)
-	if err != nil {
-		return err
-	}
-	if len(newFields) == 0 {
-		return nil
-	}
-
-	// There are some new fields, so create field types mappings on cluster.
-	c := &createFieldsIfNotExistCommand{Database: database, Measurement: measurement, Fields: newFields}
-	_, err = s.broadcast(createFieldsIfNotExistsMessageType, c)
-	if err != nil {
-		return err
 	}
 
 	return nil
@@ -2861,10 +2669,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
 			err = s.applyDeleteShardGroup(m)
 		case setDefaultRetentionPolicyMessageType:
 			err = s.applySetDefaultRetentionPolicy(m)
-		case createFieldsIfNotExistsMessageType:
-			err = s.applyCreateFieldsIfNotExist(m)
-		case createSeriesIfNotExistsMessageType:
-			err = s.applyCreateSeriesIfNotExists(m)
+		case createMeasurementsIfNotExistsMessageType:
+			err = s.applyCreateMeasurementsIfNotExists(m)
 		case setPrivilegeMessageType:
 			err = s.applySetPrivilege(m)
 		case createContinuousQueryMessageType:

From ff765793e650c98ab8fb766fc909fda6c8ed894b Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Thu, 19 Feb 2015 12:45:00 -0800
Subject: [PATCH 06/17] Implement applyCreateMeasurementsIfNotExists

---
 server.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 60 insertions(+)

diff --git a/server.go b/server.go
index 1474aa7f2d..e55c61d566 100644
--- a/server.go
+++ b/server.go
@@ -1745,6 +1745,66 @@ func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string,
 
 // applyCreateMeasurementsIfNotExists creates the Measurements, Series, and Fields in the Metastore.
 func (s *Server) applyCreateMeasurementsIfNotExists(m *messaging.Message) error {
+	var c createMeasurementsIfNotExistsCommand
+	mustUnmarshalJSON(m.Data, &c)
+
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	// Validate command.
+	db := s.databases[c.Database]
+	if db == nil {
+		return ErrDatabaseNotFound
+	}
+
+	// Process command within a transaction.
+	if err := s.meta.mustUpdate(func(tx *metatx) error {
+		for _, cm := range c.Measurements {
+			// Create each series
+			for _, t := range cm.Tags {
+				_, ss := db.MeasurementAndSeries(cm.Name, t)
+
+				// Ensure creation of Series is idempotent.
+				if ss != nil {
+					continue
+				}
+
+				series, err := tx.createSeries(db.name, cm.Name, t)
+				if err != nil {
+					return err
+				}
+				db.addSeriesToIndex(cm.Name, series)
+			}
+
+			// Create each new field.
+			mm := db.measurements[cm.Name]
+			if mm == nil {
+				panic(fmt.Sprintf("Measurement %s does not exist", cm.Name))
+			}
+			for k, v := range cm.Fields {
+				if err := mm.createFieldIfNotExists(k, v); err != nil {
+					if err == ErrFieldOverflow {
+						log.Printf("no more fields allowed: %s::%s", mm.Name, k)
+						continue
+					} else if err == ErrFieldTypeConflict {
+						log.Printf("field type conflict: %s::%s", mm.Name, k)
+						continue
+					}
+					return err
+				}
+				if err := tx.saveMeasurement(db.name, mm); err != nil {
+					return fmt.Errorf("save measurement: %s", err)
+				}
+			}
+			if err := tx.saveDatabase(db); err != nil {
+				return fmt.Errorf("save database: %s", err)
+			}
+		}
+
+		return nil
+	}); err != nil {
+		return err
+	}
+
 	return nil
 }
 

From 3cb939834479388115e3971a4f94d18c15f89f0b Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Thu, 19 Feb 2015 13:36:22 -0800
Subject: [PATCH 07/17] Add simple batch write test at handler level

This test passes, but only because it is checking for the wrong results.
Once batching is implemented this test will fail (as long as it is
unaltered).
---
 httpd/handler_test.go | 80 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 80 insertions(+)

diff --git a/httpd/handler_test.go b/httpd/handler_test.go
index b99427dcbf..db1e13b607 100644
--- a/httpd/handler_test.go
+++ b/httpd/handler_test.go
@@ -1421,6 +1421,86 @@ func TestHandler_serveWriteSeriesBoolValues(t *testing.T) {
 	}
 }
 
+func TestHandler_serveWriteSeriesBatch(t *testing.T) {
+	srvr := OpenAuthlessServer(NewMessagingClient())
+	srvr.CreateDatabase("foo")
+	srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
+	srvr.SetDefaultRetentionPolicy("foo", "bar")
+
+	s := NewHTTPServer(srvr)
+	defer s.Close()
+
+	batch := `
+{
+    "database": "foo",
+    "retentionPolicy": "bar",
+    "points": [
+        {
+            "name": "disk",
+            "timestamp": "2009-11-10T23:00:00Z",
+            "tags": {
+                "host": "server01"
+            },
+            "values": {
+                "full": false
+            }
+        },
+        {
+            "name": "disk",
+            "timestamp": "2009-11-10T23:00:01Z",
+            "tags": {
+                "host": "server01"
+            },
+            "values": {
+                "full": true
+            }
+        },
+        {
+            "name": "disk",
+            "timestamp": "2009-11-10T23:00:02Z",
+            "tags": {
+                "host": "server02"
+            },
+            "values": {
+                "full_pct": 64
+            }
+        }
+    ]
+}
+`
+	status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, batch)
+	if status != http.StatusOK {
+		t.Log(body)
+		t.Fatalf("unexpected status: %d", status)
+	}
+	time.Sleep(200 * time.Millisecond) // Ensure data node picks up write.
+
+	query := map[string]string{"db": "foo", "q": "select * from disk"}
+	status, body = MustHTTP("GET", s.URL+`/query`, query, nil, "")
+	if status != http.StatusOK {
+		t.Logf("query %s\n", query)
+		t.Log(body)
+		t.Errorf("unexpected status: %d", status)
+	}
+
+	r := &influxdb.Results{}
+	if err := json.Unmarshal([]byte(body), r); err != nil {
+		t.Logf("query : %s\n", query)
+		t.Log(body)
+		t.Error(err)
+	}
+	if len(r.Results) != 1 {
+		t.Fatalf("unexpected results count")
+	}
+	result := r.Results[0]
+	if len(result.Rows) != 1 {
+		t.Fatalf("unexpected row count, expected: %d, actual: %d", 1, len(result.Rows))
+	}
+	if result.Rows[0].Values[0][1] != false {
+		t.Fatalf("unexpected string value, actual: %s", result.Rows[0].Values[0][1])
+	}
+}
+
 func TestHandler_serveWriteSeriesInvalidQueryField(t *testing.T) {
 	srvr := OpenAuthlessServer(NewMessagingClient())
 	srvr.CreateDatabase("foo")

From 2585a9ea4f1882b8a19088fa17c21c3fcd43e349 Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Thu, 19 Feb 2015 15:21:51 -0800
Subject: [PATCH 08/17] Encode real batches for shards

---
 influxdb.go |  3 +++
 server.go   | 47 ++++++++++++++++++++++++++++++++++-------------
 shard.go    | 16 +++++++++-------
 3 files changed, 46 insertions(+), 20 deletions(-)

diff --git a/influxdb.go b/influxdb.go
index ab2b6933f1..f4d4119586 100644
--- a/influxdb.go
+++ b/influxdb.go
@@ -81,6 +81,9 @@ var (
 	// ErrShardNotFound is returned writing to a non-existent shard.
 	ErrShardNotFound = errors.New("shard not found")
 
+	// ErrInvalidPointBuffer is returned when a buffer containing data for writing is invalid
+	ErrInvalidPointBuffer = errors.New("invalid point buffer")
+
 	// ErrReadAccessDenied is returned when a user attempts to read
 	// data that he or she does not have permission to read.
 	ErrReadAccessDenied = errors.New("read access denied")
diff --git a/server.go b/server.go
index e55c61d566..d959218741 100644
--- a/server.go
+++ b/server.go
@@ -1577,7 +1577,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
 	}
 
 	// Build writeRawSeriesMessageType publish commands.
-	shardData := make(map[uint64][]byte)
+	shardData := make(map[uint64][]byte, 0)
 	for _, p := range points {
 		// Local function makes lock management foolproof.
 		measurement, series, err := func() (*Measurement, *Series, error) {
@@ -1619,9 +1619,12 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
 			return 0, err
 		}
 
-		// Encode point header, followed by point data, and assign to shard.
-		data := marshalPointHeader(series.ID, p.Timestamp.UnixNano())
+		// Encode point header, followed by point data, and add to shard's batch.
+		data := marshalPointHeader(series.ID, uint32(len(encodedFields)), p.Timestamp.UnixNano())
 		data = append(data, encodedFields...)
+		if shardData[sh.ID] == nil {
+			shardData[sh.ID] = make([]byte, 0)
+		}
 		shardData[sh.ID] = append(shardData[sh.ID], data...)
 	}
 
@@ -1655,19 +1658,37 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
 		return ErrShardNotFound
 	}
 
-	// Extract the series id and timestamp from the header.
-	// Everything after the header is the marshalled value.
-	seriesID, timestamp := unmarshalPointHeader(m.Data[:pointHeaderSize])
-	data := m.Data[pointHeaderSize:]
-
-	// Add to lookup.
-	s.addShardBySeriesID(sh, seriesID)
-
 	// TODO: Enable some way to specify if the data should be overwritten
 	overwrite := true
 
-	// Write to shard.
-	return sh.writeSeries(seriesID, timestamp, data, overwrite)
+	for {
+		if pointHeaderSize > len(m.Data) {
+			return ErrInvalidPointBuffer
+		}
+		seriesID, payloadLength, timestamp := unmarshalPointHeader(m.Data[:pointHeaderSize])
+		m.Data = m.Data[pointHeaderSize:]
+
+		if payloadLength > uint32(len(m.Data)) {
+			return ErrInvalidPointBuffer
+		}
+		data := m.Data[:payloadLength]
+
+		// Add to lookup.
+		s.addShardBySeriesID(sh, seriesID)
+
+		// Write to shard.
+		if err := sh.writeSeries(seriesID, timestamp, data, overwrite); err != nil {
+			return err
+		}
+
+		// Push the buffer forward and check if we're done.
+		m.Data = m.Data[payloadLength:]
+		if len(m.Data) == 0 {
+			break
+		}
+	}
+
+	return nil
 }
 
 func (s *Server) addShardBySeriesID(sh *Shard, seriesID uint32) {
diff --git a/shard.go b/shard.go
index 8cf6a80ae3..40791f13e3 100644
--- a/shard.go
+++ b/shard.go
@@ -134,20 +134,22 @@ func (s *Shard) deleteSeries(name string) error {
 type Shards []*Shard
 
 // pointHeaderSize represents the size of a point header, in bytes.
-const pointHeaderSize = 4 + 8 // seriesID + timestamp
+const pointHeaderSize = 4 + 4 + 8 // seriesID + payload length + timestamp
 
-// marshalPointHeader encodes a series id, timestamp, & flagset into a byte slice.
-func marshalPointHeader(seriesID uint32, timestamp int64) []byte {
-	b := make([]byte, 12)
+// marshalPointHeader encodes a series id, payload length, timestamp, & flagset into a byte slice.
+func marshalPointHeader(seriesID uint32, payloadLength uint32, timestamp int64) []byte {
+	b := make([]byte, pointHeaderSize)
 	binary.BigEndian.PutUint32(b[0:4], seriesID)
-	binary.BigEndian.PutUint64(b[4:12], uint64(timestamp))
+	binary.BigEndian.PutUint32(b[4:8], payloadLength)
+	binary.BigEndian.PutUint64(b[8:16], uint64(timestamp))
 	return b
 }
 
 // unmarshalPointHeader decodes a byte slice into a series id, timestamp & flagset.
-func unmarshalPointHeader(b []byte) (seriesID uint32, timestamp int64) {
+func unmarshalPointHeader(b []byte) (seriesID uint32, payloadLength uint32, timestamp int64) {
 	seriesID = binary.BigEndian.Uint32(b[0:4])
-	timestamp = int64(binary.BigEndian.Uint64(b[4:12]))
+	payloadLength = binary.BigEndian.Uint32(b[4:8])
+	timestamp = int64(binary.BigEndian.Uint64(b[8:16]))
 	return
 }
 

From 0e3e223e84c50f52132ee82292adccd4e4fe83da Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Thu, 19 Feb 2015 21:56:57 -0800
Subject: [PATCH 09/17] Always ensure measurement exists in command

Unit tests need updating since some tests are no longer valid.
---
 internal_test.go | 10 ----------
 server.go        | 21 ++++++---------------
 2 files changed, 6 insertions(+), 25 deletions(-)

diff --git a/internal_test.go b/internal_test.go
index 57ae52b18e..c2e368c7bf 100644
--- a/internal_test.go
+++ b/internal_test.go
@@ -170,16 +170,6 @@ func TestCreateMeasurementsCommand_Errors(t *testing.T) {
 		t.Fatal("createMeasurementsIfNotExistsCommand is nil")
 	}
 
-	err = c.addSeriesIfNotExists("bar", nil)
-	if err != ErrMeasurementNotFound {
-		t.Fatalf("expected ErrMeasurementNotFound got %s", err.Error())
-	}
-
-	err = c.addFieldIfNotExists("bar", "value", influxql.Number)
-	if err != ErrMeasurementNotFound {
-		t.Fatalf("expected ErrMeasurementNotFound got %s", err.Error())
-	}
-
 	// Add Measurement.
 	err = c.addMeasurementIfNotExists("bar")
 	if err != nil {
diff --git a/server.go b/server.go
index d959218741..73ab769d7e 100644
--- a/server.go
+++ b/server.go
@@ -1506,13 +1506,10 @@ func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name st
 // addSeriesIfNotExists adds the Series, identified by Measurement name and tag set, to
 // the command, but only if not already present in the command.
 func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement string, tags map[string]string) error {
-	_, ok := c.Measurements[measurement]
-	if !ok {
-		return ErrMeasurementNotFound
-	}
+	c.addMeasurementIfNotExists(measurement)
 
 	tagset := string(marshalTags(tags))
-	_, ok = c.Measurements[measurement].Tags[tagset]
+	_, ok := c.Measurements[measurement].Tags[tagset]
 	if ok {
 		// Series already present in in subcommand, nothing to do.
 		return nil
@@ -1526,10 +1523,7 @@ func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement
 // addFieldIfNotExists adds the field to the command for the Measurement, but only if it is not already
 // present. It will return an error if the field is present in the command, but is of a different type.
 func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, name string, typ influxql.DataType) error {
-	_, ok := c.Measurements[measurement]
-	if !ok {
-		return ErrMeasurementNotFound
-	}
+	c.addMeasurementIfNotExists(measurement)
 
 	t, ok := c.Measurements[measurement].Fields[name]
 	if ok {
@@ -1718,14 +1712,11 @@ func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string,
 		for _, p := range points {
 			measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
 
-			if measurement == nil {
-				// Measurement not in Metastore, add to command so it's created cluster-wide.
-				c.addMeasurementIfNotExists(p.Name)
-			}
-
 			if series == nil {
 				// Series does not exist in Metastore, add it so it's created cluster-wide.
-				c.addSeriesIfNotExists(p.Name, p.Tags)
+				if err := c.addSeriesIfNotExists(p.Name, p.Tags); err != nil {
+					return err
+				}
 			}
 
 			for k, v := range p.Values {

From 5c61b7d8b847b21feae846421470c9fdaf911b2b Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Thu, 19 Feb 2015 22:08:46 -0800
Subject: [PATCH 10/17] Add new fields even when Measurement exists

---
 server.go | 15 +++++----------
 1 file changed, 5 insertions(+), 10 deletions(-)

diff --git a/server.go b/server.go
index 73ab769d7e..0fe643ee18 100644
--- a/server.go
+++ b/server.go
@@ -1725,18 +1725,13 @@ func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string,
 						// Field present in Metastore, make sure there is no type conflict.
 						if f.Type != influxql.InspectDataType(v) {
 							return fmt.Errorf(fmt.Sprintf("field \"%s\" is type %T, mapped as type %s", k, v, f.Type))
-						} else {
-							// Field does not exist in Metastore, add it so it's created cluster-wide.
-							if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
-								return err
-							}
 						}
+						continue // Field is present, and it's of the same type. Nothing more to do.
 					}
-				} else {
-					// Measurement does not exist in Metastore, so fields can't exist. Add each one unconditionally.
-					if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
-						return err
-					}
+				}
+				// Field isn't in Metastore. Add it to command so it's created cluster-wide.
+				if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
+					return err
 				}
 			}
 		}

From 4c28e63a939c70751f0e8691a60a77d44c16e9c9 Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Thu, 19 Feb 2015 22:19:16 -0800
Subject: [PATCH 11/17] Restore helpful field type conflict error message

---
 database.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/database.go b/database.go
index 6ce9414f09..58beb22bdd 100644
--- a/database.go
+++ b/database.go
@@ -663,7 +663,7 @@ func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
 		if field == nil {
 			panic(fmt.Sprintf("field does not exist for %s", k))
 		} else if influxql.InspectDataType(v) != field.Type {
-			return nil, fmt.Errorf("field %s is not of type %s", k, field.Type)
+			return nil, fmt.Errorf("field \"%s\" is type %T, mapped as type %s", k, k, field.Type)
 		}
 
 		var buf []byte

From f5b2962d420ecff19c5c53b57f62128821fc08ed Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Thu, 19 Feb 2015 22:40:15 -0800
Subject: [PATCH 12/17] Incorporate inital code review feedback

---
 internal_test.go | 40 +++++++++++++++-------------------------
 server.go        | 20 +++++++-------------
 2 files changed, 22 insertions(+), 38 deletions(-)

diff --git a/internal_test.go b/internal_test.go
index c2e368c7bf..69548d2c4b 100644
--- a/internal_test.go
+++ b/internal_test.go
@@ -99,39 +99,23 @@ func TestCreateMeasurementsCommand(t *testing.T) {
 		t.Fatal("createMeasurementsIfNotExistsCommand is nil")
 	}
 
-	// Add Measurement.
-	err = c.addMeasurementIfNotExists("bar")
-	if err != nil {
-		t.Fatal("error adding measurement bar")
-	}
-	err = c.addMeasurementIfNotExists("bar")
-	if err != nil {
-		t.Fatal("error re-adding measurement bar")
-	}
-
+	// Add Measurement twice, to make sure nothing blows up.
+	c.addMeasurementIfNotExists("bar")
+	c.addMeasurementIfNotExists("bar")
 	n = len(c.Measurements)
 	if n != 1 {
 		t.Fatalf("wrong number of measurements, expected 1, got %d", n)
 	}
 
 	// Add Series, no tags.
-	err = c.addSeriesIfNotExists("bar", nil)
-	if err != nil {
-		t.Fatal("error adding series with nil tags")
-	}
+	c.addSeriesIfNotExists("bar", nil)
 
 	// Add Series, some tags.
 	tags := map[string]string{"host": "server01"}
-	err = c.addSeriesIfNotExists("bar", tags)
-	if err != nil {
-		t.Fatal("error adding series with non-nil tags")
-	}
+	c.addSeriesIfNotExists("bar", tags)
 
 	// Add Series, same tags again.
-	err = c.addSeriesIfNotExists("bar", tags)
-	if err != nil {
-		t.Fatal("error re-adding series with non-nil tags")
-	}
+	c.addSeriesIfNotExists("bar", tags)
 
 	n = len(c.Measurements["bar"].Tags)
 	if n != 2 {
@@ -170,12 +154,18 @@ func TestCreateMeasurementsCommand_Errors(t *testing.T) {
 		t.Fatal("createMeasurementsIfNotExistsCommand is nil")
 	}
 
-	// Add Measurement.
-	err = c.addMeasurementIfNotExists("bar")
+	// Ensure fields can be added to non-existent Measurements. The
+	// Measurements should be created automatically.
+	c.addSeriesIfNotExists("bar", nil)
+
+	err = c.addFieldIfNotExists("bar", "value", influxql.Number)
 	if err != nil {
-		t.Fatal("error adding measurement bar")
+		t.Fatalf("unexpected error got %s", err.Error())
 	}
 
+	// Add Measurement. Adding it now should be OK.
+	c.addMeasurementIfNotExists("bar")
+
 	// Test type conflicts
 	err = c.addFieldIfNotExists("bar", "value", influxql.Number)
 	if err != nil {
diff --git a/server.go b/server.go
index 0fe643ee18..1f66675cff 100644
--- a/server.go
+++ b/server.go
@@ -1491,7 +1491,7 @@ func newCreateMeasurementsIfNotExistsCommand(database string) *createMeasurement
 
 // addMeasurementIfNotExists adds the Measurement to the command, but only if not already present
 // in the command.
-func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name string) error {
+func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name string) {
 	_, ok := c.Measurements[name]
 	if !ok {
 		c.Measurements[name] = createMeasurementSubcommand{
@@ -1500,24 +1500,23 @@ func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name st
 			Fields: make(map[string]influxql.DataType),
 		}
 	}
-	return nil
 }
 
 // addSeriesIfNotExists adds the Series, identified by Measurement name and tag set, to
 // the command, but only if not already present in the command.
-func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement string, tags map[string]string) error {
+func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement string, tags map[string]string) {
 	c.addMeasurementIfNotExists(measurement)
 
 	tagset := string(marshalTags(tags))
 	_, ok := c.Measurements[measurement].Tags[tagset]
 	if ok {
 		// Series already present in in subcommand, nothing to do.
-		return nil
+		return
 	}
 	// Tag-set needs to added to subcommand.
 	c.Measurements[measurement].Tags[tagset] = tags
 
-	return nil
+	return
 }
 
 // addFieldIfNotExists adds the field to the command for the Measurement, but only if it is not already
@@ -1579,12 +1578,12 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
 			defer s.mu.RUnlock()
 			db := s.databases[database]
 			if db == nil {
-				return nil, nil, fmt.Errorf("database not found %q", database)
+				return nil, nil, ErrDatabaseNotFound
 			}
 			if measurement, series := db.MeasurementAndSeries(p.Name, p.Tags); series != nil {
 				return measurement, series, nil
 			}
-			panic("series not found")
+			return nil, nil, ErrSeriesNotFound
 		}()
 		if err != nil {
 			return 0, err
@@ -1603,9 +1602,6 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
 		s.mu.RLock()
 		codec := NewFieldCodec(measurement)
 		s.mu.RUnlock()
-		if codec == nil {
-			panic("field codec is nil")
-		}
 
 		// Convert string-key/values to encoded fields.
 		encodedFields, err := codec.EncodeFields(p.Values)
@@ -1714,9 +1710,7 @@ func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string,
 
 			if series == nil {
 				// Series does not exist in Metastore, add it so it's created cluster-wide.
-				if err := c.addSeriesIfNotExists(p.Name, p.Tags); err != nil {
-					return err
-				}
+				c.addSeriesIfNotExists(p.Name, p.Tags)
 			}
 
 			for k, v := range p.Values {

From c3f4eb0de1cebbda887d552fb641d109c470f977 Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Thu, 19 Feb 2015 23:32:59 -0800
Subject: [PATCH 13/17] Tighten batching unit test

---
 httpd/handler_test.go | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/httpd/handler_test.go b/httpd/handler_test.go
index db1e13b607..f17c4f6245 100644
--- a/httpd/handler_test.go
+++ b/httpd/handler_test.go
@@ -1496,8 +1496,11 @@ func TestHandler_serveWriteSeriesBatch(t *testing.T) {
 	if len(result.Rows) != 1 {
 		t.Fatalf("unexpected row count, expected: %d, actual: %d", 1, len(result.Rows))
 	}
-	if result.Rows[0].Values[0][1] != false {
-		t.Fatalf("unexpected string value, actual: %s", result.Rows[0].Values[0][1])
+	if len(result.Rows[0].Columns) != 3 {
+		t.Fatalf("unexpected column count, expected: %d, actual: %d", 3, len(result.Rows[0].Columns))
+	}
+	if len(result.Rows[0].Values) != 3 {
+		t.Fatalf("unexpected values count, expected: %d, actual: %d", 3, len(result.Rows[0].Values))
 	}
 }
 

From 169409ac03400f0815bc452899cae905ce7119d1 Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Fri, 20 Feb 2015 00:24:02 -0800
Subject: [PATCH 14/17] Move to slices for creating measurements

Using maps was resulting in unpredicatable ordering of columns and tags.
---
 server.go | 56 ++++++++++++++++++++++++++++++-------------------------
 1 file changed, 31 insertions(+), 25 deletions(-)

diff --git a/server.go b/server.go
index 1f66675cff..5455203f65 100644
--- a/server.go
+++ b/server.go
@@ -1472,20 +1472,20 @@ type setDefaultRetentionPolicyCommand struct {
 }
 
 type createMeasurementSubcommand struct {
-	Name   string                       `json:"name"`
-	Tags   map[string]map[string]string `json:"tags"`
-	Fields map[string]influxql.DataType `json:"fields"`
+	Name   string              `json:"name"`
+	Tags   []map[string]string `json:"tags"`
+	Fields []*Field            `json:"fields"`
 }
 
 type createMeasurementsIfNotExistsCommand struct {
-	Database     string                                 `json:"database"`
-	Measurements map[string]createMeasurementSubcommand `json:"measurements"`
+	Database     string                                  `json:"database"`
+	Measurements map[string]*createMeasurementSubcommand `json:"measurements"`
 }
 
 func newCreateMeasurementsIfNotExistsCommand(database string) *createMeasurementsIfNotExistsCommand {
 	return &createMeasurementsIfNotExistsCommand{
 		Database:     database,
-		Measurements: make(map[string]createMeasurementSubcommand),
+		Measurements: make(map[string]*createMeasurementSubcommand),
 	}
 }
 
@@ -1494,10 +1494,10 @@ func newCreateMeasurementsIfNotExistsCommand(database string) *createMeasurement
 func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name string) {
 	_, ok := c.Measurements[name]
 	if !ok {
-		c.Measurements[name] = createMeasurementSubcommand{
+		c.Measurements[name] = &createMeasurementSubcommand{
 			Name:   name,
-			Tags:   make(map[string]map[string]string),
-			Fields: make(map[string]influxql.DataType),
+			Tags:   make([]map[string]string, 0),
+			Fields: make([]*Field, 0),
 		}
 	}
 }
@@ -1507,14 +1507,16 @@ func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name st
 func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement string, tags map[string]string) {
 	c.addMeasurementIfNotExists(measurement)
 
+	m := c.Measurements[measurement]
 	tagset := string(marshalTags(tags))
-	_, ok := c.Measurements[measurement].Tags[tagset]
-	if ok {
-		// Series already present in in subcommand, nothing to do.
-		return
+	for _, t := range m.Tags {
+		if string(marshalTags(t)) == tagset {
+			// Series already present in subcommand, nothing to do.
+			return
+		}
 	}
 	// Tag-set needs to added to subcommand.
-	c.Measurements[measurement].Tags[tagset] = tags
+	m.Tags = append(m.Tags, tags)
 
 	return
 }
@@ -1524,16 +1526,20 @@ func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement
 func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, name string, typ influxql.DataType) error {
 	c.addMeasurementIfNotExists(measurement)
 
-	t, ok := c.Measurements[measurement].Fields[name]
-	if ok {
-		if typ != t {
-			return ErrFieldTypeConflict
+	m := c.Measurements[measurement]
+	for _, f := range m.Fields {
+		if f.Name == name {
+			if f.Type != typ {
+				return ErrFieldTypeConflict
+			}
+			// Field already present in subcommand with same type, nothing to do.
+			return nil
 		}
-		// Field already present in subcommand with same type, nothing to do.
-		return nil
 	}
+
 	// New field for this measurement so add it to the subcommand.
-	c.Measurements[measurement].Fields[name] = typ
+	newField := &Field{Name: name, Type: typ}
+	m.Fields = append(m.Fields, newField)
 	return nil
 }
 
@@ -1781,13 +1787,13 @@ func (s *Server) applyCreateMeasurementsIfNotExists(m *messaging.Message) error
 			if mm == nil {
 				panic(fmt.Sprintf("Measurement %s does not exist", cm.Name))
 			}
-			for k, v := range cm.Fields {
-				if err := mm.createFieldIfNotExists(k, v); err != nil {
+			for _, f := range cm.Fields {
+				if err := mm.createFieldIfNotExists(f.Name, f.Type); err != nil {
 					if err == ErrFieldOverflow {
-						log.Printf("no more fields allowed: %s::%s", mm.Name, k)
+						log.Printf("no more fields allowed: %s::%s", mm.Name, f.Name)
 						continue
 					} else if err == ErrFieldTypeConflict {
-						log.Printf("field type conflict: %s::%s", mm.Name, k)
+						log.Printf("field type conflict: %s::%s", mm.Name, f.Name)
 						continue
 					}
 					return err

From bd4352c77dead7c6c37e1162edf9429ed64cd35a Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Fri, 20 Feb 2015 00:39:34 -0800
Subject: [PATCH 15/17] Remove unused shardsBySeriesID

Its presence is making Bolt-level batching quite awkward, and since it's
not used, just remove it.
---
 server.go | 20 +++-----------------
 1 file changed, 3 insertions(+), 17 deletions(-)

diff --git a/server.go b/server.go
index 5455203f65..30d5388d36 100644
--- a/server.go
+++ b/server.go
@@ -99,8 +99,7 @@ type Server struct {
 	databases map[string]*database // databases by name
 	users     map[string]*User     // user by name
 
-	shards           map[uint64]*Shard   // shards by shard id
-	shardsBySeriesID map[uint32][]*Shard // shards by series id
+	shards map[uint64]*Shard // shards by shard id
 
 	Logger *log.Logger
 
@@ -129,9 +128,8 @@ func NewServer() *Server {
 		databases: make(map[string]*database),
 		users:     make(map[string]*User),
 
-		shards:           make(map[uint64]*Shard),
-		shardsBySeriesID: make(map[uint32][]*Shard),
-		Logger:           log.New(os.Stderr, "[server] ", log.LstdFlags),
+		shards: make(map[uint64]*Shard),
+		Logger: log.New(os.Stderr, "[server] ", log.LstdFlags),
 	}
 	// Server will always return with authentication enabled.
 	// This ensures that disabling authentication must be an explicit decision.
@@ -1669,9 +1667,6 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
 		}
 		data := m.Data[:payloadLength]
 
-		// Add to lookup.
-		s.addShardBySeriesID(sh, seriesID)
-
 		// Write to shard.
 		if err := sh.writeSeries(seriesID, timestamp, data, overwrite); err != nil {
 			return err
@@ -1687,15 +1682,6 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
 	return nil
 }
 
-func (s *Server) addShardBySeriesID(sh *Shard, seriesID uint32) {
-	for _, other := range s.shardsBySeriesID[seriesID] {
-		if other.ID == sh.ID {
-			return
-		}
-	}
-	s.shardsBySeriesID[seriesID] = append(s.shardsBySeriesID[seriesID], sh)
-}
-
 // createMeasurementsIfNotExists walks the "points" and ensures that all new Series are created, and all
 // new Measurement fields have been created, across the cluster.
 func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string, points []Point) error {

From 612ef1fa0851fae324618a16f224909a3c6f9eae Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip.otoole@yahoo.com>
Date: Fri, 20 Feb 2015 00:45:35 -0800
Subject: [PATCH 16/17] Write batch in a single BoltDB transaction

---
 server.go | 27 ++-------------------------
 shard.go  | 39 +++++++++++++++++++++++++++++----------
 2 files changed, 31 insertions(+), 35 deletions(-)

diff --git a/server.go b/server.go
index 30d5388d36..da26d70ac2 100644
--- a/server.go
+++ b/server.go
@@ -1652,31 +1652,8 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
 		return ErrShardNotFound
 	}
 
-	// TODO: Enable some way to specify if the data should be overwritten
-	overwrite := true
-
-	for {
-		if pointHeaderSize > len(m.Data) {
-			return ErrInvalidPointBuffer
-		}
-		seriesID, payloadLength, timestamp := unmarshalPointHeader(m.Data[:pointHeaderSize])
-		m.Data = m.Data[pointHeaderSize:]
-
-		if payloadLength > uint32(len(m.Data)) {
-			return ErrInvalidPointBuffer
-		}
-		data := m.Data[:payloadLength]
-
-		// Write to shard.
-		if err := sh.writeSeries(seriesID, timestamp, data, overwrite); err != nil {
-			return err
-		}
-
-		// Push the buffer forward and check if we're done.
-		m.Data = m.Data[payloadLength:]
-		if len(m.Data) == 0 {
-			break
-		}
+	if err := sh.writeSeries(m.Data); err != nil {
+		return err
 	}
 
 	return nil
diff --git a/shard.go b/shard.go
index 40791f13e3..abf376f525 100644
--- a/shard.go
+++ b/shard.go
@@ -108,18 +108,37 @@ func (s *Shard) readSeries(seriesID uint32, timestamp int64) (values []byte, err
 	return
 }
 
-// writeSeries writes series data to a shard.
-func (s *Shard) writeSeries(seriesID uint32, timestamp int64, values []byte, overwrite bool) error {
+// writeSeries writes series batch to a shard.
+func (s *Shard) writeSeries(batch []byte) error {
 	return s.store.Update(func(tx *bolt.Tx) error {
-		// Create a bucket for the series.
-		b, err := tx.CreateBucketIfNotExists(u32tob(seriesID))
-		if err != nil {
-			return err
-		}
+		for {
+			if pointHeaderSize > len(batch) {
+				return ErrInvalidPointBuffer
+			}
+			seriesID, payloadLength, timestamp := unmarshalPointHeader(batch[:pointHeaderSize])
+			batch = batch[pointHeaderSize:]
 
-		// Insert the values by timestamp.
-		if err := b.Put(u64tob(uint64(timestamp)), values); err != nil {
-			return err
+			if payloadLength > uint32(len(batch)) {
+				return ErrInvalidPointBuffer
+			}
+			data := batch[:payloadLength]
+
+			// Create a bucket for the series.
+			b, err := tx.CreateBucketIfNotExists(u32tob(seriesID))
+			if err != nil {
+				return err
+			}
+
+			// Insert the values by timestamp.
+			if err := b.Put(u64tob(uint64(timestamp)), data); err != nil {
+				return err
+			}
+
+			// Push the buffer forward and check if we're done.
+			batch = batch[payloadLength:]
+			if len(batch) == 0 {
+				break
+			}
 		}
 
 		return nil

From 9c4174a006be5b43463ba4b9f58b3c444c9a6c09 Mon Sep 17 00:00:00 2001
From: Philip O'Toole <philip@influxdb.com>
Date: Fri, 20 Feb 2015 14:26:12 -0800
Subject: [PATCH 17/17] Simplify locking in WriteSeries()

In addition, memomize the Field codecs.
---
 server.go | 86 +++++++++++++++++++++++++++++--------------------------
 1 file changed, 46 insertions(+), 40 deletions(-)

diff --git a/server.go b/server.go
index da26d70ac2..9427b81c8f 100644
--- a/server.go
+++ b/server.go
@@ -1575,51 +1575,57 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
 
 	// Build writeRawSeriesMessageType publish commands.
 	shardData := make(map[uint64][]byte, 0)
-	for _, p := range points {
+	codecs := make(map[string]*FieldCodec, 0)
+	if err := func() error {
 		// Local function makes lock management foolproof.
-		measurement, series, err := func() (*Measurement, *Series, error) {
-			s.mu.RLock()
-			defer s.mu.RUnlock()
-			db := s.databases[database]
-			if db == nil {
-				return nil, nil, ErrDatabaseNotFound
-			}
-			if measurement, series := db.MeasurementAndSeries(p.Name, p.Tags); series != nil {
-				return measurement, series, nil
-			}
-			return nil, nil, ErrSeriesNotFound
-		}()
-		if err != nil {
-			return 0, err
-		}
-
-		// Retrieve shard group.
-		g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp)
-		if err != nil {
-			return 0, err
-		}
-
-		// Find appropriate shard within the shard group.
-		sh := g.ShardBySeriesID(series.ID)
-
-		// Get a field codec.
 		s.mu.RLock()
-		codec := NewFieldCodec(measurement)
-		s.mu.RUnlock()
+		defer s.mu.RUnlock()
 
-		// Convert string-key/values to encoded fields.
-		encodedFields, err := codec.EncodeFields(p.Values)
-		if err != nil {
-			return 0, err
+		db := s.databases[database]
+		if db == nil {
+			return ErrDatabaseNotFound
+		}
+		for _, p := range points {
+			measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
+			if series == nil {
+				return ErrSeriesNotFound
+			}
+
+			// Retrieve shard group.
+			g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp)
+			if err != nil {
+				return err
+			}
+
+			// Find appropriate shard within the shard group.
+			sh := g.ShardBySeriesID(series.ID)
+
+			// Many points are likely to have the same Measurement name. Re-use codecs if possible.
+			var codec *FieldCodec
+			codec, ok := codecs[measurement.Name]
+			if !ok {
+				codec = NewFieldCodec(measurement)
+				codecs[measurement.Name] = codec
+			}
+
+			// Convert string-key/values to encoded fields.
+			encodedFields, err := codec.EncodeFields(p.Values)
+			if err != nil {
+				return err
+			}
+
+			// Encode point header, followed by point data, and add to shard's batch.
+			data := marshalPointHeader(series.ID, uint32(len(encodedFields)), p.Timestamp.UnixNano())
+			data = append(data, encodedFields...)
+			if shardData[sh.ID] == nil {
+				shardData[sh.ID] = make([]byte, 0)
+			}
+			shardData[sh.ID] = append(shardData[sh.ID], data...)
 		}
 
-		// Encode point header, followed by point data, and add to shard's batch.
-		data := marshalPointHeader(series.ID, uint32(len(encodedFields)), p.Timestamp.UnixNano())
-		data = append(data, encodedFields...)
-		if shardData[sh.ID] == nil {
-			shardData[sh.ID] = make([]byte, 0)
-		}
-		shardData[sh.ID] = append(shardData[sh.ID], data...)
+		return nil
+	}(); err != nil {
+		return 0, err
 	}
 
 	// Write data for each shard to the Broker.