package persist import ( "bytes" "encoding/binary" "encoding/gob" "errors" "fmt" "log" "os" "path" "reflect" "runtime" "sync" "time" bolt "github.com/coreos/bbolt" ) // allow persistant storage and reloading of variables to bolt database. // everything is time stamped. // bucket is the variable name type Var struct { X interface{} name string opt Option } type Option struct { Permanent bool } type Config struct { configured bool MaxVars int DBPath string } func Init(x ...Config) { mux.Lock() c := Config{MaxVars: 64, DBPath: "db"} // default config conf = c if len(x) > 0 { c = x[0] } if c.MaxVars != 0 { conf.MaxVars = c.MaxVars conf.configured = true } if c.DBPath != "" { conf.DBPath = c.DBPath conf.configured = true } mux.Unlock() } func New(name string, x interface{},opt ...Option) *Var { start() mux.Lock() if vars == nil { vars = make(map[string]*Var) } ret := vars[name] if ret == nil { ret = &Var{X: x, name: name} vars[name] = ret } if len(opt) == 1 { ret.opt = opt[0] } mux.Unlock() err := ret.Load() if err != nil { // save default value if loading failed fmt.Println("Load error:",err) wchan <- encode(ret) } fmt.Println("New(): ",ret.name) mux.Lock() if loaded == nil { loaded = make(chan *Var,conf.MaxVars) // buffered channel } mux.Unlock() loaded<- ret return ret } func (p *Var) Set(x interface{}) error { if reflect.TypeOf(x) == reflect.TypeOf(p.X) { p.X = x p.Save() return nil } else { return errors.New("Set(): Type mismatch") } } func (p *Var) SaveSync() { donech := make(chan struct{}) wchan <- encode(p,donech) <-donech } func (p *Var) Save(sync ...bool) { dirty = true wchan <- encode(p) } func (p *Var) Load(ts ...time.Time) error { var err error var v []byte if len(ts) == 0 { v, err = retrieve(p.name,lastCheckpoint) } else { v, err = retrieve(p.name,ts[0]) } if err == nil { var p2 Var dec := gob.NewDecoder(bytes.NewBuffer(v)) err := dec.Decode(&p2) if reflect.TypeOf(p2.X) != reflect.TypeOf(p.X) { err = errors.New("Load(): Type mismatch") } if err != nil { fmt.Println("Load(): ",err) } else { p.X = p2.X } } else { fmt.Println("Load(",p.name,"): type =",reflect.TypeOf(p.X)) } return err } func (p *Var) Get(ts ...time.Time) interface{} { if len(ts) == 0 { return p.X } else { p2 := *p p2.Load(ts[0]) return p2.X } } func (p *Var) Delete(t time.Time) { wchan<- dencode(p,t) } func (p *Var) DeleteSync(t time.Time) { donech := make(chan struct{}) wchan<- dencode(p,t,donech) <-donech } func Commit() { mux.Lock() if dirty { dirty = false __checkpoints.SaveSync() } mux.Unlock() } func Checkpoints() { fmt.Println(__checkpoints.History()) } func Undo() { fmt.Println("Undo()") mux.Lock() __checkpoints.DeleteSync(lastCheckpoint) h,_ := __checkpoints.History() var t time.Time if len(h) > 0 { t = h[len(h)-1].Time } else { return } lastCheckpoint = t for _,v := range vars { v.Load(t) } dirty = false mux.Unlock() } func Shutdown() { close(tchan) tg.Wait() close(wchan) wg.Wait() mux.Lock() launched = false vars = make(map[string]*Var) mux.Unlock() } var ( launched bool wchan chan encoded tchan chan struct{} mux sync.Mutex wmux sync.Mutex wg sync.WaitGroup tg sync.WaitGroup db *bolt.DB loaded chan *Var conf Config __checkpoints *Var lastCheckpoint time.Time vars map[string]*Var dirty bool ) type encoded struct { name []byte value interface{} donech chan struct{} } func encode(p *Var,chs ...chan struct{}) encoded { ret := encoded{name:[]byte(p.name)} if len(chs) > 0 { ret.donech = chs[0] } var buf bytes.Buffer enc := gob.NewEncoder(&buf) err := enc.Encode(*p) if err != nil { fmt.Println("encode(): ",err) } ret.value = buf.Bytes() return ret } func dencode(p *Var,t time.Time,chs ...chan struct{}) encoded { ret := encoded{name:[]byte(p.name),value:t} if len(chs) > 0 { ret.donech = chs[0] } return ret } // tencode creates a DB key based on the time provided func tencode(t time.Time) []byte { buf := make([]byte, 8) ns := t.UnixNano() binary.BigEndian.PutUint64(buf, uint64(ns)) return buf } func tdecode(x []byte) time.Time { ns := int64(binary.BigEndian.Uint64(x)) return time.Unix(0,ns) } func start() { mux.Lock() if launched { mux.Unlock() return } launched = true if conf.configured == false { Init() } var err error dbdir := path.Dir(conf.DBPath) if _, err = os.Stat(dbdir); os.IsNotExist(err) { log.Print("creating db dir") err = os.MkdirAll(dbdir,0700) if err != nil { log.Fatal(err) } } db, err = bolt.Open(conf.DBPath, 0600, &bolt.Options{Timeout:5 * time.Second}) if err != nil { log.Fatal("persist.start() bolt.Open(): ", err) } tchan = make(chan struct{}) tg.Add(1) go tidyThread() wchan = make(chan encoded) wg.Add(1) go writer() __checkpoints = &Var{name:"__checkpoints", X: nil} h, err := __checkpoints.History() if len(h) > 0 { lastCheckpoint = h[len(h)-1].Time fmt.Println("lastCheckpoint = ",lastCheckpoint) } mux.Unlock() } func writer() { fmt.Println("launching writer thread") runtime.UnlockOSThread() for p := range wchan { wmux.Lock() err := db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(p.name) if b == nil { _, err := tx.CreateBucket(p.name) if err != nil { log.Fatal("Cannot create bucket for ",string(p.name)) } fmt.Println("writer(): created bucket for ",string(p.name)) b = tx.Bucket(p.name) if b == nil { log.Fatal("Bucket error") } } var err error switch v := p.value.(type) { case time.Time: vv := b.Get(tencode(v)) if len(vv) == 0 { return errors.New("writer() error: can't delete") } err = b.Delete(tencode(v)) case []byte: t := time.Now() if string(p.name) == "__checkpoints" { fmt.Println("saving: ",string(p.name),t) lastCheckpoint = t } err = b.Put(tencode(t),v) } if err != nil { log.Fatal("writer error", err) } return nil }) wmux.Unlock() if err != nil { fmt.Println("writer() error: ",err) } if p.donech != nil { // close out synchronous writes close(p.donech) } } fmt.Println("leaving writer thread") db.Close() fmt.Println("DB is closed") wg.Done() } type TVar struct { Time time.Time Value Var } func (p *Var) History() ([]TVar, error) { ret := make([]TVar,0) wmux.Lock() err := db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(p.name)) if b == nil { return errors.New("DB entry not found") } c := b.Cursor() for k, x := c.First(); x != nil; k, x = c.Next() { t := tdecode(k) p2 := Var{} dec := gob.NewDecoder(bytes.NewBuffer(x)) err := dec.Decode(&p2) if err != nil { return err } if reflect.TypeOf(p2.X) != reflect.TypeOf(p.X) { return errors.New("History(): Type mismatch") } p3 := *p p3.X = p2.X ret = append(ret,TVar{t,p3}) } return nil }) wmux.Unlock() return ret, err } // retrieve a variable from the database. If ts is provided, return the first version // that is not after ts[0]. func retrieve(name string, ts ...time.Time) ([]byte, error) { var ret []byte err := db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(name)) if b == nil { return errors.New("DB entry not found") } c := b.Cursor() if len(ts) == 0 { _, ret = c.Last() } else { var k []byte for k, ret = c.Last(); ret != nil; k, ret = c.Prev() { if ts[0].After(tdecode(k)) { return nil } } return errors.New("Not found") } return nil }) return ret, err } func tidyThread() { fmt.Println("tidyThread() starting") lchan := make(chan string) go func() { fmt.Println("starting loaded reading thread") if loaded == nil { loaded = make(chan *Var,conf.MaxVars) // buffered channel } for { for p := range loaded { if p.opt.Permanent == false { lchan<- p.name loaded<- p // put it back in the queue } } fmt.Println("closing lchan") close(lchan) } }() tick := time.NewTicker(time.Second * 10) for { select { case <-tchan: fmt.Println("closing loaded channel") close(loaded) tg.Done() fmt.Println("tidy(): returning") return case <-tick.C: name := <-lchan fmt.Println("tidyThread(): tidying ", name) Tidy(name) } } } const expiration time.Duration = -24 * time.Hour // discard entries if they are too old. func Tidy(name string) { stime := time.Now() etime := stime.Add(expiration) mux.Lock() i := 0 db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(name)) if b == nil { fmt.Println("Can't open bucket for ",name) return nil } c := b.Cursor() end,_ := c.Last() // always keep the most recent on for time.Since(stime) < 10 * time.Millisecond { k,_ := c.First() if tdecode(k).After(etime) { return nil // entry not expired } if reflect.DeepEqual(k,end) { return nil // always keep the most recent entry } b.Delete(k) i++ } return nil }) mux.Unlock() if i > 0 { fmt.Println("tidy(): deleted ",i," entries in ",time.Since(stime)) } fmt.Println("tidy(): done tidying",name) }