From 8b02e05f2d93db9d6eb17f08bc1c422d0ea90ae6 Mon Sep 17 00:00:00 2001 From: Greg Date: Mon, 18 Mar 2019 13:23:33 -0400 Subject: [PATCH] Save and restore download queue status. Remember when downloading is complete for a podcast. --- main.go | 112 +++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 83 insertions(+), 29 deletions(-) diff --git a/main.go b/main.go index dbfb931..83d6691 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,10 @@ package main import ( + "bufio" + "bytes" "fmt" + "io" "log" "net/url" "os" @@ -18,7 +21,7 @@ import ( ) var ( - confFile,dataFile,dstDir string + confFile,dataFile,queueFile,dstDir string ) func init() { @@ -30,6 +33,7 @@ func init() { confDir := path.Join(homeDir,".config","rssd") confFile = path.Join(confDir,"rssd.conf") dataFile = path.Join(confDir,"podcasts.conf") + queueFile = path.Join(confDir,"queue.conf") } type Config struct { @@ -189,7 +193,9 @@ func readFeed(u string,sel Selector) *Podcast { type dlItem struct { Item *Item - downloading bool + Filename string + Downloading bool + Complete bool } type ByDate []*dlItem @@ -198,7 +204,7 @@ func (a ByDate) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a ByDate) Less(i, j int) bool { return a[j].Item.Published.After(a[i].Item.Published) } type dlQueue struct { - items []*dlItem + Items []*dlItem reqch chan *grab.Request respch chan *grab.Response sync.Mutex @@ -207,11 +213,11 @@ type dlQueue struct { } func (q *dlQueue) Sort() { - sort.Sort(ByDate(q.items)) + sort.Sort(ByDate(q.Items)) } func (q *dlQueue) Find(x *Item) (int, bool) { - for i,y := range q.items { + for i,y := range q.Items { if y.Item.Title == x.Title { return i, true } @@ -219,18 +225,27 @@ func (q *dlQueue) Find(x *Item) (int, bool) { return 0, false } +func (q *dlQueue) FindFilename(x string) (int, bool) { + for i,y := range q.Items { + if y.Filename == x { + return i, true + } + } + return 0, false +} + func (q *dlQueue) Waiting() *dlQueue { ret := &dlQueue{ - items: make([]*dlItem,0), + Items: make([]*dlItem,0), reqch: q.reqch, respch: q.respch, Mutex: q.Mutex, wg: q.wg, wake: q.wake, } - for _,i := range q.items { - if i.downloading == false { - ret.items = append(ret.items,i) + for _,i := range q.Items { + if i.Downloading == false { + ret.Items = append(ret.Items,i) } } return ret @@ -242,7 +257,44 @@ func (q *dlQueue) Add(i *Item) { } di := &dlItem{Item: &Item{} } *di.Item = *i - q.items = append(q.items,di) + di.Filename = path.Join(dstDir,i.Filename) + path.Ext(i.Url) + q.Items = append(q.Items,di) +} + +func NewQueue() *dlQueue { + ret := &dlQueue{ + Items: make([]*dlItem,0), + reqch: make(chan *grab.Request), + respch: make(chan *grab.Response), + wake: make(chan struct{}), + } + return ret +} + +func (q *dlQueue) Load() { + q.Lock() + if _, err := toml.DecodeFile(queueFile, q); err != nil { + log.Print("Cannot read queue status file:",err) + } + q.Unlock() +} + +func (q *dlQueue) Save() { + of,err := os.Create(queueFile) + if err != nil { + log.Print("dlQueue.Save(): Cannot open output file") + } else { + var buf bytes.Buffer + w := bufio.NewWriter(&buf) + enc := toml.NewEncoder(w) + log.Printf("dlQueue.Save(): encoding %d entries\n",len(q.Items)) + q.Lock() // do not lock around IO + enc.Encode(q) + q.Unlock() + io.Copy(of,&buf) + of.Close() + log.Print("dlQueue.Save(): done") + } } type Daemon struct { @@ -256,13 +308,15 @@ type Daemon struct { } func NewDaemon(conf Config, pl *pcList) *Daemon { - return &Daemon{ + ret := &Daemon{ conf: conf, g: grab.NewClient(), pl: pl, - queue: &dlQueue{ wake: make(chan struct{}) }, + queue: NewQueue(), dlwake: make(chan struct{}), } + ret.queue.Load() + return ret } func (d *Daemon) Update(urls []string) { @@ -293,6 +347,15 @@ func (d *Daemon) Monitor() { status(resp) case <-resp.Done: status(resp) + d.queue.Lock() + if i, ok := d.queue.FindFilename(resp.Filename); ok { + d.queue.Items[i].Complete = true + d.queue.Unlock() + d.queue.Save() + } else { + d.queue.Unlock() + log.Printf("Error finding queue entry for %s\n",resp.Filename) + } break Loop } } @@ -300,15 +363,12 @@ func (d *Daemon) Monitor() { log.Printf("Download failed for %s (%s)\n",resp.Filename,err) } } - for { - mon(<-d.queue.respch) + for r := range d.queue.respch { + go mon(r) } } func (d *Daemon) StartDownloader() { - log.Print("Downloader(): starting") - d.queue.reqch = make(chan *grab.Request) - d.queue.respch = make(chan *grab.Response) log.Print("Downloader(): spawning workers") for i := 0; i < d.workers; i++ { d.queue.wg.Add(1) @@ -319,12 +379,6 @@ func (d *Daemon) StartDownloader() { } log.Print("Downloader(): starting monitor") go d.Monitor() - - if d.queue.items == nil { - d.queue.Lock() - d.queue.items = make([]*dlItem,0) - d.queue.Unlock() - } go d.QueueUpdater() go d.queue.Downloader() } @@ -343,6 +397,7 @@ func (d *Daemon) QueueUpdater() { } log.Print("QueueUpdater(): Done updating download queue") d.queue.Unlock() + d.queue.Save() d.queue.wake<- struct{}{} log.Print("QueueUpdater(): Sleeping") select { @@ -363,16 +418,15 @@ func (q *dlQueue) Downloader() { q.Lock() waiting := q.Waiting() q.Unlock() - log.Print("Download queue length: ",len(waiting.items)) - for _,i := range waiting.items { - if !i.downloading { - dst := path.Join(dstDir,i.Item.Filename) + path.Ext(i.Item.Url) - req,err := grab.NewRequest(dst,i.Item.Url) + log.Print("Download queue length: ",len(waiting.Items)) + for _,i := range waiting.Items { + if !i.Downloading && !i.Complete { + req,err := grab.NewRequest(i.Filename,i.Item.Url) if err != nil { log.Print("Request error: ",err) continue } - i.downloading = true + i.Downloading = true t := time.Now() q.reqch <- req if time.Now().After(t.Add(5 * time.Second)) {