add position to log_entry protobuf
parent
d286dfdc6c
commit
d170e3e958
10
log.go
10
log.go
|
@ -154,21 +154,20 @@ func (l *Log) open(path string) error {
|
|||
l.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
|
||||
debugln("log.open.create ", path)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
// set commitIndex to 0
|
||||
_, err = fmt.Fprintf(l.file, "%8x\n", 0)
|
||||
|
||||
if err != nil {
|
||||
l.file.Close()
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
debugln("log.open.exist ", path)
|
||||
|
||||
// if the log file exists
|
||||
// we read out the commitIndex and apply all the commands
|
||||
// seek to the end of log file
|
||||
|
@ -192,8 +191,6 @@ func (l *Log) open(path string) error {
|
|||
entry, _ := newLogEntry(l, 0, 0, nil)
|
||||
n, err := entry.decode(reader)
|
||||
if err != nil {
|
||||
//panic(err)
|
||||
//l.file.Close()
|
||||
if err = os.Truncate(path, readBytes); err != nil {
|
||||
return fmt.Errorf("raft.Log: Unable to recover: %v", err)
|
||||
}
|
||||
|
@ -203,6 +200,7 @@ func (l *Log) open(path string) error {
|
|||
// Append entry.
|
||||
l.entries = append(l.entries, entry)
|
||||
debugln("open.log.append log index ", entry.Index)
|
||||
|
||||
// if the entry index less than the known commitIndex
|
||||
// commit it
|
||||
if entry.Index < commitIndex {
|
||||
|
|
|
@ -58,6 +58,7 @@ func (e *LogEntry) encode(w io.Writer) (int, error) {
|
|||
Term: proto.Uint64(e.Term),
|
||||
CommandName: proto.String(e.CommandName),
|
||||
Command: e.Command,
|
||||
Position: proto.Int64(e.Position),
|
||||
}
|
||||
|
||||
err := p.Marshal(pb)
|
||||
|
@ -106,6 +107,7 @@ func (e *LogEntry) decode(r io.Reader) (int, error) {
|
|||
e.Index = pb.GetIndex()
|
||||
e.CommandName = pb.GetCommandName()
|
||||
e.Command = pb.Command
|
||||
e.Position = pb.GetPosition()
|
||||
|
||||
return length, nil
|
||||
}
|
||||
|
|
27
log_test.go
27
log_test.go
|
@ -168,7 +168,7 @@ func TestLogTruncate(t *testing.T) {
|
|||
if err := log.open(path); err != nil {
|
||||
t.Fatalf("Unable to open log: %v", err)
|
||||
}
|
||||
defer log.close()
|
||||
|
||||
defer os.Remove(path)
|
||||
|
||||
entry1, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||
|
@ -208,4 +208,29 @@ func TestLogTruncate(t *testing.T) {
|
|||
t.Fatalf("Truncating at last commit should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2})
|
||||
}
|
||||
|
||||
// Append after truncate
|
||||
if err := log.appendEntry(entry3); err != nil {
|
||||
t.Fatalf("Unable to append after truncate: %v", err)
|
||||
}
|
||||
|
||||
log.close()
|
||||
|
||||
// Recovery the truncated log
|
||||
log = newLog()
|
||||
if err := log.open(path); err != nil {
|
||||
t.Fatalf("Unable to open log: %v", err)
|
||||
}
|
||||
// Validate existing log entries.
|
||||
if len(log.entries) != 3 {
|
||||
t.Fatalf("Expected 3 entries, got %d", len(log.entries))
|
||||
}
|
||||
if log.entries[0].Index != 1 || log.entries[0].Term != 1 {
|
||||
t.Fatalf("Unexpected entry[0]: %v", log.entries[0])
|
||||
}
|
||||
if log.entries[1].Index != 2 || log.entries[1].Term != 1 {
|
||||
t.Fatalf("Unexpected entry[1]: %v", log.entries[1])
|
||||
}
|
||||
if log.entries[2].Index != 3 || log.entries[2].Term != 2 {
|
||||
t.Fatalf("Unexpected entry[2]: %v", log.entries[2])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ type ProtoLogEntry struct {
|
|||
Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`
|
||||
CommandName *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"`
|
||||
Command []byte `protobuf:"bytes,4,opt" json:"Command,omitempty"`
|
||||
Position *int64 `protobuf:"varint,5,req" json:"Position,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -53,5 +54,12 @@ func (m *ProtoLogEntry) GetCommand() []byte {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *ProtoLogEntry) GetPosition() int64 {
|
||||
if m != nil && m.Position != nil {
|
||||
return *m.Position
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func init() {
|
||||
}
|
||||
|
|
|
@ -5,4 +5,5 @@ message ProtoLogEntry {
|
|||
required uint64 Term=2;
|
||||
required string CommandName=3;
|
||||
optional bytes Command=4; // for nop-command
|
||||
required int64 Position=5;
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: snapshot_recovery_resquest.proto
|
||||
// source: snapshot_recovery_request.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
package protobuf
|
||||
|
|
Loading…
Reference in New Issue