Compare commits
5 Commits
9653dab9f4
...
814c6ffa20
Author | SHA1 | Date | |
---|---|---|---|
814c6ffa20 | |||
20c89c6e56 | |||
24d96151f4 | |||
17beaca380 | |||
d6f4920507 |
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -2,3 +2,5 @@ test/basic/basic
|
||||||
test/basic/pgen.go
|
test/basic/pgen.go
|
||||||
test/basic/pgen_aaa.go
|
test/basic/pgen_aaa.go
|
||||||
test/basic/db
|
test/basic/db
|
||||||
|
test/tidy/db
|
||||||
|
test/tidy/pgen.go
|
||||||
|
|
|
@ -12,9 +12,9 @@ import (
|
||||||
|
|
||||||
type Var_N persist.Var
|
type Var_N persist.Var
|
||||||
|
|
||||||
func New(name string, x _T, opt ...persist.Option) *Var_N {
|
func New(name string, x _T, ps ...bool) *Var_N {
|
||||||
gob.Register(x)
|
gob.Register(x)
|
||||||
ptr := persist.New(name, x, opt...)
|
ptr := persist.New(name, x, ps...)
|
||||||
ret := (*Var_N)(unsafe.Pointer(ptr))
|
ret := (*Var_N)(unsafe.Pointer(ptr))
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
@ -49,6 +49,11 @@ func (v *Var_N) Delete(t time.Time) {
|
||||||
ptr.Delete(t)
|
ptr.Delete(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *Var_N) DeleteSync(t time.Time) {
|
||||||
|
ptr := (*persist.Var)(unsafe.Pointer(v))
|
||||||
|
ptr.Delete(t)
|
||||||
|
}
|
||||||
|
|
||||||
func (v *Var_N) History() ([]Var_N, error) {
|
func (v *Var_N) History() ([]Var_N, error) {
|
||||||
ptr := (*persist.Var)(unsafe.Pointer(v))
|
ptr := (*persist.Var)(unsafe.Pointer(v))
|
||||||
h, err := ptr.History()
|
h, err := ptr.History()
|
||||||
|
|
158
persist.go
158
persist.go
|
@ -17,6 +17,8 @@ import (
|
||||||
bolt "github.com/coreos/bbolt"
|
bolt "github.com/coreos/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var Debug bool
|
||||||
|
|
||||||
// allow persistant storage and reloading of variables to bolt database.
|
// allow persistant storage and reloading of variables to bolt database.
|
||||||
// everything is time stamped.
|
// everything is time stamped.
|
||||||
// bucket is the variable name
|
// bucket is the variable name
|
||||||
|
@ -25,42 +27,63 @@ type Var struct {
|
||||||
X interface{}
|
X interface{}
|
||||||
Time time.Time
|
Time time.Time
|
||||||
name string
|
name string
|
||||||
opt Option
|
permanent bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type encVar struct {
|
type encVar struct {
|
||||||
X interface{}
|
X interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option struct {
|
type Option func(*Config)
|
||||||
Permanent bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
configured bool
|
configured bool
|
||||||
MaxVars int
|
MaxVars int
|
||||||
DBPath string
|
DBPath string
|
||||||
|
Tidy TidyConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func Init(x ...Config) {
|
func MaxVars(i int) Option {
|
||||||
|
return func(c *Config) {
|
||||||
|
c.MaxVars = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func DBPath(s string) Option {
|
||||||
|
return func(c *Config) {
|
||||||
|
c.DBPath = s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TidyInterval(d time.Duration) Option {
|
||||||
|
return func(c *Config) {
|
||||||
|
c.Tidy.Interval = d
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Init(confs ...Option) {
|
||||||
mux.Lock()
|
mux.Lock()
|
||||||
c := Config{MaxVars: 64, DBPath: "db"} // default config
|
tc := TidyConfig {
|
||||||
conf = c
|
Interval: 10 * time.Second,
|
||||||
if len(x) > 0 {
|
|
||||||
c = x[0]
|
|
||||||
}
|
}
|
||||||
if c.MaxVars != 0 {
|
conf = Config { // default config
|
||||||
conf.MaxVars = c.MaxVars
|
configured: true,
|
||||||
conf.configured = true
|
MaxVars: 64,
|
||||||
|
DBPath: "db",
|
||||||
|
Tidy: tc,
|
||||||
}
|
}
|
||||||
if c.DBPath != "" {
|
|
||||||
conf.DBPath = c.DBPath
|
for _,f := range(confs) {
|
||||||
conf.configured = true
|
f(&conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf.Tidy.Func == nil {
|
||||||
|
conf.Tidy.Func = ExpireAfterFunc(24 * time.Hour)
|
||||||
}
|
}
|
||||||
mux.Unlock()
|
mux.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(name string, x interface{},opt ...Option) *Var {
|
func New(name string, x interface{},ps ...bool) *Var {
|
||||||
start()
|
start()
|
||||||
mux.Lock()
|
mux.Lock()
|
||||||
if vars == nil {
|
if vars == nil {
|
||||||
|
@ -71,8 +94,8 @@ func New(name string, x interface{},opt ...Option) *Var {
|
||||||
ret = &Var{X: x, name: name}
|
ret = &Var{X: x, name: name}
|
||||||
vars[name] = ret
|
vars[name] = ret
|
||||||
}
|
}
|
||||||
if len(opt) == 1 {
|
if len(ps) > 1 {
|
||||||
ret.opt = opt[0]
|
ret.permanent = ps[0]
|
||||||
}
|
}
|
||||||
mux.Unlock()
|
mux.Unlock()
|
||||||
err := ret.Load()
|
err := ret.Load()
|
||||||
|
@ -80,13 +103,15 @@ func New(name string, x interface{},opt ...Option) *Var {
|
||||||
fmt.Println("Load error:",err)
|
fmt.Println("Load error:",err)
|
||||||
wchan <- encode(ret)
|
wchan <- encode(ret)
|
||||||
}
|
}
|
||||||
fmt.Println("New(): ",ret.name)
|
if Debug { fmt.Println("New(): ",ret.name) }
|
||||||
mux.Lock()
|
mux.Lock()
|
||||||
if loaded == nil {
|
if loaded == nil {
|
||||||
loaded = make(chan *Var,conf.MaxVars) // buffered channel
|
loaded = make(chan *Var,conf.MaxVars) // buffered channel
|
||||||
}
|
}
|
||||||
mux.Unlock()
|
mux.Unlock()
|
||||||
|
if Debug { fmt.Println("New(): loaded<- ret") }
|
||||||
loaded<- ret
|
loaded<- ret
|
||||||
|
if Debug { fmt.Println("New(): returning") }
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,7 +159,7 @@ func (p *Var) Load(ts ...time.Time) error {
|
||||||
p.Time = t
|
p.Time = t
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fmt.Println("Load(",p.name,"): type =",reflect.TypeOf(p.X))
|
if Debug { fmt.Println("Load(",p.name,"): type =",reflect.TypeOf(p.X)) }
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -173,7 +198,7 @@ func Checkpoints() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func Undo() {
|
func Undo() {
|
||||||
fmt.Println("Undo()")
|
if Debug { fmt.Println("Undo()") }
|
||||||
mux.Lock()
|
mux.Lock()
|
||||||
__checkpoints.DeleteSync(lastCheckpoint)
|
__checkpoints.DeleteSync(lastCheckpoint)
|
||||||
h,_ := __checkpoints.History()
|
h,_ := __checkpoints.History()
|
||||||
|
@ -274,13 +299,15 @@ func start() {
|
||||||
launched = true
|
launched = true
|
||||||
|
|
||||||
if conf.configured == false {
|
if conf.configured == false {
|
||||||
|
mux.Unlock()
|
||||||
Init()
|
Init()
|
||||||
|
mux.Lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
dbdir := path.Dir(conf.DBPath)
|
dbdir := path.Dir(conf.DBPath)
|
||||||
if _, err = os.Stat(dbdir); os.IsNotExist(err) {
|
if _, err = os.Stat(dbdir); os.IsNotExist(err) {
|
||||||
log.Print("creating db dir")
|
if Debug { log.Print("creating db dir") }
|
||||||
err = os.MkdirAll(dbdir,0700)
|
err = os.MkdirAll(dbdir,0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -304,13 +331,13 @@ func start() {
|
||||||
h, err := __checkpoints.History()
|
h, err := __checkpoints.History()
|
||||||
if len(h) > 0 {
|
if len(h) > 0 {
|
||||||
lastCheckpoint = h[len(h)-1].Time
|
lastCheckpoint = h[len(h)-1].Time
|
||||||
fmt.Println("lastCheckpoint = ",lastCheckpoint)
|
if Debug { fmt.Println("lastCheckpoint = ",lastCheckpoint) }
|
||||||
}
|
}
|
||||||
mux.Unlock()
|
mux.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func writer() {
|
func writer() {
|
||||||
fmt.Println("launching writer thread")
|
if Debug { fmt.Println("launching writer thread") }
|
||||||
runtime.UnlockOSThread()
|
runtime.UnlockOSThread()
|
||||||
for p := range wchan {
|
for p := range wchan {
|
||||||
wmux.Lock()
|
wmux.Lock()
|
||||||
|
@ -321,7 +348,7 @@ func writer() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Cannot create bucket for ",string(p.name))
|
log.Fatal("Cannot create bucket for ",string(p.name))
|
||||||
}
|
}
|
||||||
fmt.Println("writer(): created bucket for ",string(p.name))
|
if Debug { fmt.Println("writer(): created bucket for ",string(p.name)) }
|
||||||
b = tx.Bucket(p.name)
|
b = tx.Bucket(p.name)
|
||||||
if b == nil {
|
if b == nil {
|
||||||
log.Fatal("Bucket error")
|
log.Fatal("Bucket error")
|
||||||
|
@ -338,7 +365,7 @@ func writer() {
|
||||||
case []byte:
|
case []byte:
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
if string(p.name) == "__checkpoints" {
|
if string(p.name) == "__checkpoints" {
|
||||||
fmt.Println("saving: ",string(p.name),t)
|
if Debug { fmt.Println("saving: ",string(p.name),t) }
|
||||||
lastCheckpoint = t
|
lastCheckpoint = t
|
||||||
}
|
}
|
||||||
err = b.Put(tencode(t),v)
|
err = b.Put(tencode(t),v)
|
||||||
|
@ -356,9 +383,9 @@ func writer() {
|
||||||
close(p.donech)
|
close(p.donech)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fmt.Println("leaving writer thread")
|
if Debug { fmt.Println("leaving writer thread") }
|
||||||
db.Close()
|
db.Close()
|
||||||
fmt.Println("DB is closed")
|
if Debug { fmt.Println("DB is closed") }
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -423,46 +450,81 @@ func retrieve(name string, ts ...time.Time) ([]byte, time.Time, error) {
|
||||||
return ret, t, err
|
return ret, t, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TidyFunc func(*bolt.Bucket) int
|
||||||
|
|
||||||
|
type TidyConfig struct {
|
||||||
|
Interval time.Duration
|
||||||
|
Func TidyFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExpireAfter(exp time.Duration) Option {
|
||||||
|
return func(c *Config) {
|
||||||
|
c.Tidy.Func = ExpireAfterFunc(exp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExpireAfterFunc(exp time.Duration) TidyFunc {
|
||||||
|
if exp > 0 {
|
||||||
|
exp = -1 * exp
|
||||||
|
}
|
||||||
|
if Debug { fmt.Println("ExpireAfter(",exp,")") }
|
||||||
|
return func(b *bolt.Bucket) int {
|
||||||
|
stime := time.Now()
|
||||||
|
etime := stime.Add(exp)
|
||||||
|
i := 0
|
||||||
|
c := b.Cursor()
|
||||||
|
end,_ := c.Last() // always keep the most recent on
|
||||||
|
for k,_ := c.First(); time.Since(stime) < 10 * time.Millisecond; k,_ = c.Next() {
|
||||||
|
if tdecode(k).After(etime) {
|
||||||
|
return i // entry not expired
|
||||||
|
}
|
||||||
|
if reflect.DeepEqual(k,end) {
|
||||||
|
return i // always keep the most recent entry
|
||||||
|
}
|
||||||
|
b.Delete(k)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func tidyThread() {
|
func tidyThread() {
|
||||||
fmt.Println("tidyThread() starting")
|
if Debug { fmt.Println("tidyThread() starting") }
|
||||||
lchan := make(chan string)
|
lchan := make(chan string)
|
||||||
go func() {
|
go func() {
|
||||||
fmt.Println("starting loaded reading thread")
|
if Debug { fmt.Println("starting loaded reading thread") }
|
||||||
if loaded == nil {
|
if loaded == nil {
|
||||||
loaded = make(chan *Var,conf.MaxVars) // buffered channel
|
loaded = make(chan *Var,conf.MaxVars) // buffered channel
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
for p := range loaded {
|
for p := range loaded {
|
||||||
if p.opt.Permanent == false {
|
if p.permanent == false {
|
||||||
lchan<- p.name
|
lchan<- p.name
|
||||||
loaded<- p // put it back in the queue
|
loaded<- p // put it back in the queue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fmt.Println("closing lchan")
|
if Debug { fmt.Println("closing lchan") }
|
||||||
close(lchan)
|
close(lchan)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
tick := time.NewTicker(time.Second * 10)
|
tick := time.NewTicker(conf.Tidy.Interval)
|
||||||
for { select {
|
for { select {
|
||||||
case <-tchan:
|
case <-tchan:
|
||||||
fmt.Println("closing loaded channel")
|
if Debug { fmt.Println("closing loaded channel") }
|
||||||
close(loaded)
|
close(loaded)
|
||||||
tg.Done()
|
tg.Done()
|
||||||
fmt.Println("tidy(): returning")
|
if Debug { fmt.Println("tidy(): returning") }
|
||||||
return
|
return
|
||||||
case <-tick.C:
|
case <-tick.C:
|
||||||
name := <-lchan
|
name := <-lchan
|
||||||
fmt.Println("tidyThread(): tidying ", name)
|
if Debug { fmt.Println("tidyThread(): tidying ", name) }
|
||||||
Tidy(name)
|
Tidy(name)
|
||||||
} }
|
} }
|
||||||
}
|
}
|
||||||
|
|
||||||
const expiration time.Duration = -24 * time.Hour
|
|
||||||
|
|
||||||
// discard entries if they are too old.
|
// discard entries if they are too old.
|
||||||
func Tidy(name string) {
|
func Tidy(name string) {
|
||||||
stime := time.Now()
|
stime := time.Now()
|
||||||
etime := stime.Add(expiration)
|
|
||||||
mux.Lock()
|
mux.Lock()
|
||||||
i := 0
|
i := 0
|
||||||
db.Update(func(tx *bolt.Tx) error {
|
db.Update(func(tx *bolt.Tx) error {
|
||||||
|
@ -471,25 +533,13 @@ func Tidy(name string) {
|
||||||
fmt.Println("Can't open bucket for ",name)
|
fmt.Println("Can't open bucket for ",name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
c := b.Cursor()
|
i = conf.Tidy.Func(b)
|
||||||
end,_ := c.Last() // always keep the most recent on
|
|
||||||
for time.Since(stime) < 10 * time.Millisecond {
|
|
||||||
k,_ := c.First()
|
|
||||||
if tdecode(k).After(etime) {
|
|
||||||
return nil // entry not expired
|
|
||||||
}
|
|
||||||
if reflect.DeepEqual(k,end) {
|
|
||||||
return nil // always keep the most recent entry
|
|
||||||
}
|
|
||||||
b.Delete(k)
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
mux.Unlock()
|
mux.Unlock()
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
fmt.Println("tidy(): deleted ",i," entries in ",time.Since(stime))
|
if Debug { fmt.Println("tidy(): deleted ",i," entries in ",time.Since(stime)) }
|
||||||
}
|
}
|
||||||
fmt.Println("tidy(): done tidying",name)
|
if Debug { fmt.Println("tidy(): done tidying",name) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
28
test/tidy/main.go
Normal file
28
test/tidy/main.go
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
"gitlab.wow.st/gmp/persist"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
persist.Init(
|
||||||
|
persist.TidyInterval(2 * time.Second),
|
||||||
|
persist.ExpireAfter(-5 * time.Second),
|
||||||
|
)
|
||||||
|
|
||||||
|
x := persistInt("x",0)
|
||||||
|
persist.Commit()
|
||||||
|
x.Set(2)
|
||||||
|
persist.Commit()
|
||||||
|
x.Set(3)
|
||||||
|
persist.Commit()
|
||||||
|
fmt.Println(x.History())
|
||||||
|
time.Sleep(time.Second * 3)
|
||||||
|
fmt.Println(x.History())
|
||||||
|
time.Sleep(time.Second * 3)
|
||||||
|
fmt.Println(x.History())
|
||||||
|
persist.Shutdown()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user