package storage

import (
	"bytes"
	"errors"
	"sort"

	"github.com/influxdata/influxdb"
	"github.com/influxdata/influxdb/models"
	"github.com/influxdata/influxdb/pkg/lifecycle"
	"github.com/influxdata/influxdb/tsdb"
	"github.com/influxdata/influxdb/tsdb/tsi1"
	"github.com/influxdata/influxql"
)

var (
	errUnexpectedOrg                   = errors.New("seriesCursor: unexpected org")
	errUnexpectedTagComparisonOperator = errors.New("seriesCursor: unexpected tag comparison operator")
)

type SeriesCursor interface {
	Close() error
	Next() (*SeriesCursorRow, error)
}

type SeriesCursorRequest struct {
	// Name contains the tsdb encoded org and bucket ID
	Name [influxdb.IDLength]byte
}

// seriesCursor is an implementation of SeriesCursor over an tsi1.Index.
type seriesCursor struct {
	index    *tsi1.Index
	indexref *lifecycle.Reference
	sfile    *tsdb.SeriesFile
	sfileref *lifecycle.Reference
	name     [influxdb.IDLength]byte
	keys     [][]byte
	ofs      int
	row      SeriesCursorRow
	cond     influxql.Expr
	init     bool
}

type SeriesCursorRow struct {
	Name []byte
	Tags models.Tags
}

// newSeriesCursor returns a new instance of SeriesCursor.
func newSeriesCursor(req SeriesCursorRequest, index *tsi1.Index, sfile *tsdb.SeriesFile, cond influxql.Expr) (SeriesCursor, error) {
	if cond != nil {
		var err error
		influxql.WalkFunc(cond, func(node influxql.Node) {
			switch n := node.(type) {
			case *influxql.BinaryExpr:
				switch n.Op {
				case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, influxql.OR, influxql.AND:
				default:
					err = errUnexpectedTagComparisonOperator
				}
			}
		})
		if err != nil {
			return nil, err
		}
	}

	indexref, err := index.Acquire()
	if err != nil {
		return nil, err
	}
	sfileref, err := sfile.Acquire()
	if err != nil {
		indexref.Release()
		return nil, err
	}

	return &seriesCursor{
		index:    index,
		indexref: indexref,
		sfile:    sfile,
		sfileref: sfileref,
		name:     req.Name,
		cond:     cond,
	}, nil
}

// Close closes the iterator. Safe to call multiple times.
func (cur *seriesCursor) Close() error {
	cur.sfileref.Release()
	cur.indexref.Release()
	return nil
}

// Next emits the next point in the iterator.
func (cur *seriesCursor) Next() (*SeriesCursorRow, error) {
	if !cur.init {
		if err := cur.readSeriesKeys(); err != nil {
			return nil, err
		}
		cur.init = true
	}

	if cur.ofs < len(cur.keys) {
		cur.row.Name, cur.row.Tags = tsdb.ParseSeriesKey(cur.keys[cur.ofs])
		if !bytes.HasPrefix(cur.row.Name, cur.name[:influxdb.OrgIDLength]) {
			return nil, errUnexpectedOrg
		}
		cur.ofs++
		return &cur.row, nil
	}

	return nil, nil
}

func (cur *seriesCursor) readSeriesKeys() error {
	sitr, err := cur.index.MeasurementSeriesByExprIterator(cur.name[:], cur.cond)
	if err != nil {
		return err
	} else if sitr == nil {
		return nil
	}
	defer sitr.Close()

	for {
		elem, err := sitr.Next()
		if err != nil {
			return err
		} else if elem.SeriesID.IsZero() {
			break
		}

		key := cur.sfile.SeriesKey(elem.SeriesID)
		if len(key) == 0 {
			continue
		}
		cur.keys = append(cur.keys, key)
	}

	// Sort keys.
	sort.Sort(seriesKeys(cur.keys))
	return nil
}

type seriesKeys [][]byte

func (a seriesKeys) Len() int      { return len(a) }
func (a seriesKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a seriesKeys) Less(i, j int) bool {
	return tsdb.CompareSeriesKeys(a[i], a[j]) == -1
}