package raft

import (
	"errors"
	"sync"
	"time"
)

//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------

// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
	server         *Server
	name           string
	prevLogIndex   uint64
	mutex          sync.Mutex
	heartbeatTimer *Timer
}

type FlushResponse struct {
	term    uint64
	success bool
	err     error
	peer    *Peer
}

//------------------------------------------------------------------------------
//
// Constructor
//
//------------------------------------------------------------------------------

// Creates a new peer.
func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
	p := &Peer{
		server:         server,
		name:           name,
		heartbeatTimer: NewTimer(heartbeatTimeout, heartbeatTimeout),
	}

	return p
}

//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------

// Retrieves the name of the peer.
func (p *Peer) Name() string {
	return p.name
}

// Retrieves the heartbeat timeout.
func (p *Peer) HeartbeatTimeout() time.Duration {
	return p.heartbeatTimer.MinDuration()
}

// Sets the heartbeat timeout.
func (p *Peer) SetHeartbeatTimeout(duration time.Duration) {
	p.heartbeatTimer.SetDuration(duration)
}

func (p *Peer) StartHeartbeat() {
	go p.heartbeat()
}

//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------

//--------------------------------------
// State
//--------------------------------------

// Stops the peer entirely.
func (p *Peer) stop() {
	p.mutex.Lock()
	defer p.mutex.Unlock()
	p.heartbeatTimer.Stop()
}

//--------------------------------------
// Copying
//--------------------------------------

// Clones the state of the peer. The clone is not attached to a server and
// the heartbeat timer will not exist.
func (p *Peer) clone() *Peer {
	p.mutex.Lock()
	defer p.mutex.Unlock()
	return &Peer{
		name:         p.name,
		prevLogIndex: p.prevLogIndex,
	}
}

//--------------------------------------
// Flush
//--------------------------------------

// Sends an AppendEntries RPC but does not obtain a lock
// on the server.
func (p *Peer) flush() (uint64, bool, error) {
	// We need to hold the log lock to create AppendEntriesRequest
	// avoid snapshot to delete the desired entries before AEQ()
	req := p.server.createAppendEntriesRequest(p.prevLogIndex)

	if req != nil {
		return p.sendFlushRequest(req)
	} else {
		req := p.server.createSnapshotRequest()
		return p.sendSnapshotRequest(req)
	}

}

// send VoteRequest Request
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
	req.peer = p
	debugln(p.server.Name(), "Send Vote Request to ", p.Name())
	if resp, _ := p.server.transporter.SendVoteRequest(p.server, p, req); resp != nil {
		resp.peer = p
		c <- resp
	}
}

// send Snapshot Request
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error) {
	// Ignore any null requests.
	if req == nil {
		return 0, false, errors.New("raft.Peer: Request required")
	}

	// Generate an snapshot request based on the state of the server and
	// log. Send the request through the user-provided handler and process the
	// result.
	resp, err := p.server.transporter.SendSnapshotRequest(p.server, p, req)

	if resp == nil {
		return 0, false, err
	}

	// If successful then update the previous log index. If it was
	// unsuccessful then decrement the previous log index and we'll try again
	// next time.
	if resp.Success {
		p.prevLogIndex = req.LastIndex

	} else {
		panic(resp)
	}

	return resp.Term, resp.Success, err
}

// Flushes a request through the server's transport.
func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) {
	// Ignore any null requests.
	if req == nil {
		return 0, false, errors.New("raft.Peer: Request required")
	}

	// Generate an AppendEntries request based on the state of the server and
	// log. Send the request through the user-provided handler and process the
	// result.
	//debugln("flush to ", p.Name())
	debugln("[HeartBeat] Leader ", p.server.Name(), " to ", p.Name(), " ", len(req.Entries), " ", time.Now())

	respChan := make(chan *AppendEntriesResponse, 2)

	go func() {
		tranResp, _ := p.server.transporter.SendAppendEntriesRequest(p.server, p, req)
		respChan <- tranResp
	}()

	go func() {
		<-time.After(p.server.heartbeatTimeout)
		respChan <- nil
	}()

	resp := <-respChan

	debugln("receive flush response from ", p.Name())

	if resp == nil {
		return 0, false, errors.New("Network problem")
	}

	// If successful then update the previous log index. If it was
	// unsuccessful then decrement the previous log index and we'll try again
	// next time.
	if resp.Success {
		if len(req.Entries) > 0 {
			p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
			debugln("Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
		}
	} else {

		if resp.Term > p.server.currentTerm {
			return resp.Term, false, errors.New("Step down")
		}

		// we may miss a response from peer
		if resp.CommitIndex > p.prevLogIndex {
			p.prevLogIndex = resp.CommitIndex
		} else if p.prevLogIndex > 0 {
			// Decrement the previous log index down until we find a match. Don't
			// let it go below where the peer's commit index is though. That's a
			// problem.
			p.prevLogIndex--
		}

	}
	return resp.Term, resp.Success, nil
}

//--------------------------------------
// Heartbeat
//--------------------------------------

// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat() {
	for {

		// (1) timeout/fire happens, flush the peer
		// (2) stopped, return

		if p.heartbeatTimer.Start() {

			var f FlushResponse

			f.peer = p

			f.term, f.success, f.err = p.flush()

			// if the peer successfully appended the log entry
			// we will tell the commit center
			if f.success {
				if p.prevLogIndex > p.server.log.CommitIndex() {
					debugln("[Heartbeat] Peer", p.Name(), "send to commit center")
					p.server.response <- f
					debugln("[Heartbeat] Peer", p.Name(), "back from commit center")
				}

			} else {
				// shutdown the heartbeat
				if f.term > p.server.currentTerm {
					debugln("[Heartbeat] SetpDown!")
					select {
					case p.server.stepDown <- f.term:
						return
					default:
						return
					}
				}
			}

		} else {
			// shutdown
			return
		}
	}
}