From 603a1f26e07d871da72181b4773c962eaed5d5bc Mon Sep 17 00:00:00 2001 From: Jacob Marble Date: Tue, 5 Mar 2019 16:18:04 -0800 Subject: [PATCH] use tracing.StartSpanFromContext --- authorizer/bucket.go | 13 ++++---- bolt/bucket.go | 22 ++++++------- bolt/keyvalue_log.go | 8 ++--- bolt/kv.go | 10 +++--- bolt/organization.go | 11 +++---- chronograf/influx/databases.go | 27 ++++++++-------- chronograf/influx/influx.go | 18 +++++------ cmd/influxd/launcher/launcher.go | 3 +- http/bucket_service.go | 9 +++--- http/influxdb/bucket.go | 5 ++- http/influxdb/source_proxy_query_service.go | 5 ++- http/org_service.go | 9 +++--- http/proxy_query_service.go | 3 +- http/query_handler.go | 5 ++- http/source_proxy_service.go | 5 ++- http/task_service.go | 23 +++++++------- http/write_handler.go | 3 +- kit/tracing/tracing.go | 2 ++ kit/tracing/tracing_test.go | 4 +-- kv/bucket.go | 35 ++++++++++----------- kv/org.go | 8 ++--- kv/urm.go | 7 ++--- query/bridges.go | 3 +- query/control/controller.go | 4 +-- query/influxql/service.go | 7 ++--- query/logging.go | 5 ++- storage/bucket_service.go | 15 +++++---- storage/engine.go | 10 +++--- storage/wal/wal.go | 7 +++-- task/backend/scheduler.go | 3 +- task/platform_adapter.go | 25 +++++++-------- task/validator.go | 24 +++++++------- tsdb/series_file.go | 4 +-- tsdb/tsi1/index.go | 4 +-- tsdb/tsm1/compact.go | 4 +-- tsdb/tsm1/engine.go | 5 +-- tsdb/tsm1/file_store.go | 6 ++-- 37 files changed, 172 insertions(+), 189 deletions(-) diff --git a/authorizer/bucket.go b/authorizer/bucket.go index 303079f4d4..fe98dd0441 100644 --- a/authorizer/bucket.go +++ b/authorizer/bucket.go @@ -3,9 +3,8 @@ package authorizer import ( "context" - "github.com/opentracing/opentracing-go" - "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" ) var _ influxdb.BucketService = (*BucketService)(nil) @@ -28,7 +27,7 @@ func newBucketPermission(a influxdb.Action, orgID, id influxdb.ID) (*influxdb.Pe } func authorizeReadBucket(ctx context.Context, orgID, id influxdb.ID) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "authorizeReadBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() p, err := newBucketPermission(influxdb.ReadAction, orgID, id) @@ -58,7 +57,7 @@ func authorizeWriteBucket(ctx context.Context, orgID, id influxdb.ID) error { // FindBucketByID checks to see if the authorizer on context has read access to the id provided. func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucketByID") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() b, err := s.s.FindBucketByID(ctx, id) @@ -75,7 +74,7 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*in // FindBucket retrieves the bucket and checks to see if the authorizer on context has read access to the bucket. func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() b, err := s.s.FindBucket(ctx, filter) @@ -92,7 +91,7 @@ func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFi // FindBuckets retrieves all buckets that match the provided filter and then filters the list down to only the resources that are authorized. func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBuckets") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // TODO: we'll likely want to push this operation into the database eventually since fetching the whole list of data @@ -123,7 +122,7 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketF // CreateBucket checks to see if the authorizer on context has write access to the global buckets resource. func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.CreateBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() p, err := influxdb.NewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, b.OrganizationID) diff --git a/bolt/bucket.go b/bolt/bucket.go index e09baf289b..64f984a38e 100644 --- a/bolt/bucket.go +++ b/bolt/bucket.go @@ -6,11 +6,11 @@ import ( "fmt" "time" - bolt "github.com/coreos/bbolt" - "github.com/opentracing/opentracing-go" + "github.com/coreos/bbolt" platform "github.com/influxdata/influxdb" platformcontext "github.com/influxdata/influxdb/context" + "github.com/influxdata/influxdb/kit/tracing" ) var ( @@ -32,7 +32,7 @@ func (c *Client) initializeBuckets(ctx context.Context, tx *bolt.Tx) error { } func (c *Client) setOrganizationOnBucket(ctx context.Context, tx *bolt.Tx, b *platform.Bucket) *platform.Error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.setOrganizationOnBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() o, err := c.findOrganizationByID(ctx, tx, b.OrganizationID) @@ -47,7 +47,7 @@ func (c *Client) setOrganizationOnBucket(ctx context.Context, tx *bolt.Tx, b *pl // FindBucketByID retrieves a bucket by id. func (c *Client) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.FindBucketByID") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var b *platform.Bucket @@ -72,7 +72,7 @@ func (c *Client) FindBucketByID(ctx context.Context, id platform.ID) (*platform. } func (c *Client) findBucketByID(ctx context.Context, tx *bolt.Tx, id platform.ID) (*platform.Bucket, *platform.Error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.findBucketByID") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var b platform.Bucket @@ -111,7 +111,7 @@ func (c *Client) findBucketByID(ctx context.Context, tx *bolt.Tx, id platform.ID // FindBucketByName returns a bucket by name for a particular organization. // TODO: have method for finding bucket using organization name and bucket name. func (c *Client) FindBucketByName(ctx context.Context, orgID platform.ID, n string) (*platform.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.FindBucketByName") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var b *platform.Bucket @@ -132,7 +132,7 @@ func (c *Client) FindBucketByName(ctx context.Context, orgID platform.ID, n stri } func (c *Client) findBucketByName(ctx context.Context, tx *bolt.Tx, orgID platform.ID, n string) (*platform.Bucket, *platform.Error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.findBucketByName") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() b := &platform.Bucket{ @@ -168,7 +168,7 @@ func (c *Client) findBucketByName(ctx context.Context, tx *bolt.Tx, orgID platfo // Filters using ID, or OrganizationID and bucket Name should be efficient. // Other filters will do a linear scan across buckets until it finds a match. func (c *Client) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.FindBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var b *platform.Bucket @@ -257,7 +257,7 @@ func filterBucketsFn(filter platform.BucketFilter) func(b *platform.Bucket) bool // Filters using ID, or OrganizationID and bucket Name should be efficient. // Other filters will do a linear scan across all buckets searching for a match. func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.FindBuckets") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() if filter.ID != nil { @@ -296,7 +296,7 @@ func (c *Client) FindBuckets(ctx context.Context, filter platform.BucketFilter, } func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.BucketFilter, opts ...platform.FindOptions) ([]*platform.Bucket, *platform.Error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.findBuckets") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() bs := []*platform.Bucket{} @@ -345,7 +345,7 @@ func (c *Client) findBuckets(ctx context.Context, tx *bolt.Tx, filter platform.B // CreateBucket creates a platform bucket and sets b.ID. func (c *Client) CreateBucket(ctx context.Context, b *platform.Bucket) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.CreateBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var err error diff --git a/bolt/keyvalue_log.go b/bolt/keyvalue_log.go index 819ffa3a4f..acd54f5c05 100644 --- a/bolt/keyvalue_log.go +++ b/bolt/keyvalue_log.go @@ -9,10 +9,10 @@ import ( "fmt" "time" - bolt "github.com/coreos/bbolt" - "github.com/opentracing/opentracing-go" + "github.com/coreos/bbolt" platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" ) var ( @@ -114,7 +114,7 @@ func (c *Client) initializeKeyValueLog(ctx context.Context, tx *bolt.Tx) error { var errKeyValueLogBoundsNotFound = fmt.Errorf("oplog not found") func (c *Client) getKeyValueLogBounds(ctx context.Context, tx *bolt.Tx, key []byte) (*keyValueLogBounds, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "Client.getKeyValueLogBounds") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() k := encodeKeyValueIndexKey(key) @@ -149,7 +149,7 @@ func (c *Client) putKeyValueLogBounds(ctx context.Context, tx *bolt.Tx, key []by } func (c *Client) updateKeyValueLogBounds(ctx context.Context, tx *bolt.Tx, k []byte, t time.Time) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.updateKeyValueLogBounds") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // retrieve the keyValue log boundaries diff --git a/bolt/kv.go b/bolt/kv.go index 907293a33f..68b00bdc6a 100644 --- a/bolt/kv.go +++ b/bolt/kv.go @@ -7,10 +7,10 @@ import ( "path/filepath" "time" - bolt "github.com/coreos/bbolt" - "github.com/opentracing/opentracing-go" + "github.com/coreos/bbolt" "go.uber.org/zap" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/kv" ) @@ -32,7 +32,7 @@ func NewKVStore(path string) *KVStore { // Open creates boltDB file it doesn't exists and opens it otherwise. func (s *KVStore) Open(ctx context.Context) error { - span, _ := opentracing.StartSpanFromContext(ctx, "KVStore.Open") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() // Ensure the required directory structure exists. @@ -102,7 +102,7 @@ func (s *KVStore) WithDB(db *bolt.DB) { // View opens up a view transaction against the store. func (s *KVStore) View(ctx context.Context, fn func(tx kv.Tx) error) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "KVStore.View") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() return s.db.View(func(tx *bolt.Tx) error { @@ -115,7 +115,7 @@ func (s *KVStore) View(ctx context.Context, fn func(tx kv.Tx) error) error { // Update opens up an update transaction against the store. func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "KVStore.Update") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() return s.db.Update(func(tx *bolt.Tx) error { diff --git a/bolt/organization.go b/bolt/organization.go index fa1087c3d3..1e2c1f1773 100644 --- a/bolt/organization.go +++ b/bolt/organization.go @@ -6,11 +6,10 @@ import ( "fmt" "time" - bolt "github.com/coreos/bbolt" - "github.com/opentracing/opentracing-go" - - influxdb "github.com/influxdata/influxdb" + "github.com/coreos/bbolt" + "github.com/influxdata/influxdb" influxdbcontext "github.com/influxdata/influxdb/context" + "github.com/influxdata/influxdb/kit/tracing" ) var ( @@ -54,7 +53,7 @@ func (c *Client) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*inf } func (c *Client) findOrganizationByID(ctx context.Context, tx *bolt.Tx, id influxdb.ID) (*influxdb.Organization, *influxdb.Error) { - span, _ := opentracing.StartSpanFromContext(ctx, "Client.findOrganizationByID") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() encodedID, err := id.Encode() @@ -100,7 +99,7 @@ func (c *Client) FindOrganizationByName(ctx context.Context, n string) (*influxd } func (c *Client) findOrganizationByName(ctx context.Context, tx *bolt.Tx, n string) (*influxdb.Organization, *influxdb.Error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.findOrganizationByName") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() o := tx.Bucket(organizationIndex).Get(organizationIndexKey(n)) diff --git a/chronograf/influx/databases.go b/chronograf/influx/databases.go index 43c17ff37f..0630096f1a 100644 --- a/chronograf/influx/databases.go +++ b/chronograf/influx/databases.go @@ -6,14 +6,13 @@ import ( "encoding/json" "fmt" - "github.com/opentracing/opentracing-go" - "github.com/influxdata/influxdb/chronograf" + "github.com/influxdata/influxdb/kit/tracing" ) // AllDB returns all databases from within Influx func (c *Client) AllDB(ctx context.Context) ([]chronograf.Database, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.AllDB") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() return c.showDatabases(ctx) @@ -21,7 +20,7 @@ func (c *Client) AllDB(ctx context.Context) ([]chronograf.Database, error) { // CreateDB creates a database within Influx func (c *Client) CreateDB(ctx context.Context, db *chronograf.Database) (*chronograf.Database, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.CreateDB") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() _, err := c.Query(ctx, chronograf.Query{ @@ -38,7 +37,7 @@ func (c *Client) CreateDB(ctx context.Context, db *chronograf.Database) (*chrono // DropDB drops a database within Influx func (c *Client) DropDB(ctx context.Context, db string) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.DropDB") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() _, err := c.Query(ctx, chronograf.Query{ @@ -53,14 +52,14 @@ func (c *Client) DropDB(ctx context.Context, db string) error { // AllRP returns all the retention policies for a specific database func (c *Client) AllRP(ctx context.Context, db string) ([]chronograf.RetentionPolicy, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.AllRP") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() return c.showRetentionPolicies(ctx, db) } func (c *Client) getRP(ctx context.Context, db, rp string) (chronograf.RetentionPolicy, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.getRP") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() rs, err := c.AllRP(ctx, db) @@ -78,7 +77,7 @@ func (c *Client) getRP(ctx context.Context, db, rp string) (chronograf.Retention // CreateRP creates a retention policy for a specific database func (c *Client) CreateRP(ctx context.Context, db string, rp *chronograf.RetentionPolicy) (*chronograf.RetentionPolicy, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.CreateRP") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() query := fmt.Sprintf(`CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %d`, rp.Name, db, rp.Duration, rp.Replication) @@ -108,7 +107,7 @@ func (c *Client) CreateRP(ctx context.Context, db string, rp *chronograf.Retenti // UpdateRP updates a specific retention policy for a specific database func (c *Client) UpdateRP(ctx context.Context, db string, rp string, upd *chronograf.RetentionPolicy) (*chronograf.RetentionPolicy, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.UpdateRP") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var buffer bytes.Buffer @@ -163,7 +162,7 @@ func (c *Client) UpdateRP(ctx context.Context, db string, rp string, upd *chrono // DropRP removes a specific retention policy for a specific database func (c *Client) DropRP(ctx context.Context, db string, rp string) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.DropRP") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() _, err := c.Query(ctx, chronograf.Query{ @@ -181,14 +180,14 @@ func (c *Client) DropRP(ctx context.Context, db string, rp string) error { // optional limit and offset. If no limit or offset is provided, it defaults to // a limit of 100 measurements with no offset. func (c *Client) GetMeasurements(ctx context.Context, db string, limit, offset int) ([]chronograf.Measurement, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.GetMeasurements") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() return c.showMeasurements(ctx, db, limit, offset) } func (c *Client) showDatabases(ctx context.Context) ([]chronograf.Database, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.showDatabases") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() res, err := c.Query(ctx, chronograf.Query{ @@ -211,7 +210,7 @@ func (c *Client) showDatabases(ctx context.Context) ([]chronograf.Database, erro } func (c *Client) showRetentionPolicies(ctx context.Context, db string) ([]chronograf.RetentionPolicy, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.showRetentionPolicies") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() retentionPolicies, err := c.Query(ctx, chronograf.Query{ @@ -236,7 +235,7 @@ func (c *Client) showRetentionPolicies(ctx context.Context, db string) ([]chrono } func (c *Client) showMeasurements(ctx context.Context, db string, limit, offset int) ([]chronograf.Measurement, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.showMeasurements") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() show := fmt.Sprintf(`SHOW MEASUREMENTS ON "%s"`, db) diff --git a/chronograf/influx/influx.go b/chronograf/influx/influx.go index 8608b7872c..88cbba9cef 100644 --- a/chronograf/influx/influx.go +++ b/chronograf/influx/influx.go @@ -11,8 +11,6 @@ import ( "net/url" "strings" - "github.com/opentracing/opentracing-go" - "github.com/influxdata/influxdb/chronograf" "github.com/influxdata/influxdb/kit/tracing" ) @@ -50,7 +48,7 @@ func (r Response) MarshalJSON() ([]byte, error) { } func (c *Client) query(ctx context.Context, u *url.URL, q chronograf.Query) (chronograf.Response, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "Client.query") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u.Path = "query" @@ -141,7 +139,7 @@ type result struct { // include both the database and retention policy. In-flight requests can be // cancelled using the provided context. func (c *Client) Query(ctx context.Context, q chronograf.Query) (chronograf.Response, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.Query") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() resps := make(chan (result)) @@ -160,7 +158,7 @@ func (c *Client) Query(ctx context.Context, q chronograf.Query) (chronograf.Resp // Connect caches the URL and optional Bearer Authorization for the data source func (c *Client) Connect(ctx context.Context, src *chronograf.Source) error { - span, _ := opentracing.StartSpanFromContext(ctx, "Client.Connect") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := url.Parse(src.URL) @@ -206,7 +204,7 @@ func (c *Client) Type(ctx context.Context) (string, error) { } func (c *Client) pingTimeout(ctx context.Context) (string, string, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.pingTimeout") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() resps := make(chan (pingResult)) @@ -230,7 +228,7 @@ type pingResult struct { } func (c *Client) ping(ctx context.Context, u *url.URL) (string, string, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "Client.ping") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u.Path = "ping" @@ -280,7 +278,7 @@ func (c *Client) ping(ctx context.Context, u *url.URL) (string, string, error) { // Write POSTs line protocol to a database and retention policy func (c *Client) Write(ctx context.Context, points []chronograf.Point) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.Write") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() for _, point := range points { @@ -292,7 +290,7 @@ func (c *Client) Write(ctx context.Context, points []chronograf.Point) error { } func (c *Client) writePoint(ctx context.Context, point *chronograf.Point) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.writePoint") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() lp, err := toLineProtocol(point) @@ -327,7 +325,7 @@ func (c *Client) writePoint(ctx context.Context, point *chronograf.Point) error } func (c *Client) write(ctx context.Context, u *url.URL, db, rp, lp string) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Client.write") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() u.Path = "write" diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 4ac8fdb3db..dab7bb6af1 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -30,6 +30,7 @@ import ( "github.com/influxdata/influxdb/internal/fs" "github.com/influxdata/influxdb/kit/cli" "github.com/influxdata/influxdb/kit/prom" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/kv" influxlogger "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/nats" @@ -282,7 +283,7 @@ func (m *Launcher) Run(ctx context.Context, args ...string) error { } func (m *Launcher) run(ctx context.Context) (err error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Launcher.run") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() m.running = true diff --git a/http/bucket_service.go b/http/bucket_service.go index 614ec3084f..a9c88aa507 100644 --- a/http/bucket_service.go +++ b/http/bucket_service.go @@ -10,7 +10,6 @@ import ( "time" "github.com/julienschmidt/httprouter" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "github.com/influxdata/influxdb" @@ -595,7 +594,7 @@ type BucketService struct { // FindBucketByID returns a single bucket by ID. func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucketByID") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(s.Addr, bucketIDPath(id)) @@ -630,7 +629,7 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*in // FindBucket returns the first bucket that matches filter. func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() bs, n, err := s.FindBuckets(ctx, filter) @@ -652,7 +651,7 @@ func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFi // FindBuckets returns a list of buckets that match filter and the total count of matching buckets. // Additional options provide pagination & sorting. func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "BucketService.FindBuckets") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(s.Addr, bucketPath) @@ -722,7 +721,7 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketF // CreateBucket creates a new bucket and sets b.ID with the new identifier. func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error { - span, _ := opentracing.StartSpanFromContext(ctx, "BucketService.CreateBucket") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(s.Addr, bucketPath) diff --git a/http/influxdb/bucket.go b/http/influxdb/bucket.go index a5888bb481..620878a905 100644 --- a/http/influxdb/bucket.go +++ b/http/influxdb/bucket.go @@ -5,9 +5,8 @@ import ( "fmt" "time" - "github.com/opentracing/opentracing-go" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" ) // BucketService connects to Influx via HTTP using tokens to manage buckets @@ -24,7 +23,7 @@ func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFi } func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBuckets") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() c, err := newClient(s.Source) diff --git a/http/influxdb/source_proxy_query_service.go b/http/influxdb/source_proxy_query_service.go index 5c647698f3..6fbd79c783 100644 --- a/http/influxdb/source_proxy_query_service.go +++ b/http/influxdb/source_proxy_query_service.go @@ -11,7 +11,6 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/csv" "github.com/influxdata/flux/lang" - "github.com/opentracing/opentracing-go" platform "github.com/influxdata/influxdb" platformhttp "github.com/influxdata/influxdb/http" @@ -40,7 +39,7 @@ func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *q } func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "SourceProxyQueryService.fluxQuery") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() request := struct { Spec *flux.Spec `json:"spec"` @@ -111,7 +110,7 @@ func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, re } func (s *SourceProxyQueryService) influxQuery(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "SourceProxyQueryService.influxQuery") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() if len(s.URL) == 0 { return flux.Statistics{}, tracing.LogError(span, fmt.Errorf("URL from source cannot be empty if the compiler type is influxql")) diff --git a/http/org_service.go b/http/org_service.go index 39b6203910..4fc447d596 100644 --- a/http/org_service.go +++ b/http/org_service.go @@ -9,7 +9,6 @@ import ( "path" "github.com/julienschmidt/httprouter" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "github.com/influxdata/influxdb" @@ -616,7 +615,7 @@ func (s *OrganizationService) FindOrganization(ctx context.Context, filter influ // FindOrganizations returns all organizations that match the filter via HTTP. func (s *OrganizationService) FindOrganizations(ctx context.Context, filter influxdb.OrganizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Organization, int, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "OrganizationService.FindOrganizations") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() if filter.Name != nil { @@ -689,7 +688,7 @@ func (s *OrganizationService) CreateOrganization(ctx context.Context, o *influxd } } - span, _ := opentracing.StartSpanFromContext(ctx, "OrganizationService.CreateOrganization") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() if o.Name != "" { @@ -740,7 +739,7 @@ func (s *OrganizationService) CreateOrganization(ctx context.Context, o *influxd // UpdateOrganization updates the organization over HTTP. func (s *OrganizationService) UpdateOrganization(ctx context.Context, id influxdb.ID, upd influxdb.OrganizationUpdate) (*influxdb.Organization, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "OrganizationService.UpdateOrganization") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() span.LogKV("org-id", id) @@ -787,7 +786,7 @@ func (s *OrganizationService) UpdateOrganization(ctx context.Context, id influxd // DeleteOrganization removes organization id over HTTP. func (s *OrganizationService) DeleteOrganization(ctx context.Context, id influxdb.ID) error { - span, _ := opentracing.StartSpanFromContext(ctx, "OrganizationService.DeleteOrganization") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(s.Addr, organizationIDPath(id)) diff --git a/http/proxy_query_service.go b/http/proxy_query_service.go index 3ec350414a..a77e1738cc 100644 --- a/http/proxy_query_service.go +++ b/http/proxy_query_service.go @@ -14,7 +14,6 @@ import ( "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/query" "github.com/julienschmidt/httprouter" - "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -142,7 +141,7 @@ func (s *ProxyQueryService) Ping(ctx context.Context) error { } func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "ProxyQueryService.Query") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(s.Addr, proxyQueryPath) if err != nil { diff --git a/http/query_handler.go b/http/query_handler.go index b1d83c4cb0..d5cca810b3 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -18,7 +18,6 @@ import ( "github.com/influxdata/flux/iocounter" "github.com/influxdata/flux/parser" "github.com/julienschmidt/httprouter" - "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -330,7 +329,7 @@ type FluxService struct { // Query runs a flux query against a influx server and sends the results to the io.Writer. // Will use the token from the context over the token within the service struct. func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequest) (flux.Statistics, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "FluxService.Query") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(s.Addr, fluxPath) if err != nil { @@ -386,7 +385,7 @@ type FluxQueryService struct { // Query runs a flux query against a influx server and decodes the result func (s *FluxQueryService) Query(ctx context.Context, r *query.Request) (flux.ResultIterator, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "FluxQueryService.Query") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(s.Addr, fluxPath) diff --git a/http/source_proxy_service.go b/http/source_proxy_service.go index 1aa042eace..0b1d20b18a 100644 --- a/http/source_proxy_service.go +++ b/http/source_proxy_service.go @@ -12,7 +12,6 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/lang" - "github.com/opentracing/opentracing-go" platform "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kit/tracing" @@ -37,7 +36,7 @@ func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *q } func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "SourceProxyQueryService.queryFlux") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(s.Addr, "/api/v2/query") if err != nil { @@ -75,7 +74,7 @@ func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, re } func (s *SourceProxyQueryService) queryInfluxQL(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "SourceProxyQueryService.queryInfluxQL") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() compiler, ok := req.Request.Compiler.(*influxql.Compiler) diff --git a/http/task_service.go b/http/task_service.go index c181ed556c..87c71127f4 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -15,7 +15,6 @@ import ( "github.com/influxdata/flux" "github.com/julienschmidt/httprouter" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" platform "github.com/influxdata/influxdb" @@ -1366,7 +1365,7 @@ type TaskService struct { // FindTaskByID returns a single task func (t TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.FindTaskByID") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(t.Addr, taskIDPath(id)) @@ -1409,7 +1408,7 @@ func (t TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platfor // FindTasks returns a list of tasks that match a filter (limit 100) and the total count // of matching tasks. func (t TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.FindTasks") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(t.Addr, tasksPath) @@ -1468,7 +1467,7 @@ func (t TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter) // CreateTask creates a new task. func (t TaskService) CreateTask(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.CreateTask") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(t.Addr, tasksPath) @@ -1511,7 +1510,7 @@ func (t TaskService) CreateTask(ctx context.Context, tc platform.TaskCreate) (*p // UpdateTask updates a single task with changeset. func (t TaskService) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.UpdateTask") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(t.Addr, taskIDPath(id)) @@ -1555,7 +1554,7 @@ func (t TaskService) UpdateTask(ctx context.Context, id platform.ID, upd platfor // DeleteTask removes a task by ID and purges all associated data and scheduled runs. func (t TaskService) DeleteTask(ctx context.Context, id platform.ID) error { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.DeleteTask") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(t.Addr, taskIDPath(id)) @@ -1585,7 +1584,7 @@ func (t TaskService) DeleteTask(ctx context.Context, id platform.ID) error { // FindLogs returns logs for a run. func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.FindLogs") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() if !filter.Task.Valid() { @@ -1633,7 +1632,7 @@ func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([ // FindRuns returns a list of runs that match a filter and the total count of returned runs. func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.FindRuns") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() if !filter.Task.Valid() { @@ -1689,7 +1688,7 @@ func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([ // FindRunByID returns a single run of a specific task. func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.FindRunByID") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(t.Addr, taskIDRunIDPath(taskID, runID)) @@ -1732,7 +1731,7 @@ func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID) // RetryRun creates and returns a new run (which is a retry of another run). func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.RetryRun") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() p := path.Join(taskIDRunIDPath(taskID, runID), "retry") @@ -1780,7 +1779,7 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (* } func (t TaskService) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.ForceRun") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(t.Addr, taskIDRunsPath(taskID)) @@ -1832,7 +1831,7 @@ func cancelPath(taskID, runID platform.ID) string { } func (t TaskService) CancelRun(ctx context.Context, taskID, runID platform.ID) error { - span, _ := opentracing.StartSpanFromContext(ctx, "TaskService.CancelRun") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() u, err := newURL(t.Addr, cancelPath(taskID, runID)) diff --git a/http/write_handler.go b/http/write_handler.go index bd2abedcff..341c93509e 100644 --- a/http/write_handler.go +++ b/http/write_handler.go @@ -9,13 +9,12 @@ import ( "net/http" "time" - "github.com/influxdata/influxdb/kit/tracing" - "github.com/julienschmidt/httprouter" "go.uber.org/zap" platform "github.com/influxdata/influxdb" pcontext "github.com/influxdata/influxdb/context" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/storage" "github.com/influxdata/influxdb/tsdb" diff --git a/kit/tracing/tracing.go b/kit/tracing/tracing.go index dc0d6837b7..5e5eb1cbfb 100644 --- a/kit/tracing/tracing.go +++ b/kit/tracing/tracing.go @@ -60,9 +60,11 @@ func ExtractFromHTTPRequest(req *http.Request, handlerName string) (opentracing. // If this performance penalty is too much, try these, which are also demonstrated in benchmark tests: // // Create a root span // span := opentracing.StartSpan("operation name") +// ctx := opentracing.ContextWithSpan(context.Background(), span) // // // Create a child span // span := opentracing.StartSpan("operation name", opentracing.ChildOf(sc)) +// ctx := opentracing.ContextWithSpan(context.Background(), span) // // // Sugar to create a child span // span, ctx := opentracing.StartSpanFromContext(ctx, "operation name") diff --git a/kit/tracing/tracing_test.go b/kit/tracing/tracing_test.go index 27657fd716..c37d627166 100644 --- a/kit/tracing/tracing_test.go +++ b/kit/tracing/tracing_test.go @@ -3,12 +3,12 @@ package tracing import ( "context" "fmt" - "github.com/uber/jaeger-client-go" "net/http" "runtime" "testing" "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go" ) func TestInjectAndExtractHTTPRequest(t *testing.T) { @@ -162,5 +162,3 @@ func BenchmarkOpentracing_StartSpan_child(b *testing.B) { _ = opentracing.StartSpan("operation name", opentracing.ChildOf(parentSpan.Context())) } } - -// TODO test nil and other context problems with StartSpanFromContext. \ No newline at end of file diff --git a/kv/bucket.go b/kv/bucket.go index d5bf000427..8e66200833 100644 --- a/kv/bucket.go +++ b/kv/bucket.go @@ -6,10 +6,9 @@ import ( "fmt" "time" - "github.com/opentracing/opentracing-go" - "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" + "github.com/influxdata/influxdb/kit/tracing" ) var ( @@ -49,7 +48,7 @@ func (s *Service) bucketsIndexBucket(tx Tx) (Bucket, error) { } func (s *Service) setOrganizationOnBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.setOrganizationOnBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() o, err := s.findOrganizationByID(ctx, tx, b.OrganizationID) @@ -64,7 +63,7 @@ func (s *Service) setOrganizationOnBucket(ctx context.Context, tx Tx, b *influxd // FindBucketByID retrieves a bucket by id. func (s *Service) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.FindBucketByID") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var b *influxdb.Bucket @@ -88,7 +87,7 @@ func (s *Service) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb } func (s *Service) findBucketByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.findBucketByID") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var b influxdb.Bucket @@ -136,7 +135,7 @@ func (s *Service) findBucketByID(ctx context.Context, tx Tx, id influxdb.ID) (*i // FindBucketByName returns a bucket by name for a particular organization. // TODO: have method for finding bucket using organization name and bucket name. func (s *Service) FindBucketByName(ctx context.Context, orgID influxdb.ID, n string) (*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.FindBucketByName") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var b *influxdb.Bucket @@ -156,7 +155,7 @@ func (s *Service) FindBucketByName(ctx context.Context, orgID influxdb.ID, n str } func (s *Service) findBucketByName(ctx context.Context, tx Tx, orgID influxdb.ID, n string) (*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.findBucketByName") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() b := &influxdb.Bucket{ @@ -201,7 +200,7 @@ func (s *Service) findBucketByName(ctx context.Context, tx Tx, orgID influxdb.ID // Filters using ID, or OrganizationID and bucket Name should be efficient. // Other filters will do a linear scan across buckets until it finds a match. func (s *Service) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.FindBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var b *influxdb.Bucket @@ -288,7 +287,7 @@ func filterBucketsFn(filter influxdb.BucketFilter) func(b *influxdb.Bucket) bool // Filters using ID, or OrganizationID and bucket Name should be efficient. // Other filters will do a linear scan across all buckets searching for a match. func (s *Service) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opts ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.FindBuckets") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() if filter.ID != nil { @@ -327,7 +326,7 @@ func (s *Service) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, } func (s *Service) findBuckets(ctx context.Context, tx Tx, filter influxdb.BucketFilter, opts ...influxdb.FindOptions) ([]*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.findBuckets") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() bs := []*influxdb.Bucket{} @@ -376,7 +375,7 @@ func (s *Service) findBuckets(ctx context.Context, tx Tx, filter influxdb.Bucket // CreateBucket creates a influxdb bucket and sets b.ID. func (s *Service) CreateBucket(ctx context.Context, b *influxdb.Bucket) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.CreateBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() return s.kv.Update(ctx, func(tx Tx) error { @@ -386,7 +385,7 @@ func (s *Service) CreateBucket(ctx context.Context, b *influxdb.Bucket) error { func (s *Service) createBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error { if b.OrganizationID.Valid() { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.createBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() _, pe := s.findOrganizationByID(ctx, tx, b.OrganizationID) @@ -442,7 +441,7 @@ func (s *Service) PutBucket(ctx context.Context, b *influxdb.Bucket) error { } func (s *Service) createBucketUserResourceMappings(ctx context.Context, tx Tx, b *influxdb.Bucket) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.createBucketUserResourceMappings") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() ms, err := s.findUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{ @@ -472,7 +471,7 @@ func (s *Service) createBucketUserResourceMappings(ctx context.Context, tx Tx, b } func (s *Service) putBucket(ctx context.Context, tx Tx, b *influxdb.Bucket) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.putBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() b.Organization = "" @@ -531,7 +530,7 @@ func bucketIndexKey(b *influxdb.Bucket) ([]byte, error) { // forEachBucket will iterate through all buckets while fn returns true. func (s *Service) forEachBucket(ctx context.Context, tx Tx, descending bool, fn func(*influxdb.Bucket) bool) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.forEachBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() bkt, err := s.bucketsBucket(tx) @@ -574,7 +573,7 @@ func (s *Service) forEachBucket(ctx context.Context, tx Tx, descending bool, fn } func (s *Service) uniqueBucketName(ctx context.Context, tx Tx, b *influxdb.Bucket) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.uniqueBucketName") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() key, err := bucketIndexKey(b) @@ -593,7 +592,7 @@ func (s *Service) uniqueBucketName(ctx context.Context, tx Tx, b *influxdb.Bucke // UpdateBucket updates a bucket according the parameters set on upd. func (s *Service) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.UpdateBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var b *influxdb.Bucket @@ -610,7 +609,7 @@ func (s *Service) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb } func (s *Service) updateBucket(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.updateBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() b, err := s.findBucketByID(ctx, tx, id) diff --git a/kv/org.go b/kv/org.go index 673bb2b941..422611c878 100644 --- a/kv/org.go +++ b/kv/org.go @@ -6,11 +6,11 @@ import ( "fmt" "time" - "github.com/opentracing/opentracing-go" "go.uber.org/zap" "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" + "github.com/influxdata/influxdb/kit/tracing" ) var ( @@ -53,7 +53,7 @@ func (s *Service) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*in } func (s *Service) findOrganizationByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Organization, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "Service.findOrganizationByID") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() encodedID, err := id.Encode() @@ -93,7 +93,7 @@ func (s *Service) findOrganizationByID(ctx context.Context, tx Tx, id influxdb.I // FindOrganizationByName returns a organization by name for a particular organization. func (s *Service) FindOrganizationByName(ctx context.Context, n string) (*influxdb.Organization, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.FindOrganizationByName") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var o *influxdb.Organization @@ -111,7 +111,7 @@ func (s *Service) FindOrganizationByName(ctx context.Context, n string) (*influx } func (s *Service) findOrganizationByName(ctx context.Context, tx Tx, n string) (*influxdb.Organization, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.findOrganizationByName") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() b, err := tx.Bucket(organizationIndex) diff --git a/kv/urm.go b/kv/urm.go index ea673281e5..32de3a4252 100644 --- a/kv/urm.go +++ b/kv/urm.go @@ -5,10 +5,9 @@ import ( "encoding/json" "fmt" - "github.com/opentracing/opentracing-go" - "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" + "github.com/influxdata/influxdb/kit/tracing" ) var ( @@ -132,7 +131,7 @@ func (s *Service) CreateUserResourceMapping(ctx context.Context, m *influxdb.Use } func (s *Service) createUserResourceMapping(ctx context.Context, tx Tx, m *influxdb.UserResourceMapping) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.createUserResourceMapping") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() if err := s.uniqueUserResourceMapping(ctx, tx, m); err != nil { @@ -167,7 +166,7 @@ func (s *Service) createUserResourceMapping(ctx context.Context, tx Tx, m *influ // This method creates the user/resource mappings for resources that belong to an organization. func (s *Service) createOrgDependentMappings(ctx context.Context, tx Tx, m *influxdb.UserResourceMapping) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.createOrgDependentMappings") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() bf := influxdb.BucketFilter{OrganizationID: &m.ResourceID} diff --git a/query/bridges.go b/query/bridges.go index b555e191f2..ef2df5a93b 100644 --- a/query/bridges.go +++ b/query/bridges.go @@ -8,7 +8,6 @@ import ( "github.com/influxdata/flux/csv" platform "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kit/tracing" - "github.com/opentracing/opentracing-go" ) // QueryServiceBridge implements the QueryService interface while consuming the AsyncQueryService interface. @@ -74,7 +73,7 @@ type ProxyQueryServiceAsyncBridge struct { } func (b ProxyQueryServiceAsyncBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (flux.Statistics, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "ProxyQueryServiceBridge.Query") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() q, err := b.AsyncQueryService.Query(ctx, &req.Request) diff --git a/query/control/controller.go b/query/control/controller.go index 12dd31d296..893b3bf5ae 100644 --- a/query/control/controller.go +++ b/query/control/controller.go @@ -6,10 +6,10 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/control" - "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/query" ) @@ -30,7 +30,7 @@ func New(config control.Config) *Controller { // Query satisfies the AsyncQueryService while ensuring the request is propagated on the context. func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Controller.Query") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Set the request on the context so platform specific Flux operations can retrieve it later. diff --git a/query/influxql/service.go b/query/influxql/service.go index 85e5c54617..653a05f943 100644 --- a/query/influxql/service.go +++ b/query/influxql/service.go @@ -9,7 +9,6 @@ import ( "net/url" "github.com/influxdata/flux" - "github.com/opentracing/opentracing-go" "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/query" @@ -32,7 +31,7 @@ type Service struct { // Query will execute a query for the influxql.Compiler type against an influxdb 1.x endpoint, // and return results using the default decoder. func (s *Service) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.Query") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() resp, err := s.query(ctx, req) @@ -53,7 +52,7 @@ func (s *Service) Query(ctx context.Context, req *query.Request) (flux.ResultIte // QueryRawJSON will execute a query for the influxql.Compiler type against an influxdb 1.x endpoint, // and return the body of the response as a byte array. func (s *Service) QueryRawJSON(ctx context.Context, req *query.Request) ([]byte, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.Query") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() resp, err := s.query(ctx, req) @@ -70,7 +69,7 @@ func (s *Service) QueryRawJSON(ctx context.Context, req *query.Request) ([]byte, } func (s *Service) query(ctx context.Context, req *query.Request) (*http.Response, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Service.query") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Verify that this is an influxql query in the compiler. diff --git a/query/logging.go b/query/logging.go index 049f80f7bf..e3b86ea38a 100644 --- a/query/logging.go +++ b/query/logging.go @@ -8,7 +8,6 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/influxdb/kit/tracing" - "github.com/opentracing/opentracing-go" ) // LoggingServiceBridge implements ProxyQueryService and logs the queries while consuming a QueryService interface. @@ -19,7 +18,7 @@ type LoggingServiceBridge struct { // Query executes and logs the query. func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (stats flux.Statistics, err error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "LoggingServiceBridge.Query") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() var n int64 @@ -59,7 +58,7 @@ func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *Prox } // The results iterator may have had an error independent of encoding errors. if err = results.Err(); err != nil { - return n, tracing.LogError(span, err) + return stats, tracing.LogError(span, err) } return stats, nil } diff --git a/storage/bucket_service.go b/storage/bucket_service.go index 34de4259de..a7a6b75726 100644 --- a/storage/bucket_service.go +++ b/storage/bucket_service.go @@ -4,9 +4,8 @@ import ( "context" "errors" - "github.com/opentracing/opentracing-go" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" ) // BucketDeleter defines the behaviour of deleting a bucket. @@ -35,7 +34,7 @@ func NewBucketService(s platform.BucketService, engine BucketDeleter) *BucketSer // FindBucketByID returns a single bucket by ID. func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucketByID") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() if s.inner == nil || s.engine == nil { @@ -46,7 +45,7 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*pl // FindBucket returns the first bucket that matches filter. func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() if s.inner == nil || s.engine == nil { @@ -58,7 +57,7 @@ func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFi // FindBuckets returns a list of buckets that match filter and the total count of matching buckets. // Additional options provide pagination & sorting. func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.FindBuckets") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() if s.inner == nil || s.engine == nil { @@ -69,7 +68,7 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketF // CreateBucket creates a new bucket and sets b.ID with the new identifier. func (s *BucketService) CreateBucket(ctx context.Context, b *platform.Bucket) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.CreateBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() if s.inner == nil || s.engine == nil { @@ -81,7 +80,7 @@ func (s *BucketService) CreateBucket(ctx context.Context, b *platform.Bucket) er // UpdateBucket updates a single bucket with changeset. // Returns the new bucket state after update. func (s *BucketService) UpdateBucket(ctx context.Context, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.UpdateBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() if s.inner == nil || s.engine == nil { @@ -92,7 +91,7 @@ func (s *BucketService) UpdateBucket(ctx context.Context, id platform.ID, upd pl // DeleteBucket removes a bucket by ID. func (s *BucketService) DeleteBucket(ctx context.Context, bucketID platform.ID) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "BucketService.DeleteBucket") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() bucket, err := s.FindBucketByID(ctx, bucketID) diff --git a/storage/engine.go b/storage/engine.go index 97179e9e8e..620a9e2474 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -10,6 +10,7 @@ import ( "time" platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/storage/wal" @@ -18,7 +19,6 @@ import ( "github.com/influxdata/influxdb/tsdb/tsm1" "github.com/influxdata/influxdb/tsdb/value" "github.com/influxdata/influxql" - "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -188,7 +188,7 @@ func (e *Engine) Open(ctx context.Context) (err error) { return nil // Already open } - span, ctx := opentracing.StartSpanFromContext(ctx, "Engine.Open") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Open the services in order and clean up if any fail. @@ -360,7 +360,7 @@ func (e *Engine) CreateCursorIterator(ctx context.Context) (tsdb.CursorIterator, // // Appropriate errors are returned in those cases. func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Engine.WritePoints") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() collection, j := tsdb.NewSeriesCollection(points), 0 @@ -445,7 +445,7 @@ func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error { // writePointsLocked does the work of writing points and must be called under some sort of lock. func (e *Engine) writePointsLocked(ctx context.Context, collection *tsdb.SeriesCollection, values map[string][]value.Value) error { - span, _ := opentracing.StartSpanFromContext(ctx, "Engine.writePointsLocked") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() // TODO(jeff): keep track of the values in the collection so that partial write @@ -478,7 +478,7 @@ func (e *Engine) writePointsLocked(ctx context.Context, collection *tsdb.SeriesC // AcquireSegments closes the current WAL segment, gets the set of all the currently closed // segments, and calls the callback. It does all of this under the lock on the engine. func (e *Engine) AcquireSegments(ctx context.Context, fn func(segs []string) error) error { - span, _ := opentracing.StartSpanFromContext(ctx, "Engine.AcquireSegments") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() e.mu.Lock() diff --git a/storage/wal/wal.go b/storage/wal/wal.go index 24a7beae59..f93ba1b3fd 100644 --- a/storage/wal/wal.go +++ b/storage/wal/wal.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/pkg/pool" "github.com/influxdata/influxdb/tsdb/value" @@ -171,7 +172,7 @@ func (l *WAL) Open(ctx context.Context) error { return nil } - span, _ := opentracing.StartSpanFromContext(ctx, "WAL.Open") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() span.LogKV("segment_size", l.SegmentSize, @@ -310,7 +311,7 @@ func (l *WAL) sync() { // which the points were written. If an error is returned the segment ID should // be ignored. If the WAL is disabled, -1 and nil is returned. func (l *WAL) WriteMulti(ctx context.Context, values map[string][]value.Value) (int, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "WAL.WriteMulti") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() if !l.enabled { @@ -374,7 +375,7 @@ func (l *WAL) Remove(ctx context.Context, files []string) error { return nil } - span, _ := opentracing.StartSpanFromContext(ctx, "WAL.Remove") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() l.mu.Lock() diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index 14f97a1b33..3f365f2219 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -16,6 +16,7 @@ import ( "go.uber.org/zap" platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" ) var ( @@ -639,7 +640,7 @@ func (r *runner) clearRunning(id platform.ID) { func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *zap.Logger) { defer r.wg.Done() - sp, spCtx := opentracing.StartSpanFromContext(ctx, "task.run.execution") + sp, spCtx := tracing.StartSpanFromContext(ctx) defer sp.Finish() rp, err := r.executor.Execute(spCtx, qr) diff --git a/task/platform_adapter.go b/task/platform_adapter.go index 7c94690c1d..3afa419b87 100644 --- a/task/platform_adapter.go +++ b/task/platform_adapter.go @@ -6,10 +6,9 @@ import ( "fmt" "time" - "github.com/opentracing/opentracing-go" - platform "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb/task/options" ) @@ -38,7 +37,7 @@ type pAdapter struct { var _ platform.TaskService = pAdapter{} func (p pAdapter) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.FindTaskByID") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() t, m, err := p.s.FindTaskByIDWithMeta(ctx, id) @@ -54,7 +53,7 @@ func (p pAdapter) FindTaskByID(ctx context.Context, id platform.ID) (*platform.T } func (p pAdapter) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.FindTasks") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() params := backend.TaskSearchParams{PageSize: filter.Limit} @@ -126,7 +125,7 @@ func (p pAdapter) FindTasks(ctx context.Context, filter platform.TaskFilter) ([] } func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.CreateTask") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() auth, err := icontext.GetAuthorizer(ctx) @@ -205,7 +204,7 @@ func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platf } func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.CreateTask") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() err := upd.Validate() @@ -239,7 +238,7 @@ func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.T } func (p pAdapter) DeleteTask(ctx context.Context, id platform.ID) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.DeleteTask") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() _, err := p.s.DeleteTask(ctx, id) @@ -267,7 +266,7 @@ func (p pAdapter) DeleteTask(ctx context.Context, id platform.ID) error { } func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.FindLogs") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() task, err := p.s.FindTaskByID(ctx, filter.Task) @@ -284,7 +283,7 @@ func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*p } func (p pAdapter) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.FindRuns") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() task, err := p.s.FindTaskByID(ctx, filter.Task) @@ -297,7 +296,7 @@ func (p pAdapter) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*p } func (p pAdapter) FindRunByID(ctx context.Context, taskID, id platform.ID) (*platform.Run, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.FindRunByID") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() task, err := p.s.FindTaskByID(ctx, taskID) @@ -308,7 +307,7 @@ func (p pAdapter) FindRunByID(ctx context.Context, taskID, id platform.ID) (*pla } func (p pAdapter) RetryRun(ctx context.Context, taskID, id platform.ID) (*platform.Run, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.RetryRun") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() task, err := p.s.FindTaskByID(ctx, taskID) @@ -344,7 +343,7 @@ func (p pAdapter) RetryRun(ctx context.Context, taskID, id platform.ID) (*platfo } func (p pAdapter) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.ForceRun") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() requestedAt := time.Now() @@ -362,7 +361,7 @@ func (p pAdapter) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor } func (p pAdapter) CancelRun(ctx context.Context, taskID, runID platform.ID) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "pAdapter.CancelRun") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() return p.rc.CancelRun(ctx, taskID, runID) diff --git a/task/validator.go b/task/validator.go index 7359294f3d..3513d4b9d5 100644 --- a/task/validator.go +++ b/task/validator.go @@ -7,10 +7,10 @@ import ( "time" "github.com/influxdata/flux" - "github.com/opentracing/opentracing-go" platform "github.com/influxdata/influxdb" platcontext "github.com/influxdata/influxdb/context" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/query" ) @@ -39,7 +39,7 @@ func NewValidator(ts platform.TaskService, bs platform.BucketService) platform.T } func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.FindTaskByID") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Unauthenticated task lookup, to identify the task's organization. @@ -61,7 +61,7 @@ func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID } func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.FindTasks") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // First, get the tasks in the organization, without authentication. @@ -90,7 +90,7 @@ func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter platform.T } func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.CreateTask") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() p, err := platform.NewPermission(platform.WriteAction, platform.TasksResourceType, t.OrganizationID) @@ -110,7 +110,7 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskC } func (ts *taskServiceValidator) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.UpdateTask") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Unauthenticated task lookup, to identify the task's organization. @@ -136,7 +136,7 @@ func (ts *taskServiceValidator) UpdateTask(ctx context.Context, id platform.ID, } func (ts *taskServiceValidator) DeleteTask(ctx context.Context, id platform.ID) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.DeleteTask") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Unauthenticated task lookup, to identify the task's organization. @@ -158,7 +158,7 @@ func (ts *taskServiceValidator) DeleteTask(ctx context.Context, id platform.ID) } func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.FindLogs") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Look up the task first, through the validator, to ensure we have permission to view the task. @@ -171,7 +171,7 @@ func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter platform.Lo } func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.FindRuns") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Look up the task first, through the validator, to ensure we have permission to view the task. @@ -194,7 +194,7 @@ func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter platform.Ru } func (ts *taskServiceValidator) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.FindRunByID") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Unauthenticated task lookup, to identify the task's organization. @@ -216,7 +216,7 @@ func (ts *taskServiceValidator) FindRunByID(ctx context.Context, taskID, runID p } func (ts *taskServiceValidator) CancelRun(ctx context.Context, taskID, runID platform.ID) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.CancelRun") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Unauthenticated task lookup, to identify the task's organization. @@ -238,7 +238,7 @@ func (ts *taskServiceValidator) CancelRun(ctx context.Context, taskID, runID pla } func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.RetryRun") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Unauthenticated task lookup, to identify the task's organization. @@ -260,7 +260,7 @@ func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID plat } func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "taskServiceValidator.ForceRun") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Unauthenticated task lookup, to identify the task's organization. diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 3ed8334ae2..e5c1650d3c 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -6,13 +6,13 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/opentracing/opentracing-go" "os" "path/filepath" "sort" "sync" "github.com/cespare/xxhash" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/binaryutil" @@ -92,7 +92,7 @@ func (f *SeriesFile) Open(ctx context.Context) error { return errors.New("series file already opened") } - span, _ := opentracing.StartSpanFromContext(ctx, "SeriesFile.Open") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() _, logEnd := logger.NewOperation(f.Logger, "Opening Series File", "series_file_open", zap.String("path", f.path)) diff --git a/tsdb/tsi1/index.go b/tsdb/tsi1/index.go index b647e960ce..2105f0265e 100644 --- a/tsdb/tsi1/index.go +++ b/tsdb/tsi1/index.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "github.com/opentracing/opentracing-go" "io/ioutil" "os" "path/filepath" @@ -18,6 +17,7 @@ import ( "unsafe" "github.com/cespare/xxhash" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/lifecycle" "github.com/influxdata/influxdb/pkg/slices" @@ -217,7 +217,7 @@ func (i *Index) Open(ctx context.Context) error { return errors.New("index already open") } - span, _ := opentracing.StartSpanFromContext(ctx, "Index.Open") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() // Ensure root exists. diff --git a/tsdb/tsm1/compact.go b/tsdb/tsm1/compact.go index 5ded049651..7ffeda0e35 100644 --- a/tsdb/tsm1/compact.go +++ b/tsdb/tsm1/compact.go @@ -16,7 +16,6 @@ import ( "bytes" "context" "fmt" - "github.com/opentracing/opentracing-go" "io" "math" "os" @@ -27,6 +26,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/tsdb" ) @@ -811,7 +811,7 @@ func (c *Compactor) EnableCompactions() { // WriteSnapshot writes a Cache snapshot to one or more new TSM files. func (c *Compactor) WriteSnapshot(ctx context.Context, cache *Cache) ([]string, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "Compactor.WriteSnapshot") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() c.mu.RLock() diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 4f008edf44..c45732cbca 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -16,6 +16,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/lifecycle" @@ -495,7 +496,7 @@ func (e *Engine) initTrackers() { // Open opens and initializes the engine. func (e *Engine) Open(ctx context.Context) (err error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Engine.Open") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() defer func() { @@ -775,7 +776,7 @@ func (t *compactionTracker) SetFullQueue(length uint64) { t.SetQueue(5, length) // WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done. func (e *Engine) WriteSnapshot(ctx context.Context) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "Engine.WriteSnapshot") + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() // Lock and grab the cache snapshot along with all the closed WAL diff --git a/tsdb/tsm1/file_store.go b/tsdb/tsm1/file_store.go index 9f41b07f96..672a2b8c61 100644 --- a/tsdb/tsm1/file_store.go +++ b/tsdb/tsm1/file_store.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "github.com/opentracing/opentracing-go" "io/ioutil" "math" "os" @@ -18,6 +17,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/pkg/file" "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/pkg/metrics" @@ -517,7 +517,7 @@ func (f *FileStore) Open(ctx context.Context) error { return errors.New("cannot open FileStore without an OpenLimiter (is EngineOptions.OpenLimiter set?)") } - span, _ := opentracing.StartSpanFromContext(ctx, "FileStore.Open") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() // find the current max ID for temp directories @@ -1104,7 +1104,7 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location { // CreateSnapshot creates hardlinks for all tsm and tombstone files // in the path provided. func (f *FileStore) CreateSnapshot(ctx context.Context) (string, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "FileStore.CreateSnapshot") + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() span.LogKV("dir", f.dir)