Support return of shard groups for timerange
parent
5626c2c725
commit
64ace983e8
20
meta/data.go
20
meta/data.go
|
@ -254,6 +254,26 @@ func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error)
|
|||
return groups, nil
|
||||
}
|
||||
|
||||
// ShardGroupsByTimeRange returns a list of all shard groups on a database and policy that may contain data
|
||||
// for the specified time range.
|
||||
func (data *Data) ShardGroupsByTimeRange(database, policy string, tmin, tmax time.Time) ([]ShardGroupInfo, error) {
|
||||
// Find retention policy.
|
||||
rpi, err := data.RetentionPolicy(database, policy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if rpi == nil {
|
||||
return nil, ErrRetentionPolicyNotFound
|
||||
}
|
||||
groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
|
||||
for _, g := range rpi.ShardGroups {
|
||||
if g.Deleted() || !g.Overlaps(tmin, tmax) {
|
||||
continue
|
||||
}
|
||||
groups = append(groups, g)
|
||||
}
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
// ShardGroupByTimestamp returns the shard group on a database and policy for a given timestamp.
|
||||
func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
|
||||
// Find retention policy.
|
||||
|
|
|
@ -863,6 +863,20 @@ func (s *Store) ShardGroups(database, policy string) (a []ShardGroupInfo, err er
|
|||
return
|
||||
}
|
||||
|
||||
// ShardGroupsByTimeRange returns a slice of ShardGroups that may contain data for the given time range
|
||||
func (s *Store) ShardGroupsByTimeRange(database, policy string, tmin, tmax time.Time) (a []ShardGroupInfo, err error) {
|
||||
err = s.read(func(data *Data) error {
|
||||
a, err = data.ShardGroupsByTimeRange(database, policy, tmin, tmax)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if a == nil {
|
||||
return errInvalidate
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// VisitRetentionPolicies calls the given function with full retention policy details.
|
||||
func (s *Store) VisitRetentionPolicies(f func(d DatabaseInfo, r RetentionPolicyInfo)) {
|
||||
s.read(func(data *Data) error {
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -389,6 +390,75 @@ func TestStore_CreateShardGroup(t *testing.T) {
|
|||
} else if sgi.ID != 1 {
|
||||
t.Fatalf("unexpected shard group: %#v", sgi)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestStore_ShardGroupsRetrieval(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := MustOpenStore()
|
||||
defer s.Close()
|
||||
|
||||
// Create resources for testing.
|
||||
if _, err := s.CreateNode("host0"); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := s.CreateDatabase("db0"); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if sgi, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sgi.ID != 1 {
|
||||
t.Fatalf("unexpected shard group: %#v", sgi)
|
||||
}
|
||||
|
||||
// Function to compare actual and expected. Works on integers only, as results require sorting.
|
||||
assertShardGroupsInTimeRange := func(database, policy string, actualGroups []meta.ShardGroupInfo, expectedGroupIDs []int) {
|
||||
if len(actualGroups) != len(expectedGroupIDs) {
|
||||
t.Fatalf(("number of actual groups (%d) does not equal number expected groups (%d)"), len(actualGroups), len(expectedGroupIDs))
|
||||
}
|
||||
|
||||
actualGroupIDs := []int{}
|
||||
for i := range actualGroups {
|
||||
actualGroupIDs = append(actualGroupIDs, int(actualGroups[i].ID))
|
||||
}
|
||||
|
||||
sort.Ints(actualGroupIDs)
|
||||
sort.Ints(expectedGroupIDs)
|
||||
for i := range actualGroupIDs {
|
||||
if actualGroupIDs[i] != expectedGroupIDs[i] {
|
||||
t.Fatalf("actual group IDs (%v) does not match expected group IDs (%v)", actualGroupIDs, expectedGroupIDs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check that it is returned correctly when requested.
|
||||
if sgs, err := s.ShardGroups("db0", "rp0"); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
assertShardGroupsInTimeRange("db0", "rp0", sgs, []int{1})
|
||||
}
|
||||
|
||||
if sgs, err := s.ShardGroupsByTimeRange("db0", "rp0", time.Date(1999, time.January, 1, 0, 0, 0, 0, time.UTC), time.Date(1999, time.January, 2, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
assertShardGroupsInTimeRange("db0", "rp0", sgs, []int{})
|
||||
}
|
||||
if sgs, err := s.ShardGroupsByTimeRange("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
assertShardGroupsInTimeRange("db0", "rp0", sgs, []int{1})
|
||||
}
|
||||
if sgs, err := s.ShardGroupsByTimeRange("db0", "rp0", time.Date(1999, time.January, 1, 0, 0, 0, 0, time.UTC), time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
assertShardGroupsInTimeRange("db0", "rp0", sgs, []int{1})
|
||||
}
|
||||
if sgs, err := s.ShardGroupsByTimeRange("db0", "rp0", time.Date(2002, time.January, 1, 0, 0, 0, 0, time.UTC), time.Date(2002, time.January, 2, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
assertShardGroupsInTimeRange("db0", "rp0", sgs, []int{})
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the store can delete an existing shard group.
|
||||
|
|
Loading…
Reference in New Issue