172 lines
4.1 KiB
Go
172 lines
4.1 KiB
Go
package cluster
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb"
|
|
"github.com/influxdata/influxdb/influxql"
|
|
"github.com/influxdata/influxdb/services/meta"
|
|
)
|
|
|
|
const (
|
|
metaExecutorWriteTimeout = 5 * time.Second
|
|
metaExecutorMaxWriteConnections = 10
|
|
)
|
|
|
|
// MetaExecutor executes meta queries on all data nodes.
|
|
type MetaExecutor struct {
|
|
mu sync.RWMutex
|
|
timeout time.Duration
|
|
pool *clientPool
|
|
maxConnections int
|
|
Logger *log.Logger
|
|
Node *influxdb.Node
|
|
|
|
nodeExecutor interface {
|
|
executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error
|
|
}
|
|
|
|
MetaClient interface {
|
|
DataNode(id uint64) (ni *meta.NodeInfo, err error)
|
|
DataNodes() ([]meta.NodeInfo, error)
|
|
}
|
|
}
|
|
|
|
// NewMetaExecutor returns a new initialized *MetaExecutor.
|
|
func NewMetaExecutor() *MetaExecutor {
|
|
m := &MetaExecutor{
|
|
timeout: metaExecutorWriteTimeout,
|
|
pool: newClientPool(),
|
|
maxConnections: metaExecutorMaxWriteConnections,
|
|
Logger: log.New(os.Stderr, "[meta-executor] ", log.LstdFlags),
|
|
}
|
|
m.nodeExecutor = m
|
|
|
|
return m
|
|
}
|
|
|
|
// remoteNodeError wraps an error with context about a node that
|
|
// returned the error.
|
|
type remoteNodeError struct {
|
|
id uint64
|
|
err error
|
|
}
|
|
|
|
func (e remoteNodeError) Error() string {
|
|
return fmt.Sprintf("partial success, node %d may be down (%s)", e.id, e.err)
|
|
}
|
|
|
|
// ExecuteStatement executes a single InfluxQL statement on all nodes in the cluster concurrently.
|
|
func (m *MetaExecutor) ExecuteStatement(stmt influxql.Statement, database string) error {
|
|
// Get a list of all nodes the query needs to be executed on.
|
|
nodes, err := m.MetaClient.DataNodes()
|
|
if err != nil {
|
|
return err
|
|
} else if len(nodes) < 1 {
|
|
return nil
|
|
}
|
|
|
|
// Start a goroutine to execute the statement on each of the remote nodes.
|
|
var wg sync.WaitGroup
|
|
errs := make(chan error, len(nodes)-1)
|
|
for _, node := range nodes {
|
|
if m.Node.ID == node.ID {
|
|
continue // Don't execute statement on ourselves.
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(node meta.NodeInfo) {
|
|
defer wg.Done()
|
|
if err := m.nodeExecutor.executeOnNode(stmt, database, &node); err != nil {
|
|
errs <- remoteNodeError{id: node.ID, err: err}
|
|
}
|
|
}(node)
|
|
}
|
|
|
|
// Wait on n-1 nodes to execute the statement and respond.
|
|
wg.Wait()
|
|
|
|
select {
|
|
case err = <-errs:
|
|
return err
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// executeOnNode executes a single InfluxQL statement on a single node.
|
|
func (m *MetaExecutor) executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error {
|
|
// We're executing on a remote node so establish a connection.
|
|
c, err := m.dial(node.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
conn, ok := c.(*pooledConn)
|
|
if !ok {
|
|
panic("wrong connection type in MetaExecutor")
|
|
}
|
|
// Return connection to pool by "closing" it.
|
|
defer conn.Close()
|
|
|
|
// Build RPC request.
|
|
var request ExecuteStatementRequest
|
|
request.SetStatement(stmt.String())
|
|
request.SetDatabase(database)
|
|
|
|
// Marshal into protocol buffer.
|
|
buf, err := request.MarshalBinary()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Send request.
|
|
conn.SetWriteDeadline(time.Now().Add(m.timeout))
|
|
if err := WriteTLV(conn, executeStatementRequestMessage, buf); err != nil {
|
|
conn.MarkUnusable()
|
|
return err
|
|
}
|
|
|
|
// Read the response.
|
|
conn.SetReadDeadline(time.Now().Add(m.timeout))
|
|
_, buf, err = ReadTLV(conn)
|
|
if err != nil {
|
|
conn.MarkUnusable()
|
|
return err
|
|
}
|
|
|
|
// Unmarshal response.
|
|
var response ExecuteStatementResponse
|
|
if err := response.UnmarshalBinary(buf); err != nil {
|
|
return err
|
|
}
|
|
|
|
if response.Code() != 0 {
|
|
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// dial returns a connection to a single node in the cluster.
|
|
func (m *MetaExecutor) dial(nodeID uint64) (net.Conn, error) {
|
|
// If we don't have a connection pool for that addr yet, create one
|
|
_, ok := m.pool.getPool(nodeID)
|
|
if !ok {
|
|
factory := &connFactory{nodeID: nodeID, clientPool: m.pool, timeout: m.timeout}
|
|
factory.metaClient = m.MetaClient
|
|
|
|
p, err := NewBoundedPool(1, m.maxConnections, m.timeout, factory.dial)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.pool.setPool(nodeID, p)
|
|
}
|
|
return m.pool.conn(nodeID)
|
|
}
|