Compare commits

...

5 Commits

Author SHA1 Message Date
Greg 814c6ffa20 Add Debug variable to control verbose output. 2019-02-07 10:18:53 -05:00
Greg 20c89c6e56 Functional options to persist.Init(). 2018-08-30 09:23:21 -04:00
Greg 24d96151f4 Test changes. 2018-08-10 08:19:32 -04:00
Greg 17beaca380 Allow configuration of tidy functions. Use a closure to decide what entries
to delete. The closure takes a *bolt.Bucket and returns an integer of the
number of entries deleted.
2018-08-09 18:21:52 -04:00
Greg d6f4920507 Add DeleteSync() to generate/template.go. 2018-08-09 16:15:00 -04:00
4 changed files with 141 additions and 56 deletions

2
.gitignore vendored
View File

@ -2,3 +2,5 @@ test/basic/basic
test/basic/pgen.go
test/basic/pgen_aaa.go
test/basic/db
test/tidy/db
test/tidy/pgen.go

View File

@ -12,9 +12,9 @@ import (
type Var_N persist.Var
func New(name string, x _T, opt ...persist.Option) *Var_N {
func New(name string, x _T, ps ...bool) *Var_N {
gob.Register(x)
ptr := persist.New(name, x, opt...)
ptr := persist.New(name, x, ps...)
ret := (*Var_N)(unsafe.Pointer(ptr))
return ret
}
@ -49,6 +49,11 @@ func (v *Var_N) Delete(t time.Time) {
ptr.Delete(t)
}
func (v *Var_N) DeleteSync(t time.Time) {
ptr := (*persist.Var)(unsafe.Pointer(v))
ptr.Delete(t)
}
func (v *Var_N) History() ([]Var_N, error) {
ptr := (*persist.Var)(unsafe.Pointer(v))
h, err := ptr.History()

View File

@ -17,6 +17,8 @@ import (
bolt "github.com/coreos/bbolt"
)
var Debug bool
// allow persistant storage and reloading of variables to bolt database.
// everything is time stamped.
// bucket is the variable name
@ -25,42 +27,63 @@ type Var struct {
X interface{}
Time time.Time
name string
opt Option
permanent bool
}
type encVar struct {
X interface{}
}
type Option struct {
Permanent bool
}
type Option func(*Config)
type Config struct {
configured bool
MaxVars int
DBPath string
Tidy TidyConfig
}
func Init(x ...Config) {
func MaxVars(i int) Option {
return func(c *Config) {
c.MaxVars = i
}
}
func DBPath(s string) Option {
return func(c *Config) {
c.DBPath = s
}
}
func TidyInterval(d time.Duration) Option {
return func(c *Config) {
c.Tidy.Interval = d
}
}
func Init(confs ...Option) {
mux.Lock()
c := Config{MaxVars: 64, DBPath: "db"} // default config
conf = c
if len(x) > 0 {
c = x[0]
tc := TidyConfig {
Interval: 10 * time.Second,
}
if c.MaxVars != 0 {
conf.MaxVars = c.MaxVars
conf.configured = true
conf = Config { // default config
configured: true,
MaxVars: 64,
DBPath: "db",
Tidy: tc,
}
if c.DBPath != "" {
conf.DBPath = c.DBPath
conf.configured = true
for _,f := range(confs) {
f(&conf)
}
if conf.Tidy.Func == nil {
conf.Tidy.Func = ExpireAfterFunc(24 * time.Hour)
}
mux.Unlock()
}
func New(name string, x interface{},opt ...Option) *Var {
func New(name string, x interface{},ps ...bool) *Var {
start()
mux.Lock()
if vars == nil {
@ -71,8 +94,8 @@ func New(name string, x interface{},opt ...Option) *Var {
ret = &Var{X: x, name: name}
vars[name] = ret
}
if len(opt) == 1 {
ret.opt = opt[0]
if len(ps) > 1 {
ret.permanent = ps[0]
}
mux.Unlock()
err := ret.Load()
@ -80,13 +103,15 @@ func New(name string, x interface{},opt ...Option) *Var {
fmt.Println("Load error:",err)
wchan <- encode(ret)
}
fmt.Println("New(): ",ret.name)
if Debug { fmt.Println("New(): ",ret.name) }
mux.Lock()
if loaded == nil {
loaded = make(chan *Var,conf.MaxVars) // buffered channel
}
mux.Unlock()
if Debug { fmt.Println("New(): loaded<- ret") }
loaded<- ret
if Debug { fmt.Println("New(): returning") }
return ret
}
@ -134,7 +159,7 @@ func (p *Var) Load(ts ...time.Time) error {
p.Time = t
}
} else {
fmt.Println("Load(",p.name,"): type =",reflect.TypeOf(p.X))
if Debug { fmt.Println("Load(",p.name,"): type =",reflect.TypeOf(p.X)) }
}
return err
}
@ -173,7 +198,7 @@ func Checkpoints() {
}
func Undo() {
fmt.Println("Undo()")
if Debug { fmt.Println("Undo()") }
mux.Lock()
__checkpoints.DeleteSync(lastCheckpoint)
h,_ := __checkpoints.History()
@ -274,13 +299,15 @@ func start() {
launched = true
if conf.configured == false {
mux.Unlock()
Init()
mux.Lock()
}
var err error
dbdir := path.Dir(conf.DBPath)
if _, err = os.Stat(dbdir); os.IsNotExist(err) {
log.Print("creating db dir")
if Debug { log.Print("creating db dir") }
err = os.MkdirAll(dbdir,0700)
if err != nil {
log.Fatal(err)
@ -304,13 +331,13 @@ func start() {
h, err := __checkpoints.History()
if len(h) > 0 {
lastCheckpoint = h[len(h)-1].Time
fmt.Println("lastCheckpoint = ",lastCheckpoint)
if Debug { fmt.Println("lastCheckpoint = ",lastCheckpoint) }
}
mux.Unlock()
}
func writer() {
fmt.Println("launching writer thread")
if Debug { fmt.Println("launching writer thread") }
runtime.UnlockOSThread()
for p := range wchan {
wmux.Lock()
@ -321,7 +348,7 @@ func writer() {
if err != nil {
log.Fatal("Cannot create bucket for ",string(p.name))
}
fmt.Println("writer(): created bucket for ",string(p.name))
if Debug { fmt.Println("writer(): created bucket for ",string(p.name)) }
b = tx.Bucket(p.name)
if b == nil {
log.Fatal("Bucket error")
@ -338,7 +365,7 @@ func writer() {
case []byte:
t := time.Now()
if string(p.name) == "__checkpoints" {
fmt.Println("saving: ",string(p.name),t)
if Debug { fmt.Println("saving: ",string(p.name),t) }
lastCheckpoint = t
}
err = b.Put(tencode(t),v)
@ -356,9 +383,9 @@ func writer() {
close(p.donech)
}
}
fmt.Println("leaving writer thread")
if Debug { fmt.Println("leaving writer thread") }
db.Close()
fmt.Println("DB is closed")
if Debug { fmt.Println("DB is closed") }
wg.Done()
}
@ -423,46 +450,81 @@ func retrieve(name string, ts ...time.Time) ([]byte, time.Time, error) {
return ret, t, err
}
type TidyFunc func(*bolt.Bucket) int
type TidyConfig struct {
Interval time.Duration
Func TidyFunc
}
func ExpireAfter(exp time.Duration) Option {
return func(c *Config) {
c.Tidy.Func = ExpireAfterFunc(exp)
}
}
func ExpireAfterFunc(exp time.Duration) TidyFunc {
if exp > 0 {
exp = -1 * exp
}
if Debug { fmt.Println("ExpireAfter(",exp,")") }
return func(b *bolt.Bucket) int {
stime := time.Now()
etime := stime.Add(exp)
i := 0
c := b.Cursor()
end,_ := c.Last() // always keep the most recent on
for k,_ := c.First(); time.Since(stime) < 10 * time.Millisecond; k,_ = c.Next() {
if tdecode(k).After(etime) {
return i // entry not expired
}
if reflect.DeepEqual(k,end) {
return i // always keep the most recent entry
}
b.Delete(k)
i++
}
return i
}
}
func tidyThread() {
fmt.Println("tidyThread() starting")
if Debug { fmt.Println("tidyThread() starting") }
lchan := make(chan string)
go func() {
fmt.Println("starting loaded reading thread")
if Debug { fmt.Println("starting loaded reading thread") }
if loaded == nil {
loaded = make(chan *Var,conf.MaxVars) // buffered channel
}
for {
for p := range loaded {
if p.opt.Permanent == false {
if p.permanent == false {
lchan<- p.name
loaded<- p // put it back in the queue
}
}
fmt.Println("closing lchan")
if Debug { fmt.Println("closing lchan") }
close(lchan)
}
}()
tick := time.NewTicker(time.Second * 10)
tick := time.NewTicker(conf.Tidy.Interval)
for { select {
case <-tchan:
fmt.Println("closing loaded channel")
if Debug { fmt.Println("closing loaded channel") }
close(loaded)
tg.Done()
fmt.Println("tidy(): returning")
if Debug { fmt.Println("tidy(): returning") }
return
case <-tick.C:
name := <-lchan
fmt.Println("tidyThread(): tidying ", name)
if Debug { fmt.Println("tidyThread(): tidying ", name) }
Tidy(name)
} }
}
const expiration time.Duration = -24 * time.Hour
// discard entries if they are too old.
func Tidy(name string) {
stime := time.Now()
etime := stime.Add(expiration)
mux.Lock()
i := 0
db.Update(func(tx *bolt.Tx) error {
@ -471,25 +533,13 @@ func Tidy(name string) {
fmt.Println("Can't open bucket for ",name)
return nil
}
c := b.Cursor()
end,_ := c.Last() // always keep the most recent on
for time.Since(stime) < 10 * time.Millisecond {
k,_ := c.First()
if tdecode(k).After(etime) {
return nil // entry not expired
}
if reflect.DeepEqual(k,end) {
return nil // always keep the most recent entry
}
b.Delete(k)
i++
}
i = conf.Tidy.Func(b)
return nil
})
mux.Unlock()
if i > 0 {
fmt.Println("tidy(): deleted ",i," entries in ",time.Since(stime))
if Debug { fmt.Println("tidy(): deleted ",i," entries in ",time.Since(stime)) }
}
fmt.Println("tidy(): done tidying",name)
if Debug { fmt.Println("tidy(): done tidying",name) }
}

28
test/tidy/main.go Normal file
View File

@ -0,0 +1,28 @@
package main
import (
"fmt"
"time"
"gitlab.wow.st/gmp/persist"
)
func main() {
persist.Init(
persist.TidyInterval(2 * time.Second),
persist.ExpireAfter(-5 * time.Second),
)
x := persistInt("x",0)
persist.Commit()
x.Set(2)
persist.Commit()
x.Set(3)
persist.Commit()
fmt.Println(x.History())
time.Sleep(time.Second * 3)
fmt.Println(x.History())
time.Sleep(time.Second * 3)
fmt.Println(x.History())
persist.Shutdown()
}