package dbrp // The DBRP Mapping `Service` maps database, retention policy pairs to buckets. // Every `DBRPMapping` stored is scoped to an organization ID. // The service must ensure the following invariants are valid at any time: // - each orgID, database, retention policy triple must be unique; // - for each orgID and database there must exist one and only one default mapping (`mapping.Default` set to `true`). // The service does so using three kv buckets: // - one for storing mappings; // - one for storing an index of mappings by orgID and database; // - one for storing the current default mapping for an orgID and a database. // // On *create*, the service creates the mapping. // If another mapping with the same orgID, database, and retention policy exists, it fails. // If the mapping is the first one for the specified orgID-database couple, it will be the default one. // // On *find*, the service find mappings. // Every mapping returned uses the kv bucket where the default is specified to update the `mapping.Default` field. // // On *update*, the service updates the mapping. // If the update causes another bucket to have the same orgID, database, and retention policy, it fails. // If the update unsets `mapping.Default`, the first mapping found is set as default. // // On *delete*, the service updates the mapping. // If the deletion deletes the default mapping, the first mapping found is set as default. import ( "bytes" "context" "encoding/json" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kv" "github.com/influxdata/influxdb/v2/snowflake" ) var ( bucket = []byte("dbrpv1") indexBucket = []byte("dbrpbyorganddbindexv1") defaultBucket = []byte("dbrpdefaultv1") ) var _ influxdb.DBRPMappingServiceV2 = (*AuthorizedService)(nil) type Service struct { store kv.Store IDGen influxdb.IDGenerator bucketSvc influxdb.BucketService byOrgAndDatabase *kv.Index } func indexForeignKey(dbrp influxdb.DBRPMappingV2) []byte { return composeForeignKey(dbrp.OrganizationID, dbrp.Database) } func composeForeignKey(orgID influxdb.ID, db string) []byte { encID, _ := orgID.Encode() key := make([]byte, len(encID)+len(db)) copy(key, encID) copy(key[len(encID):], db) return key } func NewService(ctx context.Context, bucketSvc influxdb.BucketService, st kv.Store) influxdb.DBRPMappingServiceV2 { return &Service{ store: st, IDGen: snowflake.NewDefaultIDGenerator(), bucketSvc: bucketSvc, byOrgAndDatabase: kv.NewIndex(kv.NewIndexMapping(bucket, indexBucket, func(v []byte) ([]byte, error) { var dbrp influxdb.DBRPMappingV2 if err := json.Unmarshal(v, &dbrp); err != nil { return nil, err } return indexForeignKey(dbrp), nil }), kv.WithIndexReadPathEnabled), } } // getDefault gets the default mapping ID inside of a transaction. func (s *Service) getDefault(tx kv.Tx, compKey []byte) ([]byte, error) { b, err := tx.Bucket(defaultBucket) if err != nil { return nil, err } defID, err := b.Get(compKey) if err != nil { return nil, err } return defID, nil } // getDefaultID returns the default mapping ID for the given orgID and db. func (s *Service) getDefaultID(tx kv.Tx, compKey []byte) (influxdb.ID, error) { defID, err := s.getDefault(tx, compKey) if err != nil { return 0, err } id := new(influxdb.ID) if err := id.Decode(defID); err != nil { return 0, err } return *id, nil } // isDefault tells whether a mapping is the default one. func (s *Service) isDefault(tx kv.Tx, compKey []byte, id []byte) (bool, error) { defID, err := s.getDefault(tx, compKey) if kv.IsNotFound(err) { return false, nil } if err != nil { return false, err } return bytes.Equal(id, defID), nil } // isDefaultSet tells if there is a default mapping for the given composite key. func (s *Service) isDefaultSet(tx kv.Tx, compKey []byte) (bool, error) { b, err := tx.Bucket(defaultBucket) if err != nil { return false, ErrInternalService(err) } _, err = b.Get(compKey) if kv.IsNotFound(err) { return false, nil } if err != nil { return false, ErrInternalService(err) } return true, nil } // setAsDefault sets the given id as default for the given composite key. func (s *Service) setAsDefault(tx kv.Tx, compKey []byte, id []byte) error { b, err := tx.Bucket(defaultBucket) if err != nil { return ErrInternalService(err) } if err := b.Put(compKey, id); err != nil { return ErrInternalService(err) } return nil } // unsetDefault un-sets the default for the given composite key. // Useful when a db/rp pair does not exist anymore. func (s *Service) unsetDefault(tx kv.Tx, compKey []byte) error { b, err := tx.Bucket(defaultBucket) if err != nil { return ErrInternalService(err) } if err = b.Delete(compKey); err != nil { return ErrInternalService(err) } return nil } // getFirstBut returns the first element in the db/rp index (not accounting for the `skipID`). // If the length of the returned ID is 0, it means no element was found. // The skip value is useful, for instance, if one wants to delete an element based on the result of this operation. func (s *Service) getFirstBut(tx kv.Tx, compKey []byte, skipID []byte) (next []byte, err error) { err = s.byOrgAndDatabase.Walk(context.Background(), tx, compKey, func(k, v []byte) (bool, error) { if bytes.Equal(skipID, k) { return true, nil } next = k return false, nil }) return } // isDBRPUnique verifies if the triple orgID-database-retention-policy is unique. func (s *Service) isDBRPUnique(ctx context.Context, m influxdb.DBRPMappingV2) error { return s.store.View(ctx, func(tx kv.Tx) error { return s.byOrgAndDatabase.Walk(ctx, tx, composeForeignKey(m.OrganizationID, m.Database), func(k, v []byte) (bool, error) { dbrp := &influxdb.DBRPMappingV2{} if err := json.Unmarshal(v, dbrp); err != nil { return false, ErrInternalService(err) } if dbrp.ID == m.ID { // Corner case. // This is the very same DBRP, just skip it! return true, nil } if dbrp.RetentionPolicy == m.RetentionPolicy { return false, ErrDBRPAlreadyExists("another DBRP mapping with same orgID, db, and rp exists") } return true, nil }) }) } // FindBy returns the mapping for the given ID. func (s *Service) FindByID(ctx context.Context, orgID, id influxdb.ID) (*influxdb.DBRPMappingV2, error) { encodedID, err := id.Encode() if err != nil { return nil, ErrInvalidDBRPID } m := &influxdb.DBRPMappingV2{} if err := s.store.View(ctx, func(tx kv.Tx) error { bucket, err := tx.Bucket(bucket) if err != nil { return ErrInternalService(err) } b, err := bucket.Get(encodedID) if err != nil { return ErrDBRPNotFound } if err := json.Unmarshal(b, m); err != nil { return ErrInternalService(err) } // If the given orgID is wrong, it is as if we did not found a mapping scoped to this org. if m.OrganizationID != orgID { return ErrDBRPNotFound } // Update the default value for this mapping. m.Default, err = s.isDefault(tx, indexForeignKey(*m), encodedID) if err != nil { return ErrInternalService(err) } return nil }); err != nil { return nil, err } return m, nil } // FindMany returns a list of mappings that match filter and the total count of matching dbrp mappings. // TODO(affo): find a smart way to apply FindOptions to a list of items. func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilterV2, opts ...influxdb.FindOptions) ([]*influxdb.DBRPMappingV2, int, error) { // Memoize default IDs. defs := make(map[string]*influxdb.ID) get := func(tx kv.Tx, orgID influxdb.ID, db string) (*influxdb.ID, error) { k := orgID.String() + db if _, ok := defs[k]; !ok { id, err := s.getDefaultID(tx, composeForeignKey(orgID, db)) if kv.IsNotFound(err) { // Still need to store a not-found result. defs[k] = nil } else if err != nil { return nil, err } else { defs[k] = &id } } return defs[k], nil } ms := []*influxdb.DBRPMappingV2{} add := func(tx kv.Tx) func(k, v []byte) (bool, error) { return func(k, v []byte) (bool, error) { m := influxdb.DBRPMappingV2{} if err := json.Unmarshal(v, &m); err != nil { return false, ErrInternalService(err) } // Updating the Default field must be done before filtering. defID, err := get(tx, m.OrganizationID, m.Database) if err != nil { return false, ErrInternalService(err) } m.Default = m.ID == *defID if filterFunc(&m, filter) { ms = append(ms, &m) } return true, nil } } err := s.store.View(ctx, func(tx kv.Tx) error { // Optimized path, use index. if orgID := filter.OrgID; orgID != nil { // The index performs a prefix search. // The foreign key is `orgID + db`. // If you want to look by orgID only, just pass orgID as prefix. db := "" if filter.Database != nil { db = *filter.Database } compKey := composeForeignKey(*orgID, db) if len(db) > 0 { // Even more optimized, looking for the default given an orgID and database. // No walking index needed. if def := filter.Default; def != nil && *def { defID, err := s.getDefault(tx, compKey) if kv.IsNotFound(err) { return nil } if err != nil { return ErrInternalService(err) } bucket, err := tx.Bucket(bucket) if err != nil { return ErrInternalService(err) } v, err := bucket.Get(defID) if err != nil { return ErrInternalService(err) } _, err = add(tx)(defID, v) return err } } return s.byOrgAndDatabase.Walk(ctx, tx, compKey, add(tx)) } bucket, err := tx.Bucket(bucket) if err != nil { return ErrInternalService(err) } cur, err := bucket.Cursor() if err != nil { return ErrInternalService(err) } for k, v := cur.First(); k != nil; k, v = cur.Next() { if _, err := add(tx)(k, v); err != nil { return err } } return nil }) return ms, len(ms), err } // Create creates a new mapping. // If another mapping with same organization ID, database, and retention policy exists, an error is returned. // If the mapping already contains a valid ID, that one is used for storing the mapping. func (s *Service) Create(ctx context.Context, dbrp *influxdb.DBRPMappingV2) error { if !dbrp.ID.Valid() { dbrp.ID = s.IDGen.ID() } if err := dbrp.Validate(); err != nil { return ErrInvalidDBRP(err) } if _, err := s.bucketSvc.FindBucketByID(ctx, dbrp.BucketID); err != nil { return err } // If a dbrp with this particular ID already exists an error is returned. if _, err := s.FindByID(ctx, dbrp.OrganizationID, dbrp.ID); err == nil { return ErrDBRPAlreadyExists("dbrp already exist for this particular ID. If you are trying an update use the right function .Update") } // If a dbrp with this orgID, db, and rp exists an error is returned. if err := s.isDBRPUnique(ctx, *dbrp); err != nil { return err } encodedID, err := dbrp.ID.Encode() if err != nil { return ErrInvalidDBRPID } return s.store.Update(ctx, func(tx kv.Tx) error { bucket, err := tx.Bucket(bucket) if err != nil { return ErrInternalService(err) } compKey := indexForeignKey(*dbrp) if err := s.byOrgAndDatabase.Insert(tx, compKey, encodedID); err != nil { return err } defSet, err := s.isDefaultSet(tx, compKey) if err != nil { return err } if !defSet { dbrp.Default = true } b, err := json.Marshal(dbrp) if err != nil { return ErrInternalService(err) } if err := bucket.Put(encodedID, b); err != nil { return ErrInternalService(err) } if dbrp.Default { if err := s.setAsDefault(tx, compKey, encodedID); err != nil { return err } } return nil }) } // Updates a mapping. // If another mapping with same organization ID, database, and retention policy exists, an error is returned. // Un-setting `Default` for a mapping will cause the first one to become the default. func (s *Service) Update(ctx context.Context, dbrp *influxdb.DBRPMappingV2) error { if err := dbrp.Validate(); err != nil { return ErrInvalidDBRP(err) } oldDBRP, err := s.FindByID(ctx, dbrp.OrganizationID, dbrp.ID) if err != nil { return ErrDBRPNotFound } // Overwrite fields that cannot change. dbrp.ID = oldDBRP.ID dbrp.OrganizationID = oldDBRP.OrganizationID dbrp.BucketID = oldDBRP.BucketID dbrp.Database = oldDBRP.Database // If a dbrp with this orgID, db, and rp exists an error is returned. if err := s.isDBRPUnique(ctx, *dbrp); err != nil { return err } encodedID, err := dbrp.ID.Encode() if err != nil { return ErrInternalService(err) } b, err := json.Marshal(dbrp) if err != nil { return ErrInternalService(err) } return s.store.Update(ctx, func(tx kv.Tx) error { bucket, err := tx.Bucket(bucket) if err != nil { return ErrInternalService(err) } if err := bucket.Put(encodedID, b); err != nil { return err } compKey := indexForeignKey(*dbrp) if dbrp.Default { err = s.setAsDefault(tx, compKey, encodedID) } else if oldDBRP.Default { // This means default was unset. // Need to find a new default. first, ferr := s.getFirstBut(tx, compKey, encodedID) if ferr != nil { return ferr } if len(first) > 0 { err = s.setAsDefault(tx, compKey, first) } // If no first was found, then this will remain the default. } return err }) } // Delete removes a mapping. // Deleting a mapping that does not exists is not an error. // Deleting the default mapping will cause the first one (if any) to become the default. func (s *Service) Delete(ctx context.Context, orgID, id influxdb.ID) error { dbrp, err := s.FindByID(ctx, orgID, id) if err != nil { return nil } encodedID, err := id.Encode() if err != nil { return ErrInternalService(err) } return s.store.Update(ctx, func(tx kv.Tx) error { bucket, err := tx.Bucket(bucket) if err != nil { return ErrInternalService(err) } compKey := indexForeignKey(*dbrp) if err := bucket.Delete(encodedID); err != nil { return err } if err := s.byOrgAndDatabase.Delete(tx, compKey, encodedID); err != nil { return ErrInternalService(err) } // If this was the default, we need to set a new default. var derr error if dbrp.Default { first, err := s.getFirstBut(tx, compKey, encodedID) if err != nil { return err } if len(first) > 0 { derr = s.setAsDefault(tx, compKey, first) } else { // This means no other mapping is in the index. // Unset the default derr = s.unsetDefault(tx, compKey) } } return derr }) } // filterFunc is capable to validate if the dbrp is valid from a given filter. // it runs true if the filtering data are contained in the dbrp. func filterFunc(dbrp *influxdb.DBRPMappingV2, filter influxdb.DBRPMappingFilterV2) bool { return (filter.ID == nil || (*filter.ID) == dbrp.ID) && (filter.OrgID == nil || (*filter.OrgID) == dbrp.OrganizationID) && (filter.BucketID == nil || (*filter.BucketID) == dbrp.BucketID) && (filter.Database == nil || (*filter.Database) == dbrp.Database) && (filter.RetentionPolicy == nil || (*filter.RetentionPolicy) == dbrp.RetentionPolicy) && (filter.Default == nil || (*filter.Default) == dbrp.Default) }