package kv import ( "bytes" "context" "errors" "fmt" ) const ( defaultPopulateBatchSize = 100 ) // Index is used to define and manage an index for a source bucket. // // When using the index you must provide it with an IndexMapping. // The IndexMapping provides the index with the contract it needs to populate // the entire index and traverse a populated index correctly. // The IndexMapping provides a way to retrieve the key on which to index with // when provided with the value from the source. // It also provides the way to access the source bucket. // // The following is an illustration of its use: // // byUserID := func(v []byte) ([]byte, error) { // auth := &influxdb.Authorization{} // // if err := json.Unmarshal(v, auth); err != nil { // return err // } // // return auth.UserID.Encode() // } // // // configure a write only index // indexByUser := NewIndex(NewSource([]byte(`authorizationsbyuserv1/), byUserID)) // // indexByUser.Insert(tx, someUserID, someAuthID) // // indexByUser.Delete(tx, someUserID, someAuthID) // // indexByUser.Walk(tx, someUserID, func(k, v []byte) error { // auth := &influxdb.Authorization{} // if err := json.Unmarshal(v, auth); err != nil { // return err // } // // // do something with auth // // return nil // }) // // // populate entire index from source // indexedCount, err := indexByUser.Populate(ctx, store) // // // verify the current index against the source and return the differences // // found in each // diff, err := indexByUser.Verify(ctx, tx) type Index struct { IndexMapping // populateBatchSize configures the size of the batch used for insertion populateBatchSize int // canRead configures whether or not Walk accesses the index at all // or skips the index altogether and returns nothing. // This is used when you want to integrate only the write path before // releasing the read path. canRead bool } // IndexOption is a function which configures an index type IndexOption func(*Index) // WithIndexReadPathEnabled enables the read paths of the index (Walk) // This should be enabled once the index has been fully populated and // the Insert and Delete paths are correctly integrated. func WithIndexReadPathEnabled(i *Index) { i.canRead = true } // WithIndexPopulateBatchSize configures the size of each batch // used when fully populating an index. (number of puts per tx) func WithIndexPopulateBatchSize(n int) IndexOption { return func(i *Index) { i.populateBatchSize = n } } // IndexMapping is a type which configures and Index to map items // from a source bucket to an index bucket via a mapping known as // IndexSourceOn. This function is called on the values in the source // to derive the foreign key on which to index each item. type IndexMapping interface { SourceBucket() []byte IndexBucket() []byte IndexSourceOn(value []byte) (foreignKey []byte, err error) } // IndexSourceOnFunc is a function which can be used to derive the foreign key // of a value in a source bucket. type IndexSourceOnFunc func([]byte) ([]byte, error) type indexMapping struct { source []byte index []byte fn IndexSourceOnFunc } func (i indexMapping) SourceBucket() []byte { return i.source } func (i indexMapping) IndexBucket() []byte { return i.index } func (i indexMapping) IndexSourceOn(v []byte) ([]byte, error) { return i.fn(v) } // NewIndexMapping creates an implementation of IndexMapping for the provided source bucket // to a destination index bucket. func NewIndexMapping(sourceBucket, indexBucket []byte, fn IndexSourceOnFunc) IndexMapping { return indexMapping{ source: sourceBucket, index: indexBucket, fn: fn, } } // NewIndex configures and returns a new *Index for a given index mapping. // By default the read path (Walk) is disabled. This is because the index needs to // be fully populated before depending upon the read path. // The read path can be enabled using WithIndexReadPathEnabled option. func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index { index := &Index{ IndexMapping: mapping, populateBatchSize: defaultPopulateBatchSize, } for _, opt := range opts { opt(index) } return index } // Initialize creates the bucket if it does not already exist. func (i *Index) Initialize(ctx context.Context, store Store) error { return store.Update(ctx, func(tx Tx) error { // create bucket if not exist _, err := tx.Bucket(i.IndexBucket()) return err }) } func (i *Index) indexBucket(tx Tx) (Bucket, error) { return tx.Bucket(i.IndexBucket()) } func (i *Index) sourceBucket(tx Tx) (Bucket, error) { return tx.Bucket(i.SourceBucket()) } func indexKey(foreignKey, primaryKey []byte) (newKey []byte) { newKey = make([]byte, len(primaryKey)+len(foreignKey)+1) copy(newKey, foreignKey) newKey[len(foreignKey)] = '/' copy(newKey[len(foreignKey)+1:], primaryKey) return } func indexKeyParts(indexKey []byte) (fk, pk []byte, err error) { // this function is called with items missing in index parts := bytes.SplitN(indexKey, []byte("/"), 2) if len(parts) < 2 { return nil, nil, errors.New("malformed index key") } // parts are fk/pk fk, pk = parts[0], parts[1] return } // Insert creates a single index entry for the provided primary key on the foreign key. func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error { bkt, err := i.indexBucket(tx) if err != nil { return err } return bkt.Put(indexKey(foreignKey, primaryKey), primaryKey) } // Delete removes the foreignKey and primaryKey mapping from the underlying index. func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error { bkt, err := i.indexBucket(tx) if err != nil { return err } return bkt.Delete(indexKey(foreignKey, primaryKey)) } // Walk walks the source bucket using keys found in the index using the provided foreign key // given the index has been fully populated. func (i *Index) Walk(tx Tx, foreignKey []byte, visitFn VisitFunc) error { // skip walking if configured to do so as the index // is currently being used purely to write the index if !i.canRead { return nil } sourceBucket, err := i.sourceBucket(tx) if err != nil { return err } indexBucket, err := i.indexBucket(tx) if err != nil { return err } cursor, err := indexBucket.ForwardCursor(foreignKey, WithCursorPrefix(foreignKey)) if err != nil { return err } return indexWalk(cursor, sourceBucket, visitFn) } // PopulateConfig configures a call to Populate type PopulateConfig struct { RemoveDanglingForeignKeys bool } // PopulateOption is a functional option for the Populate call type PopulateOption func(*PopulateConfig) // WithPopulateRemoveDanglingForeignKeys removes index entries which point to // missing items in the source bucket. func WithPopulateRemoveDanglingForeignKeys(c *PopulateConfig) { c.RemoveDanglingForeignKeys = true } // Populate does a full population of the index using the IndexSourceOn IndexMapping function. // Once completed it marks the index as ready for use. // It return a nil error on success and the count of inserted items. func (i *Index) Populate(ctx context.Context, store Store, opts ...PopulateOption) (n int, err error) { var config PopulateConfig for _, opt := range opts { opt(&config) } // verify the index to derive missing index // we can skip missing source lookup as we're // only interested in populating the missing index diff, err := i.verify(ctx, store, config.RemoveDanglingForeignKeys) if err != nil { return 0, fmt.Errorf("looking up missing indexes: %w", err) } flush := func(batch kvSlice) error { if len(batch) == 0 { return nil } if err := store.Update(ctx, func(tx Tx) error { indexBucket, err := i.indexBucket(tx) if err != nil { return err } for _, pair := range batch { // insert missing item into index if err := indexBucket.Put(pair[0], pair[1]); err != nil { return err } n++ } return nil }); err != nil { return fmt.Errorf("updating index: %w", err) } return nil } var batch kvSlice for fk, fkm := range diff.MissingFromIndex { for pk := range fkm { batch = append(batch, [2][]byte{indexKey([]byte(fk), []byte(pk)), []byte(pk)}) if len(batch) >= i.populateBatchSize { if err := flush(batch); err != nil { return n, err } batch = batch[:0] } } } if err := flush(batch); err != nil { return n, err } if config.RemoveDanglingForeignKeys { return n, i.remove(ctx, store, diff.MissingFromSource) } return n, nil } // DeleteAll removes the entire index in batches func (i *Index) DeleteAll(ctx context.Context, store Store) error { diff, err := i.verify(ctx, store, true) if err != nil { return err } for k, v := range diff.MissingFromSource { if fkm, ok := diff.PresentInIndex[k]; ok { for pk := range v { fkm[pk] = struct{}{} } continue } diff.PresentInIndex[k] = v } return i.remove(ctx, store, diff.PresentInIndex) } func (i *Index) remove(ctx context.Context, store Store, mappings map[string]map[string]struct{}) error { var ( batch [][]byte flush = func(batch [][]byte) error { if len(batch) == 0 { return nil } if err := store.Update(ctx, func(tx Tx) error { indexBucket, err := i.indexBucket(tx) if err != nil { return err } for _, indexKey := range batch { // delete dangling foreign key if err := indexBucket.Delete(indexKey); err != nil { return err } } return nil }); err != nil { return fmt.Errorf("removing dangling foreign keys: %w", err) } return nil } ) for fk, fkm := range mappings { for pk := range fkm { batch = append(batch, indexKey([]byte(fk), []byte(pk))) if len(batch) >= i.populateBatchSize { if err := flush(batch); err != nil { return err } batch = batch[:0] } } } return flush(batch) } // IndexDiff contains a set of items present in the source not in index, // along with a set of things in the index which are not in the source. type IndexDiff struct { // PresentInIndex is a map of foreign key to primary keys // present in the index. PresentInIndex map[string]map[string]struct{} // MissingFromIndex is a map of foreign key to associated primary keys // missing from the index given the source bucket. // These items could be due to the fact an index populate migration has // not yet occured, the index populate code is incorrect or the write path // for your resource type does not yet insert into the index as well (Create actions). MissingFromIndex map[string]map[string]struct{} // MissingFromSource is a map of foreign key to associated primary keys // missing from the source but accounted for in the index. // This happens when index items are not properly removed from the index // when an item is removed from the source (Delete actions). MissingFromSource map[string]map[string]struct{} } func (i *IndexDiff) addMissingSource(fk, pk []byte) { if i.MissingFromSource == nil { i.MissingFromSource = map[string]map[string]struct{}{} } if _, ok := i.MissingFromSource[string(fk)]; !ok { i.MissingFromSource[string(fk)] = map[string]struct{}{} } i.MissingFromSource[string(fk)][string(pk)] = struct{}{} } func (i *IndexDiff) addMissingIndex(fk, pk []byte) { if i.MissingFromIndex == nil { i.MissingFromIndex = map[string]map[string]struct{}{} } if _, ok := i.MissingFromIndex[string(fk)]; !ok { i.MissingFromIndex[string(fk)] = map[string]struct{}{} } i.MissingFromIndex[string(fk)][string(pk)] = struct{}{} } // Corrupt returns a list of foreign keys which have corrupted indexes (partial) // These are foreign keys which map to a subset of the primary keys which they should // be associated with. func (i *IndexDiff) Corrupt() (corrupt []string) { for fk := range i.MissingFromIndex { if _, ok := i.PresentInIndex[fk]; ok { corrupt = append(corrupt, fk) } } return } // Verify returns the difference between a source and its index // The difference contains items in the source that are not in the index // and vice-versa. func (i *Index) Verify(ctx context.Context, store Store) (diff IndexDiff, err error) { return i.verify(ctx, store, true) } func (i *Index) verify(ctx context.Context, store Store, includeMissingSource bool) (diff IndexDiff, err error) { diff.PresentInIndex, err = i.readEntireIndex(ctx, store) if err != nil { return diff, err } sourceKVs, err := consumeBucket(ctx, store, i.sourceBucket) if err != nil { return diff, err } // pks is a map of primary keys in source pks := map[string]struct{}{} // look for items missing from index for _, kv := range sourceKVs { pk, v := kv[0], kv[1] if includeMissingSource { // this is only useful for missing source pks[string(pk)] = struct{}{} } fk, err := i.IndexSourceOn(v) if err != nil { return diff, err } fkm, ok := diff.PresentInIndex[string(fk)] if ok { _, ok = fkm[string(pk)] } if !ok { diff.addMissingIndex(fk, pk) } } if includeMissingSource { // look for items missing from source for fk, fkm := range diff.PresentInIndex { for pk := range fkm { if _, ok := pks[pk]; !ok { diff.addMissingSource([]byte(fk), []byte(pk)) } } } } return } // indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their // associated primaryKey's value in the provided source bucket. // When an item is located in the source, the provided visit function is called with primary key and associated value. func indexWalk(indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) { defer func() { if cerr := indexCursor.Close(); cerr != nil && err == nil { err = cerr } }() for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() { v, err := sourceBucket.Get(pk) if err != nil { return err } if err := visit(pk, v); err != nil { return fmt.Errorf("for index entry %q: %w", string(ik), err) } } return indexCursor.Err() } // readEntireIndex returns the entire current state of the index func (i *Index) readEntireIndex(ctx context.Context, store Store) (map[string]map[string]struct{}, error) { kvs, err := consumeBucket(ctx, store, i.indexBucket) if err != nil { return nil, err } index := map[string]map[string]struct{}{} for _, kv := range kvs { fk, pk, err := indexKeyParts(kv[0]) if err != nil { return nil, err } if fkm, ok := index[string(fk)]; ok { fkm[string(pk)] = struct{}{} continue } index[string(fk)] = map[string]struct{}{string(pk): struct{}{}} } return index, nil } type kvSlice [][2][]byte // consumeBucket consumes the entire k/v space for the provided bucket function // applied to the provided store func consumeBucket(ctx context.Context, store Store, fn func(tx Tx) (Bucket, error)) (kvs kvSlice, err error) { return kvs, store.View(ctx, func(tx Tx) error { bkt, err := fn(tx) if err != nil { return err } cursor, err := bkt.ForwardCursor(nil) if err != nil { return err } return WalkCursor(ctx, cursor, func(k, v []byte) error { kvs = append(kvs, [2][]byte{k, v}) return nil }) }) }