Compare commits

..

No commits in common. "814c6ffa20ab2fe98f42f097af517a8818557f6b" and "9653dab9f4170e82a712570b033eb6fba0a11895" have entirely different histories.

4 changed files with 56 additions and 141 deletions

2
.gitignore vendored
View File

@ -2,5 +2,3 @@ test/basic/basic
test/basic/pgen.go test/basic/pgen.go
test/basic/pgen_aaa.go test/basic/pgen_aaa.go
test/basic/db test/basic/db
test/tidy/db
test/tidy/pgen.go

View File

@ -12,9 +12,9 @@ import (
type Var_N persist.Var type Var_N persist.Var
func New(name string, x _T, ps ...bool) *Var_N { func New(name string, x _T, opt ...persist.Option) *Var_N {
gob.Register(x) gob.Register(x)
ptr := persist.New(name, x, ps...) ptr := persist.New(name, x, opt...)
ret := (*Var_N)(unsafe.Pointer(ptr)) ret := (*Var_N)(unsafe.Pointer(ptr))
return ret return ret
} }
@ -49,11 +49,6 @@ func (v *Var_N) Delete(t time.Time) {
ptr.Delete(t) ptr.Delete(t)
} }
func (v *Var_N) DeleteSync(t time.Time) {
ptr := (*persist.Var)(unsafe.Pointer(v))
ptr.Delete(t)
}
func (v *Var_N) History() ([]Var_N, error) { func (v *Var_N) History() ([]Var_N, error) {
ptr := (*persist.Var)(unsafe.Pointer(v)) ptr := (*persist.Var)(unsafe.Pointer(v))
h, err := ptr.History() h, err := ptr.History()

View File

@ -17,8 +17,6 @@ import (
bolt "github.com/coreos/bbolt" bolt "github.com/coreos/bbolt"
) )
var Debug bool
// allow persistant storage and reloading of variables to bolt database. // allow persistant storage and reloading of variables to bolt database.
// everything is time stamped. // everything is time stamped.
// bucket is the variable name // bucket is the variable name
@ -27,63 +25,42 @@ type Var struct {
X interface{} X interface{}
Time time.Time Time time.Time
name string name string
permanent bool opt Option
} }
type encVar struct { type encVar struct {
X interface{} X interface{}
} }
type Option func(*Config) type Option struct {
Permanent bool
}
type Config struct { type Config struct {
configured bool configured bool
MaxVars int MaxVars int
DBPath string DBPath string
Tidy TidyConfig
} }
func MaxVars(i int) Option { func Init(x ...Config) {
return func(c *Config) {
c.MaxVars = i
}
}
func DBPath(s string) Option {
return func(c *Config) {
c.DBPath = s
}
}
func TidyInterval(d time.Duration) Option {
return func(c *Config) {
c.Tidy.Interval = d
}
}
func Init(confs ...Option) {
mux.Lock() mux.Lock()
tc := TidyConfig { c := Config{MaxVars: 64, DBPath: "db"} // default config
Interval: 10 * time.Second, conf = c
if len(x) > 0 {
c = x[0]
} }
conf = Config { // default config if c.MaxVars != 0 {
configured: true, conf.MaxVars = c.MaxVars
MaxVars: 64, conf.configured = true
DBPath: "db",
Tidy: tc,
} }
if c.DBPath != "" {
for _,f := range(confs) { conf.DBPath = c.DBPath
f(&conf) conf.configured = true
}
if conf.Tidy.Func == nil {
conf.Tidy.Func = ExpireAfterFunc(24 * time.Hour)
} }
mux.Unlock() mux.Unlock()
} }
func New(name string, x interface{},ps ...bool) *Var { func New(name string, x interface{},opt ...Option) *Var {
start() start()
mux.Lock() mux.Lock()
if vars == nil { if vars == nil {
@ -94,8 +71,8 @@ func New(name string, x interface{},ps ...bool) *Var {
ret = &Var{X: x, name: name} ret = &Var{X: x, name: name}
vars[name] = ret vars[name] = ret
} }
if len(ps) > 1 { if len(opt) == 1 {
ret.permanent = ps[0] ret.opt = opt[0]
} }
mux.Unlock() mux.Unlock()
err := ret.Load() err := ret.Load()
@ -103,15 +80,13 @@ func New(name string, x interface{},ps ...bool) *Var {
fmt.Println("Load error:",err) fmt.Println("Load error:",err)
wchan <- encode(ret) wchan <- encode(ret)
} }
if Debug { fmt.Println("New(): ",ret.name) } fmt.Println("New(): ",ret.name)
mux.Lock() mux.Lock()
if loaded == nil { if loaded == nil {
loaded = make(chan *Var,conf.MaxVars) // buffered channel loaded = make(chan *Var,conf.MaxVars) // buffered channel
} }
mux.Unlock() mux.Unlock()
if Debug { fmt.Println("New(): loaded<- ret") }
loaded<- ret loaded<- ret
if Debug { fmt.Println("New(): returning") }
return ret return ret
} }
@ -159,7 +134,7 @@ func (p *Var) Load(ts ...time.Time) error {
p.Time = t p.Time = t
} }
} else { } else {
if Debug { fmt.Println("Load(",p.name,"): type =",reflect.TypeOf(p.X)) } fmt.Println("Load(",p.name,"): type =",reflect.TypeOf(p.X))
} }
return err return err
} }
@ -198,7 +173,7 @@ func Checkpoints() {
} }
func Undo() { func Undo() {
if Debug { fmt.Println("Undo()") } fmt.Println("Undo()")
mux.Lock() mux.Lock()
__checkpoints.DeleteSync(lastCheckpoint) __checkpoints.DeleteSync(lastCheckpoint)
h,_ := __checkpoints.History() h,_ := __checkpoints.History()
@ -299,15 +274,13 @@ func start() {
launched = true launched = true
if conf.configured == false { if conf.configured == false {
mux.Unlock()
Init() Init()
mux.Lock()
} }
var err error var err error
dbdir := path.Dir(conf.DBPath) dbdir := path.Dir(conf.DBPath)
if _, err = os.Stat(dbdir); os.IsNotExist(err) { if _, err = os.Stat(dbdir); os.IsNotExist(err) {
if Debug { log.Print("creating db dir") } log.Print("creating db dir")
err = os.MkdirAll(dbdir,0700) err = os.MkdirAll(dbdir,0700)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -331,13 +304,13 @@ func start() {
h, err := __checkpoints.History() h, err := __checkpoints.History()
if len(h) > 0 { if len(h) > 0 {
lastCheckpoint = h[len(h)-1].Time lastCheckpoint = h[len(h)-1].Time
if Debug { fmt.Println("lastCheckpoint = ",lastCheckpoint) } fmt.Println("lastCheckpoint = ",lastCheckpoint)
} }
mux.Unlock() mux.Unlock()
} }
func writer() { func writer() {
if Debug { fmt.Println("launching writer thread") } fmt.Println("launching writer thread")
runtime.UnlockOSThread() runtime.UnlockOSThread()
for p := range wchan { for p := range wchan {
wmux.Lock() wmux.Lock()
@ -348,7 +321,7 @@ func writer() {
if err != nil { if err != nil {
log.Fatal("Cannot create bucket for ",string(p.name)) log.Fatal("Cannot create bucket for ",string(p.name))
} }
if Debug { fmt.Println("writer(): created bucket for ",string(p.name)) } fmt.Println("writer(): created bucket for ",string(p.name))
b = tx.Bucket(p.name) b = tx.Bucket(p.name)
if b == nil { if b == nil {
log.Fatal("Bucket error") log.Fatal("Bucket error")
@ -365,7 +338,7 @@ func writer() {
case []byte: case []byte:
t := time.Now() t := time.Now()
if string(p.name) == "__checkpoints" { if string(p.name) == "__checkpoints" {
if Debug { fmt.Println("saving: ",string(p.name),t) } fmt.Println("saving: ",string(p.name),t)
lastCheckpoint = t lastCheckpoint = t
} }
err = b.Put(tencode(t),v) err = b.Put(tencode(t),v)
@ -383,9 +356,9 @@ func writer() {
close(p.donech) close(p.donech)
} }
} }
if Debug { fmt.Println("leaving writer thread") } fmt.Println("leaving writer thread")
db.Close() db.Close()
if Debug { fmt.Println("DB is closed") } fmt.Println("DB is closed")
wg.Done() wg.Done()
} }
@ -450,81 +423,46 @@ func retrieve(name string, ts ...time.Time) ([]byte, time.Time, error) {
return ret, t, err return ret, t, err
} }
type TidyFunc func(*bolt.Bucket) int
type TidyConfig struct {
Interval time.Duration
Func TidyFunc
}
func ExpireAfter(exp time.Duration) Option {
return func(c *Config) {
c.Tidy.Func = ExpireAfterFunc(exp)
}
}
func ExpireAfterFunc(exp time.Duration) TidyFunc {
if exp > 0 {
exp = -1 * exp
}
if Debug { fmt.Println("ExpireAfter(",exp,")") }
return func(b *bolt.Bucket) int {
stime := time.Now()
etime := stime.Add(exp)
i := 0
c := b.Cursor()
end,_ := c.Last() // always keep the most recent on
for k,_ := c.First(); time.Since(stime) < 10 * time.Millisecond; k,_ = c.Next() {
if tdecode(k).After(etime) {
return i // entry not expired
}
if reflect.DeepEqual(k,end) {
return i // always keep the most recent entry
}
b.Delete(k)
i++
}
return i
}
}
func tidyThread() { func tidyThread() {
if Debug { fmt.Println("tidyThread() starting") } fmt.Println("tidyThread() starting")
lchan := make(chan string) lchan := make(chan string)
go func() { go func() {
if Debug { fmt.Println("starting loaded reading thread") } fmt.Println("starting loaded reading thread")
if loaded == nil { if loaded == nil {
loaded = make(chan *Var,conf.MaxVars) // buffered channel loaded = make(chan *Var,conf.MaxVars) // buffered channel
} }
for { for {
for p := range loaded { for p := range loaded {
if p.permanent == false { if p.opt.Permanent == false {
lchan<- p.name lchan<- p.name
loaded<- p // put it back in the queue loaded<- p // put it back in the queue
} }
} }
if Debug { fmt.Println("closing lchan") } fmt.Println("closing lchan")
close(lchan) close(lchan)
} }
}() }()
tick := time.NewTicker(conf.Tidy.Interval) tick := time.NewTicker(time.Second * 10)
for { select { for { select {
case <-tchan: case <-tchan:
if Debug { fmt.Println("closing loaded channel") } fmt.Println("closing loaded channel")
close(loaded) close(loaded)
tg.Done() tg.Done()
if Debug { fmt.Println("tidy(): returning") } fmt.Println("tidy(): returning")
return return
case <-tick.C: case <-tick.C:
name := <-lchan name := <-lchan
if Debug { fmt.Println("tidyThread(): tidying ", name) } fmt.Println("tidyThread(): tidying ", name)
Tidy(name) Tidy(name)
} } } }
} }
const expiration time.Duration = -24 * time.Hour
// discard entries if they are too old. // discard entries if they are too old.
func Tidy(name string) { func Tidy(name string) {
stime := time.Now() stime := time.Now()
etime := stime.Add(expiration)
mux.Lock() mux.Lock()
i := 0 i := 0
db.Update(func(tx *bolt.Tx) error { db.Update(func(tx *bolt.Tx) error {
@ -533,13 +471,25 @@ func Tidy(name string) {
fmt.Println("Can't open bucket for ",name) fmt.Println("Can't open bucket for ",name)
return nil return nil
} }
i = conf.Tidy.Func(b) c := b.Cursor()
end,_ := c.Last() // always keep the most recent on
for time.Since(stime) < 10 * time.Millisecond {
k,_ := c.First()
if tdecode(k).After(etime) {
return nil // entry not expired
}
if reflect.DeepEqual(k,end) {
return nil // always keep the most recent entry
}
b.Delete(k)
i++
}
return nil return nil
}) })
mux.Unlock() mux.Unlock()
if i > 0 { if i > 0 {
if Debug { fmt.Println("tidy(): deleted ",i," entries in ",time.Since(stime)) } fmt.Println("tidy(): deleted ",i," entries in ",time.Since(stime))
} }
if Debug { fmt.Println("tidy(): done tidying",name) } fmt.Println("tidy(): done tidying",name)
} }

View File

@ -1,28 +0,0 @@
package main
import (
"fmt"
"time"
"gitlab.wow.st/gmp/persist"
)
func main() {
persist.Init(
persist.TidyInterval(2 * time.Second),
persist.ExpireAfter(-5 * time.Second),
)
x := persistInt("x",0)
persist.Commit()
x.Set(2)
persist.Commit()
x.Set(3)
persist.Commit()
fmt.Println(x.History())
time.Sleep(time.Second * 3)
fmt.Println(x.History())
time.Sleep(time.Second * 3)
fmt.Println(x.History())
persist.Shutdown()
}