package persist import ( "bytes" "encoding/binary" "encoding/gob" "errors" "fmt" "log" "os" "path" "reflect" "runtime" "sync" "time" bolt "github.com/coreos/bbolt" ) // allow persistant storage and reloading of variables to bolt database. // everything is time stamped. // bucket is the variable name type Var struct { X interface{} Time time.Time name string permanent bool } type encVar struct { X interface{} } type Option func(*Config) type Config struct { configured bool MaxVars int DBPath string Tidy TidyConfig } func MaxVars(i int) Option { return func(c *Config) { c.MaxVars = i } } func DBPath(s string) Option { return func(c *Config) { c.DBPath = s } } func TidyInterval(d time.Duration) Option { return func(c *Config) { c.Tidy.Interval = d } } func Init(confs ...Option) { mux.Lock() tc := TidyConfig { Interval: 10 * time.Second, } conf = Config { // default config configured: true, MaxVars: 64, DBPath: "db", Tidy: tc, } for _,f := range(confs) { f(&conf) } if conf.Tidy.Func == nil { conf.Tidy.Func = ExpireAfterFunc(24 * time.Hour) } mux.Unlock() } func New(name string, x interface{},ps ...bool) *Var { start() mux.Lock() if vars == nil { vars = make(map[string]*Var) } ret := vars[name] if ret == nil { ret = &Var{X: x, name: name} vars[name] = ret } if len(ps) > 1 { ret.permanent = ps[0] } mux.Unlock() err := ret.Load() if err != nil { // save default value if loading failed fmt.Println("Load error:",err) wchan <- encode(ret) } fmt.Println("New(): ",ret.name) mux.Lock() if loaded == nil { loaded = make(chan *Var,conf.MaxVars) // buffered channel } mux.Unlock() loaded<- ret return ret } func (p *Var) Set(x interface{}) error { if reflect.TypeOf(x) == reflect.TypeOf(p.X) { p.X = x p.Save() return nil } else { return errors.New("Set(): Type mismatch") } } func (p *Var) SaveSync() { donech := make(chan struct{}) wchan <- encode(p,donech) <-donech } func (p *Var) Save(sync ...bool) { dirty = true wchan <- encode(p) } func (p *Var) Load(ts ...time.Time) error { var err error var v []byte var t time.Time if len(ts) == 0 { v, t, err = retrieve(p.name,lastCheckpoint) } else { v, t, err = retrieve(p.name,ts[0]) } if err == nil { var p2 Var dec := gob.NewDecoder(bytes.NewBuffer(v)) err := dec.Decode(&p2) if reflect.TypeOf(p2.X) != reflect.TypeOf(p.X) { err = errors.New("Load(): Type mismatch") } if err != nil { fmt.Println("Load(): ",err) } else { p.X = p2.X p.Time = t } } else { fmt.Println("Load(",p.name,"): type =",reflect.TypeOf(p.X)) } return err } func (p *Var) Get(ts ...time.Time) interface{} { if len(ts) == 0 { return p.X } else { p2 := *p p2.Load(ts[0]) return p2.X } } func (p *Var) Delete(t time.Time) { wchan<- dencode(p,t) } func (p *Var) DeleteSync(t time.Time) { donech := make(chan struct{}) wchan<- dencode(p,t,donech) <-donech } func Commit() { mux.Lock() if dirty { dirty = false __checkpoints.SaveSync() } mux.Unlock() } func Checkpoints() { fmt.Println(__checkpoints.History()) } func Undo() { fmt.Println("Undo()") mux.Lock() __checkpoints.DeleteSync(lastCheckpoint) h,_ := __checkpoints.History() var t time.Time if len(h) > 0 { t = h[len(h)-1].Time } else { return } lastCheckpoint = t for _,v := range vars { v.Load(t) } dirty = false mux.Unlock() } func Shutdown() { close(tchan) tg.Wait() close(wchan) wg.Wait() mux.Lock() launched = false vars = make(map[string]*Var) mux.Unlock() } var ( launched bool wchan chan encoded tchan chan struct{} mux sync.Mutex wmux sync.Mutex wg sync.WaitGroup tg sync.WaitGroup db *bolt.DB loaded chan *Var conf Config __checkpoints *Var lastCheckpoint time.Time vars map[string]*Var dirty bool ) type encoded struct { name []byte value interface{} donech chan struct{} } func encode(p *Var,chs ...chan struct{}) encoded { ret := encoded{name:[]byte(p.name)} ep := encVar{ p.X } if len(chs) > 0 { ret.donech = chs[0] } var buf bytes.Buffer enc := gob.NewEncoder(&buf) err := enc.Encode(ep) if err != nil { fmt.Println("encode(): ",err) } ret.value = buf.Bytes() return ret } func dencode(p *Var,t time.Time,chs ...chan struct{}) encoded { ret := encoded{name:[]byte(p.name),value:t} if len(chs) > 0 { ret.donech = chs[0] } return ret } // tencode creates a DB key based on the time provided func tencode(t time.Time) []byte { buf := make([]byte, 8) ns := t.UnixNano() binary.BigEndian.PutUint64(buf, uint64(ns)) return buf } func tdecode(x []byte) time.Time { ns := int64(binary.BigEndian.Uint64(x)) return time.Unix(0,ns) } func start() { mux.Lock() if launched { mux.Unlock() return } launched = true if conf.configured == false { mux.Unlock() Init() mux.Lock() } var err error dbdir := path.Dir(conf.DBPath) if _, err = os.Stat(dbdir); os.IsNotExist(err) { log.Print("creating db dir") err = os.MkdirAll(dbdir,0700) if err != nil { log.Fatal(err) } } db, err = bolt.Open(conf.DBPath, 0600, &bolt.Options{Timeout:5 * time.Second}) if err != nil { log.Fatal("persist.start() bolt.Open(): ", err) } tchan = make(chan struct{}) tg.Add(1) go tidyThread() wchan = make(chan encoded) wg.Add(1) go writer() __checkpoints = &Var{name:"__checkpoints", X: nil} h, err := __checkpoints.History() if len(h) > 0 { lastCheckpoint = h[len(h)-1].Time fmt.Println("lastCheckpoint = ",lastCheckpoint) } mux.Unlock() } func writer() { fmt.Println("launching writer thread") runtime.UnlockOSThread() for p := range wchan { wmux.Lock() err := db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(p.name) if b == nil { _, err := tx.CreateBucket(p.name) if err != nil { log.Fatal("Cannot create bucket for ",string(p.name)) } fmt.Println("writer(): created bucket for ",string(p.name)) b = tx.Bucket(p.name) if b == nil { log.Fatal("Bucket error") } } var err error switch v := p.value.(type) { case time.Time: vv := b.Get(tencode(v)) if len(vv) == 0 { return errors.New("writer() error: can't delete") } err = b.Delete(tencode(v)) case []byte: t := time.Now() if string(p.name) == "__checkpoints" { fmt.Println("saving: ",string(p.name),t) lastCheckpoint = t } err = b.Put(tencode(t),v) } if err != nil { log.Fatal("writer error", err) } return nil }) wmux.Unlock() if err != nil { fmt.Println("writer() error: ",err) } if p.donech != nil { // close out synchronous writes close(p.donech) } } fmt.Println("leaving writer thread") db.Close() fmt.Println("DB is closed") wg.Done() } func (p *Var) History() ([]Var, error) { ret := make([]Var,0) wmux.Lock() err := db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(p.name)) if b == nil { return errors.New("DB entry not found") } c := b.Cursor() for k, x := c.First(); x != nil; k, x = c.Next() { t := tdecode(k) p2 := Var{} dec := gob.NewDecoder(bytes.NewBuffer(x)) err := dec.Decode(&p2) if err != nil { return err } if reflect.TypeOf(p2.X) != reflect.TypeOf(p.X) { return errors.New("History(): Type mismatch") } p3 := *p p3.X = p2.X p3.Time = t ret = append(ret,p3) } return nil }) wmux.Unlock() return ret, err } // retrieve a variable from the database. If ts is provided, return the first version // that is not after ts[0]. func retrieve(name string, ts ...time.Time) ([]byte, time.Time, error) { var ret []byte var t time.Time err := db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(name)) if b == nil { return errors.New("DB entry not found") } c := b.Cursor() if len(ts) == 0 { var k []byte k, ret = c.Last() t = tdecode(k) } else { var k []byte for k, ret = c.Last(); ret != nil; k, ret = c.Prev() { t = tdecode(k) if ts[0].After(t) { return nil } } return errors.New("Not found") } return nil }) return ret, t, err } type TidyFunc func(*bolt.Bucket) int type TidyConfig struct { Interval time.Duration Func TidyFunc } func ExpireAfter(exp time.Duration) Option { return func(c *Config) { c.Tidy.Func = ExpireAfterFunc(exp) } } func ExpireAfterFunc(exp time.Duration) TidyFunc { if exp > 0 { exp = -1 * exp } fmt.Println("ExpireAfter(",exp,")") return func(b *bolt.Bucket) int { stime := time.Now() etime := stime.Add(exp) i := 0 c := b.Cursor() end,_ := c.Last() // always keep the most recent on for k,_ := c.First(); time.Since(stime) < 10 * time.Millisecond; k,_ = c.Next() { if tdecode(k).After(etime) { return i // entry not expired } if reflect.DeepEqual(k,end) { return i // always keep the most recent entry } b.Delete(k) i++ } return i } } func tidyThread() { fmt.Println("tidyThread() starting") lchan := make(chan string) go func() { fmt.Println("starting loaded reading thread") if loaded == nil { loaded = make(chan *Var,conf.MaxVars) // buffered channel } for { for p := range loaded { if p.permanent == false { lchan<- p.name loaded<- p // put it back in the queue } } fmt.Println("closing lchan") close(lchan) } }() tick := time.NewTicker(conf.Tidy.Interval) for { select { case <-tchan: fmt.Println("closing loaded channel") close(loaded) tg.Done() fmt.Println("tidy(): returning") return case <-tick.C: name := <-lchan fmt.Println("tidyThread(): tidying ", name) Tidy(name) } } } // discard entries if they are too old. func Tidy(name string) { stime := time.Now() mux.Lock() i := 0 db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(name)) if b == nil { fmt.Println("Can't open bucket for ",name) return nil } i = conf.Tidy.Func(b) return nil }) mux.Unlock() if i > 0 { fmt.Println("tidy(): deleted ",i," entries in ",time.Since(stime)) } fmt.Println("tidy(): done tidying",name) }