diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ea576545f..fcbe925d31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,11 +32,15 @@ - [#7382](https://github.com/influxdata/influxdb/issues/7382): Shard stats include wal path tag so disk bytes make more sense. - [#7385](https://github.com/influxdata/influxdb/pull/7385): Reduce query planning allocations -## v1.0.2 [unreleased] +## v1.0.2 [2016-10-05] ### Bugfixes +- [#7150](https://github.com/influxdata/influxdb/issues/7150): Do not automatically reset the shard duration when using ALTER RETENTION POLICY +- [#5878](https://github.com/influxdata/influxdb/issues/5878): Ensure correct shard groups created when retention policy has been altered. - [#7391](https://github.com/influxdata/influxdb/issues/7391): Fix RLE integer decoding producing negative numbers +- [#7335](https://github.com/influxdata/influxdb/pull/7335): Avoid stat syscall when planning compactions +- [#7330](https://github.com/influxdata/influxdb/issues/7330): Subscription data loss under high write load ## v1.0.1 [2016-09-26] diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 785d9c9db6..2dd65ccf95 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -5,6 +5,7 @@ import ( "io" "log" "os" + "sort" "sync" "sync/atomic" "time" @@ -190,10 +191,6 @@ func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic { // maps to a shard group or shard that does not currently exist, it will be // created before returning the mapping. func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) { - - // holds the start time ranges for required shard groups - timeRanges := map[time.Time]*meta.ShardGroupInfo{} - rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy) if err != nil { return nil, err @@ -201,46 +198,83 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy) } - // Find the minimum time for a point if the retention policy has a shard - // group duration. We will automatically drop any points before this time. - // There is a chance of a time on the edge of the shard group duration to - // sneak through even after it has been removed, but the circumstances are - // rare enough and don't matter enough that we don't account for this - // edge case. + // Holds all the shard groups and shards that are required for writes. + list := make(sgList, 0, 8) min := time.Unix(0, models.MinNanoTime) if rp.Duration > 0 { min = time.Now().Add(-rp.Duration) } for _, p := range wp.Points { - if p.Time().Before(min) { + // Either the point is outside the scope of the RP, or we already have + // a suitable shard group for the point. + if p.Time().Before(min) || list.Covers(p.Time()) { continue } - timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil - } - // holds all the shard groups and shards that are required for writes - for t := range timeRanges { - sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t) + // No shard groups overlap with the point's time, so we will create + // a new shard group for this point. + sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, p.Time()) if err != nil { return nil, err } - timeRanges[t] = sg + + if sg == nil { + return nil, errors.New("nil shard group") + } + list = list.Append(*sg) } mapping := NewShardMapping() for _, p := range wp.Points { - sg, ok := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] - if !ok { + sg := list.ShardGroupAt(p.Time()) + if sg == nil { + // We didn't create a shard group because the point was outside the + // scope of the RP. atomic.AddInt64(&w.stats.WriteDropped, 1) continue } + sh := sg.ShardFor(p.HashID()) mapping.MapPoint(&sh, p) } return mapping, nil } +// sgList is a wrapper around a meta.ShardGroupInfos where we can also check +// if a given time is covered by any of the shard groups in the list. +type sgList meta.ShardGroupInfos + +func (l sgList) Covers(t time.Time) bool { + if len(l) == 0 { + return false + } + return l.ShardGroupAt(t) != nil +} + +func (l sgList) ShardGroupAt(t time.Time) *meta.ShardGroupInfo { + // Attempt to find a shard group that could contain this point. + // Shard groups are sorted first according to end time, and then according + // to start time. Therefore, if there are multiple shard groups that match + // this point's time they will be preferred in this order: + // + // - a shard group with the earliest end time; + // - (assuming identical end times) the shard group with the earliest start + // time. + idx := sort.Search(len(l), func(i int) bool { return l[i].EndTime.After(t) }) + if idx == len(l) { + return nil + } + return &l[idx] +} + +// Append appends a shard group to the list, and returns a sorted list. +func (l sgList) Append(sgi meta.ShardGroupInfo) sgList { + next := append(l, sgi) + sort.Sort(meta.ShardGroupInfos(next)) + return next +} + // WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of // a cluster structure for information. This is to avoid a circular dependency func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error { diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 810bf238d1..86752e9d6e 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -50,10 +50,89 @@ func TestPointsWriter_MapShards_One(t *testing.T) { } } +// Ensures the points writer maps to a new shard group when the shard duration +// is changed. +func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) { + ms := PointsWriterMetaClient{} + rp := NewRetentionPolicy("myp", time.Hour, 3) + + ms.NodeIDFn = func() uint64 { return 1 } + ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { + return rp, nil + } + + var ( + i int + now = time.Now() + ) + + ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { + sg := []meta.ShardGroupInfo{ + meta.ShardGroupInfo{ + Shards: make([]meta.ShardInfo, 1), + StartTime: now, EndTime: now.Add(rp.Duration).Add(-1), + }, + meta.ShardGroupInfo{ + Shards: make([]meta.ShardInfo, 1), + StartTime: now.Add(time.Hour), EndTime: now.Add(3 * time.Hour).Add(rp.Duration).Add(-1), + }, + }[i] + i++ + return &sg, nil + } + + c := coordinator.NewPointsWriter() + c.MetaClient = ms + + pr := &coordinator.WritePointsRequest{ + Database: "mydb", + RetentionPolicy: "myrp", + } + pr.AddPoint("cpu", 1.0, now, nil) + pr.AddPoint("cpu", 2.0, now.Add(2*time.Second), nil) + + var ( + shardMappings *coordinator.ShardMapping + err error + ) + if shardMappings, err = c.MapShards(pr); err != nil { + t.Fatalf("unexpected an error: %v", err) + } + + if got, exp := len(shardMappings.Points[0]), 2; got != exp { + t.Fatalf("got %d point(s), expected %d", got, exp) + } + + if got, exp := len(shardMappings.Shards), 1; got != exp { + t.Errorf("got %d shard(s), expected %d", got, exp) + } + + // Now we alter the retention policy duration. + rp.ShardGroupDuration = 3 * time.Hour + + pr = &coordinator.WritePointsRequest{ + Database: "mydb", + RetentionPolicy: "myrp", + } + pr.AddPoint("cpu", 1.0, now.Add(2*time.Hour), nil) + + // Point is beyond previous shard group so a new shard group should be + // created. + if shardMappings, err = c.MapShards(pr); err != nil { + t.Fatalf("unexpected an error: %v", err) + } + + // We can check value of i since it's only incremeneted when a shard group + // is created. + if got, exp := i, 2; got != exp { + t.Fatal("new shard group was not created, expected it to be") + } +} + // Ensures the points writer maps a multiple points across shard group boundaries. func TestPointsWriter_MapShards_Multiple(t *testing.T) { ms := PointsWriterMetaClient{} - rp := NewRetentionPolicy("myp", 0, 3) + rp := NewRetentionPolicy("myp", time.Hour, 3) rp.ShardGroupDuration = time.Hour AttachShardGroupInfo(rp, []meta.ShardOwner{ {NodeID: 1}, @@ -90,9 +169,9 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) { // Three points that range over the shardGroup duration (1h) and should map to two // distinct shards - pr.AddPoint("cpu", 1.0, time.Unix(0, 0), nil) - pr.AddPoint("cpu", 2.0, time.Unix(0, 0).Add(time.Hour), nil) - pr.AddPoint("cpu", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil) + pr.AddPoint("cpu", 1.0, time.Now(), nil) + pr.AddPoint("cpu", 2.0, time.Now().Add(time.Hour), nil) + pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil) var ( shardMappings *coordinator.ShardMapping @@ -107,12 +186,12 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) { } for _, points := range shardMappings.Points { - // First shard shoud have 1 point w/ first point added + // First shard should have 1 point w/ first point added if len(points) == 1 && points[0].Time() != pr.Points[0].Time() { t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time(), pr.Points[0].Time()) } - // Second shard shoud have the last two points added + // Second shard should have the last two points added if len(points) == 2 && points[0].Time() != pr.Points[1].Time() { t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time(), pr.Points[1].Time()) } @@ -195,11 +274,15 @@ func TestPointsWriter_WritePoints(t *testing.T) { RetentionPolicy: test.retentionPolicy, } + // Ensure that the test shard groups are created before the points + // are created. + ms := NewPointsWriterMetaClient() + // Three points that range over the shardGroup duration (1h) and should map to two // distinct shards - pr.AddPoint("cpu", 1.0, time.Unix(0, 0), nil) - pr.AddPoint("cpu", 2.0, time.Unix(0, 0).Add(time.Hour), nil) - pr.AddPoint("cpu", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil) + pr.AddPoint("cpu", 1.0, time.Now(), nil) + pr.AddPoint("cpu", 2.0, time.Now().Add(time.Hour), nil) + pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil) // copy to prevent data race theTest := test @@ -245,7 +328,6 @@ func TestPointsWriter_WritePoints(t *testing.T) { }, } - ms := NewPointsWriterMetaClient() ms.DatabaseFn = func(database string) *meta.DatabaseInfo { return nil } @@ -357,7 +439,7 @@ func TestBufferedPointsWriter(t *testing.T) { numPoints := int(float64(capacity) * 5.5) for i := 0; i < numPoints; i++ { - req.AddPoint("cpu", float64(i), time.Unix(0, 0).Add(time.Duration(i)*time.Second), nil) + req.AddPoint("cpu", float64(i), time.Now().Add(time.Duration(i)*time.Second), nil) } r := coordinator.IntoWriteRequest(req) @@ -508,6 +590,7 @@ func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *met Owners: owners, }) + start := time.Now() rp := &meta.RetentionPolicyInfo{ Name: "myrp", ReplicaN: nodeCount, @@ -516,8 +599,8 @@ func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *met ShardGroups: []meta.ShardGroupInfo{ meta.ShardGroupInfo{ ID: nextShardID(), - StartTime: time.Unix(0, 0), - EndTime: time.Unix(0, 0).Add(duration).Add(-1), + StartTime: start, + EndTime: start.Add(duration).Add(-1), Shards: shards, }, }, @@ -528,7 +611,7 @@ func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *met func AttachShardGroupInfo(rp *meta.RetentionPolicyInfo, owners []meta.ShardOwner) { var startTime, endTime time.Time if len(rp.ShardGroups) == 0 { - startTime = time.Unix(0, 0) + startTime = time.Now() } else { startTime = rp.ShardGroups[len(rp.ShardGroups)-1].StartTime.Add(rp.ShardGroupDuration) } diff --git a/etc/config.sample.toml b/etc/config.sample.toml index fde202d06a..f586284450 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -49,8 +49,8 @@ reporting-disabled = false # These are the WAL settings for the storage engine >= 0.9.3 wal-dir = "/var/lib/influxdb/wal" wal-logging-enabled = true - - # Trace logging provides more verbose output around the tsm engine. Turning + + # Trace logging provides more verbose output around the tsm engine. Turning # this on can provide more useful output for debugging tsm engine issues. # trace-logging-enabled = false @@ -185,7 +185,9 @@ reporting-disabled = false [subscriber] enabled = true - http-timeout = "30s" + # http-timeout = "30s" + # write-concurrency = 40 + # write-buffer-size = 1000 ### diff --git a/services/meta/client_test.go b/services/meta/client_test.go index 216143f9f7..3f3f059738 100644 --- a/services/meta/client_test.go +++ b/services/meta/client_test.go @@ -84,7 +84,7 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) { Name: "rp0", Duration: &duration, ReplicaN: &replicaN, - ShardGroupDuration: 2 * time.Hour, + ShardGroupDuration: 30 * time.Minute, } if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec); err != nil { t.Fatal(err) @@ -104,7 +104,7 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) { t.Fatalf("rp duration wrong: %v", rp.Duration) } else if rp.ReplicaN != 1 { t.Fatalf("rp replication wrong: %d", rp.ReplicaN) - } else if rp.ShardGroupDuration != 2*time.Hour { + } else if rp.ShardGroupDuration != 30*time.Minute { t.Fatalf("rp shard duration wrong: %v", rp.ShardGroupDuration) } @@ -306,7 +306,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) { // Creating the same policy, but with a different shard group // duration should also result in an error. rp1 = rp0 - rp1.ShardGroupDuration = 2 * rp0.ShardGroupDuration + rp1.ShardGroupDuration = rp0.ShardGroupDuration / 2 _, got = c.CreateRetentionPolicy("db0", &meta.RetentionPolicySpec{ Name: rp1.Name, @@ -317,6 +317,22 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) { if exp := meta.ErrRetentionPolicyExists; got != exp { t.Fatalf("got error %v, expected error %v", got, exp) } + + // Creating a policy with the shard duration being greater than the + // duration should also be an error. + rp1 = rp0 + rp1.Duration = 1 * time.Hour + rp1.ShardGroupDuration = 2 * time.Hour + + _, got = c.CreateRetentionPolicy("db0", &meta.RetentionPolicySpec{ + Name: rp1.Name, + ReplicaN: &rp1.ReplicaN, + Duration: &rp1.Duration, + ShardGroupDuration: rp1.ShardGroupDuration, + }) + if exp := meta.ErrIncompatibleDurations; got != exp { + t.Fatalf("got error %v, expected error %v", got, exp) + } } func TestMetaClient_SetDefaultRetentionPolicy(t *testing.T) { @@ -360,6 +376,87 @@ func TestMetaClient_SetDefaultRetentionPolicy(t *testing.T) { } } +func TestMetaClient_UpdateRetentionPolicy(t *testing.T) { + t.Parallel() + + d, c := newClient() + defer os.RemoveAll(d) + defer c.Close() + + if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &meta.RetentionPolicySpec{ + Name: "rp0", + ShardGroupDuration: 4 * time.Hour, + }); err != nil { + t.Fatal(err) + } + + rpi, err := c.RetentionPolicy("db0", "rp0") + if err != nil { + t.Fatal(err) + } + + // Set the duration to another value and ensure that the shard group duration + // doesn't change. + duration := 2 * rpi.ShardGroupDuration + replicaN := 1 + if err := c.UpdateRetentionPolicy("db0", "rp0", &meta.RetentionPolicyUpdate{ + Duration: &duration, + ReplicaN: &replicaN, + }); err != nil { + t.Fatal(err) + } + + rpi, err = c.RetentionPolicy("db0", "rp0") + if err != nil { + t.Fatal(err) + } + if exp, got := 4*time.Hour, rpi.ShardGroupDuration; exp != got { + t.Fatalf("shard group duration wrong: \n\texp: %s\n\tgot: %s", exp, got) + } + + // Set the duration to below the shard group duration. This should return an error. + duration = rpi.ShardGroupDuration / 2 + if err := c.UpdateRetentionPolicy("db0", "rp0", &meta.RetentionPolicyUpdate{ + Duration: &duration, + }); err == nil { + t.Fatal("expected error") + } else if err != meta.ErrIncompatibleDurations { + t.Fatalf("expected error '%s', got '%s'", meta.ErrIncompatibleDurations, err) + } + + // Set the shard duration longer than the overall duration. This should also return an error. + sgDuration := rpi.Duration * 2 + if err := c.UpdateRetentionPolicy("db0", "rp0", &meta.RetentionPolicyUpdate{ + ShardGroupDuration: &sgDuration, + }); err == nil { + t.Fatal("expected error") + } else if err != meta.ErrIncompatibleDurations { + t.Fatalf("expected error '%s', got '%s'", meta.ErrIncompatibleDurations, err) + } + + // Set both values to incompatible values and ensure an error is returned. + duration = rpi.ShardGroupDuration + sgDuration = rpi.Duration + if err := c.UpdateRetentionPolicy("db0", "rp0", &meta.RetentionPolicyUpdate{ + Duration: &duration, + ShardGroupDuration: &sgDuration, + }); err == nil { + t.Fatal("expected error") + } else if err != meta.ErrIncompatibleDurations { + t.Fatalf("expected error '%s', got '%s'", meta.ErrIncompatibleDurations, err) + } + + // Allow any shard duration if the duration is set to zero. + duration = time.Duration(0) + sgDuration = 168 * time.Hour + if err := c.UpdateRetentionPolicy("db0", "rp0", &meta.RetentionPolicyUpdate{ + Duration: &duration, + ShardGroupDuration: &sgDuration, + }); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + func TestMetaClient_DropRetentionPolicy(t *testing.T) { t.Parallel() diff --git a/services/meta/data.go b/services/meta/data.go index d80aaaa3d5..5a39b59bc0 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -150,6 +150,10 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf // do it again to verify input. rpi.ShardGroupDuration = normalisedShardDuration(rpi.ShardGroupDuration, rpi.Duration) + if rpi.Duration > 0 && rpi.Duration < rpi.ShardGroupDuration { + return ErrIncompatibleDurations + } + // Find database. di := data.Database(database) if di == nil { @@ -231,6 +235,15 @@ func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPol return ErrRetentionPolicyDurationTooLow } + // Enforce duration is at least the shard duration + if (rpu.Duration != nil && *rpu.Duration > 0 && + ((rpu.ShardGroupDuration != nil && *rpu.Duration < *rpu.ShardGroupDuration) || + (rpu.ShardGroupDuration == nil && *rpu.Duration < rpi.ShardGroupDuration))) || + (rpu.Duration == nil && rpi.Duration > 0 && + rpu.ShardGroupDuration != nil && rpi.Duration < *rpu.ShardGroupDuration) { + return ErrIncompatibleDurations + } + // Update fields. if rpu.Name != nil { rpi.Name = *rpu.Name @@ -241,11 +254,8 @@ func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPol if rpu.ReplicaN != nil { rpi.ReplicaN = *rpu.ReplicaN } - if rpu.ShardGroupDuration != nil { - rpi.ShardGroupDuration = *rpu.ShardGroupDuration - } else { - rpi.ShardGroupDuration = shardGroupDuration(rpi.Duration) + rpi.ShardGroupDuration = normalisedShardDuration(*rpu.ShardGroupDuration, rpi.Duration) } return nil diff --git a/services/meta/errors.go b/services/meta/errors.go index a028cd4fad..802ea2f0b6 100644 --- a/services/meta/errors.go +++ b/services/meta/errors.go @@ -55,6 +55,11 @@ var ( // with an existing policy. ErrRetentionPolicyConflict = errors.New("retention policy conflicts with an existing policy") + // ErrIncompatibleDurations is returned when creating or updating a + // retention policy that has a duration lower than the current shard + // duration. + ErrIncompatibleDurations = errors.New("retention policy duration must be greater than the shard duration") + // ErrReplicationFactorTooLow is returned when the replication factor is not in an // acceptable range. ErrReplicationFactorTooLow = errors.New("replication factor must be greater than 0") diff --git a/services/subscriber/config.go b/services/subscriber/config.go index 372e5440c4..d9b026f9fe 100644 --- a/services/subscriber/config.go +++ b/services/subscriber/config.go @@ -11,7 +11,9 @@ import ( ) const ( - DefaultHTTPTimeout = 30 * time.Second + DefaultHTTPTimeout = 30 * time.Second + DefaultWriteConcurrency = 40 + DefaultWriteBufferSize = 1000 ) // Config represents a configuration of the subscriber service. @@ -28,6 +30,12 @@ type Config struct { // configure the path to the PEM encoded CA certs file. If the // empty string, the default system certs will be used CaCerts string `toml:"ca-certs"` + + // The number of writer goroutines processing the write channel. + WriteConcurrency int `toml:"write-concurrency"` + + // The number of in-flight writes buffered in the write channel. + WriteBufferSize int `toml:"write-buffer-size"` } // NewConfig returns a new instance of a subscriber config. @@ -37,6 +45,8 @@ func NewConfig() Config { HTTPTimeout: toml.Duration(DefaultHTTPTimeout), InsecureSkipVerify: false, CaCerts: "", + WriteConcurrency: DefaultWriteConcurrency, + WriteBufferSize: DefaultWriteBufferSize, } } @@ -44,6 +54,7 @@ func (c Config) Validate() error { if c.HTTPTimeout <= 0 { return errors.New("http-timeout must be greater than 0") } + if c.CaCerts != "" && !fileExists(c.CaCerts) { abspath, err := filepath.Abs(c.CaCerts) if err != nil { @@ -51,6 +62,15 @@ func (c Config) Validate() error { } return fmt.Errorf("ca-certs file %s does not exist", abspath) } + + if c.WriteBufferSize <= 0 { + return errors.New("write-buffer-size must be greater than 0") + } + + if c.WriteConcurrency <= 0 { + return errors.New("write-concurrency must be greater than 0") + } + return nil } diff --git a/services/subscriber/config_test.go b/services/subscriber/config_test.go index bbb38f488b..3915d3f15a 100644 --- a/services/subscriber/config_test.go +++ b/services/subscriber/config_test.go @@ -42,6 +42,8 @@ http-timeout = "60s" enabled = true ca-certs = '%s' insecure-skip-verify = true +write-buffer-size = 1000 +write-concurrency = 10 `, abspath), &c); err != nil { t.Fatal(err) } @@ -87,6 +89,8 @@ http-timeout = "60s" enabled = true ca-certs = '%s' insecure-skip-verify = false +write-buffer-size = 1000 +write-concurrency = 10 `, tmpfile.Name()), &c); err != nil { t.Fatal(err) } diff --git a/services/subscriber/service.go b/services/subscriber/service.go index d2557db4e5..c8939663e3 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -24,7 +24,8 @@ const ( ) // PointsWriter is an interface for writing points to a subscription destination. -// Only WritePoints() needs to be satisfied. +// Only WritePoints() needs to be satisfied. PointsWriter implementations +// must be goroutine safe. type PointsWriter interface { WritePoints(p *coordinator.WritePointsRequest) error } @@ -287,17 +288,19 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) error { return err } cw := chanWriter{ - writeRequests: make(chan *coordinator.WritePointsRequest, 100), + writeRequests: make(chan *coordinator.WritePointsRequest, s.conf.WriteBufferSize), pw: sub, pointsWritten: &s.stats.PointsWritten, failures: &s.stats.WriteFailures, logger: s.Logger, } - wg.Add(1) - go func() { - defer wg.Done() - cw.Run() - }() + for i := 0; i < s.conf.WriteConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + cw.Run() + }() + } s.subs[se] = cw s.Logger.Println("added new subscription for", se.db, se.rp) }