package kv import ( "context" "encoding/json" "fmt" influxdb "github.com/influxdata/influxdb" ) var ( sourceBucket = []byte("sourcesv1") ) // DefaultSource is the default source. var DefaultSource = influxdb.Source{ Default: true, Name: "autogen", Type: influxdb.SelfSourceType, } const ( // DefaultSourceID it the default source identifier DefaultSourceID = "020f755c3c082000" // DefaultSourceOrganizationID is the default source's organization identifier DefaultSourceOrganizationID = "50616e67652c206c" ) func init() { if err := DefaultSource.ID.DecodeFromString(DefaultSourceID); err != nil { panic(fmt.Sprintf("failed to decode default source id: %v", err)) } if err := DefaultSource.OrganizationID.DecodeFromString(DefaultSourceOrganizationID); err != nil { panic(fmt.Sprintf("failed to decode default source organization id: %v", err)) } } func (s *Service) initializeSources(ctx context.Context, tx Tx) error { if _, err := tx.Bucket(sourceBucket); err != nil { return err } _, pe := s.findSourceByID(ctx, tx, DefaultSource.ID) if pe != nil && influxdb.ErrorCode(pe) != influxdb.ENotFound { return pe } if influxdb.ErrorCode(pe) == influxdb.ENotFound { if err := s.putSource(ctx, tx, &DefaultSource); err != nil { return err } } return nil } // DefaultSource retrieves the default source. func (s *Service) DefaultSource(ctx context.Context) (*influxdb.Source, error) { var sr *influxdb.Source err := s.kv.View(ctx, func(tx Tx) error { // TODO(desa): make this faster by putting the default source in an index. srcs, err := s.findSources(ctx, tx, influxdb.FindOptions{}) if err != nil { return err } for _, src := range srcs { if src.Default { sr = src return nil } } return &influxdb.Error{ Code: influxdb.ENotFound, Msg: "no default source found", } }) if err != nil { return nil, &influxdb.Error{ Err: err, } } return sr, nil } // FindSourceByID retrieves a source by id. func (s *Service) FindSourceByID(ctx context.Context, id influxdb.ID) (*influxdb.Source, error) { var sr *influxdb.Source err := s.kv.View(ctx, func(tx Tx) error { src, pe := s.findSourceByID(ctx, tx, id) if pe != nil { return &influxdb.Error{ Err: pe, } } sr = src return nil }) return sr, err } func (s *Service) findSourceByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Source, error) { encodedID, err := id.Encode() if err != nil { return nil, &influxdb.Error{ Err: err, } } b, err := tx.Bucket(sourceBucket) if err != nil { return nil, err } v, err := b.Get(encodedID) if IsNotFound(err) { return nil, &influxdb.Error{ Code: influxdb.ENotFound, Msg: influxdb.ErrSourceNotFound, } } if err != nil { return nil, err } if err != nil { return nil, err } var sr influxdb.Source if err := json.Unmarshal(v, &sr); err != nil { return nil, &influxdb.Error{ Err: err, } } return &sr, nil } // FindSources retrives all sources that match an arbitrary source filter. // Filters using ID, or OrganizationID and source Name should be efficient. // Other filters will do a linear scan across all sources searching for a match. func (s *Service) FindSources(ctx context.Context, opt influxdb.FindOptions) ([]*influxdb.Source, int, error) { ss := []*influxdb.Source{} err := s.kv.View(ctx, func(tx Tx) error { srcs, err := s.findSources(ctx, tx, opt) if err != nil { return err } ss = srcs return nil }) if err != nil { return nil, 0, &influxdb.Error{ Op: influxdb.OpFindSources, Err: err, } } return ss, len(ss), nil } func (s *Service) findSources(ctx context.Context, tx Tx, opt influxdb.FindOptions) ([]*influxdb.Source, error) { ss := []*influxdb.Source{} err := s.forEachSource(ctx, tx, func(s *influxdb.Source) bool { ss = append(ss, s) return true }) if err != nil { return nil, err } return ss, nil } // CreateSource creates a influxdb source and sets s.ID. func (s *Service) CreateSource(ctx context.Context, src *influxdb.Source) error { err := s.kv.Update(ctx, func(tx Tx) error { src.ID = s.IDGenerator.ID() // Generating an organization id if it missing or invalid if !src.OrganizationID.Valid() { src.OrganizationID = s.IDGenerator.ID() } return s.putSource(ctx, tx, src) }) if err != nil { return &influxdb.Error{ Err: err, } } return nil } // PutSource will put a source without setting an ID. func (s *Service) PutSource(ctx context.Context, src *influxdb.Source) error { return s.kv.Update(ctx, func(tx Tx) error { return s.putSource(ctx, tx, src) }) } func (s *Service) putSource(ctx context.Context, tx Tx, src *influxdb.Source) error { v, err := json.Marshal(src) if err != nil { return err } encodedID, err := src.ID.Encode() if err != nil { return err } b, err := tx.Bucket(sourceBucket) if err != nil { return err } if err := b.Put(encodedID, v); err != nil { return err } return nil } // forEachSource will iterate through all sources while fn returns true. func (s *Service) forEachSource(ctx context.Context, tx Tx, fn func(*influxdb.Source) bool) error { b, err := tx.Bucket(sourceBucket) if err != nil { return err } cur, err := b.Cursor() if err != nil { return err } for k, v := cur.First(); k != nil; k, v = cur.Next() { s := &influxdb.Source{} if err := json.Unmarshal(v, s); err != nil { return err } if !fn(s) { break } } return nil } // UpdateSource updates a source according the parameters set on upd. func (s *Service) UpdateSource(ctx context.Context, id influxdb.ID, upd influxdb.SourceUpdate) (*influxdb.Source, error) { var sr *influxdb.Source err := s.kv.Update(ctx, func(tx Tx) error { src, err := s.updateSource(ctx, tx, id, upd) if err != nil { return &influxdb.Error{ Err: err, } } sr = src return nil }) return sr, err } func (s *Service) updateSource(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.SourceUpdate) (*influxdb.Source, error) { src, pe := s.findSourceByID(ctx, tx, id) if pe != nil { return nil, pe } if err := upd.Apply(src); err != nil { return nil, err } if err := s.putSource(ctx, tx, src); err != nil { return nil, err } return src, nil } // DeleteSource deletes a source and prunes it from the index. func (s *Service) DeleteSource(ctx context.Context, id influxdb.ID) error { return s.kv.Update(ctx, func(tx Tx) error { pe := s.deleteSource(ctx, tx, id) if pe != nil { return &influxdb.Error{ Err: pe, } } return nil }) } func (s *Service) deleteSource(ctx context.Context, tx Tx, id influxdb.ID) error { if id == DefaultSource.ID { return &influxdb.Error{ Code: influxdb.EForbidden, Msg: "cannot delete autogen source", } } _, pe := s.findSourceByID(ctx, tx, id) if pe != nil { return pe } encodedID, err := id.Encode() if err != nil { return &influxdb.Error{ Err: err, } } b, err := tx.Bucket(sourceBucket) if err != nil { return err } if err = b.Delete(encodedID); err != nil { return &influxdb.Error{ Err: err, } } return nil }