persist/persist.go

491 lines
9.0 KiB
Go
Raw Normal View History

2018-08-03 15:08:30 -04:00
package persist
import (
"bytes"
"encoding/binary"
"encoding/gob"
"errors"
"fmt"
"log"
"os"
"path"
"reflect"
"runtime"
"sync"
"time"
bolt "github.com/coreos/bbolt"
)
// allow persistant storage and reloading of variables to bolt database.
// everything is time stamped.
// bucket is the variable name
type Var struct {
X interface{}
Time time.Time
2018-08-03 15:08:30 -04:00
name string
opt Option
}
type Option struct {
Permanent bool
}
type Config struct {
configured bool
MaxVars int
DBPath string
}
func Init(x ...Config) {
mux.Lock()
c := Config{MaxVars: 64, DBPath: "db"} // default config
conf = c
if len(x) > 0 {
c = x[0]
}
if c.MaxVars != 0 {
conf.MaxVars = c.MaxVars
conf.configured = true
}
if c.DBPath != "" {
conf.DBPath = c.DBPath
conf.configured = true
}
mux.Unlock()
}
func New(name string, x interface{},opt ...Option) *Var {
start()
mux.Lock()
if vars == nil {
vars = make(map[string]*Var)
}
ret := vars[name]
if ret == nil {
ret = &Var{X: x, name: name}
vars[name] = ret
}
if len(opt) == 1 {
ret.opt = opt[0]
}
mux.Unlock()
err := ret.Load()
if err != nil { // save default value if loading failed
fmt.Println("Load error:",err)
wchan <- encode(ret)
}
fmt.Println("New(): ",ret.name)
mux.Lock()
if loaded == nil {
loaded = make(chan *Var,conf.MaxVars) // buffered channel
}
mux.Unlock()
loaded<- ret
return ret
}
func (p *Var) Set(x interface{}) error {
if reflect.TypeOf(x) == reflect.TypeOf(p.X) {
p.X = x
p.Save()
return nil
} else {
return errors.New("Set(): Type mismatch")
}
}
func (p *Var) SaveSync() {
donech := make(chan struct{})
wchan <- encode(p,donech)
<-donech
}
func (p *Var) Save(sync ...bool) {
dirty = true
wchan <- encode(p)
}
func (p *Var) Load(ts ...time.Time) error {
var err error
var v []byte
var t time.Time
2018-08-03 15:08:30 -04:00
if len(ts) == 0 {
v, t, err = retrieve(p.name,lastCheckpoint)
2018-08-03 15:08:30 -04:00
} else {
v, t, err = retrieve(p.name,ts[0])
2018-08-03 15:08:30 -04:00
}
if err == nil {
var p2 Var
dec := gob.NewDecoder(bytes.NewBuffer(v))
err := dec.Decode(&p2)
if reflect.TypeOf(p2.X) != reflect.TypeOf(p.X) {
err = errors.New("Load(): Type mismatch")
}
if err != nil {
fmt.Println("Load(): ",err)
} else {
p.X = p2.X
p.Time = t
2018-08-03 15:08:30 -04:00
}
} else {
fmt.Println("Load(",p.name,"): type =",reflect.TypeOf(p.X))
}
return err
}
func (p *Var) Get(ts ...time.Time) interface{} {
if len(ts) == 0 {
return p.X
} else {
p2 := *p
p2.Load(ts[0])
return p2.X
}
}
func (p *Var) Delete(t time.Time) {
wchan<- dencode(p,t)
}
func (p *Var) DeleteSync(t time.Time) {
donech := make(chan struct{})
wchan<- dencode(p,t,donech)
<-donech
}
func Commit() {
mux.Lock()
if dirty {
dirty = false
__checkpoints.SaveSync()
}
mux.Unlock()
}
func Checkpoints() {
fmt.Println(__checkpoints.History())
}
func Undo() {
fmt.Println("Undo()")
mux.Lock()
__checkpoints.DeleteSync(lastCheckpoint)
h,_ := __checkpoints.History()
var t time.Time
if len(h) > 0 {
t = h[len(h)-1].Time
} else {
return
}
lastCheckpoint = t
for _,v := range vars {
v.Load(t)
}
dirty = false
mux.Unlock()
}
func Shutdown() {
close(tchan)
tg.Wait()
close(wchan)
wg.Wait()
mux.Lock()
launched = false
vars = make(map[string]*Var)
mux.Unlock()
}
var (
launched bool
wchan chan encoded
tchan chan struct{}
mux sync.Mutex
wmux sync.Mutex
wg sync.WaitGroup
tg sync.WaitGroup
db *bolt.DB
loaded chan *Var
conf Config
__checkpoints *Var
lastCheckpoint time.Time
vars map[string]*Var
dirty bool
)
type encoded struct {
name []byte
value interface{}
donech chan struct{}
}
func encode(p *Var,chs ...chan struct{}) encoded {
ret := encoded{name:[]byte(p.name)}
if len(chs) > 0 {
ret.donech = chs[0]
}
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(*p)
if err != nil {
fmt.Println("encode(): ",err)
}
ret.value = buf.Bytes()
return ret
}
func dencode(p *Var,t time.Time,chs ...chan struct{}) encoded {
ret := encoded{name:[]byte(p.name),value:t}
if len(chs) > 0 {
ret.donech = chs[0]
}
return ret
}
// tencode creates a DB key based on the time provided
func tencode(t time.Time) []byte {
buf := make([]byte, 8)
ns := t.UnixNano()
binary.BigEndian.PutUint64(buf, uint64(ns))
return buf
}
func tdecode(x []byte) time.Time {
ns := int64(binary.BigEndian.Uint64(x))
return time.Unix(0,ns)
}
func start() {
mux.Lock()
if launched {
mux.Unlock()
return
}
launched = true
if conf.configured == false {
Init()
}
var err error
dbdir := path.Dir(conf.DBPath)
if _, err = os.Stat(dbdir); os.IsNotExist(err) {
log.Print("creating db dir")
err = os.MkdirAll(dbdir,0700)
if err != nil {
log.Fatal(err)
}
}
db, err = bolt.Open(conf.DBPath, 0600, &bolt.Options{Timeout:5 * time.Second})
if err != nil {
log.Fatal("persist.start() bolt.Open(): ", err)
}
tchan = make(chan struct{})
tg.Add(1)
go tidyThread()
wchan = make(chan encoded)
wg.Add(1)
go writer()
__checkpoints = &Var{name:"__checkpoints", X: nil}
h, err := __checkpoints.History()
if len(h) > 0 {
lastCheckpoint = h[len(h)-1].Time
fmt.Println("lastCheckpoint = ",lastCheckpoint)
}
mux.Unlock()
}
func writer() {
fmt.Println("launching writer thread")
runtime.UnlockOSThread()
for p := range wchan {
wmux.Lock()
err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(p.name)
if b == nil {
_, err := tx.CreateBucket(p.name)
if err != nil {
log.Fatal("Cannot create bucket for ",string(p.name))
}
fmt.Println("writer(): created bucket for ",string(p.name))
b = tx.Bucket(p.name)
if b == nil {
log.Fatal("Bucket error")
}
}
var err error
switch v := p.value.(type) {
case time.Time:
vv := b.Get(tencode(v))
if len(vv) == 0 {
return errors.New("writer() error: can't delete")
}
err = b.Delete(tencode(v))
case []byte:
t := time.Now()
if string(p.name) == "__checkpoints" {
fmt.Println("saving: ",string(p.name),t)
lastCheckpoint = t
}
err = b.Put(tencode(t),v)
}
if err != nil {
log.Fatal("writer error", err)
}
return nil
})
wmux.Unlock()
if err != nil {
fmt.Println("writer() error: ",err)
}
if p.donech != nil { // close out synchronous writes
close(p.donech)
}
}
fmt.Println("leaving writer thread")
db.Close()
fmt.Println("DB is closed")
wg.Done()
}
func (p *Var) History() ([]Var, error) {
ret := make([]Var,0)
2018-08-03 15:08:30 -04:00
wmux.Lock()
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(p.name))
if b == nil {
return errors.New("DB entry not found")
}
c := b.Cursor()
for k, x := c.First(); x != nil; k, x = c.Next() {
t := tdecode(k)
p2 := Var{}
dec := gob.NewDecoder(bytes.NewBuffer(x))
err := dec.Decode(&p2)
if err != nil {
return err
}
if reflect.TypeOf(p2.X) != reflect.TypeOf(p.X) {
return errors.New("History(): Type mismatch")
}
p3 := *p
p3.X = p2.X
p3.Time = t
ret = append(ret,p3)
2018-08-03 15:08:30 -04:00
}
return nil
})
wmux.Unlock()
return ret, err
}
// retrieve a variable from the database. If ts is provided, return the first version
// that is not after ts[0].
func retrieve(name string, ts ...time.Time) ([]byte, time.Time, error) {
2018-08-03 15:08:30 -04:00
var ret []byte
var t time.Time
2018-08-03 15:08:30 -04:00
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(name))
if b == nil {
return errors.New("DB entry not found")
}
c := b.Cursor()
if len(ts) == 0 {
var k []byte
k, ret = c.Last()
t = tdecode(k)
2018-08-03 15:08:30 -04:00
} else {
var k []byte
for k, ret = c.Last(); ret != nil; k, ret = c.Prev() {
t = tdecode(k)
if ts[0].After(t) {
2018-08-03 15:08:30 -04:00
return nil
}
}
return errors.New("Not found")
}
return nil
})
return ret, t, err
2018-08-03 15:08:30 -04:00
}
func tidyThread() {
fmt.Println("tidyThread() starting")
lchan := make(chan string)
go func() {
fmt.Println("starting loaded reading thread")
if loaded == nil {
loaded = make(chan *Var,conf.MaxVars) // buffered channel
}
for {
for p := range loaded {
if p.opt.Permanent == false {
lchan<- p.name
loaded<- p // put it back in the queue
}
}
fmt.Println("closing lchan")
close(lchan)
}
}()
tick := time.NewTicker(time.Second * 10)
for { select {
case <-tchan:
fmt.Println("closing loaded channel")
close(loaded)
tg.Done()
fmt.Println("tidy(): returning")
return
case <-tick.C:
name := <-lchan
fmt.Println("tidyThread(): tidying ", name)
Tidy(name)
} }
}
const expiration time.Duration = -24 * time.Hour
// discard entries if they are too old.
func Tidy(name string) {
stime := time.Now()
etime := stime.Add(expiration)
mux.Lock()
i := 0
db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(name))
if b == nil {
fmt.Println("Can't open bucket for ",name)
return nil
}
c := b.Cursor()
end,_ := c.Last() // always keep the most recent on
for time.Since(stime) < 10 * time.Millisecond {
k,_ := c.First()
if tdecode(k).After(etime) {
return nil // entry not expired
}
if reflect.DeepEqual(k,end) {
return nil // always keep the most recent entry
}
b.Delete(k)
i++
}
return nil
})
mux.Unlock()
if i > 0 {
fmt.Println("tidy(): deleted ",i," entries in ",time.Since(stime))
}
fmt.Println("tidy(): done tidying",name)
}