Compare commits

...

5 Commits

Author SHA1 Message Date
Greg 8b02e05f2d Save and restore download queue status. Remember when downloading is
complete for a podcast.
2019-03-18 13:23:33 -04:00
gmp 92374e120f update .gitignore 2019-03-15 14:58:03 -04:00
Greg 661f42516f Slow down automatic wake-up of Queueupdater and Downloader. 2019-03-15 13:51:44 -04:00
gmp ac96eaab6d Change lookback to 30 days. 2019-03-15 12:44:57 -04:00
Greg e609afeb02 Restructuring update/download threads. 2019-03-15 12:41:32 -04:00
2 changed files with 123 additions and 62 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
rssd rssd
log log
nohup.out

184
main.go
View File

@ -1,7 +1,10 @@
package main package main
import ( import (
"bufio"
"bytes"
"fmt" "fmt"
"io"
"log" "log"
"net/url" "net/url"
"os" "os"
@ -18,7 +21,7 @@ import (
) )
var ( var (
confFile,dataFile,dstDir string confFile,dataFile,queueFile,dstDir string
) )
func init() { func init() {
@ -30,6 +33,7 @@ func init() {
confDir := path.Join(homeDir,".config","rssd") confDir := path.Join(homeDir,".config","rssd")
confFile = path.Join(confDir,"rssd.conf") confFile = path.Join(confDir,"rssd.conf")
dataFile = path.Join(confDir,"podcasts.conf") dataFile = path.Join(confDir,"podcasts.conf")
queueFile = path.Join(confDir,"queue.conf")
} }
type Config struct { type Config struct {
@ -189,7 +193,9 @@ func readFeed(u string,sel Selector) *Podcast {
type dlItem struct { type dlItem struct {
Item *Item Item *Item
downloading bool Filename string
Downloading bool
Complete bool
} }
type ByDate []*dlItem type ByDate []*dlItem
@ -198,19 +204,20 @@ func (a ByDate) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByDate) Less(i, j int) bool { return a[j].Item.Published.After(a[i].Item.Published) } func (a ByDate) Less(i, j int) bool { return a[j].Item.Published.After(a[i].Item.Published) }
type dlQueue struct { type dlQueue struct {
items []*dlItem Items []*dlItem
reqch chan *grab.Request reqch chan *grab.Request
respch chan *grab.Response respch chan *grab.Response
sync.Mutex sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
wake chan struct{}
} }
func (q *dlQueue) Sort() { func (q *dlQueue) Sort() {
sort.Sort(ByDate(q.items)) sort.Sort(ByDate(q.Items))
} }
func (q *dlQueue) Find(x *Item) (int, bool) { func (q *dlQueue) Find(x *Item) (int, bool) {
for i,y := range q.items { for i,y := range q.Items {
if y.Item.Title == x.Title { if y.Item.Title == x.Title {
return i, true return i, true
} }
@ -218,17 +225,27 @@ func (q *dlQueue) Find(x *Item) (int, bool) {
return 0, false return 0, false
} }
func (q *dlQueue) FindFilename(x string) (int, bool) {
for i,y := range q.Items {
if y.Filename == x {
return i, true
}
}
return 0, false
}
func (q *dlQueue) Waiting() *dlQueue { func (q *dlQueue) Waiting() *dlQueue {
ret := &dlQueue{ ret := &dlQueue{
items: make([]*dlItem,0), Items: make([]*dlItem,0),
reqch: q.reqch, reqch: q.reqch,
respch: q.respch, respch: q.respch,
Mutex: q.Mutex, Mutex: q.Mutex,
wg: q.wg, wg: q.wg,
wake: q.wake,
} }
for _,i := range q.items { for _,i := range q.Items {
if i.downloading == false { if i.Downloading == false {
ret.items = append(ret.items,i) ret.Items = append(ret.Items,i)
} }
} }
return ret return ret
@ -240,7 +257,44 @@ func (q *dlQueue) Add(i *Item) {
} }
di := &dlItem{Item: &Item{} } di := &dlItem{Item: &Item{} }
*di.Item = *i *di.Item = *i
q.items = append(q.items,di) di.Filename = path.Join(dstDir,i.Filename) + path.Ext(i.Url)
q.Items = append(q.Items,di)
}
func NewQueue() *dlQueue {
ret := &dlQueue{
Items: make([]*dlItem,0),
reqch: make(chan *grab.Request),
respch: make(chan *grab.Response),
wake: make(chan struct{}),
}
return ret
}
func (q *dlQueue) Load() {
q.Lock()
if _, err := toml.DecodeFile(queueFile, q); err != nil {
log.Print("Cannot read queue status file:",err)
}
q.Unlock()
}
func (q *dlQueue) Save() {
of,err := os.Create(queueFile)
if err != nil {
log.Print("dlQueue.Save(): Cannot open output file")
} else {
var buf bytes.Buffer
w := bufio.NewWriter(&buf)
enc := toml.NewEncoder(w)
log.Printf("dlQueue.Save(): encoding %d entries\n",len(q.Items))
q.Lock() // do not lock around IO
enc.Encode(q)
q.Unlock()
io.Copy(of,&buf)
of.Close()
log.Print("dlQueue.Save(): done")
}
} }
type Daemon struct { type Daemon struct {
@ -254,17 +308,19 @@ type Daemon struct {
} }
func NewDaemon(conf Config, pl *pcList) *Daemon { func NewDaemon(conf Config, pl *pcList) *Daemon {
return &Daemon{ ret := &Daemon{
conf: conf, conf: conf,
g: grab.NewClient(), g: grab.NewClient(),
pl: pl, pl: pl,
queue: &dlQueue{}, queue: NewQueue(),
dlwake: make(chan struct{}), dlwake: make(chan struct{}),
} }
ret.queue.Load()
return ret
} }
func (d *Daemon) Update(urls []string) { func (d *Daemon) Update(urls []string) {
sel := daysAgo(60) sel := daysAgo(30)
for _,url := range urls { for _,url := range urls {
log.Print(" -> ",url) log.Print(" -> ",url)
f := readFeed(url,sel) // do not lock around IO f := readFeed(url,sel) // do not lock around IO
@ -282,7 +338,7 @@ func (d *Daemon) Monitor() {
100*resp.Progress()) 100*resp.Progress())
} }
mon := func(resp *grab.Response) { mon := func(resp *grab.Response) {
t := time.NewTicker(1 * time.Second) t := time.NewTicker(5 * time.Second)
defer t.Stop() defer t.Stop()
Loop: Loop:
for { for {
@ -291,6 +347,15 @@ func (d *Daemon) Monitor() {
status(resp) status(resp)
case <-resp.Done: case <-resp.Done:
status(resp) status(resp)
d.queue.Lock()
if i, ok := d.queue.FindFilename(resp.Filename); ok {
d.queue.Items[i].Complete = true
d.queue.Unlock()
d.queue.Save()
} else {
d.queue.Unlock()
log.Printf("Error finding queue entry for %s\n",resp.Filename)
}
break Loop break Loop
} }
} }
@ -298,15 +363,12 @@ func (d *Daemon) Monitor() {
log.Printf("Download failed for %s (%s)\n",resp.Filename,err) log.Printf("Download failed for %s (%s)\n",resp.Filename,err)
} }
} }
for { for r := range d.queue.respch {
mon(<-d.queue.respch) go mon(r)
} }
} }
func (d *Daemon) Downloader() { func (d *Daemon) StartDownloader() {
log.Print("Downloader(): starting")
d.queue.reqch = make(chan *grab.Request)
d.queue.respch = make(chan *grab.Response)
log.Print("Downloader(): spawning workers") log.Print("Downloader(): spawning workers")
for i := 0; i < d.workers; i++ { for i := 0; i < d.workers; i++ {
d.queue.wg.Add(1) d.queue.wg.Add(1)
@ -317,67 +379,66 @@ func (d *Daemon) Downloader() {
} }
log.Print("Downloader(): starting monitor") log.Print("Downloader(): starting monitor")
go d.Monitor() go d.Monitor()
go d.QueueUpdater()
go d.queue.Downloader()
}
if d.queue.items == nil { func (d *Daemon) QueueUpdater() {
t := time.NewTicker(30 * time.Minute)
defer t.Stop()
for {
// lock Podcast list and update download queue
log.Print("QueueUpdater(): Updating download queue")
d.queue.Lock() d.queue.Lock()
d.queue.items = make([]*dlItem,0) for _,p := range d.pl.Podcasts {
d.queue.Unlock() for _,i := range p.Items {
} d.queue.Add(&i)
upd := make(chan struct{})
go func() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for {
// lock Podcast list and update download queue
log.Print("Downloader(): Updating podcast list")
d.queue.Lock()
for _,p := range d.pl.Podcasts {
for _,i := range p.Items {
d.queue.Add(&i)
}
}
log.Print("Downloader(): Done updating podcast list")
d.queue.Unlock()
upd<- struct{}{}
select {
case <-t.C:
continue
case <-d.dlwake:
continue
} }
} }
}() log.Print("QueueUpdater(): Done updating download queue")
d.queue.Unlock()
d.queue.Save()
d.queue.wake<- struct{}{}
log.Print("QueueUpdater(): Sleeping")
select {
case <-t.C:
continue
case <-d.dlwake:
continue
}
}
}
func (q *dlQueue) Downloader() {
t := time.NewTicker(30 * time.Minute)
defer t.Stop()
LOOP: LOOP:
for { for {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
// launch requests for files we are not yet downloading // launch requests for files we are not yet downloading
d.queue.Lock() q.Lock()
waiting := d.queue.Waiting() waiting := q.Waiting()
d.queue.Unlock() q.Unlock()
log.Print("Download queue length: ",len(waiting.items)) log.Print("Download queue length: ",len(waiting.Items))
for _,i := range waiting.items { for _,i := range waiting.Items {
if !i.downloading { if !i.Downloading && !i.Complete {
dst := path.Join(dstDir,i.Item.Filename) + path.Ext(i.Item.Url) req,err := grab.NewRequest(i.Filename,i.Item.Url)
req,err := grab.NewRequest(dst,i.Item.Url)
if err != nil { if err != nil {
log.Print("Request error: ",err) log.Print("Request error: ",err)
continue continue
} }
i.downloading = true i.Downloading = true
t := time.Now() t := time.Now()
d.queue.reqch <- req q.reqch <- req
if time.Now().After(t.Add(5 * time.Second)) { if time.Now().After(t.Add(5 * time.Second)) {
continue LOOP // refresh list continue LOOP // refresh list
} }
} }
} }
log.Print("Downloader(): Done launching downloads")
log.Print("Downloader(): sleeping") log.Print("Downloader(): sleeping")
select { select {
case <-t.C: case <-t.C:
continue continue
case <-upd: case <-q.wake:
continue continue
} }
} }
@ -385,7 +446,7 @@ func (d *Daemon) Downloader() {
func (d *Daemon) Updater() { func (d *Daemon) Updater() {
log.Print("Updater(): starting") log.Print("Updater(): starting")
go d.Downloader() d.StartDownloader()
for { for {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
d.Update(d.conf.Urls) d.Update(d.conf.Urls)
@ -399,7 +460,6 @@ func (d *Daemon) Updater() {
enc.Encode(d.pl) enc.Encode(d.pl)
d.Unlock() d.Unlock()
of.Close() of.Close()
log.Print("Updater(): done writing")
} }
d.dlwake <-struct{}{} d.dlwake <-struct{}{}
time.Sleep(30 * time.Minute) time.Sleep(30 * time.Minute)