commit
5427ba2db7
|
@ -32,11 +32,15 @@
|
|||
- [#7382](https://github.com/influxdata/influxdb/issues/7382): Shard stats include wal path tag so disk bytes make more sense.
|
||||
- [#7385](https://github.com/influxdata/influxdb/pull/7385): Reduce query planning allocations
|
||||
|
||||
## v1.0.2 [unreleased]
|
||||
## v1.0.2 [2016-10-05]
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#7150](https://github.com/influxdata/influxdb/issues/7150): Do not automatically reset the shard duration when using ALTER RETENTION POLICY
|
||||
- [#5878](https://github.com/influxdata/influxdb/issues/5878): Ensure correct shard groups created when retention policy has been altered.
|
||||
- [#7391](https://github.com/influxdata/influxdb/issues/7391): Fix RLE integer decoding producing negative numbers
|
||||
- [#7335](https://github.com/influxdata/influxdb/pull/7335): Avoid stat syscall when planning compactions
|
||||
- [#7330](https://github.com/influxdata/influxdb/issues/7330): Subscription data loss under high write load
|
||||
|
||||
## v1.0.1 [2016-09-26]
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -190,10 +191,6 @@ func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic {
|
|||
// maps to a shard group or shard that does not currently exist, it will be
|
||||
// created before returning the mapping.
|
||||
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
|
||||
|
||||
// holds the start time ranges for required shard groups
|
||||
timeRanges := map[time.Time]*meta.ShardGroupInfo{}
|
||||
|
||||
rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -201,46 +198,83 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
|
|||
return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)
|
||||
}
|
||||
|
||||
// Find the minimum time for a point if the retention policy has a shard
|
||||
// group duration. We will automatically drop any points before this time.
|
||||
// There is a chance of a time on the edge of the shard group duration to
|
||||
// sneak through even after it has been removed, but the circumstances are
|
||||
// rare enough and don't matter enough that we don't account for this
|
||||
// edge case.
|
||||
// Holds all the shard groups and shards that are required for writes.
|
||||
list := make(sgList, 0, 8)
|
||||
min := time.Unix(0, models.MinNanoTime)
|
||||
if rp.Duration > 0 {
|
||||
min = time.Now().Add(-rp.Duration)
|
||||
}
|
||||
|
||||
for _, p := range wp.Points {
|
||||
if p.Time().Before(min) {
|
||||
// Either the point is outside the scope of the RP, or we already have
|
||||
// a suitable shard group for the point.
|
||||
if p.Time().Before(min) || list.Covers(p.Time()) {
|
||||
continue
|
||||
}
|
||||
timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil
|
||||
}
|
||||
|
||||
// holds all the shard groups and shards that are required for writes
|
||||
for t := range timeRanges {
|
||||
sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)
|
||||
// No shard groups overlap with the point's time, so we will create
|
||||
// a new shard group for this point.
|
||||
sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, p.Time())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
timeRanges[t] = sg
|
||||
|
||||
if sg == nil {
|
||||
return nil, errors.New("nil shard group")
|
||||
}
|
||||
list = list.Append(*sg)
|
||||
}
|
||||
|
||||
mapping := NewShardMapping()
|
||||
for _, p := range wp.Points {
|
||||
sg, ok := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
|
||||
if !ok {
|
||||
sg := list.ShardGroupAt(p.Time())
|
||||
if sg == nil {
|
||||
// We didn't create a shard group because the point was outside the
|
||||
// scope of the RP.
|
||||
atomic.AddInt64(&w.stats.WriteDropped, 1)
|
||||
continue
|
||||
}
|
||||
|
||||
sh := sg.ShardFor(p.HashID())
|
||||
mapping.MapPoint(&sh, p)
|
||||
}
|
||||
return mapping, nil
|
||||
}
|
||||
|
||||
// sgList is a wrapper around a meta.ShardGroupInfos where we can also check
|
||||
// if a given time is covered by any of the shard groups in the list.
|
||||
type sgList meta.ShardGroupInfos
|
||||
|
||||
func (l sgList) Covers(t time.Time) bool {
|
||||
if len(l) == 0 {
|
||||
return false
|
||||
}
|
||||
return l.ShardGroupAt(t) != nil
|
||||
}
|
||||
|
||||
func (l sgList) ShardGroupAt(t time.Time) *meta.ShardGroupInfo {
|
||||
// Attempt to find a shard group that could contain this point.
|
||||
// Shard groups are sorted first according to end time, and then according
|
||||
// to start time. Therefore, if there are multiple shard groups that match
|
||||
// this point's time they will be preferred in this order:
|
||||
//
|
||||
// - a shard group with the earliest end time;
|
||||
// - (assuming identical end times) the shard group with the earliest start
|
||||
// time.
|
||||
idx := sort.Search(len(l), func(i int) bool { return l[i].EndTime.After(t) })
|
||||
if idx == len(l) {
|
||||
return nil
|
||||
}
|
||||
return &l[idx]
|
||||
}
|
||||
|
||||
// Append appends a shard group to the list, and returns a sorted list.
|
||||
func (l sgList) Append(sgi meta.ShardGroupInfo) sgList {
|
||||
next := append(l, sgi)
|
||||
sort.Sort(meta.ShardGroupInfos(next))
|
||||
return next
|
||||
}
|
||||
|
||||
// WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of
|
||||
// a cluster structure for information. This is to avoid a circular dependency
|
||||
func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error {
|
||||
|
|
|
@ -50,10 +50,89 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensures the points writer maps to a new shard group when the shard duration
|
||||
// is changed.
|
||||
func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) {
|
||||
ms := PointsWriterMetaClient{}
|
||||
rp := NewRetentionPolicy("myp", time.Hour, 3)
|
||||
|
||||
ms.NodeIDFn = func() uint64 { return 1 }
|
||||
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
|
||||
return rp, nil
|
||||
}
|
||||
|
||||
var (
|
||||
i int
|
||||
now = time.Now()
|
||||
)
|
||||
|
||||
ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
|
||||
sg := []meta.ShardGroupInfo{
|
||||
meta.ShardGroupInfo{
|
||||
Shards: make([]meta.ShardInfo, 1),
|
||||
StartTime: now, EndTime: now.Add(rp.Duration).Add(-1),
|
||||
},
|
||||
meta.ShardGroupInfo{
|
||||
Shards: make([]meta.ShardInfo, 1),
|
||||
StartTime: now.Add(time.Hour), EndTime: now.Add(3 * time.Hour).Add(rp.Duration).Add(-1),
|
||||
},
|
||||
}[i]
|
||||
i++
|
||||
return &sg, nil
|
||||
}
|
||||
|
||||
c := coordinator.NewPointsWriter()
|
||||
c.MetaClient = ms
|
||||
|
||||
pr := &coordinator.WritePointsRequest{
|
||||
Database: "mydb",
|
||||
RetentionPolicy: "myrp",
|
||||
}
|
||||
pr.AddPoint("cpu", 1.0, now, nil)
|
||||
pr.AddPoint("cpu", 2.0, now.Add(2*time.Second), nil)
|
||||
|
||||
var (
|
||||
shardMappings *coordinator.ShardMapping
|
||||
err error
|
||||
)
|
||||
if shardMappings, err = c.MapShards(pr); err != nil {
|
||||
t.Fatalf("unexpected an error: %v", err)
|
||||
}
|
||||
|
||||
if got, exp := len(shardMappings.Points[0]), 2; got != exp {
|
||||
t.Fatalf("got %d point(s), expected %d", got, exp)
|
||||
}
|
||||
|
||||
if got, exp := len(shardMappings.Shards), 1; got != exp {
|
||||
t.Errorf("got %d shard(s), expected %d", got, exp)
|
||||
}
|
||||
|
||||
// Now we alter the retention policy duration.
|
||||
rp.ShardGroupDuration = 3 * time.Hour
|
||||
|
||||
pr = &coordinator.WritePointsRequest{
|
||||
Database: "mydb",
|
||||
RetentionPolicy: "myrp",
|
||||
}
|
||||
pr.AddPoint("cpu", 1.0, now.Add(2*time.Hour), nil)
|
||||
|
||||
// Point is beyond previous shard group so a new shard group should be
|
||||
// created.
|
||||
if shardMappings, err = c.MapShards(pr); err != nil {
|
||||
t.Fatalf("unexpected an error: %v", err)
|
||||
}
|
||||
|
||||
// We can check value of i since it's only incremeneted when a shard group
|
||||
// is created.
|
||||
if got, exp := i, 2; got != exp {
|
||||
t.Fatal("new shard group was not created, expected it to be")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensures the points writer maps a multiple points across shard group boundaries.
|
||||
func TestPointsWriter_MapShards_Multiple(t *testing.T) {
|
||||
ms := PointsWriterMetaClient{}
|
||||
rp := NewRetentionPolicy("myp", 0, 3)
|
||||
rp := NewRetentionPolicy("myp", time.Hour, 3)
|
||||
rp.ShardGroupDuration = time.Hour
|
||||
AttachShardGroupInfo(rp, []meta.ShardOwner{
|
||||
{NodeID: 1},
|
||||
|
@ -90,9 +169,9 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
|
|||
|
||||
// Three points that range over the shardGroup duration (1h) and should map to two
|
||||
// distinct shards
|
||||
pr.AddPoint("cpu", 1.0, time.Unix(0, 0), nil)
|
||||
pr.AddPoint("cpu", 2.0, time.Unix(0, 0).Add(time.Hour), nil)
|
||||
pr.AddPoint("cpu", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil)
|
||||
pr.AddPoint("cpu", 1.0, time.Now(), nil)
|
||||
pr.AddPoint("cpu", 2.0, time.Now().Add(time.Hour), nil)
|
||||
pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil)
|
||||
|
||||
var (
|
||||
shardMappings *coordinator.ShardMapping
|
||||
|
@ -107,12 +186,12 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, points := range shardMappings.Points {
|
||||
// First shard shoud have 1 point w/ first point added
|
||||
// First shard should have 1 point w/ first point added
|
||||
if len(points) == 1 && points[0].Time() != pr.Points[0].Time() {
|
||||
t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time(), pr.Points[0].Time())
|
||||
}
|
||||
|
||||
// Second shard shoud have the last two points added
|
||||
// Second shard should have the last two points added
|
||||
if len(points) == 2 && points[0].Time() != pr.Points[1].Time() {
|
||||
t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time(), pr.Points[1].Time())
|
||||
}
|
||||
|
@ -195,11 +274,15 @@ func TestPointsWriter_WritePoints(t *testing.T) {
|
|||
RetentionPolicy: test.retentionPolicy,
|
||||
}
|
||||
|
||||
// Ensure that the test shard groups are created before the points
|
||||
// are created.
|
||||
ms := NewPointsWriterMetaClient()
|
||||
|
||||
// Three points that range over the shardGroup duration (1h) and should map to two
|
||||
// distinct shards
|
||||
pr.AddPoint("cpu", 1.0, time.Unix(0, 0), nil)
|
||||
pr.AddPoint("cpu", 2.0, time.Unix(0, 0).Add(time.Hour), nil)
|
||||
pr.AddPoint("cpu", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil)
|
||||
pr.AddPoint("cpu", 1.0, time.Now(), nil)
|
||||
pr.AddPoint("cpu", 2.0, time.Now().Add(time.Hour), nil)
|
||||
pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil)
|
||||
|
||||
// copy to prevent data race
|
||||
theTest := test
|
||||
|
@ -245,7 +328,6 @@ func TestPointsWriter_WritePoints(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
ms := NewPointsWriterMetaClient()
|
||||
ms.DatabaseFn = func(database string) *meta.DatabaseInfo {
|
||||
return nil
|
||||
}
|
||||
|
@ -357,7 +439,7 @@ func TestBufferedPointsWriter(t *testing.T) {
|
|||
|
||||
numPoints := int(float64(capacity) * 5.5)
|
||||
for i := 0; i < numPoints; i++ {
|
||||
req.AddPoint("cpu", float64(i), time.Unix(0, 0).Add(time.Duration(i)*time.Second), nil)
|
||||
req.AddPoint("cpu", float64(i), time.Now().Add(time.Duration(i)*time.Second), nil)
|
||||
}
|
||||
|
||||
r := coordinator.IntoWriteRequest(req)
|
||||
|
@ -508,6 +590,7 @@ func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *met
|
|||
Owners: owners,
|
||||
})
|
||||
|
||||
start := time.Now()
|
||||
rp := &meta.RetentionPolicyInfo{
|
||||
Name: "myrp",
|
||||
ReplicaN: nodeCount,
|
||||
|
@ -516,8 +599,8 @@ func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *met
|
|||
ShardGroups: []meta.ShardGroupInfo{
|
||||
meta.ShardGroupInfo{
|
||||
ID: nextShardID(),
|
||||
StartTime: time.Unix(0, 0),
|
||||
EndTime: time.Unix(0, 0).Add(duration).Add(-1),
|
||||
StartTime: start,
|
||||
EndTime: start.Add(duration).Add(-1),
|
||||
Shards: shards,
|
||||
},
|
||||
},
|
||||
|
@ -528,7 +611,7 @@ func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *met
|
|||
func AttachShardGroupInfo(rp *meta.RetentionPolicyInfo, owners []meta.ShardOwner) {
|
||||
var startTime, endTime time.Time
|
||||
if len(rp.ShardGroups) == 0 {
|
||||
startTime = time.Unix(0, 0)
|
||||
startTime = time.Now()
|
||||
} else {
|
||||
startTime = rp.ShardGroups[len(rp.ShardGroups)-1].StartTime.Add(rp.ShardGroupDuration)
|
||||
}
|
||||
|
|
|
@ -185,7 +185,9 @@ reporting-disabled = false
|
|||
|
||||
[subscriber]
|
||||
enabled = true
|
||||
http-timeout = "30s"
|
||||
# http-timeout = "30s"
|
||||
# write-concurrency = 40
|
||||
# write-buffer-size = 1000
|
||||
|
||||
|
||||
###
|
||||
|
|
|
@ -84,7 +84,7 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) {
|
|||
Name: "rp0",
|
||||
Duration: &duration,
|
||||
ReplicaN: &replicaN,
|
||||
ShardGroupDuration: 2 * time.Hour,
|
||||
ShardGroupDuration: 30 * time.Minute,
|
||||
}
|
||||
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -104,7 +104,7 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) {
|
|||
t.Fatalf("rp duration wrong: %v", rp.Duration)
|
||||
} else if rp.ReplicaN != 1 {
|
||||
t.Fatalf("rp replication wrong: %d", rp.ReplicaN)
|
||||
} else if rp.ShardGroupDuration != 2*time.Hour {
|
||||
} else if rp.ShardGroupDuration != 30*time.Minute {
|
||||
t.Fatalf("rp shard duration wrong: %v", rp.ShardGroupDuration)
|
||||
}
|
||||
|
||||
|
@ -306,7 +306,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
|
|||
// Creating the same policy, but with a different shard group
|
||||
// duration should also result in an error.
|
||||
rp1 = rp0
|
||||
rp1.ShardGroupDuration = 2 * rp0.ShardGroupDuration
|
||||
rp1.ShardGroupDuration = rp0.ShardGroupDuration / 2
|
||||
|
||||
_, got = c.CreateRetentionPolicy("db0", &meta.RetentionPolicySpec{
|
||||
Name: rp1.Name,
|
||||
|
@ -317,6 +317,22 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
|
|||
if exp := meta.ErrRetentionPolicyExists; got != exp {
|
||||
t.Fatalf("got error %v, expected error %v", got, exp)
|
||||
}
|
||||
|
||||
// Creating a policy with the shard duration being greater than the
|
||||
// duration should also be an error.
|
||||
rp1 = rp0
|
||||
rp1.Duration = 1 * time.Hour
|
||||
rp1.ShardGroupDuration = 2 * time.Hour
|
||||
|
||||
_, got = c.CreateRetentionPolicy("db0", &meta.RetentionPolicySpec{
|
||||
Name: rp1.Name,
|
||||
ReplicaN: &rp1.ReplicaN,
|
||||
Duration: &rp1.Duration,
|
||||
ShardGroupDuration: rp1.ShardGroupDuration,
|
||||
})
|
||||
if exp := meta.ErrIncompatibleDurations; got != exp {
|
||||
t.Fatalf("got error %v, expected error %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetaClient_SetDefaultRetentionPolicy(t *testing.T) {
|
||||
|
@ -360,6 +376,87 @@ func TestMetaClient_SetDefaultRetentionPolicy(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMetaClient_UpdateRetentionPolicy(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d, c := newClient()
|
||||
defer os.RemoveAll(d)
|
||||
defer c.Close()
|
||||
|
||||
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &meta.RetentionPolicySpec{
|
||||
Name: "rp0",
|
||||
ShardGroupDuration: 4 * time.Hour,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rpi, err := c.RetentionPolicy("db0", "rp0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Set the duration to another value and ensure that the shard group duration
|
||||
// doesn't change.
|
||||
duration := 2 * rpi.ShardGroupDuration
|
||||
replicaN := 1
|
||||
if err := c.UpdateRetentionPolicy("db0", "rp0", &meta.RetentionPolicyUpdate{
|
||||
Duration: &duration,
|
||||
ReplicaN: &replicaN,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rpi, err = c.RetentionPolicy("db0", "rp0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if exp, got := 4*time.Hour, rpi.ShardGroupDuration; exp != got {
|
||||
t.Fatalf("shard group duration wrong: \n\texp: %s\n\tgot: %s", exp, got)
|
||||
}
|
||||
|
||||
// Set the duration to below the shard group duration. This should return an error.
|
||||
duration = rpi.ShardGroupDuration / 2
|
||||
if err := c.UpdateRetentionPolicy("db0", "rp0", &meta.RetentionPolicyUpdate{
|
||||
Duration: &duration,
|
||||
}); err == nil {
|
||||
t.Fatal("expected error")
|
||||
} else if err != meta.ErrIncompatibleDurations {
|
||||
t.Fatalf("expected error '%s', got '%s'", meta.ErrIncompatibleDurations, err)
|
||||
}
|
||||
|
||||
// Set the shard duration longer than the overall duration. This should also return an error.
|
||||
sgDuration := rpi.Duration * 2
|
||||
if err := c.UpdateRetentionPolicy("db0", "rp0", &meta.RetentionPolicyUpdate{
|
||||
ShardGroupDuration: &sgDuration,
|
||||
}); err == nil {
|
||||
t.Fatal("expected error")
|
||||
} else if err != meta.ErrIncompatibleDurations {
|
||||
t.Fatalf("expected error '%s', got '%s'", meta.ErrIncompatibleDurations, err)
|
||||
}
|
||||
|
||||
// Set both values to incompatible values and ensure an error is returned.
|
||||
duration = rpi.ShardGroupDuration
|
||||
sgDuration = rpi.Duration
|
||||
if err := c.UpdateRetentionPolicy("db0", "rp0", &meta.RetentionPolicyUpdate{
|
||||
Duration: &duration,
|
||||
ShardGroupDuration: &sgDuration,
|
||||
}); err == nil {
|
||||
t.Fatal("expected error")
|
||||
} else if err != meta.ErrIncompatibleDurations {
|
||||
t.Fatalf("expected error '%s', got '%s'", meta.ErrIncompatibleDurations, err)
|
||||
}
|
||||
|
||||
// Allow any shard duration if the duration is set to zero.
|
||||
duration = time.Duration(0)
|
||||
sgDuration = 168 * time.Hour
|
||||
if err := c.UpdateRetentionPolicy("db0", "rp0", &meta.RetentionPolicyUpdate{
|
||||
Duration: &duration,
|
||||
ShardGroupDuration: &sgDuration,
|
||||
}); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetaClient_DropRetentionPolicy(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -150,6 +150,10 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf
|
|||
// do it again to verify input.
|
||||
rpi.ShardGroupDuration = normalisedShardDuration(rpi.ShardGroupDuration, rpi.Duration)
|
||||
|
||||
if rpi.Duration > 0 && rpi.Duration < rpi.ShardGroupDuration {
|
||||
return ErrIncompatibleDurations
|
||||
}
|
||||
|
||||
// Find database.
|
||||
di := data.Database(database)
|
||||
if di == nil {
|
||||
|
@ -231,6 +235,15 @@ func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPol
|
|||
return ErrRetentionPolicyDurationTooLow
|
||||
}
|
||||
|
||||
// Enforce duration is at least the shard duration
|
||||
if (rpu.Duration != nil && *rpu.Duration > 0 &&
|
||||
((rpu.ShardGroupDuration != nil && *rpu.Duration < *rpu.ShardGroupDuration) ||
|
||||
(rpu.ShardGroupDuration == nil && *rpu.Duration < rpi.ShardGroupDuration))) ||
|
||||
(rpu.Duration == nil && rpi.Duration > 0 &&
|
||||
rpu.ShardGroupDuration != nil && rpi.Duration < *rpu.ShardGroupDuration) {
|
||||
return ErrIncompatibleDurations
|
||||
}
|
||||
|
||||
// Update fields.
|
||||
if rpu.Name != nil {
|
||||
rpi.Name = *rpu.Name
|
||||
|
@ -241,11 +254,8 @@ func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPol
|
|||
if rpu.ReplicaN != nil {
|
||||
rpi.ReplicaN = *rpu.ReplicaN
|
||||
}
|
||||
|
||||
if rpu.ShardGroupDuration != nil {
|
||||
rpi.ShardGroupDuration = *rpu.ShardGroupDuration
|
||||
} else {
|
||||
rpi.ShardGroupDuration = shardGroupDuration(rpi.Duration)
|
||||
rpi.ShardGroupDuration = normalisedShardDuration(*rpu.ShardGroupDuration, rpi.Duration)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -55,6 +55,11 @@ var (
|
|||
// with an existing policy.
|
||||
ErrRetentionPolicyConflict = errors.New("retention policy conflicts with an existing policy")
|
||||
|
||||
// ErrIncompatibleDurations is returned when creating or updating a
|
||||
// retention policy that has a duration lower than the current shard
|
||||
// duration.
|
||||
ErrIncompatibleDurations = errors.New("retention policy duration must be greater than the shard duration")
|
||||
|
||||
// ErrReplicationFactorTooLow is returned when the replication factor is not in an
|
||||
// acceptable range.
|
||||
ErrReplicationFactorTooLow = errors.New("replication factor must be greater than 0")
|
||||
|
|
|
@ -12,6 +12,8 @@ import (
|
|||
|
||||
const (
|
||||
DefaultHTTPTimeout = 30 * time.Second
|
||||
DefaultWriteConcurrency = 40
|
||||
DefaultWriteBufferSize = 1000
|
||||
)
|
||||
|
||||
// Config represents a configuration of the subscriber service.
|
||||
|
@ -28,6 +30,12 @@ type Config struct {
|
|||
// configure the path to the PEM encoded CA certs file. If the
|
||||
// empty string, the default system certs will be used
|
||||
CaCerts string `toml:"ca-certs"`
|
||||
|
||||
// The number of writer goroutines processing the write channel.
|
||||
WriteConcurrency int `toml:"write-concurrency"`
|
||||
|
||||
// The number of in-flight writes buffered in the write channel.
|
||||
WriteBufferSize int `toml:"write-buffer-size"`
|
||||
}
|
||||
|
||||
// NewConfig returns a new instance of a subscriber config.
|
||||
|
@ -37,6 +45,8 @@ func NewConfig() Config {
|
|||
HTTPTimeout: toml.Duration(DefaultHTTPTimeout),
|
||||
InsecureSkipVerify: false,
|
||||
CaCerts: "",
|
||||
WriteConcurrency: DefaultWriteConcurrency,
|
||||
WriteBufferSize: DefaultWriteBufferSize,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,6 +54,7 @@ func (c Config) Validate() error {
|
|||
if c.HTTPTimeout <= 0 {
|
||||
return errors.New("http-timeout must be greater than 0")
|
||||
}
|
||||
|
||||
if c.CaCerts != "" && !fileExists(c.CaCerts) {
|
||||
abspath, err := filepath.Abs(c.CaCerts)
|
||||
if err != nil {
|
||||
|
@ -51,6 +62,15 @@ func (c Config) Validate() error {
|
|||
}
|
||||
return fmt.Errorf("ca-certs file %s does not exist", abspath)
|
||||
}
|
||||
|
||||
if c.WriteBufferSize <= 0 {
|
||||
return errors.New("write-buffer-size must be greater than 0")
|
||||
}
|
||||
|
||||
if c.WriteConcurrency <= 0 {
|
||||
return errors.New("write-concurrency must be greater than 0")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -42,6 +42,8 @@ http-timeout = "60s"
|
|||
enabled = true
|
||||
ca-certs = '%s'
|
||||
insecure-skip-verify = true
|
||||
write-buffer-size = 1000
|
||||
write-concurrency = 10
|
||||
`, abspath), &c); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -87,6 +89,8 @@ http-timeout = "60s"
|
|||
enabled = true
|
||||
ca-certs = '%s'
|
||||
insecure-skip-verify = false
|
||||
write-buffer-size = 1000
|
||||
write-concurrency = 10
|
||||
`, tmpfile.Name()), &c); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -24,7 +24,8 @@ const (
|
|||
)
|
||||
|
||||
// PointsWriter is an interface for writing points to a subscription destination.
|
||||
// Only WritePoints() needs to be satisfied.
|
||||
// Only WritePoints() needs to be satisfied. PointsWriter implementations
|
||||
// must be goroutine safe.
|
||||
type PointsWriter interface {
|
||||
WritePoints(p *coordinator.WritePointsRequest) error
|
||||
}
|
||||
|
@ -287,17 +288,19 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) error {
|
|||
return err
|
||||
}
|
||||
cw := chanWriter{
|
||||
writeRequests: make(chan *coordinator.WritePointsRequest, 100),
|
||||
writeRequests: make(chan *coordinator.WritePointsRequest, s.conf.WriteBufferSize),
|
||||
pw: sub,
|
||||
pointsWritten: &s.stats.PointsWritten,
|
||||
failures: &s.stats.WriteFailures,
|
||||
logger: s.Logger,
|
||||
}
|
||||
for i := 0; i < s.conf.WriteConcurrency; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
cw.Run()
|
||||
}()
|
||||
}
|
||||
s.subs[se] = cw
|
||||
s.Logger.Println("added new subscription for", se.db, se.rp)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue