package influxql

import (
	"fmt"
	"sort"
	"strconv"
	"time"

	"github.com/apache/arrow/go/arrow/array"
	"github.com/influxdata/flux"
	"github.com/influxdata/flux/arrow"
	"github.com/influxdata/flux/execute"
	"github.com/influxdata/flux/memory"
	"github.com/influxdata/flux/values"
)

// responseIterator implements flux.ResultIterator for a Response.
type responseIterator struct {
	response  *Response
	resultIdx int
}

// NewResponseIterator constructs a flux.ResultIterator from a Response.
func NewResponseIterator(r *Response) flux.ResultIterator {
	return &responseIterator{
		response: r,
	}
}

// More returns true if there are results left to iterate through.
// It is used to implement flux.ResultIterator.
func (r *responseIterator) More() bool {
	return r.resultIdx < len(r.response.Results)
}

// Next retrieves the next flux.Result.
// It is used to implement flux.ResultIterator.
func (r *responseIterator) Next() flux.Result {
	res := r.response.Results[r.resultIdx]
	r.resultIdx++
	return newQueryResult(&res)
}

// Release is a noop.
// It is used to implement flux.ResultIterator.
func (r *responseIterator) Release() {}

// Err returns an error if the response contained an error.
// It is used to implement flux.ResultIterator.
func (r *responseIterator) Err() error {
	if r.response.Err != "" {
		return fmt.Errorf(r.response.Err)
	}

	return nil
}

func (r *responseIterator) Statistics() flux.Statistics {
	return flux.Statistics{}
}

// seriesIterator is a simple wrapper for Result that implements flux.Result and flux.TableIterator.
type seriesIterator struct {
	result *Result
}

func newQueryResult(r *Result) *seriesIterator {
	return &seriesIterator{
		result: r,
	}
}

// Name returns the results statement id.
// It is used to implement flux.Result.
func (r *seriesIterator) Name() string {
	return strconv.Itoa(r.result.StatementID)
}

// Tables returns the original as a flux.TableIterator.
// It is used to implement flux.Result.
func (r *seriesIterator) Tables() flux.TableIterator {
	return r
}

// Do iterates through the series of a Result.
// It is used to implement flux.TableIterator.
func (r *seriesIterator) Do(f func(flux.Table) error) error {
	for _, row := range r.result.Series {
		t, err := newQueryTable(row)
		if err != nil {
			return err
		}
		if err := f(t); err != nil {
			return err
		}
	}

	return nil
}

func (r *seriesIterator) Statistics() flux.Statistics {
	return flux.Statistics{}
}

// queryTable implements flux.Table and flux.ColReader.
type queryTable struct {
	row      *Row
	groupKey flux.GroupKey
	colMeta  []flux.ColMeta
	cols     []array.Interface
}

func newQueryTable(r *Row) (*queryTable, error) {
	t := &queryTable{
		row: r,
	}
	if err := t.translateRowsToColumns(); err != nil {
		return nil, err
	}
	return t, nil
}

func (t *queryTable) Statistics() flux.Statistics {
	return flux.Statistics{}
}

// Data in a column is laid out in the following way:
//   [ r.row.Columns... , r.tagKeys()... , r.row.Name ]
func (t *queryTable) translateRowsToColumns() error {
	t.cols = make([]array.Interface, len(t.Cols()))
	for i := range t.row.Columns {
		col := t.Cols()[i]
		switch col.Type {
		case flux.TFloat:
			b := arrow.NewFloatBuilder(&memory.Allocator{})
			b.Reserve(t.Len())
			for _, row := range t.row.Values {
				val, ok := row[i].(float64)
				if !ok {
					return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
				}
				b.Append(val)
			}
			t.cols[i] = b.NewArray()
			b.Release()
		case flux.TInt:
			b := arrow.NewIntBuilder(&memory.Allocator{})
			b.Reserve(t.Len())
			for _, row := range t.row.Values {
				val, ok := row[i].(int64)
				if !ok {
					return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
				}
				b.Append(val)
			}
			t.cols[i] = b.NewArray()
			b.Release()
		case flux.TUInt:
			b := arrow.NewUintBuilder(&memory.Allocator{})
			b.Reserve(t.Len())
			for _, row := range t.row.Values {
				val, ok := row[i].(uint64)
				if !ok {
					return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
				}
				b.Append(val)
			}
			t.cols[i] = b.NewArray()
			b.Release()
		case flux.TString:
			b := arrow.NewStringBuilder(&memory.Allocator{})
			b.Reserve(t.Len())
			for _, row := range t.row.Values {
				val, ok := row[i].(string)
				if !ok {
					return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
				}
				b.AppendString(val)
			}
			t.cols[i] = b.NewArray()
			b.Release()
		case flux.TBool:
			b := arrow.NewBoolBuilder(&memory.Allocator{})
			b.Reserve(t.Len())
			for _, row := range t.row.Values {
				val, ok := row[i].(bool)
				if !ok {
					return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
				}
				b.Append(val)
			}
			t.cols[i] = b.NewArray()
			b.Release()
		case flux.TTime:
			b := arrow.NewIntBuilder(&memory.Allocator{})
			b.Reserve(t.Len())
			for _, row := range t.row.Values {
				switch val := row[i].(type) {
				case int64:
					b.Append(val)
				case float64:
					b.Append(int64(val))
				case string:
					tm, err := time.Parse(time.RFC3339, val)
					if err != nil {
						return fmt.Errorf("could not parse string %q as time: %v", val, err)
					}
					b.Append(tm.UnixNano())
				default:
					return fmt.Errorf("unsupported type %T found in column %s", val, col.Label)
				}
			}
			t.cols[i] = b.NewArray()
			b.Release()
		default:
			return fmt.Errorf("invalid type %T found in column %s", col.Type, col.Label)
		}
	}

	for j := len(t.row.Columns); j < len(t.Cols()); j++ {
		b := arrow.NewStringBuilder(&memory.Allocator{})
		b.Reserve(t.Len())

		var value string
		if key := t.Cols()[j].Label; key == "_measurement" {
			value = t.row.Name
		} else {
			value = t.row.Tags[key]
		}

		for i := 0; i < t.Len(); i++ {
			b.AppendString(value)
		}
		t.cols[j] = b.NewArray()
		b.Release()
	}
	return nil
}

// Key constructs the flux.GroupKey for a Row from the rows
// tags and measurement.
// It is used to implement flux.Table and flux.ColReader.
func (r *queryTable) Key() flux.GroupKey {
	if r.groupKey == nil {
		cols := make([]flux.ColMeta, len(r.row.Tags)+1) // plus one is for measurement
		vs := make([]values.Value, len(r.row.Tags)+1)
		kvs := make([]interface{}, len(r.row.Tags)+1)
		colMeta := r.Cols()
		labels := append(r.tagKeys(), "_measurement")
		for j, label := range labels {
			idx := execute.ColIdx(label, colMeta)
			if idx < 0 {
				panic(fmt.Errorf("table invalid: missing group column %q", label))
			}
			cols[j] = colMeta[idx]
			kvs[j] = "string"
			v := values.New(kvs[j])
			if v == values.InvalidValue {
				panic(fmt.Sprintf("unsupported value kind %T", kvs[j]))
			}
			vs[j] = v
		}
		r.groupKey = execute.NewGroupKey(cols, vs)
	}

	return r.groupKey
}

// tags returns the tag keys for a Row.
func (r *queryTable) tagKeys() []string {
	tags := []string{}
	for t := range r.row.Tags {
		tags = append(tags, t)
	}
	sort.Strings(tags)
	return tags
}

// Cols returns the columns for a row where the data is laid out in the following way:
//   [ r.row.Columns... , r.tagKeys()... , r.row.Name ]
// It is used to implement flux.Table and flux.ColReader.
func (r *queryTable) Cols() []flux.ColMeta {
	if r.colMeta == nil {
		colMeta := make([]flux.ColMeta, len(r.row.Columns)+len(r.row.Tags)+1)
		for i, col := range r.row.Columns {
			colMeta[i] = flux.ColMeta{
				Label: col,
				Type:  flux.TInvalid,
			}
			if col == "time" {
				// rename the time column
				colMeta[i].Label = "_time"
				colMeta[i].Type = flux.TTime
			}
		}

		if len(r.row.Values) < 1 {
			panic("must have at least one value")
		}
		data := r.row.Values[0]
		for i := range r.row.Columns {
			v := data[i]
			if colMeta[i].Label == "_time" {
				continue
			}
			switch v.(type) {
			case float64:
				colMeta[i].Type = flux.TFloat
			case int64:
				colMeta[i].Type = flux.TInt
			case uint64:
				colMeta[i].Type = flux.TUInt
			case bool:
				colMeta[i].Type = flux.TBool
			case string:
				colMeta[i].Type = flux.TString
			}
		}

		tags := r.tagKeys()

		leng := len(r.row.Columns)
		for i, tag := range tags {
			colMeta[leng+i] = flux.ColMeta{
				Label: tag,
				Type:  flux.TString,
			}
		}

		leng = leng + len(tags)
		colMeta[leng] = flux.ColMeta{
			Label: "_measurement",
			Type:  flux.TString,
		}
		r.colMeta = colMeta
	}

	return r.colMeta
}

// Do applies f to itself. This is because Row is a flux.ColReader.
// It is used to implement flux.Table.
func (r *queryTable) Do(f func(flux.ColReader) error) error {
	return f(r)
}

func (r *queryTable) Done() {}

// Empty returns true if a Row has no values.
// It is used to implement flux.Table.
func (r *queryTable) Empty() bool { return r.Len() == 0 }

// Len returns the length or r.row.Values
// It is used to implement flux.ColReader.
func (r *queryTable) Len() int {
	return len(r.row.Values)
}

func (r *queryTable) Retain()  {}
func (r *queryTable) Release() {}

// Bools returns the values in column index j as bools.
// It will panic if the column is not a []bool.
// It is used to implement flux.ColReader.
func (r *queryTable) Bools(j int) *array.Boolean {
	return r.cols[j].(*array.Boolean)
}

// Ints returns the values in column index j as ints.
// It will panic if the column is not a []int64.
// It is used to implement flux.ColReader.
func (r *queryTable) Ints(j int) *array.Int64 {
	return r.cols[j].(*array.Int64)
}

// UInts returns the values in column index j as ints.
// It will panic if the column is not a []uint64.
// It is used to implement flux.ColReader.
func (r *queryTable) UInts(j int) *array.Uint64 {
	return r.cols[j].(*array.Uint64)
}

// Floats returns the values in column index j as floats.
// It will panic if the column is not a []float64.
// It is used to implement flux.ColReader.
func (r *queryTable) Floats(j int) *array.Float64 {
	return r.cols[j].(*array.Float64)
}

// Strings returns the values in column index j as strings.
// It will panic if the column is not a []string.
// It is used to implement flux.ColReader.
func (r *queryTable) Strings(j int) *array.Binary {
	return r.cols[j].(*array.Binary)
}

// Times returns the values in column index j as values.Times.
// It will panic if the column is not a []values.Time.
// It is used to implement flux.ColReader.
func (r *queryTable) Times(j int) *array.Int64 {
	return r.cols[j].(*array.Int64)
}