influxdb/cluster/meta_executor.go

172 lines
4.1 KiB
Go

package cluster
import (
"fmt"
"log"
"net"
"os"
"sync"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)
const (
metaExecutorWriteTimeout = 5 * time.Second
metaExecutorMaxWriteConnections = 10
)
// MetaExecutor executes meta queries on all data nodes.
type MetaExecutor struct {
mu sync.RWMutex
timeout time.Duration
pool *clientPool
maxConnections int
Logger *log.Logger
Node *influxdb.Node
nodeExecutor interface {
executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error
}
MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
DataNodes() ([]meta.NodeInfo, error)
}
}
// NewMetaExecutor returns a new initialized *MetaExecutor.
func NewMetaExecutor() *MetaExecutor {
m := &MetaExecutor{
timeout: metaExecutorWriteTimeout,
pool: newClientPool(),
maxConnections: metaExecutorMaxWriteConnections,
Logger: log.New(os.Stderr, "[meta-executor] ", log.LstdFlags),
}
m.nodeExecutor = m
return m
}
// remoteNodeError wraps an error with context about a node that
// returned the error.
type remoteNodeError struct {
id uint64
err error
}
func (e remoteNodeError) Error() string {
return fmt.Sprintf("partial success, node %d may be down (%s)", e.id, e.err)
}
// ExecuteStatement executes a single InfluxQL statement on all nodes in the cluster concurrently.
func (m *MetaExecutor) ExecuteStatement(stmt influxql.Statement, database string) error {
// Get a list of all nodes the query needs to be executed on.
nodes, err := m.MetaClient.DataNodes()
if err != nil {
return err
} else if len(nodes) < 1 {
return nil
}
// Start a goroutine to execute the statement on each of the remote nodes.
var wg sync.WaitGroup
errs := make(chan error, len(nodes)-1)
for _, node := range nodes {
if m.Node.ID == node.ID {
continue // Don't execute statement on ourselves.
}
wg.Add(1)
go func(node meta.NodeInfo) {
defer wg.Done()
if err := m.nodeExecutor.executeOnNode(stmt, database, &node); err != nil {
errs <- remoteNodeError{id: node.ID, err: err}
}
}(node)
}
// Wait on n-1 nodes to execute the statement and respond.
wg.Wait()
select {
case err = <-errs:
return err
default:
return nil
}
}
// executeOnNode executes a single InfluxQL statement on a single node.
func (m *MetaExecutor) executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error {
// We're executing on a remote node so establish a connection.
c, err := m.dial(node.ID)
if err != nil {
return err
}
conn, ok := c.(*pooledConn)
if !ok {
panic("wrong connection type in MetaExecutor")
}
// Return connection to pool by "closing" it.
defer conn.Close()
// Build RPC request.
var request ExecuteStatementRequest
request.SetStatement(stmt.String())
request.SetDatabase(database)
// Marshal into protocol buffer.
buf, err := request.MarshalBinary()
if err != nil {
return err
}
// Send request.
conn.SetWriteDeadline(time.Now().Add(m.timeout))
if err := WriteTLV(conn, executeStatementRequestMessage, buf); err != nil {
conn.MarkUnusable()
return err
}
// Read the response.
conn.SetReadDeadline(time.Now().Add(m.timeout))
_, buf, err = ReadTLV(conn)
if err != nil {
conn.MarkUnusable()
return err
}
// Unmarshal response.
var response ExecuteStatementResponse
if err := response.UnmarshalBinary(buf); err != nil {
return err
}
if response.Code() != 0 {
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}
return nil
}
// dial returns a connection to a single node in the cluster.
func (m *MetaExecutor) dial(nodeID uint64) (net.Conn, error) {
// If we don't have a connection pool for that addr yet, create one
_, ok := m.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, clientPool: m.pool, timeout: m.timeout}
factory.metaClient = m.MetaClient
p, err := NewBoundedPool(1, m.maxConnections, m.timeout, factory.dial)
if err != nil {
return nil, err
}
m.pool.setPool(nodeID, p)
}
return m.pool.conn(nodeID)
}