fix: allow limiting shards during flux iterator reads (#23052) (#23053)

(cherry picked from commit 3ce6cb45aa)
pull/23059/head
Sam Arnold 2022-01-12 19:43:30 -05:00 committed by GitHub
parent 367b0c2a2d
commit 30d6aec839
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 46 additions and 7 deletions

View File

@ -109,24 +109,45 @@ func (s *Store) validateArgs(database, rp string, start, end int64) (string, str
return database, rp, start, end, nil
}
func (s *Store) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error) {
func limitToShardIDs(list []uint64, allowed []uint64) []uint64 {
shardOk := make(map[uint64]struct{}, len(allowed))
for _, id := range allowed {
shardOk[id] = struct{}{}
}
newShardIDs := make([]uint64, 0, len(list))
for _, id := range list {
if _, ok := shardOk[id]; ok {
newShardIDs = append(newShardIDs, id)
}
}
return newShardIDs
}
func (s *Store) ReadFilterLimit(ctx context.Context, req *datatypes.ReadFilterRequest, limitShardIDs []uint64) (reads.ResultSet, error) {
if req.ReadSource == nil {
return nil, errors.New("missing read source")
}
var database, rp string
var start, end int64
var shardIDs []uint64
source, err := GetReadSource(req.ReadSource)
if err != nil {
return nil, err
}
database, rp, start, end, err = s.validateArgs(source.Database, source.RetentionPolicy, req.Range.GetStart(), req.Range.GetEnd())
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.Range.GetStart(), req.Range.GetEnd())
shardIDs, err = s.findShardIDs(database, rp, false, start, end)
if err != nil {
return nil, err
}
shardIDs, err := s.findShardIDs(database, rp, false, start, end)
if err != nil {
return nil, err
if len(limitShardIDs) > 0 {
shardIDs = limitToShardIDs(shardIDs, limitShardIDs)
}
if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil
return nil, nil
@ -149,7 +170,11 @@ func (s *Store) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest
return reads.NewFilteredResultSet(ctx, req.Range.GetStart(), req.Range.GetEnd(), cur), nil
}
func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) (reads.GroupResultSet, error) {
func (s *Store) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error) {
return s.ReadFilterLimit(ctx, req, nil)
}
func (s *Store) ReadGroupLimit(ctx context.Context, req *datatypes.ReadGroupRequest, limitShardIDs []uint64) (reads.GroupResultSet, error) {
if req.ReadSource == nil {
return nil, errors.New("missing read source")
}
@ -172,6 +197,9 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest)
if err != nil {
return nil, err
}
if len(limitShardIDs) > 0 {
shardIDs = limitToShardIDs(shardIDs, limitShardIDs)
}
if len(shardIDs) == 0 {
return nil, nil
}
@ -199,7 +227,11 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest)
return rs, nil
}
func (s *Store) WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAggregateRequest) (reads.ResultSet, error) {
func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) (reads.GroupResultSet, error) {
return s.ReadGroupLimit(ctx, req, nil)
}
func (s *Store) WindowAggregateLimit(ctx context.Context, req *datatypes.ReadWindowAggregateRequest, limitShardIDs []uint64) (reads.ResultSet, error) {
if req.ReadSource == nil {
return nil, errors.New("missing read source")
}
@ -222,6 +254,9 @@ func (s *Store) WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAg
if err != nil {
return nil, err
}
if len(limitShardIDs) > 0 {
shardIDs = limitToShardIDs(shardIDs, limitShardIDs)
}
if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil
return nil, nil
}
@ -238,6 +273,10 @@ func (s *Store) WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAg
return reads.NewWindowAggregateResultSet(ctx, req, cur)
}
func (s *Store) WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAggregateRequest) (reads.ResultSet, error) {
return s.WindowAggregateLimit(ctx, req, nil)
}
func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error) {
if req.TagsSource == nil {
return nil, errors.New("missing read source")