influxdb/kv/index.go

629 lines
16 KiB
Go

package kv
import (
"bytes"
"context"
"errors"
"fmt"
)
const (
defaultPopulateBatchSize = 100
)
// Index is used to define and manage an index for a source bucket.
//
// When using the index you must provide it with an IndexMapping.
// The IndexMapping provides the index with the contract it needs to populate
// the entire index and traverse a populated index correctly.
// The IndexMapping provides a way to retrieve the key on which to index with
// when provided with the value from the source.
// It also provides the way to access the source bucket.
//
// The following is an illustration of its use:
//
// byUserID := func(v []byte) ([]byte, error) {
// auth := &influxdb.Authorization{}
//
// if err := json.Unmarshal(v, auth); err != nil {
// return err
// }
//
// return auth.UserID.Encode()
// }
//
// // configure a write only index
// indexByUser := NewIndex(NewSource([]byte(`authorizationsbyuserv1/), byUserID))
//
// indexByUser.Insert(tx, someUserID, someAuthID)
//
// indexByUser.Delete(tx, someUserID, someAuthID)
//
// indexByUser.Walk(tx, someUserID, func(k, v []byte) error {
// auth := &influxdb.Authorization{}
// if err := json.Unmarshal(v, auth); err != nil {
// return err
// }
//
// // do something with auth
//
// return nil
// })
//
// // populate entire index from source
// indexedCount, err := indexByUser.Populate(ctx, store)
//
// // verify the current index against the source and return the differences
// // found in each
// diff, err := indexByUser.Verify(ctx, tx)
type Index struct {
IndexMapping
// populateBatchSize configures the size of the batch used for insertion
populateBatchSize int
// canRead configures whether or not Walk accesses the index at all
// or skips the index altogether and returns nothing.
// This is used when you want to integrate only the write path before
// releasing the read path.
canRead bool
}
// IndexOption is a function which configures an index
type IndexOption func(*Index)
// WithIndexReadPathEnabled enables the read paths of the index (Walk)
// This should be enabled once the index has been fully populated and
// the Insert and Delete paths are correctly integrated.
func WithIndexReadPathEnabled(i *Index) {
i.canRead = true
}
// WithIndexPopulateBatchSize configures the size of each batch
// used when fully populating an index. (number of puts per tx)
func WithIndexPopulateBatchSize(n int) IndexOption {
return func(i *Index) {
i.populateBatchSize = n
}
}
// IndexMapping is a type which configures and Index to map items
// from a source bucket to an index bucket via a mapping known as
// IndexSourceOn. This function is called on the values in the source
// to derive the foreign key on which to index each item.
type IndexMapping interface {
SourceBucket() []byte
IndexBucket() []byte
IndexSourceOn(value []byte) (foreignKey []byte, err error)
}
// IndexSourceOnFunc is a function which can be used to derive the foreign key
// of a value in a source bucket.
type IndexSourceOnFunc func([]byte) ([]byte, error)
type indexMapping struct {
source []byte
index []byte
fn IndexSourceOnFunc
}
func (i indexMapping) SourceBucket() []byte { return i.source }
func (i indexMapping) IndexBucket() []byte { return i.index }
func (i indexMapping) IndexSourceOn(v []byte) ([]byte, error) {
return i.fn(v)
}
// NewIndexMapping creates an implementation of IndexMapping for the provided source bucket
// to a destination index bucket.
func NewIndexMapping(sourceBucket, indexBucket []byte, fn IndexSourceOnFunc) IndexMapping {
return indexMapping{
source: sourceBucket,
index: indexBucket,
fn: fn,
}
}
// NewIndex configures and returns a new *Index for a given index mapping.
// By default the read path (Walk) is disabled. This is because the index needs to
// be fully populated before depending upon the read path.
// The read path can be enabled using WithIndexReadPathEnabled option.
func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index {
index := &Index{
IndexMapping: mapping,
populateBatchSize: defaultPopulateBatchSize,
}
for _, opt := range opts {
opt(index)
}
return index
}
func (i *Index) initialize(ctx context.Context, store Store) error {
return store.Update(ctx, func(tx Tx) error {
// create bucket if not exist
_, err := tx.Bucket(i.IndexBucket())
return err
})
}
func (i *Index) indexBucket(tx Tx) (Bucket, error) {
return tx.Bucket(i.IndexBucket())
}
func (i *Index) sourceBucket(tx Tx) (Bucket, error) {
return tx.Bucket(i.SourceBucket())
}
func indexKey(foreignKey, primaryKey []byte) (newKey []byte) {
newKey = make([]byte, len(primaryKey)+len(foreignKey)+1)
copy(newKey, foreignKey)
newKey[len(foreignKey)] = '/'
copy(newKey[len(foreignKey)+1:], primaryKey)
return
}
func indexKeyParts(indexKey []byte) (fk, pk []byte, err error) {
// this function is called with items missing in index
parts := bytes.SplitN(indexKey, []byte("/"), 2)
if len(parts) < 2 {
return nil, nil, errors.New("malformed index key")
}
// parts are fk/pk
fk, pk = parts[0], parts[1]
return
}
// ensure IndexMigration implements MigrationSpec
var _ MigrationSpec = (*IndexMigration)(nil)
// IndexMigration is a migration for adding and removing an index.
// These are constructed via the Index.Migration function.
type IndexMigration struct {
*Index
opts []PopulateOption
}
// Name returns a readable name for the index migration.
func (i *IndexMigration) MigrationName() string {
return fmt.Sprintf("add index %q", string(i.IndexBucket()))
}
// Up initializes the index bucket and populates the index.
func (i *IndexMigration) Up(ctx context.Context, store Store) (err error) {
wrapErr := func(err error) error {
if err == nil {
return nil
}
return fmt.Errorf("migration (up) %s: %w", i.MigrationName(), err)
}
if err = i.initialize(ctx, store); err != nil {
return wrapErr(err)
}
_, err = i.Populate(ctx, store, i.opts...)
return wrapErr(err)
}
// Down deletes all entries from the index.
func (i *IndexMigration) Down(ctx context.Context, store Store) error {
if err := i.DeleteAll(ctx, store); err != nil {
return fmt.Errorf("migration (down) %s: %w", i.MigrationName(), err)
}
return nil
}
// Migration creates an IndexMigration for the underlying Index.
func (i *Index) Migration(opts ...PopulateOption) *IndexMigration {
return &IndexMigration{
Index: i,
opts: opts,
}
}
// Insert creates a single index entry for the provided primary key on the foreign key.
func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error {
bkt, err := i.indexBucket(tx)
if err != nil {
return err
}
return bkt.Put(indexKey(foreignKey, primaryKey), primaryKey)
}
// Delete removes the foreignKey and primaryKey mapping from the underlying index.
func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error {
bkt, err := i.indexBucket(tx)
if err != nil {
return err
}
return bkt.Delete(indexKey(foreignKey, primaryKey))
}
// Walk walks the source bucket using keys found in the index using the provided foreign key
// given the index has been fully populated.
func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn VisitFunc) error {
// skip walking if configured to do so as the index
// is currently being used purely to write the index
if !i.canRead {
return nil
}
sourceBucket, err := i.sourceBucket(tx)
if err != nil {
return err
}
indexBucket, err := i.indexBucket(tx)
if err != nil {
return err
}
cursor, err := indexBucket.ForwardCursor(foreignKey,
WithCursorPrefix(foreignKey))
if err != nil {
return err
}
return indexWalk(ctx, cursor, sourceBucket, visitFn)
}
// PopulateConfig configures a call to Populate
type PopulateConfig struct {
RemoveDanglingForeignKeys bool
}
// PopulateOption is a functional option for the Populate call
type PopulateOption func(*PopulateConfig)
// WithPopulateRemoveDanglingForeignKeys removes index entries which point to
// missing items in the source bucket.
func WithPopulateRemoveDanglingForeignKeys(c *PopulateConfig) {
c.RemoveDanglingForeignKeys = true
}
// Populate does a full population of the index using the IndexSourceOn IndexMapping function.
// Once completed it marks the index as ready for use.
// It return a nil error on success and the count of inserted items.
func (i *Index) Populate(ctx context.Context, store Store, opts ...PopulateOption) (n int, err error) {
var config PopulateConfig
for _, opt := range opts {
opt(&config)
}
// verify the index to derive missing index
// we can skip missing source lookup as we're
// only interested in populating the missing index
diff, err := i.verify(ctx, store, config.RemoveDanglingForeignKeys)
if err != nil {
return 0, fmt.Errorf("looking up missing indexes: %w", err)
}
flush := func(batch kvSlice) error {
if len(batch) == 0 {
return nil
}
if err := store.Update(ctx, func(tx Tx) error {
indexBucket, err := i.indexBucket(tx)
if err != nil {
return err
}
for _, pair := range batch {
// insert missing item into index
if err := indexBucket.Put(pair[0], pair[1]); err != nil {
return err
}
n++
}
return nil
}); err != nil {
return fmt.Errorf("updating index: %w", err)
}
return nil
}
var batch kvSlice
for fk, fkm := range diff.MissingFromIndex {
for pk := range fkm {
batch = append(batch, [2][]byte{indexKey([]byte(fk), []byte(pk)), []byte(pk)})
if len(batch) >= i.populateBatchSize {
if err := flush(batch); err != nil {
return n, err
}
batch = batch[:0]
}
}
}
if err := flush(batch); err != nil {
return n, err
}
if config.RemoveDanglingForeignKeys {
return n, i.remove(ctx, store, diff.MissingFromSource)
}
return n, nil
}
// DeleteAll removes the entire index in batches
func (i *Index) DeleteAll(ctx context.Context, store Store) error {
diff, err := i.verify(ctx, store, true)
if err != nil {
return err
}
for k, v := range diff.MissingFromSource {
if fkm, ok := diff.PresentInIndex[k]; ok {
for pk := range v {
fkm[pk] = struct{}{}
}
continue
}
diff.PresentInIndex[k] = v
}
return i.remove(ctx, store, diff.PresentInIndex)
}
func (i *Index) remove(ctx context.Context, store Store, mappings map[string]map[string]struct{}) error {
var (
batch [][]byte
flush = func(batch [][]byte) error {
if len(batch) == 0 {
return nil
}
if err := store.Update(ctx, func(tx Tx) error {
indexBucket, err := i.indexBucket(tx)
if err != nil {
return err
}
for _, indexKey := range batch {
// delete dangling foreign key
if err := indexBucket.Delete(indexKey); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("removing dangling foreign keys: %w", err)
}
return nil
}
)
for fk, fkm := range mappings {
for pk := range fkm {
batch = append(batch, indexKey([]byte(fk), []byte(pk)))
if len(batch) >= i.populateBatchSize {
if err := flush(batch); err != nil {
return err
}
batch = batch[:0]
}
}
}
return flush(batch)
}
// IndexDiff contains a set of items present in the source not in index,
// along with a set of things in the index which are not in the source.
type IndexDiff struct {
// PresentInIndex is a map of foreign key to primary keys
// present in the index.
PresentInIndex map[string]map[string]struct{}
// MissingFromIndex is a map of foreign key to associated primary keys
// missing from the index given the source bucket.
// These items could be due to the fact an index populate migration has
// not yet occured, the index populate code is incorrect or the write path
// for your resource type does not yet insert into the index as well (Create actions).
MissingFromIndex map[string]map[string]struct{}
// MissingFromSource is a map of foreign key to associated primary keys
// missing from the source but accounted for in the index.
// This happens when index items are not properly removed from the index
// when an item is removed from the source (Delete actions).
MissingFromSource map[string]map[string]struct{}
}
func (i *IndexDiff) addMissingSource(fk, pk []byte) {
if i.MissingFromSource == nil {
i.MissingFromSource = map[string]map[string]struct{}{}
}
if _, ok := i.MissingFromSource[string(fk)]; !ok {
i.MissingFromSource[string(fk)] = map[string]struct{}{}
}
i.MissingFromSource[string(fk)][string(pk)] = struct{}{}
}
func (i *IndexDiff) addMissingIndex(fk, pk []byte) {
if i.MissingFromIndex == nil {
i.MissingFromIndex = map[string]map[string]struct{}{}
}
if _, ok := i.MissingFromIndex[string(fk)]; !ok {
i.MissingFromIndex[string(fk)] = map[string]struct{}{}
}
i.MissingFromIndex[string(fk)][string(pk)] = struct{}{}
}
// Corrupt returns a list of foreign keys which have corrupted indexes (partial)
// These are foreign keys which map to a subset of the primary keys which they should
// be associated with.
func (i *IndexDiff) Corrupt() (corrupt []string) {
for fk := range i.MissingFromIndex {
if _, ok := i.PresentInIndex[fk]; ok {
corrupt = append(corrupt, fk)
}
}
return
}
// Verify returns the difference between a source and its index
// The difference contains items in the source that are not in the index
// and vice-versa.
func (i *Index) Verify(ctx context.Context, store Store) (diff IndexDiff, err error) {
return i.verify(ctx, store, true)
}
func (i *Index) verify(ctx context.Context, store Store, includeMissingSource bool) (diff IndexDiff, err error) {
diff.PresentInIndex, err = i.readEntireIndex(ctx, store)
if err != nil {
return diff, err
}
sourceKVs, err := consumeBucket(ctx, store, i.sourceBucket)
if err != nil {
return diff, err
}
// pks is a map of primary keys in source
pks := map[string]struct{}{}
// look for items missing from index
for _, kv := range sourceKVs {
pk, v := kv[0], kv[1]
if includeMissingSource {
// this is only useful for missing source
pks[string(pk)] = struct{}{}
}
fk, err := i.IndexSourceOn(v)
if err != nil {
return diff, err
}
fkm, ok := diff.PresentInIndex[string(fk)]
if ok {
_, ok = fkm[string(pk)]
}
if !ok {
diff.addMissingIndex(fk, pk)
}
}
if includeMissingSource {
// look for items missing from source
for fk, fkm := range diff.PresentInIndex {
for pk := range fkm {
if _, ok := pks[pk]; !ok {
diff.addMissingSource([]byte(fk), []byte(pk))
}
}
}
}
return
}
// indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their
// associated primaryKey's value in the provided source bucket.
// When an item is located in the source, the provided visit function is called with primary key and associated value.
func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
var keys [][]byte
for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() {
keys = append(keys, pk)
}
if err := indexCursor.Err(); err != nil {
return err
}
if err := indexCursor.Close(); err != nil {
return err
}
values, err := sourceBucket.GetBatch(keys...)
if err != nil {
return err
}
for i, value := range values {
if value != nil {
if err := visit(keys[i], value); err != nil {
return err
}
}
}
return nil
}
// readEntireIndex returns the entire current state of the index
func (i *Index) readEntireIndex(ctx context.Context, store Store) (map[string]map[string]struct{}, error) {
kvs, err := consumeBucket(ctx, store, i.indexBucket)
if err != nil {
return nil, err
}
index := map[string]map[string]struct{}{}
for _, kv := range kvs {
fk, pk, err := indexKeyParts(kv[0])
if err != nil {
return nil, err
}
if fkm, ok := index[string(fk)]; ok {
fkm[string(pk)] = struct{}{}
continue
}
index[string(fk)] = map[string]struct{}{string(pk): {}}
}
return index, nil
}
type kvSlice [][2][]byte
// consumeBucket consumes the entire k/v space for the provided bucket function
// applied to the provided store
func consumeBucket(ctx context.Context, store Store, fn func(tx Tx) (Bucket, error)) (kvs kvSlice, err error) {
return kvs, store.View(ctx, func(tx Tx) error {
bkt, err := fn(tx)
if err != nil {
return err
}
cursor, err := bkt.ForwardCursor(nil)
if err != nil {
return err
}
return WalkCursor(ctx, cursor, func(k, v []byte) error {
kvs = append(kvs, [2][]byte{k, v})
return nil
})
})
}