Merge pull request #6693 from influxdata/js-6599-fix-shard-groups-near-max-time
Truncate the shard group end time if it exceeds MaxNanoTimepull/6729/head
commit
1d467ab2e3
|
@ -45,6 +45,7 @@
|
|||
- [#6702](https://github.com/influxdata/influxdb/issues/6702): Fix SELECT statement required privileges.
|
||||
- [#6701](https://github.com/influxdata/influxdb/issues/6701): Filter out sources that do not match the shard database/retention policy.
|
||||
- [#6683](https://github.com/influxdata/influxdb/issues/6683): Fix compaction planning re-compacting large TSM files
|
||||
- [#6693](https://github.com/influxdata/influxdb/pull/6693): Truncate the shard group end time if it exceeds MaxNanoTime.
|
||||
|
||||
## v0.13.0 [2016-05-12]
|
||||
|
||||
|
|
|
@ -6149,6 +6149,50 @@ func TestServer_Query_DuplicateMeasurements(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServer_Query_LargeTimestamp(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenDefaultServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`cpu value=100 %d`, models.MaxNanoTime.UnixNano()),
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.writes = Writes{
|
||||
&Write{data: strings.Join(writes, "\n")},
|
||||
}
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: `select value at max nano time`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: fmt.Sprintf(`SELECT value FROM cpu WHERE time <= %d`, models.MaxNanoTime.UnixNano()),
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["` + models.MaxNanoTime.Format(time.RFC3339Nano) + `",100]]}]}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
if err := test.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
|
||||
// Open a new server with the same configuration file.
|
||||
// This is to ensure the meta data was marshaled correctly.
|
||||
s2 := OpenServer(s.Config)
|
||||
defer s2.Close()
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
if err := query.Execute(s); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This test reproduced a data race with closing the
|
||||
// Subscriber points channel while writes were in-flight in the PointsWriter.
|
||||
func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
internal "github.com/influxdata/influxdb/services/meta/internal"
|
||||
)
|
||||
|
||||
|
@ -371,6 +372,9 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time)
|
|||
sgi.ID = data.MaxShardGroupID
|
||||
sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
|
||||
sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
|
||||
if sgi.EndTime.After(models.MaxNanoTime) {
|
||||
sgi.EndTime = models.MaxNanoTime
|
||||
}
|
||||
|
||||
data.MaxShardID++
|
||||
sgi.Shards = []ShardInfo{
|
||||
|
|
Loading…
Reference in New Issue