use tracing.StartSpanFromContext
parent
8d146db9ad
commit
603a1f26e0
|
@ -3,9 +3,8 @@ package authorizer
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
"github.com/influxdata/influxdb"
|
"github.com/influxdata/influxdb"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ influxdb.BucketService = (*BucketService)(nil)
|
var _ influxdb.BucketService = (*BucketService)(nil)
|
||||||
|
@ -28,7 +27,7 @@ func newBucketPermission(a influxdb.Action, orgID, id influxdb.ID) (*influxdb.Pe
|
||||||
}
|
}
|
||||||
|
|
||||||
func authorizeReadBucket(ctx context.Context, orgID, id influxdb.ID) error {
|
func authorizeReadBucket(ctx context.Context, orgID, id influxdb.ID) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "authorizeReadBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
p, err := newBucketPermission(influxdb.ReadAction, orgID, id)
|
p, err := newBucketPermission(influxdb.ReadAction, orgID, id)
|
||||||
|
@ -58,7 +57,7 @@ func authorizeWriteBucket(ctx context.Context, orgID, id influxdb.ID) error {
|
||||||
|
|
||||||
// FindBucketByID checks to see if the authorizer on context has read access to the id provided.
|
// FindBucketByID checks to see if the authorizer on context has read access to the id provided.
|
||||||
func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
|
func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucketByID")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
b, err := s.s.FindBucketByID(ctx, id)
|
b, err := s.s.FindBucketByID(ctx, id)
|
||||||
|
@ -75,7 +74,7 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*in
|
||||||
|
|
||||||
// FindBucket retrieves the bucket and checks to see if the authorizer on context has read access to the bucket.
|
// FindBucket retrieves the bucket and checks to see if the authorizer on context has read access to the bucket.
|
||||||
func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
|
func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
b, err := s.s.FindBucket(ctx, filter)
|
b, err := s.s.FindBucket(ctx, filter)
|
||||||
|
@ -92,7 +91,7 @@ func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFi
|
||||||
|
|
||||||
// FindBuckets retrieves all buckets that match the provided filter and then filters the list down to only the resources that are authorized.
|
// FindBuckets retrieves all buckets that match the provided filter and then filters the list down to only the resources that are authorized.
|
||||||
func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
|
func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBuckets")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// TODO: we'll likely want to push this operation into the database eventually since fetching the whole list of data
|
// TODO: we'll likely want to push this operation into the database eventually since fetching the whole list of data
|
||||||
|
@ -123,7 +122,7 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketF
|
||||||
|
|
||||||
// CreateBucket checks to see if the authorizer on context has write access to the global buckets resource.
|
// CreateBucket checks to see if the authorizer on context has write access to the global buckets resource.
|
||||||
func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
|
func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.CreateBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
p, err := influxdb.NewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, b.OrganizationID)
|
p, err := influxdb.NewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, b.OrganizationID)
|
||||||
|
|
|
@ -6,11 +6,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
bolt "github.com/coreos/bbolt"
|
"github.com/coreos/bbolt"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
platformcontext "github.com/influxdata/influxdb/context"
|
platformcontext "github.com/influxdata/influxdb/context"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -32,7 +32,7 @@ func (c *Client) initializeBuckets(ctx context.Context, tx *bolt.Tx) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) setOrganizationOnBucket(ctx context.Context, tx *bolt.Tx, b *platform.Bucket) *platform.Error {
|
func (c *Client) setOrganizationOnBucket(ctx context.Context, tx *bolt.Tx, b *platform.Bucket) *platform.Error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.setOrganizationOnBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
o, err := c.findOrganizationByID(ctx, tx, b.OrganizationID)
|
o, err := c.findOrganizationByID(ctx, tx, b.OrganizationID)
|
||||||
|
@ -47,7 +47,7 @@ func (c *Client) setOrganizationOnBucket(ctx context.Context, tx *bolt.Tx, b *pl
|
||||||
|
|
||||||
// FindBucketByID retrieves a bucket by id.
|
// FindBucketByID retrieves a bucket by id.
|
||||||
func (c *Client) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) {
|
func (c *Client) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.FindBucketByID")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var b *platform.Bucket
|
var b *platform.Bucket
|
||||||
|
@ -72,7 +72,7 @@ func (c *Client) FindBucketByID(ctx context.Context, id platform.ID) (*platform.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) findBucketByID(ctx context.Context, tx *bolt.Tx, id platform.ID) (*platform.Bucket, *platform.Error) {
|
func (c *Client) findBucketByID(ctx context.Context, tx *bolt.Tx, id platform.ID) (*platform.Bucket, *platform.Error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.findBucketByID")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var b platform.Bucket
|
var b platform.Bucket
|
||||||
|
@ -111,7 +111,7 @@ func (c *Client) findBucketByID(ctx context.Context, tx *bolt.Tx, id platform.ID
|
||||||
// FindBucketByName returns a bucket by name for a particular organization.
|
// FindBucketByName returns a bucket by name for a particular organization.
|
||||||
// TODO: have method for finding bucket using organization name and bucket name.
|
// TODO: have method for finding bucket using organization name and bucket name.
|
||||||
func (c *Client) FindBucketByName(ctx context.Context, orgID platform.ID, n string) (*platform.Bucket, error) {
|
func (c *Client) FindBucketByName(ctx context.Context, orgID platform.ID, n string) (*platform.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.FindBucketByName")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var b *platform.Bucket
|
var b *platform.Bucket
|
||||||
|
@ -132,7 +132,7 @@ func (c *Client) FindBucketByName(ctx context.Context, orgID platform.ID, n stri
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) findBucketByName(ctx context.Context, tx *bolt.Tx, orgID platform.ID, n string) (*platform.Bucket, *platform.Error) {
|
func (c *Client) findBucketByName(ctx context.Context, tx *bolt.Tx, orgID platform.ID, n string) (*platform.Bucket, *platform.Error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.findBucketByName")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
b := &platform.Bucket{
|
b := &platform.Bucket{
|
||||||
|
@ -168,7 +168,7 @@ func (c *Client) findBucketByName(ctx context.Context, tx *bolt.Tx, orgID platfo
|
||||||
// Filters using ID, or OrganizationID and bucket Name should be efficient.
|
// Filters using ID, or OrganizationID and bucket Name should be efficient.
|
||||||
// Other filters will do a linear scan across buckets until it finds a match.
|
// Other filters will do a linear scan across buckets until it finds a match.
|
||||||
func (c *Client) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) {
|
func (c *Client) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.FindBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var b *platform.Bucket
|
var b *platform.Bucket
|
||||||
|
@ -257,7 +257,7 @@ func filterBucketsFn(filter platform.BucketFilter) func(b *platform.Bucket) bool
|
||||||
// Filters using ID, or OrganizationID and bucket Name should be efficient.
|
// Filters using ID, or OrganizationID and bucket Name should be efficient.
|
||||||
// Other filters will do a linear scan across all buckets searching for a match.
|
// Other filters will do a linear scan across all buckets searching for a match.
|
||||||
func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, int, error) {
|
func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.FindBuckets")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if filter.ID != nil {
|
if filter.ID != nil {
|
||||||
|
@ -296,7 +296,7 @@ func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, *platform.Error) {
|
func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, *platform.Error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.findBuckets")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
bs := []*platform.Bucket{}
|
bs := []*platform.Bucket{}
|
||||||
|
@ -345,7 +345,7 @@ func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.B
|
||||||
|
|
||||||
// CreateBucket creates a platform bucket and sets b.ID.
|
// CreateBucket creates a platform bucket and sets b.ID.
|
||||||
func (c *Client) CreateBucket(ctx context.Context, b *platform.Bucket) error {
|
func (c *Client) CreateBucket(ctx context.Context, b *platform.Bucket) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.CreateBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
|
@ -9,10 +9,10 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
bolt "github.com/coreos/bbolt"
|
"github.com/coreos/bbolt"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -114,7 +114,7 @@ func (c *Client) initializeKeyValueLog(ctx context.Context, tx *bolt.Tx) error {
|
||||||
var errKeyValueLogBoundsNotFound = fmt.Errorf("oplog not found")
|
var errKeyValueLogBoundsNotFound = fmt.Errorf("oplog not found")
|
||||||
|
|
||||||
func (c *Client) getKeyValueLogBounds(ctx context.Context, tx *bolt.Tx, key []byte) (*keyValueLogBounds, error) {
|
func (c *Client) getKeyValueLogBounds(ctx context.Context, tx *bolt.Tx, key []byte) (*keyValueLogBounds, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "Client.getKeyValueLogBounds")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
k := encodeKeyValueIndexKey(key)
|
k := encodeKeyValueIndexKey(key)
|
||||||
|
@ -149,7 +149,7 @@ func (c *Client) putKeyValueLogBounds(ctx context.Context, tx *bolt.Tx, key []by
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) updateKeyValueLogBounds(ctx context.Context, tx *bolt.Tx, k []byte, t time.Time) error {
|
func (c *Client) updateKeyValueLogBounds(ctx context.Context, tx *bolt.Tx, k []byte, t time.Time) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.updateKeyValueLogBounds")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// retrieve the keyValue log boundaries
|
// retrieve the keyValue log boundaries
|
||||||
|
|
10
bolt/kv.go
10
bolt/kv.go
|
@ -7,10 +7,10 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
bolt "github.com/coreos/bbolt"
|
"github.com/coreos/bbolt"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/kv"
|
"github.com/influxdata/influxdb/kv"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ func NewKVStore(path string) *KVStore {
|
||||||
|
|
||||||
// Open creates boltDB file it doesn't exists and opens it otherwise.
|
// Open creates boltDB file it doesn't exists and opens it otherwise.
|
||||||
func (s *KVStore) Open(ctx context.Context) error {
|
func (s *KVStore) Open(ctx context.Context) error {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "KVStore.Open")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Ensure the required directory structure exists.
|
// Ensure the required directory structure exists.
|
||||||
|
@ -102,7 +102,7 @@ func (s *KVStore) WithDB(db *bolt.DB) {
|
||||||
|
|
||||||
// View opens up a view transaction against the store.
|
// View opens up a view transaction against the store.
|
||||||
func (s *KVStore) View(ctx context.Context, fn func(tx kv.Tx) error) error {
|
func (s *KVStore) View(ctx context.Context, fn func(tx kv.Tx) error) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "KVStore.View")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
return s.db.View(func(tx *bolt.Tx) error {
|
return s.db.View(func(tx *bolt.Tx) error {
|
||||||
|
@ -115,7 +115,7 @@ func (s *KVStore) View(ctx context.Context, fn func(tx kv.Tx) error) error {
|
||||||
|
|
||||||
// Update opens up an update transaction against the store.
|
// Update opens up an update transaction against the store.
|
||||||
func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error {
|
func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "KVStore.Update")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
return s.db.Update(func(tx *bolt.Tx) error {
|
return s.db.Update(func(tx *bolt.Tx) error {
|
||||||
|
|
|
@ -6,11 +6,10 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
bolt "github.com/coreos/bbolt"
|
"github.com/coreos/bbolt"
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/influxdata/influxdb"
|
||||||
|
|
||||||
influxdb "github.com/influxdata/influxdb"
|
|
||||||
influxdbcontext "github.com/influxdata/influxdb/context"
|
influxdbcontext "github.com/influxdata/influxdb/context"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -54,7 +53,7 @@ func (c *Client) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*inf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) findOrganizationByID(ctx context.Context, tx *bolt.Tx, id influxdb.ID) (*influxdb.Organization, *influxdb.Error) {
|
func (c *Client) findOrganizationByID(ctx context.Context, tx *bolt.Tx, id influxdb.ID) (*influxdb.Organization, *influxdb.Error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "Client.findOrganizationByID")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
encodedID, err := id.Encode()
|
encodedID, err := id.Encode()
|
||||||
|
@ -100,7 +99,7 @@ func (c *Client) FindOrganizationByName(ctx context.Context, n string) (*influxd
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) findOrganizationByName(ctx context.Context, tx *bolt.Tx, n string) (*influxdb.Organization, *influxdb.Error) {
|
func (c *Client) findOrganizationByName(ctx context.Context, tx *bolt.Tx, n string) (*influxdb.Organization, *influxdb.Error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.findOrganizationByName")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
o := tx.Bucket(organizationIndex).Get(organizationIndexKey(n))
|
o := tx.Bucket(organizationIndex).Get(organizationIndexKey(n))
|
||||||
|
|
|
@ -6,14 +6,13 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/chronograf"
|
"github.com/influxdata/influxdb/chronograf"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AllDB returns all databases from within Influx
|
// AllDB returns all databases from within Influx
|
||||||
func (c *Client) AllDB(ctx context.Context) ([]chronograf.Database, error) {
|
func (c *Client) AllDB(ctx context.Context) ([]chronograf.Database, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.AllDB")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
return c.showDatabases(ctx)
|
return c.showDatabases(ctx)
|
||||||
|
@ -21,7 +20,7 @@ func (c *Client) AllDB(ctx context.Context) ([]chronograf.Database, error) {
|
||||||
|
|
||||||
// CreateDB creates a database within Influx
|
// CreateDB creates a database within Influx
|
||||||
func (c *Client) CreateDB(ctx context.Context, db *chronograf.Database) (*chronograf.Database, error) {
|
func (c *Client) CreateDB(ctx context.Context, db *chronograf.Database) (*chronograf.Database, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.CreateDB")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
_, err := c.Query(ctx, chronograf.Query{
|
_, err := c.Query(ctx, chronograf.Query{
|
||||||
|
@ -38,7 +37,7 @@ func (c *Client) CreateDB(ctx context.Context, db *chronograf.Database) (*chrono
|
||||||
|
|
||||||
// DropDB drops a database within Influx
|
// DropDB drops a database within Influx
|
||||||
func (c *Client) DropDB(ctx context.Context, db string) error {
|
func (c *Client) DropDB(ctx context.Context, db string) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.DropDB")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
_, err := c.Query(ctx, chronograf.Query{
|
_, err := c.Query(ctx, chronograf.Query{
|
||||||
|
@ -53,14 +52,14 @@ func (c *Client) DropDB(ctx context.Context, db string) error {
|
||||||
|
|
||||||
// AllRP returns all the retention policies for a specific database
|
// AllRP returns all the retention policies for a specific database
|
||||||
func (c *Client) AllRP(ctx context.Context, db string) ([]chronograf.RetentionPolicy, error) {
|
func (c *Client) AllRP(ctx context.Context, db string) ([]chronograf.RetentionPolicy, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.AllRP")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
return c.showRetentionPolicies(ctx, db)
|
return c.showRetentionPolicies(ctx, db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) getRP(ctx context.Context, db, rp string) (chronograf.RetentionPolicy, error) {
|
func (c *Client) getRP(ctx context.Context, db, rp string) (chronograf.RetentionPolicy, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.getRP")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
rs, err := c.AllRP(ctx, db)
|
rs, err := c.AllRP(ctx, db)
|
||||||
|
@ -78,7 +77,7 @@ func (c *Client) getRP(ctx context.Context, db, rp string) (chronograf.Retention
|
||||||
|
|
||||||
// CreateRP creates a retention policy for a specific database
|
// CreateRP creates a retention policy for a specific database
|
||||||
func (c *Client) CreateRP(ctx context.Context, db string, rp *chronograf.RetentionPolicy) (*chronograf.RetentionPolicy, error) {
|
func (c *Client) CreateRP(ctx context.Context, db string, rp *chronograf.RetentionPolicy) (*chronograf.RetentionPolicy, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.CreateRP")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
query := fmt.Sprintf(`CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %d`, rp.Name, db, rp.Duration, rp.Replication)
|
query := fmt.Sprintf(`CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %d`, rp.Name, db, rp.Duration, rp.Replication)
|
||||||
|
@ -108,7 +107,7 @@ func (c *Client) CreateRP(ctx context.Context, db string, rp *chronograf.Retenti
|
||||||
|
|
||||||
// UpdateRP updates a specific retention policy for a specific database
|
// UpdateRP updates a specific retention policy for a specific database
|
||||||
func (c *Client) UpdateRP(ctx context.Context, db string, rp string, upd *chronograf.RetentionPolicy) (*chronograf.RetentionPolicy, error) {
|
func (c *Client) UpdateRP(ctx context.Context, db string, rp string, upd *chronograf.RetentionPolicy) (*chronograf.RetentionPolicy, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.UpdateRP")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var buffer bytes.Buffer
|
var buffer bytes.Buffer
|
||||||
|
@ -163,7 +162,7 @@ func (c *Client) UpdateRP(ctx context.Context, db string, rp string, upd *chrono
|
||||||
|
|
||||||
// DropRP removes a specific retention policy for a specific database
|
// DropRP removes a specific retention policy for a specific database
|
||||||
func (c *Client) DropRP(ctx context.Context, db string, rp string) error {
|
func (c *Client) DropRP(ctx context.Context, db string, rp string) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.DropRP")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
_, err := c.Query(ctx, chronograf.Query{
|
_, err := c.Query(ctx, chronograf.Query{
|
||||||
|
@ -181,14 +180,14 @@ func (c *Client) DropRP(ctx context.Context, db string, rp string) error {
|
||||||
// optional limit and offset. If no limit or offset is provided, it defaults to
|
// optional limit and offset. If no limit or offset is provided, it defaults to
|
||||||
// a limit of 100 measurements with no offset.
|
// a limit of 100 measurements with no offset.
|
||||||
func (c *Client) GetMeasurements(ctx context.Context, db string, limit, offset int) ([]chronograf.Measurement, error) {
|
func (c *Client) GetMeasurements(ctx context.Context, db string, limit, offset int) ([]chronograf.Measurement, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.GetMeasurements")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
return c.showMeasurements(ctx, db, limit, offset)
|
return c.showMeasurements(ctx, db, limit, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) showDatabases(ctx context.Context) ([]chronograf.Database, error) {
|
func (c *Client) showDatabases(ctx context.Context) ([]chronograf.Database, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.showDatabases")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
res, err := c.Query(ctx, chronograf.Query{
|
res, err := c.Query(ctx, chronograf.Query{
|
||||||
|
@ -211,7 +210,7 @@ func (c *Client) showDatabases(ctx context.Context) ([]chronograf.Database, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) showRetentionPolicies(ctx context.Context, db string) ([]chronograf.RetentionPolicy, error) {
|
func (c *Client) showRetentionPolicies(ctx context.Context, db string) ([]chronograf.RetentionPolicy, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.showRetentionPolicies")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
retentionPolicies, err := c.Query(ctx, chronograf.Query{
|
retentionPolicies, err := c.Query(ctx, chronograf.Query{
|
||||||
|
@ -236,7 +235,7 @@ func (c *Client) showRetentionPolicies(ctx context.Context, db string) ([]chrono
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) showMeasurements(ctx context.Context, db string, limit, offset int) ([]chronograf.Measurement, error) {
|
func (c *Client) showMeasurements(ctx context.Context, db string, limit, offset int) ([]chronograf.Measurement, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.showMeasurements")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
show := fmt.Sprintf(`SHOW MEASUREMENTS ON "%s"`, db)
|
show := fmt.Sprintf(`SHOW MEASUREMENTS ON "%s"`, db)
|
||||||
|
|
|
@ -11,8 +11,6 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/chronograf"
|
"github.com/influxdata/influxdb/chronograf"
|
||||||
"github.com/influxdata/influxdb/kit/tracing"
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
@ -50,7 +48,7 @@ func (r Response) MarshalJSON() ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) query(ctx context.Context, u *url.URL, q chronograf.Query) (chronograf.Response, error) {
|
func (c *Client) query(ctx context.Context, u *url.URL, q chronograf.Query) (chronograf.Response, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "Client.query")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u.Path = "query"
|
u.Path = "query"
|
||||||
|
@ -141,7 +139,7 @@ type result struct {
|
||||||
// include both the database and retention policy. In-flight requests can be
|
// include both the database and retention policy. In-flight requests can be
|
||||||
// cancelled using the provided context.
|
// cancelled using the provided context.
|
||||||
func (c *Client) Query(ctx context.Context, q chronograf.Query) (chronograf.Response, error) {
|
func (c *Client) Query(ctx context.Context, q chronograf.Query) (chronograf.Response, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.Query")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
resps := make(chan (result))
|
resps := make(chan (result))
|
||||||
|
@ -160,7 +158,7 @@ func (c *Client) Query(ctx context.Context, q chronograf.Query) (chronograf.Resp
|
||||||
|
|
||||||
// Connect caches the URL and optional Bearer Authorization for the data source
|
// Connect caches the URL and optional Bearer Authorization for the data source
|
||||||
func (c *Client) Connect(ctx context.Context, src *chronograf.Source) error {
|
func (c *Client) Connect(ctx context.Context, src *chronograf.Source) error {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "Client.Connect")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := url.Parse(src.URL)
|
u, err := url.Parse(src.URL)
|
||||||
|
@ -206,7 +204,7 @@ func (c *Client) Type(ctx context.Context) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) pingTimeout(ctx context.Context) (string, string, error) {
|
func (c *Client) pingTimeout(ctx context.Context) (string, string, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.pingTimeout")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
resps := make(chan (pingResult))
|
resps := make(chan (pingResult))
|
||||||
|
@ -230,7 +228,7 @@ type pingResult struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) ping(ctx context.Context, u *url.URL) (string, string, error) {
|
func (c *Client) ping(ctx context.Context, u *url.URL) (string, string, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "Client.ping")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u.Path = "ping"
|
u.Path = "ping"
|
||||||
|
@ -280,7 +278,7 @@ func (c *Client) ping(ctx context.Context, u *url.URL) (string, string, error) {
|
||||||
|
|
||||||
// Write POSTs line protocol to a database and retention policy
|
// Write POSTs line protocol to a database and retention policy
|
||||||
func (c *Client) Write(ctx context.Context, points []chronograf.Point) error {
|
func (c *Client) Write(ctx context.Context, points []chronograf.Point) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.Write")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
for _, point := range points {
|
for _, point := range points {
|
||||||
|
@ -292,7 +290,7 @@ func (c *Client) Write(ctx context.Context, points []chronograf.Point) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) writePoint(ctx context.Context, point *chronograf.Point) error {
|
func (c *Client) writePoint(ctx context.Context, point *chronograf.Point) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.writePoint")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
lp, err := toLineProtocol(point)
|
lp, err := toLineProtocol(point)
|
||||||
|
@ -327,7 +325,7 @@ func (c *Client) writePoint(ctx context.Context, point *chronograf.Point) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) write(ctx context.Context, u *url.URL, db, rp, lp string) error {
|
func (c *Client) write(ctx context.Context, u *url.URL, db, rp, lp string) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Client.write")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u.Path = "write"
|
u.Path = "write"
|
||||||
|
|
|
@ -30,6 +30,7 @@ import (
|
||||||
"github.com/influxdata/influxdb/internal/fs"
|
"github.com/influxdata/influxdb/internal/fs"
|
||||||
"github.com/influxdata/influxdb/kit/cli"
|
"github.com/influxdata/influxdb/kit/cli"
|
||||||
"github.com/influxdata/influxdb/kit/prom"
|
"github.com/influxdata/influxdb/kit/prom"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/kv"
|
"github.com/influxdata/influxdb/kv"
|
||||||
influxlogger "github.com/influxdata/influxdb/logger"
|
influxlogger "github.com/influxdata/influxdb/logger"
|
||||||
"github.com/influxdata/influxdb/nats"
|
"github.com/influxdata/influxdb/nats"
|
||||||
|
@ -282,7 +283,7 @@ func (m *Launcher) Run(ctx context.Context, args ...string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Launcher) run(ctx context.Context) (err error) {
|
func (m *Launcher) run(ctx context.Context) (err error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Launcher.run")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
m.running = true
|
m.running = true
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/julienschmidt/httprouter"
|
"github.com/julienschmidt/httprouter"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb"
|
"github.com/influxdata/influxdb"
|
||||||
|
@ -595,7 +594,7 @@ type BucketService struct {
|
||||||
|
|
||||||
// FindBucketByID returns a single bucket by ID.
|
// FindBucketByID returns a single bucket by ID.
|
||||||
func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
|
func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucketByID")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(s.Addr, bucketIDPath(id))
|
u, err := newURL(s.Addr, bucketIDPath(id))
|
||||||
|
@ -630,7 +629,7 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*in
|
||||||
|
|
||||||
// FindBucket returns the first bucket that matches filter.
|
// FindBucket returns the first bucket that matches filter.
|
||||||
func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
|
func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
bs, n, err := s.FindBuckets(ctx, filter)
|
bs, n, err := s.FindBuckets(ctx, filter)
|
||||||
|
@ -652,7 +651,7 @@ func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFi
|
||||||
// FindBuckets returns a list of buckets that match filter and the total count of matching buckets.
|
// FindBuckets returns a list of buckets that match filter and the total count of matching buckets.
|
||||||
// Additional options provide pagination & sorting.
|
// Additional options provide pagination & sorting.
|
||||||
func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
|
func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "BucketService.FindBuckets")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(s.Addr, bucketPath)
|
u, err := newURL(s.Addr, bucketPath)
|
||||||
|
@ -722,7 +721,7 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketF
|
||||||
|
|
||||||
// CreateBucket creates a new bucket and sets b.ID with the new identifier.
|
// CreateBucket creates a new bucket and sets b.ID with the new identifier.
|
||||||
func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
|
func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "BucketService.CreateBucket")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(s.Addr, bucketPath)
|
u, err := newURL(s.Addr, bucketPath)
|
||||||
|
|
|
@ -5,9 +5,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
// BucketService connects to Influx via HTTP using tokens to manage buckets
|
// BucketService connects to Influx via HTTP using tokens to manage buckets
|
||||||
|
@ -24,7 +23,7 @@ func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFi
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) {
|
func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBuckets")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
c, err := newClient(s.Source)
|
c, err := newClient(s.Source)
|
||||||
|
|
|
@ -11,7 +11,6 @@ import (
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
"github.com/influxdata/flux/csv"
|
"github.com/influxdata/flux/csv"
|
||||||
"github.com/influxdata/flux/lang"
|
"github.com/influxdata/flux/lang"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
platformhttp "github.com/influxdata/influxdb/http"
|
platformhttp "github.com/influxdata/influxdb/http"
|
||||||
|
@ -40,7 +39,7 @@ func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *q
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "SourceProxyQueryService.fluxQuery")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
request := struct {
|
request := struct {
|
||||||
Spec *flux.Spec `json:"spec"`
|
Spec *flux.Spec `json:"spec"`
|
||||||
|
@ -111,7 +110,7 @@ func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, re
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceProxyQueryService) influxQuery(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
func (s *SourceProxyQueryService) influxQuery(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "SourceProxyQueryService.influxQuery")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
if len(s.URL) == 0 {
|
if len(s.URL) == 0 {
|
||||||
return flux.Statistics{}, tracing.LogError(span, fmt.Errorf("URL from source cannot be empty if the compiler type is influxql"))
|
return flux.Statistics{}, tracing.LogError(span, fmt.Errorf("URL from source cannot be empty if the compiler type is influxql"))
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"path"
|
"path"
|
||||||
|
|
||||||
"github.com/julienschmidt/httprouter"
|
"github.com/julienschmidt/httprouter"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb"
|
"github.com/influxdata/influxdb"
|
||||||
|
@ -616,7 +615,7 @@ func (s *OrganizationService) FindOrganization(ctx context.Context, filter influ
|
||||||
|
|
||||||
// FindOrganizations returns all organizations that match the filter via HTTP.
|
// FindOrganizations returns all organizations that match the filter via HTTP.
|
||||||
func (s *OrganizationService) FindOrganizations(ctx context.Context, filter influxdb.OrganizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Organization, int, error) {
|
func (s *OrganizationService) FindOrganizations(ctx context.Context, filter influxdb.OrganizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Organization, int, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "OrganizationService.FindOrganizations")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if filter.Name != nil {
|
if filter.Name != nil {
|
||||||
|
@ -689,7 +688,7 @@ func (s *OrganizationService) CreateOrganization(ctx context.Context, o *influxd
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "OrganizationService.CreateOrganization")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if o.Name != "" {
|
if o.Name != "" {
|
||||||
|
@ -740,7 +739,7 @@ func (s *OrganizationService) CreateOrganization(ctx context.Context, o *influxd
|
||||||
|
|
||||||
// UpdateOrganization updates the organization over HTTP.
|
// UpdateOrganization updates the organization over HTTP.
|
||||||
func (s *OrganizationService) UpdateOrganization(ctx context.Context, id influxdb.ID, upd influxdb.OrganizationUpdate) (*influxdb.Organization, error) {
|
func (s *OrganizationService) UpdateOrganization(ctx context.Context, id influxdb.ID, upd influxdb.OrganizationUpdate) (*influxdb.Organization, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "OrganizationService.UpdateOrganization")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
span.LogKV("org-id", id)
|
span.LogKV("org-id", id)
|
||||||
|
@ -787,7 +786,7 @@ func (s *OrganizationService) UpdateOrganization(ctx context.Context, id influxd
|
||||||
|
|
||||||
// DeleteOrganization removes organization id over HTTP.
|
// DeleteOrganization removes organization id over HTTP.
|
||||||
func (s *OrganizationService) DeleteOrganization(ctx context.Context, id influxdb.ID) error {
|
func (s *OrganizationService) DeleteOrganization(ctx context.Context, id influxdb.ID) error {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "OrganizationService.DeleteOrganization")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(s.Addr, organizationIDPath(id))
|
u, err := newURL(s.Addr, organizationIDPath(id))
|
||||||
|
|
|
@ -14,7 +14,6 @@ import (
|
||||||
"github.com/influxdata/influxdb/kit/tracing"
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/query"
|
"github.com/influxdata/influxdb/query"
|
||||||
"github.com/julienschmidt/httprouter"
|
"github.com/julienschmidt/httprouter"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -142,7 +141,7 @@ func (s *ProxyQueryService) Ping(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "ProxyQueryService.Query")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
u, err := newURL(s.Addr, proxyQueryPath)
|
u, err := newURL(s.Addr, proxyQueryPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -18,7 +18,6 @@ import (
|
||||||
"github.com/influxdata/flux/iocounter"
|
"github.com/influxdata/flux/iocounter"
|
||||||
"github.com/influxdata/flux/parser"
|
"github.com/influxdata/flux/parser"
|
||||||
"github.com/julienschmidt/httprouter"
|
"github.com/julienschmidt/httprouter"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
@ -330,7 +329,7 @@ type FluxService struct {
|
||||||
// Query runs a flux query against a influx server and sends the results to the io.Writer.
|
// Query runs a flux query against a influx server and sends the results to the io.Writer.
|
||||||
// Will use the token from the context over the token within the service struct.
|
// Will use the token from the context over the token within the service struct.
|
||||||
func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequest) (flux.Statistics, error) {
|
func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequest) (flux.Statistics, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "FluxService.Query")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
u, err := newURL(s.Addr, fluxPath)
|
u, err := newURL(s.Addr, fluxPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -386,7 +385,7 @@ type FluxQueryService struct {
|
||||||
|
|
||||||
// Query runs a flux query against a influx server and decodes the result
|
// Query runs a flux query against a influx server and decodes the result
|
||||||
func (s *FluxQueryService) Query(ctx context.Context, r *query.Request) (flux.ResultIterator, error) {
|
func (s *FluxQueryService) Query(ctx context.Context, r *query.Request) (flux.ResultIterator, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "FluxQueryService.Query")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(s.Addr, fluxPath)
|
u, err := newURL(s.Addr, fluxPath)
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
"github.com/influxdata/flux/lang"
|
"github.com/influxdata/flux/lang"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
"github.com/influxdata/influxdb/kit/tracing"
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
|
@ -37,7 +36,7 @@ func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *q
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "SourceProxyQueryService.queryFlux")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
u, err := newURL(s.Addr, "/api/v2/query")
|
u, err := newURL(s.Addr, "/api/v2/query")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -75,7 +74,7 @@ func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, re
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceProxyQueryService) queryInfluxQL(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
func (s *SourceProxyQueryService) queryInfluxQL(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "SourceProxyQueryService.queryInfluxQL")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
compiler, ok := req.Request.Compiler.(*influxql.Compiler)
|
compiler, ok := req.Request.Compiler.(*influxql.Compiler)
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,6 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
"github.com/julienschmidt/httprouter"
|
"github.com/julienschmidt/httprouter"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
|
@ -1366,7 +1365,7 @@ type TaskService struct {
|
||||||
|
|
||||||
// FindTaskByID returns a single task
|
// FindTaskByID returns a single task
|
||||||
func (t TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
func (t TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.FindTaskByID")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(t.Addr, taskIDPath(id))
|
u, err := newURL(t.Addr, taskIDPath(id))
|
||||||
|
@ -1409,7 +1408,7 @@ func (t TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platfor
|
||||||
// FindTasks returns a list of tasks that match a filter (limit 100) and the total count
|
// FindTasks returns a list of tasks that match a filter (limit 100) and the total count
|
||||||
// of matching tasks.
|
// of matching tasks.
|
||||||
func (t TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) {
|
func (t TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.FindTasks")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(t.Addr, tasksPath)
|
u, err := newURL(t.Addr, tasksPath)
|
||||||
|
@ -1468,7 +1467,7 @@ func (t TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter)
|
||||||
|
|
||||||
// CreateTask creates a new task.
|
// CreateTask creates a new task.
|
||||||
func (t TaskService) CreateTask(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
|
func (t TaskService) CreateTask(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.CreateTask")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(t.Addr, tasksPath)
|
u, err := newURL(t.Addr, tasksPath)
|
||||||
|
@ -1511,7 +1510,7 @@ func (t TaskService) CreateTask(ctx context.Context, tc platform.TaskCreate) (*p
|
||||||
|
|
||||||
// UpdateTask updates a single task with changeset.
|
// UpdateTask updates a single task with changeset.
|
||||||
func (t TaskService) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
|
func (t TaskService) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.UpdateTask")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(t.Addr, taskIDPath(id))
|
u, err := newURL(t.Addr, taskIDPath(id))
|
||||||
|
@ -1555,7 +1554,7 @@ func (t TaskService) UpdateTask(ctx context.Context, id platform.ID, upd platfor
|
||||||
|
|
||||||
// DeleteTask removes a task by ID and purges all associated data and scheduled runs.
|
// DeleteTask removes a task by ID and purges all associated data and scheduled runs.
|
||||||
func (t TaskService) DeleteTask(ctx context.Context, id platform.ID) error {
|
func (t TaskService) DeleteTask(ctx context.Context, id platform.ID) error {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.DeleteTask")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(t.Addr, taskIDPath(id))
|
u, err := newURL(t.Addr, taskIDPath(id))
|
||||||
|
@ -1585,7 +1584,7 @@ func (t TaskService) DeleteTask(ctx context.Context, id platform.ID) error {
|
||||||
|
|
||||||
// FindLogs returns logs for a run.
|
// FindLogs returns logs for a run.
|
||||||
func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.FindLogs")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if !filter.Task.Valid() {
|
if !filter.Task.Valid() {
|
||||||
|
@ -1633,7 +1632,7 @@ func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([
|
||||||
|
|
||||||
// FindRuns returns a list of runs that match a filter and the total count of returned runs.
|
// FindRuns returns a list of runs that match a filter and the total count of returned runs.
|
||||||
func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.FindRuns")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if !filter.Task.Valid() {
|
if !filter.Task.Valid() {
|
||||||
|
@ -1689,7 +1688,7 @@ func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([
|
||||||
|
|
||||||
// FindRunByID returns a single run of a specific task.
|
// FindRunByID returns a single run of a specific task.
|
||||||
func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
|
func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.FindRunByID")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(t.Addr, taskIDRunIDPath(taskID, runID))
|
u, err := newURL(t.Addr, taskIDRunIDPath(taskID, runID))
|
||||||
|
@ -1732,7 +1731,7 @@ func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID)
|
||||||
|
|
||||||
// RetryRun creates and returns a new run (which is a retry of another run).
|
// RetryRun creates and returns a new run (which is a retry of another run).
|
||||||
func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
|
func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.RetryRun")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
p := path.Join(taskIDRunIDPath(taskID, runID), "retry")
|
p := path.Join(taskIDRunIDPath(taskID, runID), "retry")
|
||||||
|
@ -1780,7 +1779,7 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (*
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t TaskService) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) {
|
func (t TaskService) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.ForceRun")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(t.Addr, taskIDRunsPath(taskID))
|
u, err := newURL(t.Addr, taskIDRunsPath(taskID))
|
||||||
|
@ -1832,7 +1831,7 @@ func cancelPath(taskID, runID platform.ID) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t TaskService) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
|
func (t TaskService) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.CancelRun")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
u, err := newURL(t.Addr, cancelPath(taskID, runID))
|
u, err := newURL(t.Addr, cancelPath(taskID, runID))
|
||||||
|
|
|
@ -9,13 +9,12 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/kit/tracing"
|
|
||||||
|
|
||||||
"github.com/julienschmidt/httprouter"
|
"github.com/julienschmidt/httprouter"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
pcontext "github.com/influxdata/influxdb/context"
|
pcontext "github.com/influxdata/influxdb/context"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
"github.com/influxdata/influxdb/storage"
|
"github.com/influxdata/influxdb/storage"
|
||||||
"github.com/influxdata/influxdb/tsdb"
|
"github.com/influxdata/influxdb/tsdb"
|
||||||
|
|
|
@ -60,9 +60,11 @@ func ExtractFromHTTPRequest(req *http.Request, handlerName string) (opentracing.
|
||||||
// If this performance penalty is too much, try these, which are also demonstrated in benchmark tests:
|
// If this performance penalty is too much, try these, which are also demonstrated in benchmark tests:
|
||||||
// // Create a root span
|
// // Create a root span
|
||||||
// span := opentracing.StartSpan("operation name")
|
// span := opentracing.StartSpan("operation name")
|
||||||
|
// ctx := opentracing.ContextWithSpan(context.Background(), span)
|
||||||
//
|
//
|
||||||
// // Create a child span
|
// // Create a child span
|
||||||
// span := opentracing.StartSpan("operation name", opentracing.ChildOf(sc))
|
// span := opentracing.StartSpan("operation name", opentracing.ChildOf(sc))
|
||||||
|
// ctx := opentracing.ContextWithSpan(context.Background(), span)
|
||||||
//
|
//
|
||||||
// // Sugar to create a child span
|
// // Sugar to create a child span
|
||||||
// span, ctx := opentracing.StartSpanFromContext(ctx, "operation name")
|
// span, ctx := opentracing.StartSpanFromContext(ctx, "operation name")
|
||||||
|
|
|
@ -3,12 +3,12 @@ package tracing
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/uber/jaeger-client-go"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"runtime"
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
|
"github.com/uber/jaeger-client-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestInjectAndExtractHTTPRequest(t *testing.T) {
|
func TestInjectAndExtractHTTPRequest(t *testing.T) {
|
||||||
|
@ -162,5 +162,3 @@ func BenchmarkOpentracing_StartSpan_child(b *testing.B) {
|
||||||
_ = opentracing.StartSpan("operation name", opentracing.ChildOf(parentSpan.Context()))
|
_ = opentracing.StartSpan("operation name", opentracing.ChildOf(parentSpan.Context()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO test nil and other context problems with StartSpanFromContext.
|
|
35
kv/bucket.go
35
kv/bucket.go
|
@ -6,10 +6,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
"github.com/influxdata/influxdb"
|
"github.com/influxdata/influxdb"
|
||||||
icontext "github.com/influxdata/influxdb/context"
|
icontext "github.com/influxdata/influxdb/context"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -49,7 +48,7 @@ func (s *Service) bucketsIndexBucket(tx Tx) (Bucket, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) setOrganizationOnBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
func (s *Service) setOrganizationOnBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.setOrganizationOnBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
o, err := s.findOrganizationByID(ctx, tx, b.OrganizationID)
|
o, err := s.findOrganizationByID(ctx, tx, b.OrganizationID)
|
||||||
|
@ -64,7 +63,7 @@ func (s *Service) setOrganizationOnBucket(ctx context.Context, tx Tx, b *influxd
|
||||||
|
|
||||||
// FindBucketByID retrieves a bucket by id.
|
// FindBucketByID retrieves a bucket by id.
|
||||||
func (s *Service) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
|
func (s *Service) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.FindBucketByID")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var b *influxdb.Bucket
|
var b *influxdb.Bucket
|
||||||
|
@ -88,7 +87,7 @@ func (s *Service) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) findBucketByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Bucket, error) {
|
func (s *Service) findBucketByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.findBucketByID")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var b influxdb.Bucket
|
var b influxdb.Bucket
|
||||||
|
@ -136,7 +135,7 @@ func (s *Service) findBucketByID(ctx context.Context, tx Tx, id influxdb.ID) (*i
|
||||||
// FindBucketByName returns a bucket by name for a particular organization.
|
// FindBucketByName returns a bucket by name for a particular organization.
|
||||||
// TODO: have method for finding bucket using organization name and bucket name.
|
// TODO: have method for finding bucket using organization name and bucket name.
|
||||||
func (s *Service) FindBucketByName(ctx context.Context, orgID influxdb.ID, n string) (*influxdb.Bucket, error) {
|
func (s *Service) FindBucketByName(ctx context.Context, orgID influxdb.ID, n string) (*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.FindBucketByName")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var b *influxdb.Bucket
|
var b *influxdb.Bucket
|
||||||
|
@ -156,7 +155,7 @@ func (s *Service) FindBucketByName(ctx context.Context, orgID influxdb.ID, n str
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) findBucketByName(ctx context.Context, tx Tx, orgID influxdb.ID, n string) (*influxdb.Bucket, error) {
|
func (s *Service) findBucketByName(ctx context.Context, tx Tx, orgID influxdb.ID, n string) (*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.findBucketByName")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
b := &influxdb.Bucket{
|
b := &influxdb.Bucket{
|
||||||
|
@ -201,7 +200,7 @@ func (s *Service) findBucketByName(ctx context.Context, tx Tx, orgID influxdb.ID
|
||||||
// Filters using ID, or OrganizationID and bucket Name should be efficient.
|
// Filters using ID, or OrganizationID and bucket Name should be efficient.
|
||||||
// Other filters will do a linear scan across buckets until it finds a match.
|
// Other filters will do a linear scan across buckets until it finds a match.
|
||||||
func (s *Service) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
|
func (s *Service) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.FindBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var b *influxdb.Bucket
|
var b *influxdb.Bucket
|
||||||
|
@ -288,7 +287,7 @@ func filterBucketsFn(filter influxdb.BucketFilter) func(b *influxdb.Bucket) bool
|
||||||
// Filters using ID, or OrganizationID and bucket Name should be efficient.
|
// Filters using ID, or OrganizationID and bucket Name should be efficient.
|
||||||
// Other filters will do a linear scan across all buckets searching for a match.
|
// Other filters will do a linear scan across all buckets searching for a match.
|
||||||
func (s *Service) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opts ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
|
func (s *Service) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opts ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.FindBuckets")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if filter.ID != nil {
|
if filter.ID != nil {
|
||||||
|
@ -327,7 +326,7 @@ func (s *Service) FindBuckets(ctx context.Context, filter influxdb.BucketFilter,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) findBuckets(ctx context.Context, tx Tx, filter influxdb.BucketFilter, opts ...influxdb.FindOptions) ([]*influxdb.Bucket, error) {
|
func (s *Service) findBuckets(ctx context.Context, tx Tx, filter influxdb.BucketFilter, opts ...influxdb.FindOptions) ([]*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.findBuckets")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
bs := []*influxdb.Bucket{}
|
bs := []*influxdb.Bucket{}
|
||||||
|
@ -376,7 +375,7 @@ func (s *Service) findBuckets(ctx context.Context, tx Tx, filter influxdb.Bucket
|
||||||
|
|
||||||
// CreateBucket creates a influxdb bucket and sets b.ID.
|
// CreateBucket creates a influxdb bucket and sets b.ID.
|
||||||
func (s *Service) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
|
func (s *Service) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.CreateBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
return s.kv.Update(ctx, func(tx Tx) error {
|
return s.kv.Update(ctx, func(tx Tx) error {
|
||||||
|
@ -386,7 +385,7 @@ func (s *Service) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
|
||||||
|
|
||||||
func (s *Service) createBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
func (s *Service) createBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
||||||
if b.OrganizationID.Valid() {
|
if b.OrganizationID.Valid() {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.createBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
_, pe := s.findOrganizationByID(ctx, tx, b.OrganizationID)
|
_, pe := s.findOrganizationByID(ctx, tx, b.OrganizationID)
|
||||||
|
@ -442,7 +441,7 @@ func (s *Service) PutBucket(ctx context.Context, b *influxdb.Bucket) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) createBucketUserResourceMappings(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
func (s *Service) createBucketUserResourceMappings(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.createBucketUserResourceMappings")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
ms, err := s.findUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
|
ms, err := s.findUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
|
||||||
|
@ -472,7 +471,7 @@ func (s *Service) createBucketUserResourceMappings(ctx context.Context, tx Tx, b
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) putBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
func (s *Service) putBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.putBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
b.Organization = ""
|
b.Organization = ""
|
||||||
|
@ -531,7 +530,7 @@ func bucketIndexKey(b *influxdb.Bucket) ([]byte, error) {
|
||||||
|
|
||||||
// forEachBucket will iterate through all buckets while fn returns true.
|
// forEachBucket will iterate through all buckets while fn returns true.
|
||||||
func (s *Service) forEachBucket(ctx context.Context, tx Tx, descending bool, fn func(*influxdb.Bucket) bool) error {
|
func (s *Service) forEachBucket(ctx context.Context, tx Tx, descending bool, fn func(*influxdb.Bucket) bool) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.forEachBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
bkt, err := s.bucketsBucket(tx)
|
bkt, err := s.bucketsBucket(tx)
|
||||||
|
@ -574,7 +573,7 @@ func (s *Service) forEachBucket(ctx context.Context, tx Tx, descending bool, fn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) uniqueBucketName(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
func (s *Service) uniqueBucketName(ctx context.Context, tx Tx, b *influxdb.Bucket) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.uniqueBucketName")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
key, err := bucketIndexKey(b)
|
key, err := bucketIndexKey(b)
|
||||||
|
@ -593,7 +592,7 @@ func (s *Service) uniqueBucketName(ctx context.Context, tx Tx, b *influxdb.Bucke
|
||||||
|
|
||||||
// UpdateBucket updates a bucket according the parameters set on upd.
|
// UpdateBucket updates a bucket according the parameters set on upd.
|
||||||
func (s *Service) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
|
func (s *Service) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.UpdateBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var b *influxdb.Bucket
|
var b *influxdb.Bucket
|
||||||
|
@ -610,7 +609,7 @@ func (s *Service) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) updateBucket(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
|
func (s *Service) updateBucket(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.updateBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
b, err := s.findBucketByID(ctx, tx, id)
|
b, err := s.findBucketByID(ctx, tx, id)
|
||||||
|
|
|
@ -6,11 +6,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb"
|
"github.com/influxdata/influxdb"
|
||||||
icontext "github.com/influxdata/influxdb/context"
|
icontext "github.com/influxdata/influxdb/context"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -53,7 +53,7 @@ func (s *Service) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*in
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) findOrganizationByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Organization, error) {
|
func (s *Service) findOrganizationByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Organization, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "Service.findOrganizationByID")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
encodedID, err := id.Encode()
|
encodedID, err := id.Encode()
|
||||||
|
@ -93,7 +93,7 @@ func (s *Service) findOrganizationByID(ctx context.Context, tx Tx, id influxdb.I
|
||||||
|
|
||||||
// FindOrganizationByName returns a organization by name for a particular organization.
|
// FindOrganizationByName returns a organization by name for a particular organization.
|
||||||
func (s *Service) FindOrganizationByName(ctx context.Context, n string) (*influxdb.Organization, error) {
|
func (s *Service) FindOrganizationByName(ctx context.Context, n string) (*influxdb.Organization, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.FindOrganizationByName")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var o *influxdb.Organization
|
var o *influxdb.Organization
|
||||||
|
@ -111,7 +111,7 @@ func (s *Service) FindOrganizationByName(ctx context.Context, n string) (*influx
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) findOrganizationByName(ctx context.Context, tx Tx, n string) (*influxdb.Organization, error) {
|
func (s *Service) findOrganizationByName(ctx context.Context, tx Tx, n string) (*influxdb.Organization, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.findOrganizationByName")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
b, err := tx.Bucket(organizationIndex)
|
b, err := tx.Bucket(organizationIndex)
|
||||||
|
|
|
@ -5,10 +5,9 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
"github.com/influxdata/influxdb"
|
"github.com/influxdata/influxdb"
|
||||||
icontext "github.com/influxdata/influxdb/context"
|
icontext "github.com/influxdata/influxdb/context"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -132,7 +131,7 @@ func (s *Service) CreateUserResourceMapping(ctx context.Context, m *influxdb.Use
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) createUserResourceMapping(ctx context.Context, tx Tx, m *influxdb.UserResourceMapping) error {
|
func (s *Service) createUserResourceMapping(ctx context.Context, tx Tx, m *influxdb.UserResourceMapping) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.createUserResourceMapping")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if err := s.uniqueUserResourceMapping(ctx, tx, m); err != nil {
|
if err := s.uniqueUserResourceMapping(ctx, tx, m); err != nil {
|
||||||
|
@ -167,7 +166,7 @@ func (s *Service) createUserResourceMapping(ctx context.Context, tx Tx, m *influ
|
||||||
|
|
||||||
// This method creates the user/resource mappings for resources that belong to an organization.
|
// This method creates the user/resource mappings for resources that belong to an organization.
|
||||||
func (s *Service) createOrgDependentMappings(ctx context.Context, tx Tx, m *influxdb.UserResourceMapping) error {
|
func (s *Service) createOrgDependentMappings(ctx context.Context, tx Tx, m *influxdb.UserResourceMapping) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.createOrgDependentMappings")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
bf := influxdb.BucketFilter{OrganizationID: &m.ResourceID}
|
bf := influxdb.BucketFilter{OrganizationID: &m.ResourceID}
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"github.com/influxdata/flux/csv"
|
"github.com/influxdata/flux/csv"
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
"github.com/influxdata/influxdb/kit/tracing"
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueryServiceBridge implements the QueryService interface while consuming the AsyncQueryService interface.
|
// QueryServiceBridge implements the QueryService interface while consuming the AsyncQueryService interface.
|
||||||
|
@ -74,7 +73,7 @@ type ProxyQueryServiceAsyncBridge struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b ProxyQueryServiceAsyncBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (flux.Statistics, error) {
|
func (b ProxyQueryServiceAsyncBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (flux.Statistics, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "ProxyQueryServiceBridge.Query")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
q, err := b.AsyncQueryService.Query(ctx, &req.Request)
|
q, err := b.AsyncQueryService.Query(ctx, &req.Request)
|
||||||
|
|
|
@ -6,10 +6,10 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
"github.com/influxdata/flux/control"
|
"github.com/influxdata/flux/control"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/query"
|
"github.com/influxdata/influxdb/query"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,7 +30,7 @@ func New(config control.Config) *Controller {
|
||||||
|
|
||||||
// Query satisfies the AsyncQueryService while ensuring the request is propagated on the context.
|
// Query satisfies the AsyncQueryService while ensuring the request is propagated on the context.
|
||||||
func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query, error) {
|
func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Controller.Query")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Set the request on the context so platform specific Flux operations can retrieve it later.
|
// Set the request on the context so platform specific Flux operations can retrieve it later.
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/kit/tracing"
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/query"
|
"github.com/influxdata/influxdb/query"
|
||||||
|
@ -32,7 +31,7 @@ type Service struct {
|
||||||
// Query will execute a query for the influxql.Compiler type against an influxdb 1.x endpoint,
|
// Query will execute a query for the influxql.Compiler type against an influxdb 1.x endpoint,
|
||||||
// and return results using the default decoder.
|
// and return results using the default decoder.
|
||||||
func (s *Service) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) {
|
func (s *Service) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.Query")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
resp, err := s.query(ctx, req)
|
resp, err := s.query(ctx, req)
|
||||||
|
@ -53,7 +52,7 @@ func (s *Service) Query(ctx context.Context, req *query.Request) (flux.ResultIte
|
||||||
// QueryRawJSON will execute a query for the influxql.Compiler type against an influxdb 1.x endpoint,
|
// QueryRawJSON will execute a query for the influxql.Compiler type against an influxdb 1.x endpoint,
|
||||||
// and return the body of the response as a byte array.
|
// and return the body of the response as a byte array.
|
||||||
func (s *Service) QueryRawJSON(ctx context.Context, req *query.Request) ([]byte, error) {
|
func (s *Service) QueryRawJSON(ctx context.Context, req *query.Request) ([]byte, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.Query")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
resp, err := s.query(ctx, req)
|
resp, err := s.query(ctx, req)
|
||||||
|
@ -70,7 +69,7 @@ func (s *Service) QueryRawJSON(ctx context.Context, req *query.Request) ([]byte,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) query(ctx context.Context, req *query.Request) (*http.Response, error) {
|
func (s *Service) query(ctx context.Context, req *query.Request) (*http.Response, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Service.query")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Verify that this is an influxql query in the compiler.
|
// Verify that this is an influxql query in the compiler.
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
"github.com/influxdata/influxdb/kit/tracing"
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// LoggingServiceBridge implements ProxyQueryService and logs the queries while consuming a QueryService interface.
|
// LoggingServiceBridge implements ProxyQueryService and logs the queries while consuming a QueryService interface.
|
||||||
|
@ -19,7 +18,7 @@ type LoggingServiceBridge struct {
|
||||||
|
|
||||||
// Query executes and logs the query.
|
// Query executes and logs the query.
|
||||||
func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (stats flux.Statistics, err error) {
|
func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (stats flux.Statistics, err error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "LoggingServiceBridge.Query")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var n int64
|
var n int64
|
||||||
|
@ -59,7 +58,7 @@ func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *Prox
|
||||||
}
|
}
|
||||||
// The results iterator may have had an error independent of encoding errors.
|
// The results iterator may have had an error independent of encoding errors.
|
||||||
if err = results.Err(); err != nil {
|
if err = results.Err(); err != nil {
|
||||||
return n, tracing.LogError(span, err)
|
return stats, tracing.LogError(span, err)
|
||||||
}
|
}
|
||||||
return stats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,9 +4,8 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
// BucketDeleter defines the behaviour of deleting a bucket.
|
// BucketDeleter defines the behaviour of deleting a bucket.
|
||||||
|
@ -35,7 +34,7 @@ func NewBucketService(s platform.BucketService, engine BucketDeleter) *BucketSer
|
||||||
|
|
||||||
// FindBucketByID returns a single bucket by ID.
|
// FindBucketByID returns a single bucket by ID.
|
||||||
func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) {
|
func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucketByID")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if s.inner == nil || s.engine == nil {
|
if s.inner == nil || s.engine == nil {
|
||||||
|
@ -46,7 +45,7 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*pl
|
||||||
|
|
||||||
// FindBucket returns the first bucket that matches filter.
|
// FindBucket returns the first bucket that matches filter.
|
||||||
func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) {
|
func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if s.inner == nil || s.engine == nil {
|
if s.inner == nil || s.engine == nil {
|
||||||
|
@ -58,7 +57,7 @@ func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFi
|
||||||
// FindBuckets returns a list of buckets that match filter and the total count of matching buckets.
|
// FindBuckets returns a list of buckets that match filter and the total count of matching buckets.
|
||||||
// Additional options provide pagination & sorting.
|
// Additional options provide pagination & sorting.
|
||||||
func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) {
|
func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBuckets")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if s.inner == nil || s.engine == nil {
|
if s.inner == nil || s.engine == nil {
|
||||||
|
@ -69,7 +68,7 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketF
|
||||||
|
|
||||||
// CreateBucket creates a new bucket and sets b.ID with the new identifier.
|
// CreateBucket creates a new bucket and sets b.ID with the new identifier.
|
||||||
func (s *BucketService) CreateBucket(ctx context.Context, b *platform.Bucket) error {
|
func (s *BucketService) CreateBucket(ctx context.Context, b *platform.Bucket) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.CreateBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if s.inner == nil || s.engine == nil {
|
if s.inner == nil || s.engine == nil {
|
||||||
|
@ -81,7 +80,7 @@ func (s *BucketService) CreateBucket(ctx context.Context, b *platform.Bucket) er
|
||||||
// UpdateBucket updates a single bucket with changeset.
|
// UpdateBucket updates a single bucket with changeset.
|
||||||
// Returns the new bucket state after update.
|
// Returns the new bucket state after update.
|
||||||
func (s *BucketService) UpdateBucket(ctx context.Context, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) {
|
func (s *BucketService) UpdateBucket(ctx context.Context, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.UpdateBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if s.inner == nil || s.engine == nil {
|
if s.inner == nil || s.engine == nil {
|
||||||
|
@ -92,7 +91,7 @@ func (s *BucketService) UpdateBucket(ctx context.Context, id platform.ID, upd pl
|
||||||
|
|
||||||
// DeleteBucket removes a bucket by ID.
|
// DeleteBucket removes a bucket by ID.
|
||||||
func (s *BucketService) DeleteBucket(ctx context.Context, bucketID platform.ID) error {
|
func (s *BucketService) DeleteBucket(ctx context.Context, bucketID platform.ID) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.DeleteBucket")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
bucket, err := s.FindBucketByID(ctx, bucketID)
|
bucket, err := s.FindBucketByID(ctx, bucketID)
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/logger"
|
"github.com/influxdata/influxdb/logger"
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
"github.com/influxdata/influxdb/storage/wal"
|
"github.com/influxdata/influxdb/storage/wal"
|
||||||
|
@ -18,7 +19,6 @@ import (
|
||||||
"github.com/influxdata/influxdb/tsdb/tsm1"
|
"github.com/influxdata/influxdb/tsdb/tsm1"
|
||||||
"github.com/influxdata/influxdb/tsdb/value"
|
"github.com/influxdata/influxdb/tsdb/value"
|
||||||
"github.com/influxdata/influxql"
|
"github.com/influxdata/influxql"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -188,7 +188,7 @@ func (e *Engine) Open(ctx context.Context) (err error) {
|
||||||
return nil // Already open
|
return nil // Already open
|
||||||
}
|
}
|
||||||
|
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Engine.Open")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Open the services in order and clean up if any fail.
|
// Open the services in order and clean up if any fail.
|
||||||
|
@ -360,7 +360,7 @@ func (e *Engine) CreateCursorIterator(ctx context.Context) (tsdb.CursorIterator,
|
||||||
//
|
//
|
||||||
// Appropriate errors are returned in those cases.
|
// Appropriate errors are returned in those cases.
|
||||||
func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error {
|
func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Engine.WritePoints")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
collection, j := tsdb.NewSeriesCollection(points), 0
|
collection, j := tsdb.NewSeriesCollection(points), 0
|
||||||
|
@ -445,7 +445,7 @@ func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error {
|
||||||
|
|
||||||
// writePointsLocked does the work of writing points and must be called under some sort of lock.
|
// writePointsLocked does the work of writing points and must be called under some sort of lock.
|
||||||
func (e *Engine) writePointsLocked(ctx context.Context, collection *tsdb.SeriesCollection, values map[string][]value.Value) error {
|
func (e *Engine) writePointsLocked(ctx context.Context, collection *tsdb.SeriesCollection, values map[string][]value.Value) error {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "Engine.writePointsLocked")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// TODO(jeff): keep track of the values in the collection so that partial write
|
// TODO(jeff): keep track of the values in the collection so that partial write
|
||||||
|
@ -478,7 +478,7 @@ func (e *Engine) writePointsLocked(ctx context.Context, collection *tsdb.SeriesC
|
||||||
// AcquireSegments closes the current WAL segment, gets the set of all the currently closed
|
// AcquireSegments closes the current WAL segment, gets the set of all the currently closed
|
||||||
// segments, and calls the callback. It does all of this under the lock on the engine.
|
// segments, and calls the callback. It does all of this under the lock on the engine.
|
||||||
func (e *Engine) AcquireSegments(ctx context.Context, fn func(segs []string) error) error {
|
func (e *Engine) AcquireSegments(ctx context.Context, fn func(segs []string) error) error {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "Engine.AcquireSegments")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb"
|
"github.com/influxdata/influxdb"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/pkg/limiter"
|
"github.com/influxdata/influxdb/pkg/limiter"
|
||||||
"github.com/influxdata/influxdb/pkg/pool"
|
"github.com/influxdata/influxdb/pkg/pool"
|
||||||
"github.com/influxdata/influxdb/tsdb/value"
|
"github.com/influxdata/influxdb/tsdb/value"
|
||||||
|
@ -171,7 +172,7 @@ func (l *WAL) Open(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "WAL.Open")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
span.LogKV("segment_size", l.SegmentSize,
|
span.LogKV("segment_size", l.SegmentSize,
|
||||||
|
@ -310,7 +311,7 @@ func (l *WAL) sync() {
|
||||||
// which the points were written. If an error is returned the segment ID should
|
// which the points were written. If an error is returned the segment ID should
|
||||||
// be ignored. If the WAL is disabled, -1 and nil is returned.
|
// be ignored. If the WAL is disabled, -1 and nil is returned.
|
||||||
func (l *WAL) WriteMulti(ctx context.Context, values map[string][]value.Value) (int, error) {
|
func (l *WAL) WriteMulti(ctx context.Context, values map[string][]value.Value) (int, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "WAL.WriteMulti")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
if !l.enabled {
|
if !l.enabled {
|
||||||
|
@ -374,7 +375,7 @@ func (l *WAL) Remove(ctx context.Context, files []string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "WAL.Remove")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
l.mu.Lock()
|
l.mu.Lock()
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -639,7 +640,7 @@ func (r *runner) clearRunning(id platform.ID) {
|
||||||
func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *zap.Logger) {
|
func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *zap.Logger) {
|
||||||
defer r.wg.Done()
|
defer r.wg.Done()
|
||||||
|
|
||||||
sp, spCtx := opentracing.StartSpanFromContext(ctx, "task.run.execution")
|
sp, spCtx := tracing.StartSpanFromContext(ctx)
|
||||||
defer sp.Finish()
|
defer sp.Finish()
|
||||||
|
|
||||||
rp, err := r.executor.Execute(spCtx, qr)
|
rp, err := r.executor.Execute(spCtx, qr)
|
||||||
|
|
|
@ -6,10 +6,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
icontext "github.com/influxdata/influxdb/context"
|
icontext "github.com/influxdata/influxdb/context"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/task/backend"
|
"github.com/influxdata/influxdb/task/backend"
|
||||||
"github.com/influxdata/influxdb/task/options"
|
"github.com/influxdata/influxdb/task/options"
|
||||||
)
|
)
|
||||||
|
@ -38,7 +37,7 @@ type pAdapter struct {
|
||||||
var _ platform.TaskService = pAdapter{}
|
var _ platform.TaskService = pAdapter{}
|
||||||
|
|
||||||
func (p pAdapter) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
func (p pAdapter) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.FindTaskByID")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
t, m, err := p.s.FindTaskByIDWithMeta(ctx, id)
|
t, m, err := p.s.FindTaskByIDWithMeta(ctx, id)
|
||||||
|
@ -54,7 +53,7 @@ func (p pAdapter) FindTaskByID(ctx context.Context, id platform.ID) (*platform.T
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) {
|
func (p pAdapter) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.FindTasks")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
params := backend.TaskSearchParams{PageSize: filter.Limit}
|
params := backend.TaskSearchParams{PageSize: filter.Limit}
|
||||||
|
@ -126,7 +125,7 @@ func (p pAdapter) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) {
|
func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.CreateTask")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
auth, err := icontext.GetAuthorizer(ctx)
|
auth, err := icontext.GetAuthorizer(ctx)
|
||||||
|
@ -205,7 +204,7 @@ func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
|
func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.CreateTask")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
err := upd.Validate()
|
err := upd.Validate()
|
||||||
|
@ -239,7 +238,7 @@ func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.T
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) DeleteTask(ctx context.Context, id platform.ID) error {
|
func (p pAdapter) DeleteTask(ctx context.Context, id platform.ID) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.DeleteTask")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
_, err := p.s.DeleteTask(ctx, id)
|
_, err := p.s.DeleteTask(ctx, id)
|
||||||
|
@ -267,7 +266,7 @@ func (p pAdapter) DeleteTask(ctx context.Context, id platform.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.FindLogs")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
task, err := p.s.FindTaskByID(ctx, filter.Task)
|
task, err := p.s.FindTaskByID(ctx, filter.Task)
|
||||||
|
@ -284,7 +283,7 @@ func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
func (p pAdapter) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.FindRuns")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
task, err := p.s.FindTaskByID(ctx, filter.Task)
|
task, err := p.s.FindTaskByID(ctx, filter.Task)
|
||||||
|
@ -297,7 +296,7 @@ func (p pAdapter) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) FindRunByID(ctx context.Context, taskID, id platform.ID) (*platform.Run, error) {
|
func (p pAdapter) FindRunByID(ctx context.Context, taskID, id platform.ID) (*platform.Run, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.FindRunByID")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
task, err := p.s.FindTaskByID(ctx, taskID)
|
task, err := p.s.FindTaskByID(ctx, taskID)
|
||||||
|
@ -308,7 +307,7 @@ func (p pAdapter) FindRunByID(ctx context.Context, taskID, id platform.ID) (*pla
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) RetryRun(ctx context.Context, taskID, id platform.ID) (*platform.Run, error) {
|
func (p pAdapter) RetryRun(ctx context.Context, taskID, id platform.ID) (*platform.Run, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.RetryRun")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
task, err := p.s.FindTaskByID(ctx, taskID)
|
task, err := p.s.FindTaskByID(ctx, taskID)
|
||||||
|
@ -344,7 +343,7 @@ func (p pAdapter) RetryRun(ctx context.Context, taskID, id platform.ID) (*platfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) {
|
func (p pAdapter) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.ForceRun")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
requestedAt := time.Now()
|
requestedAt := time.Now()
|
||||||
|
@ -362,7 +361,7 @@ func (p pAdapter) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
|
func (p pAdapter) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.CancelRun")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
return p.rc.CancelRun(ctx, taskID, runID)
|
return p.rc.CancelRun(ctx, taskID, runID)
|
||||||
|
|
|
@ -7,10 +7,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
|
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
platcontext "github.com/influxdata/influxdb/context"
|
platcontext "github.com/influxdata/influxdb/context"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/query"
|
"github.com/influxdata/influxdb/query"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ func NewValidator(ts platform.TaskService, bs platform.BucketService) platform.T
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.FindTaskByID")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Unauthenticated task lookup, to identify the task's organization.
|
// Unauthenticated task lookup, to identify the task's organization.
|
||||||
|
@ -61,7 +61,7 @@ func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) {
|
func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.FindTasks")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// First, get the tasks in the organization, without authentication.
|
// First, get the tasks in the organization, without authentication.
|
||||||
|
@ -90,7 +90,7 @@ func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter platform.T
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) {
|
func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.CreateTask")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
p, err := platform.NewPermission(platform.WriteAction, platform.TasksResourceType, t.OrganizationID)
|
p, err := platform.NewPermission(platform.WriteAction, platform.TasksResourceType, t.OrganizationID)
|
||||||
|
@ -110,7 +110,7 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskC
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
|
func (ts *taskServiceValidator) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.UpdateTask")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Unauthenticated task lookup, to identify the task's organization.
|
// Unauthenticated task lookup, to identify the task's organization.
|
||||||
|
@ -136,7 +136,7 @@ func (ts *taskServiceValidator) UpdateTask(ctx context.Context, id platform.ID,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) DeleteTask(ctx context.Context, id platform.ID) error {
|
func (ts *taskServiceValidator) DeleteTask(ctx context.Context, id platform.ID) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.DeleteTask")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Unauthenticated task lookup, to identify the task's organization.
|
// Unauthenticated task lookup, to identify the task's organization.
|
||||||
|
@ -158,7 +158,7 @@ func (ts *taskServiceValidator) DeleteTask(ctx context.Context, id platform.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.FindLogs")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Look up the task first, through the validator, to ensure we have permission to view the task.
|
// Look up the task first, through the validator, to ensure we have permission to view the task.
|
||||||
|
@ -171,7 +171,7 @@ func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter platform.Lo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.FindRuns")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Look up the task first, through the validator, to ensure we have permission to view the task.
|
// Look up the task first, through the validator, to ensure we have permission to view the task.
|
||||||
|
@ -194,7 +194,7 @@ func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter platform.Ru
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
|
func (ts *taskServiceValidator) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.FindRunByID")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Unauthenticated task lookup, to identify the task's organization.
|
// Unauthenticated task lookup, to identify the task's organization.
|
||||||
|
@ -216,7 +216,7 @@ func (ts *taskServiceValidator) FindRunByID(ctx context.Context, taskID, runID p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
|
func (ts *taskServiceValidator) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.CancelRun")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Unauthenticated task lookup, to identify the task's organization.
|
// Unauthenticated task lookup, to identify the task's organization.
|
||||||
|
@ -238,7 +238,7 @@ func (ts *taskServiceValidator) CancelRun(ctx context.Context, taskID, runID pla
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
|
func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.RetryRun")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Unauthenticated task lookup, to identify the task's organization.
|
// Unauthenticated task lookup, to identify the task's organization.
|
||||||
|
@ -260,7 +260,7 @@ func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID plat
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) {
|
func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.ForceRun")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Unauthenticated task lookup, to identify the task's organization.
|
// Unauthenticated task lookup, to identify the task's organization.
|
||||||
|
|
|
@ -6,13 +6,13 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/cespare/xxhash"
|
"github.com/cespare/xxhash"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/logger"
|
"github.com/influxdata/influxdb/logger"
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
"github.com/influxdata/influxdb/pkg/binaryutil"
|
"github.com/influxdata/influxdb/pkg/binaryutil"
|
||||||
|
@ -92,7 +92,7 @@ func (f *SeriesFile) Open(ctx context.Context) error {
|
||||||
return errors.New("series file already opened")
|
return errors.New("series file already opened")
|
||||||
}
|
}
|
||||||
|
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "SeriesFile.Open")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
_, logEnd := logger.NewOperation(f.Logger, "Opening Series File", "series_file_open", zap.String("path", f.path))
|
_, logEnd := logger.NewOperation(f.Logger, "Opening Series File", "series_file_open", zap.String("path", f.path))
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -18,6 +17,7 @@ import (
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/cespare/xxhash"
|
"github.com/cespare/xxhash"
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
"github.com/influxdata/influxdb/pkg/lifecycle"
|
"github.com/influxdata/influxdb/pkg/lifecycle"
|
||||||
"github.com/influxdata/influxdb/pkg/slices"
|
"github.com/influxdata/influxdb/pkg/slices"
|
||||||
|
@ -217,7 +217,7 @@ func (i *Index) Open(ctx context.Context) error {
|
||||||
return errors.New("index already open")
|
return errors.New("index already open")
|
||||||
}
|
}
|
||||||
|
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "Index.Open")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Ensure root exists.
|
// Ensure root exists.
|
||||||
|
|
|
@ -16,7 +16,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
@ -27,6 +26,7 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/pkg/limiter"
|
"github.com/influxdata/influxdb/pkg/limiter"
|
||||||
"github.com/influxdata/influxdb/tsdb"
|
"github.com/influxdata/influxdb/tsdb"
|
||||||
)
|
)
|
||||||
|
@ -811,7 +811,7 @@ func (c *Compactor) EnableCompactions() {
|
||||||
|
|
||||||
// WriteSnapshot writes a Cache snapshot to one or more new TSM files.
|
// WriteSnapshot writes a Cache snapshot to one or more new TSM files.
|
||||||
func (c *Compactor) WriteSnapshot(ctx context.Context, cache *Cache) ([]string, error) {
|
func (c *Compactor) WriteSnapshot(ctx context.Context, cache *Cache) ([]string, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "Compactor.WriteSnapshot")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/logger"
|
"github.com/influxdata/influxdb/logger"
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
"github.com/influxdata/influxdb/pkg/lifecycle"
|
"github.com/influxdata/influxdb/pkg/lifecycle"
|
||||||
|
@ -495,7 +496,7 @@ func (e *Engine) initTrackers() {
|
||||||
|
|
||||||
// Open opens and initializes the engine.
|
// Open opens and initializes the engine.
|
||||||
func (e *Engine) Open(ctx context.Context) (err error) {
|
func (e *Engine) Open(ctx context.Context) (err error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Engine.Open")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -775,7 +776,7 @@ func (t *compactionTracker) SetFullQueue(length uint64) { t.SetQueue(5, length)
|
||||||
|
|
||||||
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
|
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
|
||||||
func (e *Engine) WriteSnapshot(ctx context.Context) error {
|
func (e *Engine) WriteSnapshot(ctx context.Context) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Engine.WriteSnapshot")
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Lock and grab the cache snapshot along with all the closed WAL
|
// Lock and grab the cache snapshot along with all the closed WAL
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
@ -18,6 +17,7 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/pkg/file"
|
"github.com/influxdata/influxdb/pkg/file"
|
||||||
"github.com/influxdata/influxdb/pkg/limiter"
|
"github.com/influxdata/influxdb/pkg/limiter"
|
||||||
"github.com/influxdata/influxdb/pkg/metrics"
|
"github.com/influxdata/influxdb/pkg/metrics"
|
||||||
|
@ -517,7 +517,7 @@ func (f *FileStore) Open(ctx context.Context) error {
|
||||||
return errors.New("cannot open FileStore without an OpenLimiter (is EngineOptions.OpenLimiter set?)")
|
return errors.New("cannot open FileStore without an OpenLimiter (is EngineOptions.OpenLimiter set?)")
|
||||||
}
|
}
|
||||||
|
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "FileStore.Open")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// find the current max ID for temp directories
|
// find the current max ID for temp directories
|
||||||
|
@ -1104,7 +1104,7 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
|
||||||
// CreateSnapshot creates hardlinks for all tsm and tombstone files
|
// CreateSnapshot creates hardlinks for all tsm and tombstone files
|
||||||
// in the path provided.
|
// in the path provided.
|
||||||
func (f *FileStore) CreateSnapshot(ctx context.Context) (string, error) {
|
func (f *FileStore) CreateSnapshot(ctx context.Context) (string, error) {
|
||||||
span, _ := opentracing.StartSpanFromContext(ctx, "FileStore.CreateSnapshot")
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
span.LogKV("dir", f.dir)
|
span.LogKV("dir", f.dir)
|
||||||
|
|
Loading…
Reference in New Issue