influxdb/cmd/influx_tools/generate/plan.go

246 lines
5.8 KiB
Go

package generate
import (
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"github.com/influxdata/influxdb/services/meta"
"github.com/pkg/errors"
)
type StoragePlan struct {
Database string
Retention string
ReplicaN int
StartTime time.Time
ShardCount int
ShardDuration time.Duration
DatabasePath string
info *meta.DatabaseInfo
groups []meta.ShardGroupInfo
}
func (p *StoragePlan) String() string {
sb := new(strings.Builder)
p.PrintPlan(sb)
return sb.String()
}
func (p *StoragePlan) PrintPlan(w io.Writer) {
tw := tabwriter.NewWriter(w, 25, 4, 2, ' ', 0)
fmt.Fprintf(tw, "Data Path\t%s\n", p.ShardPath())
fmt.Fprintf(tw, "Shard Count\t%d\n", p.ShardCount)
fmt.Fprintf(tw, "Database\t%s/%s (Shard duration: %s)\n", p.Database, p.Retention, p.ShardDuration)
fmt.Fprintf(tw, "Start time\t%s\n", p.StartTime)
fmt.Fprintf(tw, "End time\t%s\n", p.EndTime())
tw.Flush()
}
func (p *StoragePlan) ShardPath() string {
return filepath.Join(p.DatabasePath, p.Retention)
}
// TimeSpan returns the total duration for which the data set.
func (p *StoragePlan) TimeSpan() time.Duration {
return p.ShardDuration * time.Duration(p.ShardCount)
}
func (p *StoragePlan) EndTime() time.Time {
return p.StartTime.Add(p.TimeSpan())
}
func (p *StoragePlan) InitMetadata(client server.MetaClient) (err error) {
if err = client.DropDatabase(p.Database); err != nil {
return err
}
rp := meta.RetentionPolicySpec{
Name: p.Retention,
ShardGroupDuration: p.ShardDuration,
ReplicaN: &p.ReplicaN,
}
info, err := client.CreateDatabaseWithRetentionPolicy(p.Database, &rp)
if err != nil {
return err
}
return p.createShardGroupMetadata(client, info.DefaultRetentionPolicy)
}
// InitFileSystem initializes the file system structure, cleaning up
// existing files and re-creating the appropriate shard directories.
func (p *StoragePlan) InitFileSystem(client server.MetaClient) error {
var err error
if err = os.RemoveAll(p.DatabasePath); err != nil {
return err
}
minT, maxT := p.TimeRange()
groups, err := client.NodeShardGroupsByTimeRange(p.Database, p.Retention, minT, maxT)
if err != nil {
return err
}
p.groups = groups
for i := 0; i < len(groups); i++ {
sgi := &groups[i]
if len(sgi.Shards) > 1 {
return fmt.Errorf("multiple shards for the same owner %v", sgi.Shards[0].Owners)
}
if err = os.MkdirAll(filepath.Join(p.ShardPath(), strconv.Itoa(int(sgi.Shards[0].ID))), 0777); err != nil {
return err
}
}
p.info = client.Database(p.Database)
return nil
}
// NodeShardGroups returns ShardGroupInfo with Shards limited to the current node
func (p *StoragePlan) NodeShardGroups() []meta.ShardGroupInfo {
return p.groups
}
func (p *StoragePlan) ShardGroups() []meta.ShardGroupInfo {
return p.info.RetentionPolicy(p.info.DefaultRetentionPolicy).ShardGroups
}
func (p *StoragePlan) createShardGroupMetadata(client server.MetaClient, rp string) error {
ts := p.StartTime.Truncate(p.ShardDuration).UTC()
var err error
groups := make([]*meta.ShardGroupInfo, p.ShardCount)
for i := 0; i < p.ShardCount; i++ {
groups[i], err = client.CreateShardGroup(p.Database, rp, ts)
if err != nil {
return err
}
ts = ts.Add(p.ShardDuration)
}
return nil
}
func (p *StoragePlan) TimeRange() (start, end time.Time) {
start = p.StartTime.Truncate(p.ShardDuration).UTC()
end = start.Add(time.Duration(p.ShardDuration.Nanoseconds() * int64(p.ShardCount)))
return start, end
}
func (p *StoragePlan) Validate() error {
// build default values
def := &planDefaults{}
WalkPlan(def, p)
// validate
val := &planValidator{}
WalkPlan(val, p)
return val.Err()
}
type Visitor interface {
Visit(node Node) Visitor
}
type Node interface{ node() }
func (*StoragePlan) node() {}
func WalkPlan(v Visitor, node Node) {
if v = v.Visit(node); v == nil {
return
}
switch n := node.(type) {
case *StoragePlan:
default:
panic(fmt.Sprintf("WalkConfig: unexpected node type %T", n))
}
}
type planValidator struct {
errs errlist.ErrorList
}
func (v *planValidator) Visit(node Node) Visitor {
switch n := node.(type) {
case *StoragePlan:
if n.DatabasePath == "" {
v.errs.Add(errors.New("missing DataPath"))
}
if n.StartTime.Add(n.TimeSpan()).After(time.Now()) {
v.errs.Add(fmt.Errorf("start time must be ≤ %s", time.Now().Truncate(n.ShardDuration).UTC().Add(-n.TimeSpan())))
}
}
return v
}
func (v *planValidator) Err() error {
return v.errs.Err()
}
type planDefaults struct{}
func (v *planDefaults) Visit(node Node) Visitor {
switch n := node.(type) {
case *StoragePlan:
if n.DatabasePath == "" {
n.DatabasePath = "${HOME}/.influxdb/data"
}
if n.Database == "" {
n.Database = "db"
}
if n.Retention == "" {
n.Retention = "autogen"
}
if n.ShardDuration == 0 {
n.ShardDuration = 24 * time.Hour
}
if n.ShardCount == 0 {
n.ShardCount = 1
}
if n.StartTime.IsZero() {
n.StartTime = time.Now().Truncate(n.ShardDuration).Add(-n.TimeSpan())
}
}
return v
}
type SchemaPlan struct {
StoragePlan *StoragePlan
Tags TagCardinalities
PointsPerSeriesPerShard int
}
func (p *SchemaPlan) String() string {
sb := new(strings.Builder)
p.PrintPlan(sb)
return sb.String()
}
func (p *SchemaPlan) PrintPlan(w io.Writer) {
tw := tabwriter.NewWriter(w, 25, 4, 2, ' ', 0)
fmt.Fprintf(tw, "Tag cardinalities\t%s\n", p.Tags)
fmt.Fprintf(tw, "Points per series per shard\t%d\n", p.PointsPerSeriesPerShard)
fmt.Fprintf(tw, "Total points per shard\t%d\n", p.Tags.Cardinality()*p.PointsPerSeriesPerShard)
fmt.Fprintf(tw, "Total series\t%d\n", p.Tags.Cardinality())
fmt.Fprintf(tw, "Total points\t%d\n", p.Tags.Cardinality()*p.StoragePlan.ShardCount*p.PointsPerSeriesPerShard)
_ = tw.Flush()
}