feat(http): buckets now use duration strings

pull/10616/head
Chris Goller 2018-09-25 11:16:24 -05:00
parent 2e7ce4e0bd
commit e936671c67
5 changed files with 356 additions and 38 deletions

View File

@ -6,9 +6,11 @@ import (
"time"
)
// BucketType defines known system-buckets.
type BucketType int
const (
// BucketTypeLogs defines the bucket ID of the system logs.
BucketTypeLogs = BucketType(iota + 10)
)

View File

@ -35,9 +35,94 @@ func NewBucketHandler() *BucketHandler {
return h
}
// bucket is used for serialization/deserialization with duration string syntax.
type bucket struct {
ID platform.ID `json:"id,omitempty"`
OrganizationID platform.ID `json:"organizationID,omitempty"`
Organization string `json:"organization,omitempty"`
Name string `json:"name"`
RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources
RetentionPeriod string `json:"retentionPeriod"`
}
func (b *bucket) toPlatform() (*platform.Bucket, error) {
if b == nil {
return nil, nil
}
d, err := ParseDuration(b.RetentionPeriod)
if err != nil {
return nil, err
}
return &platform.Bucket{
ID: b.ID,
OrganizationID: b.OrganizationID,
Organization: b.Organization,
Name: b.Name,
RetentionPolicyName: b.RetentionPolicyName,
RetentionPeriod: d,
}, nil
}
func newBucket(pb *platform.Bucket) *bucket {
if pb == nil {
return nil
}
return &bucket{
ID: pb.ID,
OrganizationID: pb.OrganizationID,
Organization: pb.Organization,
Name: pb.Name,
RetentionPolicyName: pb.RetentionPolicyName,
RetentionPeriod: FormatDuration(pb.RetentionPeriod),
}
}
// bucketUpdate is used for serialization/deserialization with duration string syntax.
type bucketUpdate struct {
Name *string `json:"name,omitempty"`
RetentionPeriod *string `json:"retentionPeriod,omitempty"`
}
func (b *bucketUpdate) toPlatform() (*platform.BucketUpdate, error) {
if b == nil {
return nil, nil
}
up := &platform.BucketUpdate{
Name: b.Name,
}
if b.RetentionPeriod != nil {
d, err := ParseDuration(*b.RetentionPeriod)
if err != nil {
return nil, err
}
up.RetentionPeriod = &d
}
return up, nil
}
func newBucketUpdate(pb *platform.BucketUpdate) *bucketUpdate {
if pb == nil {
return nil
}
up := &bucketUpdate{
Name: pb.Name,
}
if pb.RetentionPeriod != nil {
d := FormatDuration(*pb.RetentionPeriod)
up.RetentionPeriod = &d
}
return up
}
type bucketResponse struct {
Links map[string]string `json:"links"`
platform.Bucket
bucket
}
func newBucketResponse(b *platform.Bucket) *bucketResponse {
@ -46,7 +131,7 @@ func newBucketResponse(b *platform.Bucket) *bucketResponse {
"self": fmt.Sprintf("/v1/buckets/%s", b.ID),
"org": fmt.Sprintf("/v1/orgs/%s", b.OrganizationID),
},
Bucket: *b,
bucket: *newBucket(b),
}
}
@ -103,14 +188,20 @@ func (b postBucketRequest) Validate() error {
}
func decodePostBucketRequest(ctx context.Context, r *http.Request) (*postBucketRequest, error) {
b := &platform.Bucket{}
b := &bucket{}
if err := json.NewDecoder(r.Body).Decode(b); err != nil {
return nil, err
}
req := &postBucketRequest{
Bucket: b,
pb, err := b.toPlatform()
if err != nil {
return nil, err
}
req := &postBucketRequest{
Bucket: pb,
}
return req, req.Validate()
}
@ -305,13 +396,18 @@ func decodePatchBucketRequest(ctx context.Context, r *http.Request) (*patchBucke
return nil, err
}
var upd platform.BucketUpdate
if err := json.NewDecoder(r.Body).Decode(&upd); err != nil {
bu := &bucketUpdate{}
if err := json.NewDecoder(r.Body).Decode(bu); err != nil {
return nil, err
}
upd, err := bu.toPlatform()
if err != nil {
return nil, err
}
return &patchBucketRequest{
Update: upd,
Update: *upd,
BucketID: i,
}, nil
}
@ -350,13 +446,12 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*pl
return nil, err
}
var b platform.Bucket
if err := json.NewDecoder(resp.Body).Decode(&b); err != nil {
var br bucketResponse
if err := json.NewDecoder(resp.Body).Decode(&br); err != nil {
return nil, err
}
defer resp.Body.Close()
return &b, nil
return br.toPlatform()
}
// FindBucket returns the first bucket that matches filter.
@ -421,7 +516,12 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketF
buckets := make([]*platform.Bucket, 0, len(bs.Buckets))
for _, b := range bs.Buckets {
buckets = append(buckets, &b.Bucket)
pb, err := b.bucket.toPlatform()
if err != nil {
return nil, 0, err
}
buckets = append(buckets, pb)
}
return buckets, len(buckets), nil
@ -434,7 +534,7 @@ func (s *BucketService) CreateBucket(ctx context.Context, b *platform.Bucket) er
return err
}
octets, err := json.Marshal(b)
octets, err := json.Marshal(newBucket(b))
if err != nil {
return err
}
@ -459,11 +559,13 @@ func (s *BucketService) CreateBucket(ctx context.Context, b *platform.Bucket) er
return err
}
if err := json.NewDecoder(resp.Body).Decode(b); err != nil {
var br bucketResponse
if err := json.NewDecoder(resp.Body).Decode(&br); err != nil {
return err
}
return nil
b, err = br.toPlatform()
return err
}
// UpdateBucket updates a single bucket with changeset.
@ -474,7 +576,8 @@ func (s *BucketService) UpdateBucket(ctx context.Context, id platform.ID, upd pl
return nil, err
}
octets, err := json.Marshal(upd)
bu := newBucketUpdate(&upd)
octets, err := json.Marshal(bu)
if err != nil {
return nil, err
}
@ -498,13 +601,12 @@ func (s *BucketService) UpdateBucket(ctx context.Context, id platform.ID, upd pl
return nil, err
}
var b platform.Bucket
if err := json.NewDecoder(resp.Body).Decode(&b); err != nil {
var br bucketResponse
if err := json.NewDecoder(resp.Body).Decode(&br); err != nil {
return nil, err
}
defer resp.Body.Close()
return &b, nil
return br.toPlatform()
}
// DeleteBucket removes a bucket by ID.

View File

@ -44,14 +44,16 @@ func TestService_handleGetBuckets(t *testing.T) {
FindBucketsFn: func(ctx context.Context, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, int, error) {
return []*platform.Bucket{
{
ID: platform.ID("0"),
Name: "hello",
OrganizationID: platform.ID("10"),
ID: platform.ID("0"),
Name: "hello",
OrganizationID: platform.ID("10"),
RetentionPeriod: 2 * time.Second,
},
{
ID: platform.ID("2"),
Name: "example",
OrganizationID: platform.ID("20"),
ID: platform.ID("2"),
Name: "example",
OrganizationID: platform.ID("20"),
RetentionPeriod: 24 * time.Hour,
},
}, 2, nil
},
@ -75,7 +77,7 @@ func TestService_handleGetBuckets(t *testing.T) {
"id": "30",
"organizationID": "3130",
"name": "hello",
"retentionPeriod": 0
"retentionPeriod": "2s"
},
{
"links": {
@ -85,7 +87,7 @@ func TestService_handleGetBuckets(t *testing.T) {
"id": "32",
"organizationID": "3230",
"name": "example",
"retentionPeriod": 0
"retentionPeriod": "1d"
}
]
}
@ -179,9 +181,10 @@ func TestService_handleGetBucket(t *testing.T) {
FindBucketByIDFn: func(ctx context.Context, id platform.ID) (*platform.Bucket, error) {
if bytes.Equal(id, mustParseID("020f755c3c082000")) {
return &platform.Bucket{
ID: mustParseID("020f755c3c082000"),
OrganizationID: mustParseID("020f755c3c082000"),
Name: "hello",
ID: mustParseID("020f755c3c082000"),
OrganizationID: mustParseID("020f755c3c082000"),
Name: "hello",
RetentionPeriod: 30 * time.Second,
}, nil
}
@ -204,7 +207,7 @@ func TestService_handleGetBucket(t *testing.T) {
"id": "020f755c3c082000",
"organizationID": "020f755c3c082000",
"name": "hello",
"retentionPeriod": 0
"retentionPeriod": "30s"
}
`,
},
@ -313,7 +316,7 @@ func TestService_handlePostBucket(t *testing.T) {
"id": "020f755c3c082000",
"organizationID": "30",
"name": "hello",
"retentionPeriod": 0
"retentionPeriod": "0s"
}
`,
},
@ -325,7 +328,7 @@ func TestService_handlePostBucket(t *testing.T) {
h := NewBucketHandler()
h.BucketService = tt.fields.BucketService
b, err := json.Marshal(tt.args.bucket)
b, err := json.Marshal(newBucket(tt.args.bucket))
if err != nil {
t.Fatalf("failed to unmarshal bucket: %v", err)
}
@ -340,7 +343,8 @@ func TestService_handlePostBucket(t *testing.T) {
body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != tt.wants.statusCode {
t.Errorf("%q. handlePostBucket() = %v, want %v", tt.name, res.StatusCode, tt.wants.statusCode)
msg := res.Header.Get(ErrorHeader)
t.Errorf("%q. handlePostBucket() = %v, want %v: %s", tt.name, res.StatusCode, tt.wants.statusCode, msg)
}
if tt.wants.contentType != "" && content != tt.wants.contentType {
t.Errorf("%q. handlePostBucket() = %v, want %v", tt.name, content, tt.wants.contentType)
@ -512,7 +516,7 @@ func TestService_handlePatchBucket(t *testing.T) {
"id": "020f755c3c082000",
"organizationID": "020f755c3c082000",
"name": "example",
"retentionPeriod": 1234
"retentionPeriod": "1u"
}
`,
},
@ -549,7 +553,7 @@ func TestService_handlePatchBucket(t *testing.T) {
upd.RetentionPeriod = &tt.args.retention
}
b, err := json.Marshal(upd)
b, err := json.Marshal(newBucketUpdate(&upd))
if err != nil {
t.Fatalf("failed to unmarshal bucket update: %v", err)
}

133
http/duration.go Normal file
View File

@ -0,0 +1,133 @@
package http
import (
"fmt"
"strconv"
"time"
"github.com/influxdata/platform/kit/errors"
)
// ErrInvalidDuration is returned when parsing a malformatted duration.
var ErrInvalidDuration = errors.New("invalid duration", errors.MalformedData)
// ParseDuration parses a time duration from a string.
// This is needed instead of time.ParseDuration because this will support
// the full syntax that InfluxQL supports for specifying durations
// including weeks and days.
func ParseDuration(s string) (time.Duration, error) {
// Return an error if the string is blank or one character
if len(s) < 2 {
return 0, ErrInvalidDuration
}
// Split string into individual runes.
a := split(s)
// Start with a zero duration.
var d time.Duration
i := 0
// Check for a negative.
isNegative := false
if a[i] == '-' {
isNegative = true
i++
}
var measure int64
var unit string
// Parsing loop.
for i < len(a) {
// Find the number portion.
start := i
for ; i < len(a) && isDigit(a[i]); i++ {
// Scan for the digits.
}
// Check if we reached the end of the string prematurely.
if i >= len(a) || i == start {
return 0, ErrInvalidDuration
}
// Parse the numeric part.
n, err := strconv.ParseInt(string(a[start:i]), 10, 64)
if err != nil {
return 0, ErrInvalidDuration
}
measure = n
// Extract the unit of measure.
// If the last two characters are "ms" then parse as milliseconds.
// Otherwise just use the last character as the unit of measure.
unit = string(a[i])
switch a[i] {
case 'u', 'µ':
d += time.Duration(n) * time.Microsecond
case 'm':
if i+1 < len(a) && a[i+1] == 's' {
unit = string(a[i : i+2])
d += time.Duration(n) * time.Millisecond
i += 2
continue
}
d += time.Duration(n) * time.Minute
case 's':
d += time.Duration(n) * time.Second
case 'h':
d += time.Duration(n) * time.Hour
case 'd':
d += time.Duration(n) * 24 * time.Hour
case 'w':
d += time.Duration(n) * 7 * 24 * time.Hour
default:
return 0, ErrInvalidDuration
}
i++
}
// Check to see if we overflowed a duration
if d < 0 && !isNegative {
return 0, fmt.Errorf("overflowed duration %d%s: choose a smaller duration or INF", measure, unit)
}
if isNegative {
d = -d
}
return d, nil
}
// FormatDuration formats a duration to a string.
func FormatDuration(d time.Duration) string {
if d == 0 {
return "0s"
} else if d%(7*24*time.Hour) == 0 {
return fmt.Sprintf("%dw", d/(7*24*time.Hour))
} else if d%(24*time.Hour) == 0 {
return fmt.Sprintf("%dd", d/(24*time.Hour))
} else if d%time.Hour == 0 {
return fmt.Sprintf("%dh", d/time.Hour)
} else if d%time.Minute == 0 {
return fmt.Sprintf("%dm", d/time.Minute)
} else if d%time.Second == 0 {
return fmt.Sprintf("%ds", d/time.Second)
} else if d%time.Millisecond == 0 {
return fmt.Sprintf("%dms", d/time.Millisecond)
}
// Although we accept both "u" and "µ" when reading microsecond durations,
// we output with "u", which can be represented in 1 byte,
// instead of "µ", which requires 2 bytes.
return fmt.Sprintf("%du", d/time.Microsecond)
}
// split splits a string into a slice of runes.
func split(s string) (a []rune) {
for _, ch := range s {
a = append(a, ch)
}
return
}
// isDigit returns true if the rune is a digit.
func isDigit(ch rune) bool { return (ch >= '0' && ch <= '9') }

77
http/duration_test.go Normal file
View File

@ -0,0 +1,77 @@
package http
import (
"fmt"
"testing"
"time"
)
// Ensure a time duration can be parsed.
func TestParseDuration(t *testing.T) {
var tests = []struct {
s string
want time.Duration
wantErr bool
}{
{s: `10u`, want: 10 * time.Microsecond},
{s: `10µ`, want: 10 * time.Microsecond},
{s: `15ms`, want: 15 * time.Millisecond},
{s: `100s`, want: 100 * time.Second},
{s: `2m`, want: 2 * time.Minute},
{s: `2h`, want: 2 * time.Hour},
{s: `2d`, want: 2 * 24 * time.Hour},
{s: `2w`, want: 2 * 7 * 24 * time.Hour},
{s: `1h30m`, want: time.Hour + 30*time.Minute},
{s: `30ms3000u`, want: 30*time.Millisecond + 3000*time.Microsecond},
{s: `-5s`, want: -5 * time.Second},
{s: `-5m30s`, want: -5*time.Minute - 30*time.Second},
{s: ``, wantErr: true},
{s: `3`, wantErr: true},
{s: `1000`, wantErr: true},
{s: `w`, wantErr: true},
{s: `ms`, wantErr: true},
{s: `1.2w`, wantErr: true},
{s: `10x`, wantErr: true},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
got, err := ParseDuration(tt.s)
if (err != nil) != tt.wantErr {
t.Errorf("ParseDuration() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("ParseDuration() = %v, want %v", got, tt.want)
}
})
}
}
// Ensure a time duration can be formatted.
func TestFormatDuration(t *testing.T) {
var tests = []struct {
d time.Duration
want string
}{
{d: 3 * time.Microsecond, want: `3u`},
{d: 1001 * time.Microsecond, want: `1001u`},
{d: 15 * time.Millisecond, want: `15ms`},
{d: 100 * time.Second, want: `100s`},
{d: 2 * time.Minute, want: `2m`},
{d: 2 * time.Hour, want: `2h`},
{d: 2 * 24 * time.Hour, want: `2d`},
{d: 2 * 7 * 24 * time.Hour, want: `2w`},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
got := FormatDuration(tt.d)
if got != tt.want {
t.Errorf("FormatDuration() = %s, want %s", got, tt.want)
}
})
}
}