546 lines
10 KiB
Go
546 lines
10 KiB
Go
package persist
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/gob"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path"
|
|
"reflect"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
bolt "github.com/coreos/bbolt"
|
|
)
|
|
|
|
var Debug bool
|
|
|
|
// allow persistant storage and reloading of variables to bolt database.
|
|
// everything is time stamped.
|
|
// bucket is the variable name
|
|
|
|
type Var struct {
|
|
X interface{}
|
|
Time time.Time
|
|
name string
|
|
permanent bool
|
|
}
|
|
|
|
type encVar struct {
|
|
X interface{}
|
|
}
|
|
|
|
type Option func(*Config)
|
|
|
|
type Config struct {
|
|
configured bool
|
|
MaxVars int
|
|
DBPath string
|
|
Tidy TidyConfig
|
|
}
|
|
|
|
func MaxVars(i int) Option {
|
|
return func(c *Config) {
|
|
c.MaxVars = i
|
|
}
|
|
}
|
|
|
|
func DBPath(s string) Option {
|
|
return func(c *Config) {
|
|
c.DBPath = s
|
|
}
|
|
}
|
|
|
|
func TidyInterval(d time.Duration) Option {
|
|
return func(c *Config) {
|
|
c.Tidy.Interval = d
|
|
}
|
|
}
|
|
|
|
func Init(confs ...Option) {
|
|
mux.Lock()
|
|
tc := TidyConfig {
|
|
Interval: 10 * time.Second,
|
|
}
|
|
conf = Config { // default config
|
|
configured: true,
|
|
MaxVars: 64,
|
|
DBPath: "db",
|
|
Tidy: tc,
|
|
}
|
|
|
|
for _,f := range(confs) {
|
|
f(&conf)
|
|
}
|
|
|
|
if conf.Tidy.Func == nil {
|
|
conf.Tidy.Func = ExpireAfterFunc(24 * time.Hour)
|
|
}
|
|
mux.Unlock()
|
|
}
|
|
|
|
func New(name string, x interface{},ps ...bool) *Var {
|
|
start()
|
|
mux.Lock()
|
|
if vars == nil {
|
|
vars = make(map[string]*Var)
|
|
}
|
|
ret := vars[name]
|
|
if ret == nil {
|
|
ret = &Var{X: x, name: name}
|
|
vars[name] = ret
|
|
}
|
|
if len(ps) > 1 {
|
|
ret.permanent = ps[0]
|
|
}
|
|
mux.Unlock()
|
|
err := ret.Load()
|
|
if err != nil { // save default value if loading failed
|
|
fmt.Println("Load error:",err)
|
|
wchan <- encode(ret)
|
|
}
|
|
if Debug { fmt.Println("New(): ",ret.name) }
|
|
mux.Lock()
|
|
if loaded == nil {
|
|
loaded = make(chan *Var,conf.MaxVars) // buffered channel
|
|
}
|
|
mux.Unlock()
|
|
if Debug { fmt.Println("New(): loaded<- ret") }
|
|
loaded<- ret
|
|
if Debug { fmt.Println("New(): returning") }
|
|
return ret
|
|
}
|
|
|
|
func (p *Var) Set(x interface{}) error {
|
|
if reflect.TypeOf(x) == reflect.TypeOf(p.X) {
|
|
p.X = x
|
|
p.Save()
|
|
return nil
|
|
} else {
|
|
return errors.New("Set(): Type mismatch")
|
|
}
|
|
}
|
|
|
|
func (p *Var) SaveSync() {
|
|
donech := make(chan struct{})
|
|
wchan <- encode(p,donech)
|
|
<-donech
|
|
}
|
|
|
|
func (p *Var) Save(sync ...bool) {
|
|
dirty = true
|
|
wchan <- encode(p)
|
|
}
|
|
|
|
func (p *Var) Load(ts ...time.Time) error {
|
|
var err error
|
|
var v []byte
|
|
var t time.Time
|
|
if len(ts) == 0 {
|
|
v, t, err = retrieve(p.name,lastCheckpoint)
|
|
} else {
|
|
v, t, err = retrieve(p.name,ts[0])
|
|
}
|
|
if err == nil {
|
|
var p2 Var
|
|
dec := gob.NewDecoder(bytes.NewBuffer(v))
|
|
err := dec.Decode(&p2)
|
|
if reflect.TypeOf(p2.X) != reflect.TypeOf(p.X) {
|
|
err = errors.New("Load(): Type mismatch")
|
|
}
|
|
if err != nil {
|
|
fmt.Println("Load(): ",err)
|
|
} else {
|
|
p.X = p2.X
|
|
p.Time = t
|
|
}
|
|
} else {
|
|
if Debug { fmt.Println("Load(",p.name,"): type =",reflect.TypeOf(p.X)) }
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (p *Var) Get(ts ...time.Time) interface{} {
|
|
if len(ts) == 0 {
|
|
return p.X
|
|
} else {
|
|
p2 := *p
|
|
p2.Load(ts[0])
|
|
return p2.X
|
|
}
|
|
}
|
|
|
|
func (p *Var) Delete(t time.Time) {
|
|
wchan<- dencode(p,t)
|
|
}
|
|
|
|
func (p *Var) DeleteSync(t time.Time) {
|
|
donech := make(chan struct{})
|
|
wchan<- dencode(p,t,donech)
|
|
<-donech
|
|
}
|
|
|
|
func Commit() {
|
|
mux.Lock()
|
|
if dirty {
|
|
dirty = false
|
|
__checkpoints.SaveSync()
|
|
}
|
|
mux.Unlock()
|
|
}
|
|
|
|
func Checkpoints() {
|
|
fmt.Println(__checkpoints.History())
|
|
}
|
|
|
|
func Undo() {
|
|
if Debug { fmt.Println("Undo()") }
|
|
mux.Lock()
|
|
__checkpoints.DeleteSync(lastCheckpoint)
|
|
h,_ := __checkpoints.History()
|
|
var t time.Time
|
|
if len(h) > 0 {
|
|
t = h[len(h)-1].Time
|
|
} else {
|
|
return
|
|
}
|
|
lastCheckpoint = t
|
|
for _,v := range vars {
|
|
v.Load(t)
|
|
}
|
|
dirty = false
|
|
mux.Unlock()
|
|
}
|
|
|
|
func Shutdown() {
|
|
close(tchan)
|
|
tg.Wait()
|
|
close(wchan)
|
|
wg.Wait()
|
|
mux.Lock()
|
|
launched = false
|
|
vars = make(map[string]*Var)
|
|
mux.Unlock()
|
|
}
|
|
|
|
var (
|
|
launched bool
|
|
wchan chan encoded
|
|
tchan chan struct{}
|
|
mux sync.Mutex
|
|
wmux sync.Mutex
|
|
wg sync.WaitGroup
|
|
tg sync.WaitGroup
|
|
db *bolt.DB
|
|
loaded chan *Var
|
|
conf Config
|
|
__checkpoints *Var
|
|
lastCheckpoint time.Time
|
|
vars map[string]*Var
|
|
dirty bool
|
|
)
|
|
|
|
|
|
type encoded struct {
|
|
name []byte
|
|
value interface{}
|
|
donech chan struct{}
|
|
}
|
|
|
|
func encode(p *Var,chs ...chan struct{}) encoded {
|
|
ret := encoded{name:[]byte(p.name)}
|
|
ep := encVar{ p.X }
|
|
if len(chs) > 0 {
|
|
ret.donech = chs[0]
|
|
}
|
|
var buf bytes.Buffer
|
|
enc := gob.NewEncoder(&buf)
|
|
err := enc.Encode(ep)
|
|
if err != nil {
|
|
fmt.Println("encode(): ",err)
|
|
}
|
|
ret.value = buf.Bytes()
|
|
|
|
return ret
|
|
}
|
|
|
|
func dencode(p *Var,t time.Time,chs ...chan struct{}) encoded {
|
|
ret := encoded{name:[]byte(p.name),value:t}
|
|
if len(chs) > 0 {
|
|
ret.donech = chs[0]
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// tencode creates a DB key based on the time provided
|
|
func tencode(t time.Time) []byte {
|
|
buf := make([]byte, 8)
|
|
ns := t.UnixNano()
|
|
binary.BigEndian.PutUint64(buf, uint64(ns))
|
|
|
|
return buf
|
|
}
|
|
|
|
func tdecode(x []byte) time.Time {
|
|
ns := int64(binary.BigEndian.Uint64(x))
|
|
return time.Unix(0,ns)
|
|
}
|
|
|
|
func start() {
|
|
mux.Lock()
|
|
if launched {
|
|
mux.Unlock()
|
|
return
|
|
}
|
|
launched = true
|
|
|
|
if conf.configured == false {
|
|
mux.Unlock()
|
|
Init()
|
|
mux.Lock()
|
|
}
|
|
|
|
var err error
|
|
dbdir := path.Dir(conf.DBPath)
|
|
if _, err = os.Stat(dbdir); os.IsNotExist(err) {
|
|
if Debug { log.Print("creating db dir") }
|
|
err = os.MkdirAll(dbdir,0700)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
}
|
|
db, err = bolt.Open(conf.DBPath, 0600, &bolt.Options{Timeout:5 * time.Second})
|
|
|
|
if err != nil {
|
|
log.Fatal("persist.start() bolt.Open(): ", err)
|
|
}
|
|
|
|
tchan = make(chan struct{})
|
|
tg.Add(1)
|
|
go tidyThread()
|
|
|
|
wchan = make(chan encoded)
|
|
wg.Add(1)
|
|
go writer()
|
|
|
|
__checkpoints = &Var{name:"__checkpoints", X: nil}
|
|
h, err := __checkpoints.History()
|
|
if len(h) > 0 {
|
|
lastCheckpoint = h[len(h)-1].Time
|
|
if Debug { fmt.Println("lastCheckpoint = ",lastCheckpoint) }
|
|
}
|
|
mux.Unlock()
|
|
}
|
|
|
|
func writer() {
|
|
if Debug { fmt.Println("launching writer thread") }
|
|
runtime.UnlockOSThread()
|
|
for p := range wchan {
|
|
wmux.Lock()
|
|
err := db.Update(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket(p.name)
|
|
if b == nil {
|
|
_, err := tx.CreateBucket(p.name)
|
|
if err != nil {
|
|
log.Fatal("Cannot create bucket for ",string(p.name))
|
|
}
|
|
if Debug { fmt.Println("writer(): created bucket for ",string(p.name)) }
|
|
b = tx.Bucket(p.name)
|
|
if b == nil {
|
|
log.Fatal("Bucket error")
|
|
}
|
|
}
|
|
var err error
|
|
switch v := p.value.(type) {
|
|
case time.Time:
|
|
vv := b.Get(tencode(v))
|
|
if len(vv) == 0 {
|
|
return errors.New("writer() error: can't delete")
|
|
}
|
|
err = b.Delete(tencode(v))
|
|
case []byte:
|
|
t := time.Now()
|
|
if string(p.name) == "__checkpoints" {
|
|
if Debug { fmt.Println("saving: ",string(p.name),t) }
|
|
lastCheckpoint = t
|
|
}
|
|
err = b.Put(tencode(t),v)
|
|
}
|
|
if err != nil {
|
|
log.Fatal("writer error", err)
|
|
}
|
|
return nil
|
|
})
|
|
wmux.Unlock()
|
|
if err != nil {
|
|
fmt.Println("writer() error: ",err)
|
|
}
|
|
if p.donech != nil { // close out synchronous writes
|
|
close(p.donech)
|
|
}
|
|
}
|
|
if Debug { fmt.Println("leaving writer thread") }
|
|
db.Close()
|
|
if Debug { fmt.Println("DB is closed") }
|
|
wg.Done()
|
|
}
|
|
|
|
func (p *Var) History() ([]Var, error) {
|
|
ret := make([]Var,0)
|
|
wmux.Lock()
|
|
err := db.View(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket([]byte(p.name))
|
|
if b == nil {
|
|
return errors.New("DB entry not found")
|
|
}
|
|
c := b.Cursor()
|
|
for k, x := c.First(); x != nil; k, x = c.Next() {
|
|
t := tdecode(k)
|
|
p2 := Var{}
|
|
dec := gob.NewDecoder(bytes.NewBuffer(x))
|
|
err := dec.Decode(&p2)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if reflect.TypeOf(p2.X) != reflect.TypeOf(p.X) {
|
|
return errors.New("History(): Type mismatch")
|
|
}
|
|
p3 := *p
|
|
p3.X = p2.X
|
|
p3.Time = t
|
|
ret = append(ret,p3)
|
|
}
|
|
return nil
|
|
})
|
|
wmux.Unlock()
|
|
return ret, err
|
|
}
|
|
|
|
// retrieve a variable from the database. If ts is provided, return the first version
|
|
// that is not after ts[0].
|
|
func retrieve(name string, ts ...time.Time) ([]byte, time.Time, error) {
|
|
var ret []byte
|
|
var t time.Time
|
|
err := db.View(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket([]byte(name))
|
|
if b == nil {
|
|
return errors.New("DB entry not found")
|
|
}
|
|
c := b.Cursor()
|
|
if len(ts) == 0 {
|
|
var k []byte
|
|
k, ret = c.Last()
|
|
t = tdecode(k)
|
|
} else {
|
|
var k []byte
|
|
for k, ret = c.Last(); ret != nil; k, ret = c.Prev() {
|
|
t = tdecode(k)
|
|
if ts[0].After(t) {
|
|
return nil
|
|
}
|
|
}
|
|
return errors.New("Not found")
|
|
}
|
|
return nil
|
|
})
|
|
return ret, t, err
|
|
}
|
|
|
|
type TidyFunc func(*bolt.Bucket) int
|
|
|
|
type TidyConfig struct {
|
|
Interval time.Duration
|
|
Func TidyFunc
|
|
}
|
|
|
|
func ExpireAfter(exp time.Duration) Option {
|
|
return func(c *Config) {
|
|
c.Tidy.Func = ExpireAfterFunc(exp)
|
|
}
|
|
}
|
|
|
|
func ExpireAfterFunc(exp time.Duration) TidyFunc {
|
|
if exp > 0 {
|
|
exp = -1 * exp
|
|
}
|
|
if Debug { fmt.Println("ExpireAfter(",exp,")") }
|
|
return func(b *bolt.Bucket) int {
|
|
stime := time.Now()
|
|
etime := stime.Add(exp)
|
|
i := 0
|
|
c := b.Cursor()
|
|
end,_ := c.Last() // always keep the most recent on
|
|
for k,_ := c.First(); time.Since(stime) < 10 * time.Millisecond; k,_ = c.Next() {
|
|
if tdecode(k).After(etime) {
|
|
return i // entry not expired
|
|
}
|
|
if reflect.DeepEqual(k,end) {
|
|
return i // always keep the most recent entry
|
|
}
|
|
b.Delete(k)
|
|
i++
|
|
}
|
|
return i
|
|
}
|
|
}
|
|
|
|
func tidyThread() {
|
|
if Debug { fmt.Println("tidyThread() starting") }
|
|
lchan := make(chan string)
|
|
go func() {
|
|
if Debug { fmt.Println("starting loaded reading thread") }
|
|
if loaded == nil {
|
|
loaded = make(chan *Var,conf.MaxVars) // buffered channel
|
|
}
|
|
for {
|
|
for p := range loaded {
|
|
if p.permanent == false {
|
|
lchan<- p.name
|
|
loaded<- p // put it back in the queue
|
|
}
|
|
}
|
|
if Debug { fmt.Println("closing lchan") }
|
|
close(lchan)
|
|
}
|
|
}()
|
|
tick := time.NewTicker(conf.Tidy.Interval)
|
|
for { select {
|
|
case <-tchan:
|
|
if Debug { fmt.Println("closing loaded channel") }
|
|
close(loaded)
|
|
tg.Done()
|
|
if Debug { fmt.Println("tidy(): returning") }
|
|
return
|
|
case <-tick.C:
|
|
name := <-lchan
|
|
if Debug { fmt.Println("tidyThread(): tidying ", name) }
|
|
Tidy(name)
|
|
} }
|
|
}
|
|
|
|
// discard entries if they are too old.
|
|
func Tidy(name string) {
|
|
stime := time.Now()
|
|
mux.Lock()
|
|
i := 0
|
|
db.Update(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket([]byte(name))
|
|
if b == nil {
|
|
fmt.Println("Can't open bucket for ",name)
|
|
return nil
|
|
}
|
|
i = conf.Tidy.Func(b)
|
|
return nil
|
|
})
|
|
mux.Unlock()
|
|
if i > 0 {
|
|
if Debug { fmt.Println("tidy(): deleted ",i," entries in ",time.Since(stime)) }
|
|
}
|
|
if Debug { fmt.Println("tidy(): done tidying",name) }
|
|
}
|
|
|