831 lines
23 KiB
Go
831 lines
23 KiB
Go
package coordinator
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
log "code.google.com/p/log4go"
|
|
"github.com/gorilla/mux"
|
|
"github.com/influxdb/influxdb/_vendor/raft"
|
|
"github.com/influxdb/influxdb/cluster"
|
|
"github.com/influxdb/influxdb/common"
|
|
"github.com/influxdb/influxdb/configuration"
|
|
"github.com/influxdb/influxdb/parser"
|
|
"github.com/influxdb/influxdb/protocol"
|
|
)
|
|
|
|
const (
|
|
DEFAULT_ROOT_PWD = "root"
|
|
DEFAULT_ROOT_PWD_ENVKEY = "INFLUXDB_INIT_PWD"
|
|
RAFT_NAME_SIZE = 8
|
|
)
|
|
|
|
// The raftd server is a combination of the Raft server and an HTTP
|
|
// server which acts as the transport.
|
|
type RaftServer struct {
|
|
name string
|
|
path string
|
|
bind_address string
|
|
router *mux.Router
|
|
raftServer raft.Server
|
|
httpServer *http.Server
|
|
clusterConfig *cluster.ClusterConfiguration
|
|
mutex sync.RWMutex
|
|
listener net.Listener
|
|
closing bool
|
|
config *configuration.Configuration
|
|
notLeader chan bool
|
|
coordinator *Coordinator
|
|
processContinuousQueries bool
|
|
}
|
|
|
|
var registeredCommands bool
|
|
|
|
// Creates a new server.
|
|
func NewRaftServer(config *configuration.Configuration, clusterConfig *cluster.ClusterConfiguration) *RaftServer {
|
|
if !registeredCommands {
|
|
registeredCommands = true
|
|
for _, command := range internalRaftCommands {
|
|
raft.RegisterCommand(command)
|
|
}
|
|
}
|
|
|
|
s := &RaftServer{
|
|
path: config.RaftDir,
|
|
clusterConfig: clusterConfig,
|
|
notLeader: make(chan bool, 1),
|
|
router: mux.NewRouter(),
|
|
config: config,
|
|
}
|
|
// Read existing name or generate a new one.
|
|
if b, err := ioutil.ReadFile(filepath.Join(s.path, "name")); err == nil {
|
|
s.name = string(b)
|
|
} else {
|
|
var i uint64
|
|
if _, err := os.Stat("/dev/urandom"); err == nil {
|
|
log.Info("Using /dev/urandom to initialize the raft server name")
|
|
f, err := os.Open("/dev/urandom")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer f.Close()
|
|
readBytes := 0
|
|
b := make([]byte, RAFT_NAME_SIZE)
|
|
for readBytes < RAFT_NAME_SIZE {
|
|
n, err := f.Read(b[readBytes:])
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
readBytes += n
|
|
}
|
|
err = binary.Read(bytes.NewBuffer(b), binary.BigEndian, &i)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
} else {
|
|
log.Info("Using rand package to generate raft server name")
|
|
rand.Seed(time.Now().UnixNano())
|
|
i = uint64(rand.Int())
|
|
}
|
|
s.name = fmt.Sprintf("%08x", i)
|
|
log.Info("Setting raft name to %s", s.name)
|
|
if err = ioutil.WriteFile(filepath.Join(s.path, "name"), []byte(s.name), 0644); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *RaftServer) GetRaftName() string {
|
|
return s.name
|
|
}
|
|
|
|
func (s *RaftServer) GetLeaderRaftName() string {
|
|
return s.raftServer.Leader()
|
|
}
|
|
|
|
func (s *RaftServer) IsLeaderByRaftName(name string) bool {
|
|
//s.raftServer.State() == raft.Leader
|
|
return s.raftServer.Leader() == name
|
|
}
|
|
|
|
/**
|
|
* return a consistant leader raft connection string when called on all living nodes include leader.
|
|
*/
|
|
func (s *RaftServer) GetLeaderRaftConnectString() (string, bool) {
|
|
if s.IsLeaderByRaftName(s.name) {
|
|
return s.config.RaftConnectionString(), true
|
|
} else {
|
|
return s.leaderConnectString()
|
|
}
|
|
}
|
|
|
|
func (s *RaftServer) leaderConnectString() (string, bool) {
|
|
leader := s.raftServer.Leader()
|
|
peers := s.raftServer.Peers()
|
|
if peer, ok := peers[leader]; !ok {
|
|
return "", false
|
|
} else {
|
|
return peer.ConnectionString, true
|
|
}
|
|
}
|
|
|
|
func (s *RaftServer) doOrProxyCommand(command raft.Command) (interface{}, error) {
|
|
var err error
|
|
var value interface{}
|
|
for i := 0; i < 3; i++ {
|
|
value, err = s.doOrProxyCommandOnce(command)
|
|
if err == nil {
|
|
return value, nil
|
|
}
|
|
if strings.Contains(err.Error(), "node failure") {
|
|
continue
|
|
}
|
|
return nil, err
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
func (s *RaftServer) doOrProxyCommandOnce(command raft.Command) (interface{}, error) {
|
|
|
|
if s.raftServer.State() == raft.Leader {
|
|
value, err := s.raftServer.Do(command)
|
|
if err != nil {
|
|
log.Error("Cannot run command %#v. %s", command, err)
|
|
}
|
|
return value, err
|
|
} else {
|
|
if leader, ok := s.leaderConnectString(); !ok {
|
|
return nil, errors.New("Couldn't connect to the cluster leader...")
|
|
} else {
|
|
return SendCommandToServer(leader, command)
|
|
}
|
|
}
|
|
}
|
|
|
|
func SendCommandToServer(url string, command raft.Command) (interface{}, error) {
|
|
var b bytes.Buffer
|
|
if err := json.NewEncoder(&b).Encode(command); err != nil {
|
|
return nil, err
|
|
}
|
|
resp, err := http.Post(url+"/process_command/"+command.CommandName(), "application/json", &b)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err2 := ioutil.ReadAll(resp.Body)
|
|
|
|
if resp.StatusCode != 200 {
|
|
return nil, errors.New(strings.TrimSpace(string(body)))
|
|
}
|
|
|
|
return body, err2
|
|
|
|
}
|
|
|
|
func (s *RaftServer) CreateDatabase(name string) error {
|
|
command := NewCreateDatabaseCommand(name)
|
|
_, err := s.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (s *RaftServer) DropDatabase(name string) error {
|
|
command := NewDropDatabaseCommand(name)
|
|
_, err := s.doOrProxyCommand(command)
|
|
// TODO: Dropping database from the metastore is synchronous, but the underlying data
|
|
// delete is asynchronous. If the server crashes or restarts while this is happening
|
|
// there will be orphaned data sitting around. Not a huge deal, but we should fix this
|
|
// at some point.
|
|
// force a log compaction because we don't want this replaying after a server restarts
|
|
if err == nil {
|
|
err = s.ForceLogCompaction()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *RaftServer) SaveDbUser(u *cluster.DbUser) error {
|
|
command := NewSaveDbUserCommand(u)
|
|
_, err := s.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (s *RaftServer) ChangeDbUserPassword(db, username string, hash []byte) error {
|
|
command := NewChangeDbUserPasswordCommand(db, username, string(hash))
|
|
_, err := s.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (s *RaftServer) ChangeDbUserPermissions(db, username, readPermissions, writePermissions string) error {
|
|
command := NewChangeDbUserPermissionsCommand(db, username, readPermissions, writePermissions)
|
|
_, err := s.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (s *RaftServer) SaveClusterAdminUser(u *cluster.ClusterAdmin) error {
|
|
command := NewSaveClusterAdminCommand(u)
|
|
_, err := s.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (s *RaftServer) CreateRootUser() error {
|
|
u := &cluster.ClusterAdmin{cluster.CommonUser{Name: "root", Hash: "", IsUserDeleted: false, CacheKey: "root"}}
|
|
password := os.Getenv(DEFAULT_ROOT_PWD_ENVKEY)
|
|
if password == "" {
|
|
password = DEFAULT_ROOT_PWD
|
|
}
|
|
hash, _ := cluster.HashPassword(password)
|
|
u.ChangePassword(string(hash))
|
|
return s.SaveClusterAdminUser(u)
|
|
}
|
|
|
|
func (s *RaftServer) SetContinuousQueryTimestamp(timestamp time.Time) error {
|
|
command := NewSetContinuousQueryTimestampCommand(timestamp)
|
|
_, err := s.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (s *RaftServer) CreateContinuousQuery(db string, query string) error {
|
|
selectQuery, err := parser.ParseSelectQuery(query)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to parse continuous query: %s", query)
|
|
}
|
|
|
|
if !selectQuery.IsValidContinuousQuery() {
|
|
return fmt.Errorf("Continuous queries with a group by clause must include time(...) as one of the elements")
|
|
}
|
|
|
|
if !selectQuery.IsNonRecursiveContinuousQuery() {
|
|
return fmt.Errorf("Continuous queries with :series_name interpolation must use a regular expression in the from clause that prevents recursion")
|
|
}
|
|
|
|
duration, _, err := selectQuery.GetGroupByClause().GetGroupByTime()
|
|
if err != nil {
|
|
return fmt.Errorf("Couldn't get group by time for continuous query: %s", err)
|
|
}
|
|
|
|
// if there are already-running queries, we need to initiate a backfill
|
|
if selectQuery.GetIntoClause().Backfill &&
|
|
!s.clusterConfig.LastContinuousQueryRunTime().IsZero() {
|
|
zeroTime := time.Time{}
|
|
currentBoundary := time.Now()
|
|
if duration != nil {
|
|
currentBoundary = currentBoundary.Truncate(*duration)
|
|
}
|
|
go s.runContinuousQuery(db, selectQuery, zeroTime, currentBoundary)
|
|
} else {
|
|
// TODO: make continuous queries backfill for queries that don't have a group by time
|
|
}
|
|
|
|
command := NewCreateContinuousQueryCommand(db, query)
|
|
_, err = s.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (s *RaftServer) DeleteContinuousQuery(db string, id uint32) error {
|
|
command := NewDeleteContinuousQueryCommand(db, id)
|
|
_, err := s.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (s *RaftServer) ChangeConnectionString(raftName, protobufConnectionString, raftConnectionString string, forced bool) error {
|
|
command := &InfluxChangeConnectionStringCommand{
|
|
Force: true,
|
|
Name: raftName,
|
|
ConnectionString: raftConnectionString,
|
|
ProtobufConnectionString: protobufConnectionString,
|
|
}
|
|
for _, s := range s.raftServer.Peers() {
|
|
// send the command and ignore errors in case a server is down
|
|
SendCommandToServer(s.ConnectionString, command)
|
|
}
|
|
|
|
// make the change permament
|
|
command.Force = false
|
|
_, err := s.doOrProxyCommand(command)
|
|
log.Info("Running the actual command")
|
|
return err
|
|
}
|
|
|
|
func (s *RaftServer) AssignCoordinator(coordinator *Coordinator) error {
|
|
s.coordinator = coordinator
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
MAX_SIZE = 10 * MEGABYTE
|
|
)
|
|
|
|
func (s *RaftServer) ForceLogCompaction() error {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
err := s.raftServer.TakeSnapshot()
|
|
if err != nil {
|
|
log.Error("Cannot take snapshot: %s", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *RaftServer) CompactLog() {
|
|
checkSizeTicker := time.Tick(time.Minute)
|
|
forceCompactionTicker := time.Tick(time.Hour * 24)
|
|
|
|
for {
|
|
select {
|
|
case <-checkSizeTicker:
|
|
log.Debug("Testing if we should compact the raft logs")
|
|
|
|
path := s.raftServer.LogPath()
|
|
size, err := common.GetFileSize(path)
|
|
if err != nil {
|
|
log.Error("Error getting size of file '%s': %s", path, err)
|
|
}
|
|
if size < MAX_SIZE {
|
|
continue
|
|
}
|
|
s.ForceLogCompaction()
|
|
case <-forceCompactionTicker:
|
|
s.ForceLogCompaction()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *RaftServer) CommittedAllChanges() bool {
|
|
entries := s.raftServer.LogEntries()
|
|
if len(entries) == 0 {
|
|
if s.raftServer.CommitIndex() == 0 {
|
|
// commit index should never be zero, we have at least one
|
|
// raft join command that should be committed, something is
|
|
// wrong, return false to be safe
|
|
return false
|
|
}
|
|
// may be we recovered from a snapshot ?
|
|
return true
|
|
}
|
|
lastIndex := entries[len(entries)-1].Index()
|
|
return s.raftServer.CommitIndex() == lastIndex
|
|
}
|
|
|
|
func (s *RaftServer) startRaft() error {
|
|
log.Info("Initializing Raft Server: %s", s.config.RaftConnectionString())
|
|
|
|
// Initialize and start Raft server.
|
|
transporter := raft.NewHTTPTransporter("/raft", raft.DefaultElectionTimeout)
|
|
var err error
|
|
s.raftServer, err = raft.NewServer(s.name, s.path, transporter, s.clusterConfig, s.clusterConfig, s.config.RaftConnectionString())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.raftServer.SetElectionTimeout(s.config.RaftTimeout.Duration)
|
|
|
|
err = s.raftServer.LoadSnapshot()
|
|
if err != nil {
|
|
log.Info(err)
|
|
}
|
|
|
|
s.raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventHandler)
|
|
|
|
transporter.Install(s.raftServer, s)
|
|
s.raftServer.Start()
|
|
|
|
go s.CompactLog()
|
|
|
|
if !s.raftServer.IsLogEmpty() {
|
|
log.Info("Recovered from log")
|
|
return nil
|
|
}
|
|
|
|
potentialLeaders := s.config.SeedServers
|
|
|
|
if len(potentialLeaders) == 0 {
|
|
log.Info("Starting as new Raft leader...")
|
|
name := s.raftServer.Name()
|
|
_, err := s.raftServer.Do(&InfluxJoinCommand{
|
|
Name: name,
|
|
ConnectionString: s.config.RaftConnectionString(),
|
|
ProtobufConnectionString: s.config.ProtobufConnectionString(),
|
|
})
|
|
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
err = s.CreateRootUser()
|
|
return err
|
|
}
|
|
|
|
for {
|
|
for _, leader := range potentialLeaders {
|
|
log.Info("(raft:%s) Attempting to join leader: %s", s.raftServer.Name(), leader)
|
|
|
|
if err := s.Join(leader); err == nil {
|
|
log.Info("Joined: %s", leader)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
log.Warn("Couldn't join any of the seeds, sleeping and retrying...")
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func (s *RaftServer) raftEventHandler(e raft.Event) {
|
|
if e.Value() == "leader" {
|
|
log.Info("(raft:%s) Selected as leader. Starting leader loop.", s.raftServer.Name())
|
|
config := s.clusterConfig.GetLocalConfiguration()
|
|
retentionSweepPeriod := config.StorageRetentionSweepPeriod.Duration
|
|
retentionSweepTimer := time.NewTicker(retentionSweepPeriod)
|
|
go s.raftLeaderLoop(time.NewTicker(1*time.Second),
|
|
retentionSweepTimer)
|
|
}
|
|
|
|
if e.PrevValue() == "leader" {
|
|
log.Info("(raft:%s) Demoted from leader. Ending leader loop.", s.raftServer.Name())
|
|
s.notLeader <- true
|
|
}
|
|
}
|
|
|
|
func (s *RaftServer) raftLeaderLoop(loopTimer *time.Ticker,
|
|
retentionSweepTimer *time.Ticker) {
|
|
for {
|
|
select {
|
|
case <-loopTimer.C:
|
|
log.Debug("(raft:%s) Executing leader loop.", s.raftServer.Name())
|
|
s.checkContinuousQueries()
|
|
break
|
|
case <-retentionSweepTimer.C:
|
|
s.dropShardsWithRetentionPolicies()
|
|
break
|
|
case <-s.notLeader:
|
|
log.Debug("(raft:%s) Exiting leader loop.", s.raftServer.Name())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *RaftServer) StartProcessingContinuousQueries() {
|
|
s.processContinuousQueries = true
|
|
}
|
|
|
|
func (s *RaftServer) checkContinuousQueries() {
|
|
if !s.processContinuousQueries {
|
|
return
|
|
}
|
|
|
|
if !s.clusterConfig.HasContinuousQueries() {
|
|
return
|
|
}
|
|
|
|
runTime := time.Now()
|
|
queriesDidRun := false
|
|
|
|
for db, queries := range s.clusterConfig.ParsedContinuousQueries {
|
|
for _, query := range queries {
|
|
groupByClause := query.GetGroupByClause()
|
|
|
|
// if there's no group by clause, it's handled as a fanout query
|
|
if groupByClause.Elems == nil {
|
|
continue
|
|
}
|
|
|
|
duration, _, err := query.GetGroupByClause().GetGroupByTime()
|
|
if err != nil {
|
|
log.Error("Couldn't get group by time for continuous query:", err)
|
|
continue
|
|
}
|
|
|
|
currentBoundary := runTime.Truncate(*duration)
|
|
lastRun := s.clusterConfig.LastContinuousQueryRunTime()
|
|
if lastRun.IsZero() && !query.GetIntoClause().Backfill {
|
|
// don't backfill for this continuous query, leave it for next
|
|
// iteration. Otherwise, we're going to run the continuous
|
|
// query from lastRun which is the beginning of time until
|
|
// now. This is equivalent to backfilling.
|
|
continue
|
|
}
|
|
|
|
lastBoundary := lastRun.Truncate(*duration)
|
|
|
|
if currentBoundary.After(lastRun) {
|
|
s.runContinuousQuery(db, query, lastBoundary, currentBoundary)
|
|
queriesDidRun = true
|
|
}
|
|
}
|
|
}
|
|
|
|
if queriesDidRun {
|
|
s.clusterConfig.SetLastContinuousQueryRunTime(runTime)
|
|
s.SetContinuousQueryTimestamp(runTime)
|
|
}
|
|
}
|
|
|
|
func (s *RaftServer) dropShardsWithRetentionPolicies() {
|
|
log.Info("Checking for shards to drop")
|
|
shards := s.clusterConfig.GetExpiredShards()
|
|
for _, shard := range shards {
|
|
s.DropShard(shard.Id(), shard.ServerIds())
|
|
}
|
|
}
|
|
|
|
func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, start time.Time, end time.Time) {
|
|
adminName := s.clusterConfig.GetClusterAdmins()[0]
|
|
clusterAdmin := s.clusterConfig.GetClusterAdmin(adminName)
|
|
intoClause := query.GetIntoClause()
|
|
targetName := intoClause.Target.Name
|
|
queryString := query.GetQueryStringWithTimesAndNoIntoClause(start, end)
|
|
|
|
writer := NewContinuousQueryWriter(s.coordinator, db, targetName, query)
|
|
s.coordinator.RunQuery(clusterAdmin, db, queryString, writer)
|
|
}
|
|
|
|
func (s *RaftServer) ListenAndServe() error {
|
|
l, err := net.Listen("tcp", s.config.RaftListenString())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return s.Serve(l)
|
|
}
|
|
|
|
func (s *RaftServer) Serve(l net.Listener) error {
|
|
s.listener = l
|
|
|
|
log.Info("Initializing Raft HTTP server")
|
|
|
|
// Initialize and start HTTP server.
|
|
s.httpServer = &http.Server{
|
|
Handler: s.router,
|
|
}
|
|
|
|
s.router.HandleFunc("/join", s.joinHandler).Methods("POST")
|
|
s.router.HandleFunc("/process_command/{command_type}", s.processCommandHandler).Methods("POST")
|
|
|
|
log.Info("Raft Server Listening at %s", s.config.RaftListenString())
|
|
|
|
go func() {
|
|
err := s.httpServer.Serve(l)
|
|
if !strings.Contains(err.Error(), "closed network") {
|
|
panic(err)
|
|
}
|
|
}()
|
|
started := make(chan error)
|
|
go func() {
|
|
started <- s.startRaft()
|
|
}()
|
|
err := <-started
|
|
// time.Sleep(3 * time.Second)
|
|
return err
|
|
}
|
|
|
|
func (self *RaftServer) Close() {
|
|
if !self.closing || self.raftServer == nil {
|
|
self.closing = true
|
|
self.raftServer.Stop()
|
|
self.listener.Close()
|
|
self.notLeader <- true
|
|
}
|
|
}
|
|
|
|
// This is a hack around Gorilla mux not providing the correct net/http
|
|
// HandleFunc() interface.
|
|
func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
|
|
s.router.HandleFunc(pattern, handler)
|
|
}
|
|
|
|
// Joins to the leader of an existing cluster.
|
|
func (s *RaftServer) RemoveServer(id uint32) error {
|
|
command := &InfluxForceLeaveCommand{
|
|
Id: id,
|
|
}
|
|
for _, s := range s.raftServer.Peers() {
|
|
// send the command and ignore errors in case a server is down
|
|
SendCommandToServer(s.ConnectionString, command)
|
|
}
|
|
|
|
if _, err := command.Apply(s.raftServer); err != nil {
|
|
return err
|
|
}
|
|
|
|
// make the change permament
|
|
log.Info("Running the actual command")
|
|
_, err := s.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
// Joins to the leader of an existing cluster.
|
|
func (s *RaftServer) Join(leader string) error {
|
|
command := &InfluxJoinCommand{
|
|
Name: s.raftServer.Name(),
|
|
ConnectionString: s.config.RaftConnectionString(),
|
|
ProtobufConnectionString: s.config.ProtobufConnectionString(),
|
|
}
|
|
connectUrl := leader
|
|
if !strings.HasPrefix(connectUrl, "http://") {
|
|
connectUrl = "http://" + connectUrl
|
|
}
|
|
if !strings.HasSuffix(connectUrl, "/join") {
|
|
connectUrl = connectUrl + "/join"
|
|
}
|
|
|
|
var b bytes.Buffer
|
|
json.NewEncoder(&b).Encode(command)
|
|
log.Debug("(raft:%s) Posting to seed server %s", s.raftServer.Name(), connectUrl)
|
|
tr := &http.Transport{
|
|
ResponseHeaderTimeout: time.Second,
|
|
}
|
|
client := &http.Client{Transport: tr}
|
|
resp, err := client.Post(connectUrl, "application/json", &b)
|
|
if err != nil {
|
|
log.Error(err)
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode == http.StatusTemporaryRedirect {
|
|
address := resp.Header.Get("Location")
|
|
log.Debug("Redirected to %s to join leader", address)
|
|
return s.Join(address)
|
|
}
|
|
|
|
log.Debug("(raft:%s) Posted to seed server %s", s.raftServer.Name(), connectUrl)
|
|
return nil
|
|
}
|
|
|
|
func (s *RaftServer) retryCommand(command raft.Command, retries int) (ret interface{}, err error) {
|
|
for ; retries > 0; retries-- {
|
|
ret, err = s.raftServer.Do(command)
|
|
if err == nil {
|
|
return ret, nil
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
log.Info("Retrying RAFT command...")
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
|
|
// if this is the leader, process the command
|
|
if s.raftServer.State() == raft.Leader {
|
|
command := &InfluxJoinCommand{}
|
|
if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
log.Debug("ON RAFT LEADER - JOIN: %v", command)
|
|
// during the test suite the join command will sometimes time out.. just retry a few times
|
|
if _, err := s.raftServer.Do(command); err != nil {
|
|
log.Error("Can't process %v: %s", command, err)
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
}
|
|
return
|
|
}
|
|
|
|
leader, ok := s.leaderConnectString()
|
|
log.Debug("Non-leader redirecting to: (%v, %v)", leader, ok)
|
|
if ok {
|
|
log.Debug("redirecting to leader to join...")
|
|
http.Redirect(w, req, leader+"/join", http.StatusTemporaryRedirect)
|
|
} else {
|
|
http.Error(w, errors.New("Couldn't find leader of the cluster to join").Error(), http.StatusInternalServerError)
|
|
}
|
|
}
|
|
|
|
func (s *RaftServer) marshalAndDoCommandFromBody(command raft.Command, req *http.Request) (interface{}, error) {
|
|
if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if c, ok := command.(*InfluxChangeConnectionStringCommand); ok && c.Force {
|
|
// if this is a forced change, just do it now and return. Note
|
|
// that this isn't a permanent change, since on restart the old
|
|
// connection strings will be used
|
|
return c.Apply(s.raftServer)
|
|
}
|
|
|
|
if result, err := s.raftServer.Do(command); err != nil {
|
|
return nil, err
|
|
} else {
|
|
return result, nil
|
|
}
|
|
}
|
|
|
|
func (s *RaftServer) processCommandHandler(w http.ResponseWriter, req *http.Request) {
|
|
vars := mux.Vars(req)
|
|
value := vars["command_type"]
|
|
command := internalRaftCommands[value]
|
|
v := reflect.New(reflect.Indirect(reflect.ValueOf(command)).Type()).Interface()
|
|
copy, ok := v.(raft.Command)
|
|
if !ok {
|
|
panic(fmt.Sprintf("raft: Unable to copy command: %s (%v)", command.CommandName(), reflect.ValueOf(v).Kind().String()))
|
|
}
|
|
|
|
if result, err := s.marshalAndDoCommandFromBody(copy, req); err != nil {
|
|
log.Error("command %T failed: %s", copy, err)
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
} else {
|
|
if result != nil {
|
|
js, _ := json.Marshal(result)
|
|
w.Write(js)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (self *RaftServer) CreateShards(shards []*cluster.NewShardData) ([]*cluster.ShardData, error) {
|
|
log.Debug("RAFT: CreateShards")
|
|
command := NewCreateShardsCommand(shards)
|
|
createShardsResult, err := self.doOrProxyCommand(command)
|
|
if err != nil {
|
|
log.Error("RAFT: CreateShards: ", err)
|
|
return nil, err
|
|
}
|
|
if x, k := createShardsResult.([]byte); k {
|
|
newShards := make([]*cluster.NewShardData, 0)
|
|
err = json.Unmarshal(x, &newShards)
|
|
if err != nil {
|
|
log.Error("RAFT: error parsing new shard result: ", err)
|
|
return nil, err
|
|
}
|
|
return self.clusterConfig.MarshalNewShardArrayToShards(newShards)
|
|
}
|
|
if x, k := createShardsResult.([]*cluster.NewShardData); k {
|
|
return self.clusterConfig.MarshalNewShardArrayToShards(x)
|
|
}
|
|
|
|
return nil, fmt.Errorf("Unable to marshal Raft AddShards result!")
|
|
}
|
|
|
|
func (self *RaftServer) DropShard(id uint32, serverIds []uint32) error {
|
|
if self.clusterConfig.GetShard(id) == nil {
|
|
log.Warn("Attempted to drop shard that doesn't exist: ", id)
|
|
return nil
|
|
}
|
|
command := NewDropShardCommand(id, serverIds)
|
|
_, err := self.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (self *RaftServer) GetOrSetFieldIdsForSeries(database string, series []*protocol.Series) ([]*protocol.Series, error) {
|
|
command := NewCreateSeriesFieldIdsCommand(database, series)
|
|
result, err := self.doOrProxyCommand(command)
|
|
if result == nil || err != nil {
|
|
return nil, err
|
|
}
|
|
if x, k := result.([]byte); k {
|
|
s := []*protocol.Series{}
|
|
err := json.Unmarshal(x, &s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s, nil
|
|
}
|
|
if x, k := result.([]*protocol.Series); k {
|
|
return x, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (self *RaftServer) DropSeries(database, series string) error {
|
|
command := NewDropSeriesCommand(database, series)
|
|
_, err := self.doOrProxyCommand(command)
|
|
|
|
// TODO: Dropping series from the metastore is synchronous, but the underlying data
|
|
// delete is asynchronous. If the server crashes or restarts while this is happening
|
|
// there will be orphaned data sitting around. Not a huge deal, but we should fix this
|
|
// at some point.
|
|
// force a log compaction because we don't want this replaying after a server restarts
|
|
if err == nil {
|
|
err = self.ForceLogCompaction()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (self *RaftServer) CreateShardSpace(shardSpace *cluster.ShardSpace) error {
|
|
command := NewCreateShardSpaceCommand(shardSpace)
|
|
_, err := self.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (self *RaftServer) DropShardSpace(database, name string) error {
|
|
command := NewDropShardSpaceCommand(database, name)
|
|
_, err := self.doOrProxyCommand(command)
|
|
return err
|
|
}
|
|
|
|
func (self *RaftServer) UpdateShardSpace(shardSpace *cluster.ShardSpace) error {
|
|
command := NewUpdateShardSpaceCommand(shardSpace)
|
|
_, err := self.doOrProxyCommand(command)
|
|
return err
|
|
}
|