package kv

import (
	"bytes"
	"context"
	"errors"
)

// Index is used to define and manage an index for a source bucket.
//
// When using the index you must provide it with an IndexMapping.
// The IndexMapping provides the index with the contract it needs to populate
// the entire index and traverse a populated index correctly.
// The IndexMapping provides a way to retrieve the key on which to index with
// when provided with the value from the source.
// It also provides the way to access the source bucket.
//
// The following is an illustration of its use:
//
//  byUserID := func(v []byte) ([]byte, error) {
//      auth := &influxdb.Authorization{}
//
//      if err := json.Unmarshal(v, auth); err != nil {
//          return err
//      }
//
//      return auth.UserID.Encode()
//  }
//
//  // configure a write only index
//  indexByUser := NewIndex(NewSource([]byte(`authorizationsbyuserv1/), byUserID))
//
//  indexByUser.Insert(tx, someUserID, someAuthID)
//
//  indexByUser.Delete(tx, someUserID, someAuthID)
//
//  indexByUser.Walk(tx, someUserID, func(k, v []byte) error {
//      auth := &influxdb.Authorization{}
//      if err := json.Unmarshal(v, auth); err != nil {
//          return err
//      }
//
//      // do something with auth
//
//      return nil
//  })
//
//  // verify the current index against the source and return the differences
//  // found in each
//  diff, err := indexByUser.Verify(ctx, tx)
type Index struct {
	IndexMapping

	// canRead configures whether or not Walk accesses the index at all
	// or skips the index altogether and returns nothing.
	// This is used when you want to integrate only the write path before
	// releasing the read path.
	canRead bool
}

// IndexOption is a function which configures an index
type IndexOption func(*Index)

// WithIndexReadPathEnabled enables the read paths of the index (Walk)
// This should be enabled once the index has been fully populated and
// the Insert and Delete paths are correctly integrated.
func WithIndexReadPathEnabled(i *Index) {
	i.canRead = true
}

// IndexMapping is a type which configures and Index to map items
// from a source bucket to an index bucket via a mapping known as
// IndexSourceOn. This function is called on the values in the source
// to derive the foreign key on which to index each item.
type IndexMapping interface {
	SourceBucket() []byte
	IndexBucket() []byte
	IndexSourceOn(value []byte) (foreignKey []byte, err error)
}

// IndexSourceOnFunc is a function which can be used to derive the foreign key
// of a value in a source bucket.
type IndexSourceOnFunc func([]byte) ([]byte, error)

type indexMapping struct {
	source []byte
	index  []byte
	fn     IndexSourceOnFunc
}

func (i indexMapping) SourceBucket() []byte { return i.source }

func (i indexMapping) IndexBucket() []byte { return i.index }

func (i indexMapping) IndexSourceOn(v []byte) ([]byte, error) {
	return i.fn(v)
}

// NewIndexMapping creates an implementation of IndexMapping for the provided source bucket
// to a destination index bucket.
func NewIndexMapping(sourceBucket, indexBucket []byte, fn IndexSourceOnFunc) IndexMapping {
	return indexMapping{
		source: sourceBucket,
		index:  indexBucket,
		fn:     fn,
	}
}

// NewIndex configures and returns a new *Index for a given index mapping.
// By default the read path (Walk) is disabled. This is because the index needs to
// be fully populated before depending upon the read path.
// The read path can be enabled using WithIndexReadPathEnabled option.
func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index {
	index := &Index{IndexMapping: mapping}

	for _, opt := range opts {
		opt(index)
	}

	return index
}

func (i *Index) indexBucket(tx Tx) (Bucket, error) {
	return tx.Bucket(i.IndexBucket())
}

func (i *Index) sourceBucket(tx Tx) (Bucket, error) {
	return tx.Bucket(i.SourceBucket())
}

// IndexKey returns a value suitable for use as the key component
// when storing values in the index.
func IndexKey(foreignKey, primaryKey []byte) (newKey []byte) {
	newKey = make([]byte, len(primaryKey)+len(foreignKey)+1)
	copy(newKey, foreignKey)
	newKey[len(foreignKey)] = '/'
	copy(newKey[len(foreignKey)+1:], primaryKey)

	return
}

func indexKeyParts(indexKey []byte) (fk, pk []byte, err error) {
	// this function is called with items missing in index
	parts := bytes.SplitN(indexKey, []byte("/"), 2)
	if len(parts) < 2 {
		return nil, nil, errors.New("malformed index key")
	}

	// parts are fk/pk
	fk, pk = parts[0], parts[1]

	return
}

// Insert creates a single index entry for the provided primary key on the foreign key.
func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error {
	bkt, err := i.indexBucket(tx)
	if err != nil {
		return err
	}

	return bkt.Put(IndexKey(foreignKey, primaryKey), primaryKey)
}

// Delete removes the foreignKey and primaryKey mapping from the underlying index.
func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error {
	bkt, err := i.indexBucket(tx)
	if err != nil {
		return err
	}

	return bkt.Delete(IndexKey(foreignKey, primaryKey))
}

// Walk walks the source bucket using keys found in the index using the provided foreign key
// given the index has been fully populated.
func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn VisitFunc) error {
	// skip walking if configured to do so as the index
	// is currently being used purely to write the index
	if !i.canRead {
		return nil
	}

	sourceBucket, err := i.sourceBucket(tx)
	if err != nil {
		return err
	}

	indexBucket, err := i.indexBucket(tx)
	if err != nil {
		return err
	}

	cursor, err := indexBucket.ForwardCursor(foreignKey,
		WithCursorPrefix(foreignKey))
	if err != nil {
		return err
	}

	return indexWalk(ctx, foreignKey, cursor, sourceBucket, visitFn)
}

// indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their
// associated primaryKey's value in the provided source bucket.
// When an item is located in the source, the provided visit function is called with primary key and associated value.
func indexWalk(ctx context.Context, foreignKey []byte, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
	var keys [][]byte
	for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() {
		if fk, _, err := indexKeyParts(ik); err != nil {
			return err
		} else if string(fk) == string(foreignKey) {
			keys = append(keys, pk)
		}
	}

	if err := indexCursor.Err(); err != nil {
		return err
	}

	if err := indexCursor.Close(); err != nil {
		return err
	}

	values, err := sourceBucket.GetBatch(keys...)
	if err != nil {
		return err
	}

	for i, value := range values {
		if value != nil {
			if cont, err := visit(keys[i], value); !cont || err != nil {
				return err
			}
		}
	}

	return nil
}

// IndexDiff contains a set of items present in the source not in index,
// along with a set of things in the index which are not in the source.
type IndexDiff struct {
	// PresentInIndex is a map of foreign key to primary keys
	// present in the index.
	PresentInIndex map[string]map[string]struct{}
	// MissingFromIndex is a map of foreign key to associated primary keys
	// missing from the index given the source bucket.
	// These items could be due to the fact an index populate migration has
	// not yet occurred, the index populate code is incorrect or the write path
	// for your resource type does not yet insert into the index as well (Create actions).
	MissingFromIndex map[string]map[string]struct{}
	// MissingFromSource is a map of foreign key to associated primary keys
	// missing from the source but accounted for in the index.
	// This happens when index items are not properly removed from the index
	// when an item is removed from the source (Delete actions).
	MissingFromSource map[string]map[string]struct{}
}

func (i *IndexDiff) addMissingSource(fk, pk []byte) {
	if i.MissingFromSource == nil {
		i.MissingFromSource = map[string]map[string]struct{}{}
	}

	if _, ok := i.MissingFromSource[string(fk)]; !ok {
		i.MissingFromSource[string(fk)] = map[string]struct{}{}
	}

	i.MissingFromSource[string(fk)][string(pk)] = struct{}{}
}

func (i *IndexDiff) addMissingIndex(fk, pk []byte) {
	if i.MissingFromIndex == nil {
		i.MissingFromIndex = map[string]map[string]struct{}{}
	}

	if _, ok := i.MissingFromIndex[string(fk)]; !ok {
		i.MissingFromIndex[string(fk)] = map[string]struct{}{}
	}

	i.MissingFromIndex[string(fk)][string(pk)] = struct{}{}
}

// Corrupt returns a list of foreign keys which have corrupted indexes (partial)
// These are foreign keys which map to a subset of the primary keys which they should
// be associated with.
func (i *IndexDiff) Corrupt() (corrupt []string) {
	for fk := range i.MissingFromIndex {
		if _, ok := i.PresentInIndex[fk]; ok {
			corrupt = append(corrupt, fk)
		}
	}
	return
}

// Verify returns the difference between a source and its index
// The difference contains items in the source that are not in the index
// and vice-versa.
func (i *Index) Verify(ctx context.Context, store Store) (diff IndexDiff, err error) {
	return indexVerify(ctx, i, store, true)
}

func indexVerify(ctx context.Context, mapping IndexMapping, store Store, includeMissingSource bool) (diff IndexDiff, err error) {
	diff.PresentInIndex, err = indexReadAll(ctx, store, func(tx Tx) (Bucket, error) {
		return tx.Bucket(mapping.IndexBucket())
	})
	if err != nil {
		return diff, err
	}

	sourceKVs, err := consumeBucket(ctx, store, func(tx Tx) (Bucket, error) {
		return tx.Bucket(mapping.SourceBucket())
	})
	if err != nil {
		return diff, err
	}

	// pks is a map of primary keys in source
	pks := map[string]struct{}{}

	// look for items missing from index
	for _, kv := range sourceKVs {
		pk, v := kv[0], kv[1]

		if includeMissingSource {
			// this is only useful for missing source
			pks[string(pk)] = struct{}{}
		}

		fk, err := mapping.IndexSourceOn(v)
		if err != nil {
			return diff, err
		}

		fkm, ok := diff.PresentInIndex[string(fk)]
		if ok {
			_, ok = fkm[string(pk)]
		}

		if !ok {
			diff.addMissingIndex(fk, pk)
		}
	}

	if includeMissingSource {
		// look for items missing from source
		for fk, fkm := range diff.PresentInIndex {
			for pk := range fkm {
				if _, ok := pks[pk]; !ok {
					diff.addMissingSource([]byte(fk), []byte(pk))
				}
			}
		}
	}

	return
}

// indexReadAll returns the entire current state of the index
func indexReadAll(ctx context.Context, store Store, indexBucket func(Tx) (Bucket, error)) (map[string]map[string]struct{}, error) {
	kvs, err := consumeBucket(ctx, store, indexBucket)
	if err != nil {
		return nil, err
	}

	index := map[string]map[string]struct{}{}
	for _, kv := range kvs {
		fk, pk, err := indexKeyParts(kv[0])
		if err != nil {
			return nil, err
		}

		if fkm, ok := index[string(fk)]; ok {
			fkm[string(pk)] = struct{}{}
			continue
		}

		index[string(fk)] = map[string]struct{}{string(pk): {}}
	}

	return index, nil
}

type kvSlice [][2][]byte

// consumeBucket consumes the entire k/v space for the provided bucket function
// applied to the provided store
func consumeBucket(ctx context.Context, store Store, fn func(tx Tx) (Bucket, error)) (kvs kvSlice, err error) {
	return kvs, store.View(ctx, func(tx Tx) error {
		bkt, err := fn(tx)
		if err != nil {
			return err
		}

		cursor, err := bkt.ForwardCursor(nil)
		if err != nil {
			return err
		}

		return WalkCursor(ctx, cursor, func(k, v []byte) (bool, error) {
			kvs = append(kvs, [2][]byte{k, v})
			return true, nil
		})
	})
}