Implement ExplorationStore with boltDB and protobuf

pull/142/head
gunnaraasen 2016-09-22 16:22:41 -07:00
parent 73d84cd608
commit 36f0115e57
12 changed files with 559 additions and 5 deletions

57
store/bolt/client.go Normal file
View File

@ -0,0 +1,57 @@
package bolt
import (
"time"
"github.com/boltdb/bolt"
)
// Client is a client for the boltDB data store.
type Client struct {
Path string
db *bolt.DB
Now func() time.Time
}
func NewClient() *Client {
return &Client{
Now: time.Now,
}
}
// Open and initializ boltDB. Initial buckets are created if they do not exist.
func (c *Client) Open() error {
// Open database file.
db, err := bolt.Open(c.Path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return err
}
c.db = db
tx, err := c.db.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
// Always create explorations bucket.
if _, err := tx.CreateBucketIfNotExists([]byte("Explorations")); err != nil {
return err
}
return tx.Commit()
}
func (c *Client) Close() error {
if c.db != nil {
return c.db.Close()
}
return nil
}
// Connect creates a new session for boltDB.
func (c *Client) Connect() *Session {
s := newSession(c.db)
s.now = c.Now()
return s
}

45
store/bolt/client_test.go Normal file
View File

@ -0,0 +1,45 @@
package bolt_test
import (
"io/ioutil"
"os"
"time"
"github.com/influxdata/mrfusion/store/bolt"
)
// Mock specific time for testing.
var Now = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
type Client struct {
*bolt.Client
}
func NewClient() *Client {
f, err := ioutil.TempFile("", "mrfusion-bolt-")
if err != nil {
panic(err)
}
f.Close()
c := &Client{
Client: bolt.NewClient(),
}
c.Path = f.Name()
c.Now = func() time.Time { return Now }
return c
}
func MustOpenClient() *Client {
c := NewClient()
if err := c.Open(); err != nil {
panic(err)
}
return c
}
func (c *Client) Close() error {
defer os.Remove(c.Path)
return c.Client.Close()
}

7
store/bolt/errors.go Normal file
View File

@ -0,0 +1,7 @@
package bolt
import "errors"
var (
ErrExplorationNotFound = errors.New("exploration not found")
)

View File

@ -0,0 +1,131 @@
package bolt
import (
"github.com/influxdata/mrfusion"
"github.com/influxdata/mrfusion/store/bolt/internal"
"golang.org/x/net/context"
)
// Ensure ExplorationStore implements mrfusion.ExplorationStore.
var _ mrfusion.ExplorationStore = &ExplorationStore{}
type ExplorationStore struct {
session *Session
}
// Search the ExplorationStore for all explorations owned by userID.
func (s *ExplorationStore) Query(ctx context.Context, uid mrfusion.UserID) ([]mrfusion.Exploration, error) {
// Begin read transaction.
tx, err := s.session.db.Begin(false)
if err != nil {
return nil, err
}
defer tx.Rollback()
var explorations []mrfusion.Exploration
if err := tx.Bucket([]byte("Explorations")).ForEach(func(k, v []byte) error {
var e mrfusion.Exploration
if err := internal.UnmarshalExploration(v, &e); err != nil {
return err
} else if e.UserID != uid {
return nil
}
explorations = append(explorations, e)
return nil
}); err != nil {
return nil, err
}
return explorations, nil
}
// Create a new Exploration in the ExplorationStore.
func (s *ExplorationStore) Add(ctx context.Context, e mrfusion.Exploration) error {
// Begin read-write transaction.
tx, err := s.session.db.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
b := tx.Bucket([]byte("Explorations"))
seq, _ := b.NextSequence()
e.ID = mrfusion.ExplorationID(seq)
e.CreatedAt = s.session.now
if v, err := internal.MarshalExploration(&e); err != nil {
return err
} else if err := b.Put(itob(int(e.ID)), v); err != nil {
return err
}
return tx.Commit()
}
// Delete the exploration from the ExplorationStore
func (s *ExplorationStore) Delete(ctx context.Context, e mrfusion.Exploration) error {
// Begin read transaction.
tx, err := s.session.db.Begin(false)
if err != nil {
return err
}
defer tx.Rollback()
if err := tx.Bucket([]byte("Explorations")).Delete(itob(int(e.ID))); err != nil {
return err
}
return nil
}
// Retrieve an exploration for an id exists.
func (s *ExplorationStore) Get(ctx context.Context, id mrfusion.ExplorationID) (mrfusion.Exploration, error) {
var e mrfusion.Exploration
// Begin read transaction.
tx, err := s.session.db.Begin(false)
if err != nil {
return e, err
}
defer tx.Rollback()
if v := tx.Bucket([]byte("Explorations")).Get(itob(int(id))); v == nil {
return e, nil
} else if err := internal.UnmarshalExploration(v, &e); err != nil {
return e, err
}
return e, nil
}
// Update the exploration with the exploration `ne`; will update `UpdatedAt`.
func (s *ExplorationStore) Update(ctx context.Context, ne mrfusion.Exploration) error {
// Begin read-write transaction.
tx, err := s.session.db.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
b := tx.Bucket([]byte("Explorations"))
var e mrfusion.Exploration
if v := b.Get(itob(int(ne.ID))); v == nil {
return ErrExplorationNotFound
} else if err := internal.UnmarshalExploration(v, &e); err != nil {
return err
}
e.Name = ne.Name
e.UserID = ne.UserID
e.Data = ne.Data
e.UpdatedAt = s.session.now
if v, err := internal.MarshalExploration(&e); err != nil {
return err
} else if err := b.Put(itob(int(e.ID)), v); err != nil {
return err
}
return tx.Commit()
}

View File

@ -0,0 +1,125 @@
package bolt_test
import (
"testing"
"github.com/influxdata/mrfusion"
)
// Ensure Exploration can be added and retrieved.
func TestExplorationStore_Add(t *testing.T) {
c := MustOpenClient()
defer c.Close()
s := c.Connect().ExplorationStore()
data := "{\"panels\":{\"123\":{\"id\":\"123\",\"queryIds\":[\"456\"]}},\"queryConfigs\":{\"456\":{\"id\":\"456\",\"database\":null,\"measurement\":null,\"retentionPolicy\":null,\"fields\":[],\"tags\":{},\"groupBy\":{\"time\":null,\"tags\":[]},\"areTagsAccepted\":true,\"rawText\":null}}}"
exploration := mrfusion.Exploration{
Name: "Ferdinand Magellan",
UserID: 2,
Data: data,
}
// Add new exploration.
if err := s.Add(nil, exploration); err != nil {
t.Fatal(err)
} else if exploration.ID != 0 {
t.Fatalf("exploration ID error: got %v, expected %v", exploration.ID, 1)
}
// Confirm exploration in the store is the same as the original.
e, err := s.Get(nil, 1)
if err != nil {
t.Fatal(err)
} else if e.Name != exploration.Name {
t.Fatalf("exploration Name error: got %v, expected %v", e.Name, exploration.Name)
} else if e.UserID != exploration.UserID {
t.Fatalf("exploration UserID error: got %v, expected %v", e.UserID, exploration.UserID)
} else if e.Data != exploration.Data {
t.Fatalf("exploration Data error: got %v, expected %v", e.Data, exploration.Data)
}
}
// Ensure Explorations can be queried by UserID.
func TestExplorationStore_Query(t *testing.T) {
c := MustOpenClient()
defer c.Close()
s := c.Connect().ExplorationStore()
explorations := make(map[int]mrfusion.Exploration)
explorations[0] = mrfusion.Exploration{
Name: "Ferdinand Magellan",
UserID: 2,
Data: "{\"panels\":{\"123\":{\"id\":\"123\",\"queryIds\":[\"456\"]}},\"queryConfigs\":{\"456\":{\"id\":\"456\",\"database\":null,\"measurement\":null,\"retentionPolicy\":null,\"fields\":[],\"tags\":{},\"groupBy\":{\"time\":null,\"tags\":[]},\"areTagsAccepted\":true,\"rawText\":null}}}",
}
explorations[1] = mrfusion.Exploration{
Name: "Marco Polo",
UserID: 3,
Data: "{\"panels\":{\"123\":{\"id\":\"123\",\"queryIds\":[\"456\"]}},\"queryConfigs\":{\"456\":{\"id\":\"456\",\"database\":null,\"measurement\":null,\"retentionPolicy\":null,\"fields\":[],\"tags\":{},\"groupBy\":{\"time\":null,\"tags\":[]},\"areTagsAccepted\":true,\"rawText\":null}}}",
}
explorations[2] = mrfusion.Exploration{
Name: "Leif Ericson",
UserID: 3,
Data: "{\"panels\":{\"123\":{\"id\":\"123\",\"queryIds\":[\"456\"]}},\"queryConfigs\":{\"456\":{\"id\":\"456\",\"database\":null,\"measurement\":null,\"retentionPolicy\":null,\"fields\":[],\"tags\":{},\"groupBy\":{\"time\":null,\"tags\":[]},\"areTagsAccepted\":true,\"rawText\":null}}}",
}
for i := range explorations {
if err := s.Add(nil, explorations[i]); err != nil {
t.Fatal(err)
}
}
// Query for explorations.
if explorations, err := s.Query(nil, 3); err != nil {
t.Fatal(err)
} else if len(explorations) != 2 {
t.Fatalf("exploration query length error: got %v, expected %v", len(explorations), 2)
} else if explorations[0].Name != "Marco Polo" {
t.Fatalf("exploration query error: got %v, expected %v", explorations[0].Name, "Marco Polo")
} else if explorations[1].Name != "Leif Ericson" {
t.Fatalf("exploration query error: got %v, expected %v", explorations[1].Name, "Leif Ericson")
}
}
// Ensure an exploration can be deleted.
func TestExplorationStore_Delete(t *testing.T) {
// TODO: Make sure deleting an exploration works.
t.Skip()
}
// Ensure explorations can be updated.
func TestExplorationStore_Update(t *testing.T) {
c := MustOpenClient()
defer c.Close()
s := c.Connect().ExplorationStore()
if err := s.Add(nil, mrfusion.Exploration{Name: "Ferdinand Magellan"}); err != nil {
t.Fatal(err)
}
if err := s.Add(nil, mrfusion.Exploration{UserID: 3}); err != nil {
t.Fatal(err)
}
// Update explorations.
if err := s.Update(nil, mrfusion.Exploration{ID: 1, Name: "Francis Drake"}); err != nil {
t.Fatal(err)
}
if err := s.Update(nil, mrfusion.Exploration{ID: 2, UserID: 4}); err != nil {
t.Fatal(err)
}
// Confirm first Exploration update updated Name.
if e, err := s.Get(nil, 1); err != nil {
t.Fatal(err)
} else if e.Name != "Francis Drake" {
t.Fatalf("exploration 1 update error: got %v, expected %v", e.Name, "Francis Drake")
}
// Confirm second Exploration has updated UserID.
if e, err := s.Get(nil, 2); err != nil {
t.Fatal(err)
} else if e.UserID != 4 {
t.Fatalf("exploration 2 update error: got: %v, expected: %v", e.UserID, 4)
}
}

View File

@ -0,0 +1,39 @@
package internal
import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/mrfusion"
)
//go:generate protoc --gogo_out=. internal.proto
// MarshalExploration encodes an exploration to binary protobuf format.
func MarshalExploration(e *mrfusion.Exploration) ([]byte, error) {
return proto.Marshal(&Exploration{
ID: int64(e.ID),
Name: e.Name,
UserID: int64(e.UserID),
Data: e.Data,
CreatedAt: e.CreatedAt.UnixNano(),
UpdatedAt: e.UpdatedAt.UnixNano(),
})
}
// UnmarshalExploration decodes an exploration from binary protobuf data.
func UnmarshalExploration(data []byte, e *mrfusion.Exploration) error {
var pb Exploration
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
e.ID = mrfusion.ExplorationID(pb.ID)
e.Name = pb.Name
e.UserID = mrfusion.UserID(pb.UserID)
e.Data = pb.Data
e.CreatedAt = time.Unix(0, pb.CreatedAt).UTC()
e.UpdatedAt = time.Unix(0, pb.UpdatedAt).UTC()
return nil
}

View File

@ -0,0 +1,64 @@
// Code generated by protoc-gen-gogo.
// source: internal.proto
// DO NOT EDIT!
/*
Package internal is a generated protocol buffer package.
It is generated from these files:
internal.proto
It has these top-level messages:
Exploration
*/
package internal
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type Exploration struct {
ID int64 `protobuf:"varint,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
Name string `protobuf:"bytes,2,opt,name=Name,json=name,proto3" json:"Name,omitempty"`
UserID int64 `protobuf:"varint,3,opt,name=UserID,json=userID,proto3" json:"UserID,omitempty"`
Data string `protobuf:"bytes,4,opt,name=Data,json=data,proto3" json:"Data,omitempty"`
CreatedAt int64 `protobuf:"varint,5,opt,name=CreatedAt,json=createdAt,proto3" json:"CreatedAt,omitempty"`
UpdatedAt int64 `protobuf:"varint,6,opt,name=UpdatedAt,json=updatedAt,proto3" json:"UpdatedAt,omitempty"`
}
func (m *Exploration) Reset() { *m = Exploration{} }
func (m *Exploration) String() string { return proto.CompactTextString(m) }
func (*Exploration) ProtoMessage() {}
func (*Exploration) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{0} }
func init() {
proto.RegisterType((*Exploration)(nil), "internal.Exploration")
}
func init() { proto.RegisterFile("internal.proto", fileDescriptorInternal) }
var fileDescriptorInternal = []byte{
// 166 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0xcc, 0x2b, 0x49,
0x2d, 0xca, 0x4b, 0xcc, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0x95, 0x66,
0x32, 0x72, 0x71, 0xbb, 0x56, 0x14, 0xe4, 0xe4, 0x17, 0x25, 0x96, 0x64, 0xe6, 0xe7, 0x09, 0xf1,
0x71, 0x31, 0x79, 0xba, 0x48, 0x30, 0x2a, 0x30, 0x6a, 0x30, 0x07, 0x31, 0x65, 0xba, 0x08, 0x09,
0x71, 0xb1, 0xf8, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x29, 0x30, 0x6a, 0x70, 0x06, 0xb1, 0xe4, 0x25,
0xe6, 0xa6, 0x0a, 0x89, 0x71, 0xb1, 0x85, 0x16, 0xa7, 0x16, 0x79, 0xba, 0x48, 0x30, 0x83, 0xd5,
0xb1, 0x95, 0x82, 0x79, 0x20, 0xb5, 0x2e, 0x89, 0x25, 0x89, 0x12, 0x2c, 0x10, 0xb5, 0x29, 0x89,
0x25, 0x89, 0x42, 0x32, 0x5c, 0x9c, 0xce, 0x45, 0xa9, 0x89, 0x25, 0xa9, 0x29, 0x8e, 0x25, 0x12,
0xac, 0x60, 0xe5, 0x9c, 0xc9, 0x30, 0x01, 0x90, 0x6c, 0x68, 0x41, 0x0a, 0x54, 0x96, 0x0d, 0x22,
0x5b, 0x0a, 0x13, 0x48, 0x62, 0x03, 0x3b, 0xd6, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x64, 0xd7,
0x3d, 0xfe, 0xbe, 0x00, 0x00, 0x00,
}

View File

@ -0,0 +1,11 @@
syntax = "proto3";
package internal;
message Exploration {
int64 ID = 1;
string Name = 2;
int64 UserID = 3;
string Data = 4;
int64 CreatedAt = 5;
int64 UpdatedAt = 6;
}

View File

@ -0,0 +1,31 @@
package internal_test
import (
"reflect"
"testing"
"time"
"github.com/influxdata/mrfusion"
"github.com/influxdata/mrfusion/store/bolt/internal"
)
// Ensure an exploration can be marshaled and unmarshaled.
func TestMarshalExploration(t *testing.T) {
v := mrfusion.Exploration{
ID: 12,
Name: "Some Exploration",
UserID: 34,
Data: "{\"data\":\"something\"}",
CreatedAt: time.Now().UTC(),
UpdatedAt: time.Now().UTC(),
}
var other mrfusion.Exploration
if buf, err := internal.MarshalExploration(&v); err != nil {
t.Fatal(err)
} else if err := internal.UnmarshalExploration(buf, &other); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(v, other) {
t.Fatalf("unexpected copy: %#v", other)
}
}

26
store/bolt/session.go Normal file
View File

@ -0,0 +1,26 @@
package bolt
import (
"time"
"github.com/boltdb/bolt"
"github.com/influxdata/mrfusion"
)
// Session is a connection to a boltDB database.
// TODO: Hook up authentication here.
type Session struct {
db *bolt.DB
now time.Time
// Services
explorationStore ExplorationStore
}
func newSession(db *bolt.DB) *Session {
s := &Session{db: db}
s.explorationStore.session = s
return s
}
func (s *Session) ExplorationStore() mrfusion.ExplorationStore { return &s.explorationStore }

12
store/bolt/util.go Normal file
View File

@ -0,0 +1,12 @@
package bolt
import (
"encoding/binary"
)
// itob returns an 8-byte big endian representation of v.
func itob(v int) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(v))
return b
}

View File

@ -57,26 +57,32 @@ type AuthStore struct {
}
}
// UserID is a unique ID for a source user.
type UserID int
// ExplorationID is a unique ID for an exploration.
type ExplorationID int
// Exploration is a serialization of front-end Data Explorer.
type Exploration struct {
ID int
ID ExplorationID
Name string // User provided name of the exploration.
UserID int // UserID is the owner of this exploration.
UserID UserID // UserID is the owner of this exploration.
Data string // Opaque blob of JSON data
CreatedAt time.Time // Time the exploration was first created
UpdatedAt time.Time // Latest time the exploration was updated.
}
// ExplorationStore stores front-end serializations of explorater sessions
// ExplorationStore stores front-end serializations of data explorer sessions.
type ExplorationStore interface {
// Search the ExplorationStore for all explorations owned by userID.
Query(ctx context.Context, userID int) ([]Exploration, error)
Query(ctx context.Context, userID UserID) ([]Exploration, error)
// Create a new Exploration in the ExplorationStore
Add(context.Context, Exploration) error
// Delete the exploration from the ExplorationStore
Delete(context.Context, Exploration) error
// Retrieve an exploration if `ID` exists.
Get(ctx context.Context, ID int) (Exploration, error)
Get(ctx context.Context, ID ExplorationID) (Exploration, error)
// Update the exploration; will update `UpdatedAt`.
Update(context.Context, Exploration) error
}