From e609afeb02a1f575469ddff80c279a7d80db32ce Mon Sep 17 00:00:00 2001 From: Greg Date: Fri, 15 Mar 2019 12:41:32 -0400 Subject: [PATCH] Restructuring update/download threads. --- main.go | 76 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/main.go b/main.go index dfc8ab6..6956f7d 100644 --- a/main.go +++ b/main.go @@ -203,6 +203,7 @@ type dlQueue struct { respch chan *grab.Response sync.Mutex wg sync.WaitGroup + wake chan struct{} } func (q *dlQueue) Sort() { @@ -225,6 +226,7 @@ func (q *dlQueue) Waiting() *dlQueue { respch: q.respch, Mutex: q.Mutex, wg: q.wg, + wake: q.wake, } for _,i := range q.items { if i.downloading == false { @@ -258,7 +260,7 @@ func NewDaemon(conf Config, pl *pcList) *Daemon { conf: conf, g: grab.NewClient(), pl: pl, - queue: &dlQueue{}, + queue: &dlQueue{ wake: make(chan struct{}) }, dlwake: make(chan struct{}), } } @@ -282,7 +284,7 @@ func (d *Daemon) Monitor() { 100*resp.Progress()) } mon := func(resp *grab.Response) { - t := time.NewTicker(1 * time.Second) + t := time.NewTicker(5 * time.Second) defer t.Stop() Loop: for { @@ -303,7 +305,7 @@ func (d *Daemon) Monitor() { } } -func (d *Daemon) Downloader() { +func (d *Daemon) StartDownloader() { log.Print("Downloader(): starting") d.queue.reqch = make(chan *grab.Request) d.queue.respch = make(chan *grab.Response) @@ -323,38 +325,44 @@ func (d *Daemon) Downloader() { d.queue.items = make([]*dlItem,0) d.queue.Unlock() } - upd := make(chan struct{}) - go func() { - t := time.NewTicker(30 * time.Second) - defer t.Stop() - for { - // lock Podcast list and update download queue - log.Print("Downloader(): Updating podcast list") - d.queue.Lock() - for _,p := range d.pl.Podcasts { - for _,i := range p.Items { - d.queue.Add(&i) - } - } - log.Print("Downloader(): Done updating podcast list") - d.queue.Unlock() - upd<- struct{}{} - select { - case <-t.C: - continue - case <-d.dlwake: - continue + go d.QueueUpdater() + go d.queue.Downloader() +} + +func (d *Daemon) QueueUpdater() { + t := time.NewTicker(30 * time.Second) + defer t.Stop() + for { + // lock Podcast list and update download queue + log.Print("QueueUpdater(): Updating download queue") + d.queue.Lock() + for _,p := range d.pl.Podcasts { + for _,i := range p.Items { + d.queue.Add(&i) } } - }() + log.Print("QueueUpdater(): Done updating download queue") + d.queue.Unlock() + d.queue.wake<- struct{}{} + log.Print("QueueUpdater(): Sleeping") + select { + case <-t.C: + continue + case <-d.dlwake: + continue + } + } +} + +func (q *dlQueue) Downloader() { + t := time.NewTicker(30 * time.Second) + defer t.Stop() LOOP: for { - t := time.NewTicker(30 * time.Second) - defer t.Stop() // launch requests for files we are not yet downloading - d.queue.Lock() - waiting := d.queue.Waiting() - d.queue.Unlock() + q.Lock() + waiting := q.Waiting() + q.Unlock() log.Print("Download queue length: ",len(waiting.items)) for _,i := range waiting.items { if !i.downloading { @@ -366,18 +374,17 @@ func (d *Daemon) Downloader() { } i.downloading = true t := time.Now() - d.queue.reqch <- req + q.reqch <- req if time.Now().After(t.Add(5 * time.Second)) { continue LOOP // refresh list } } } - log.Print("Downloader(): Done launching downloads") log.Print("Downloader(): sleeping") select { case <-t.C: continue - case <-upd: + case <-q.wake: continue } } @@ -385,7 +392,7 @@ func (d *Daemon) Downloader() { func (d *Daemon) Updater() { log.Print("Updater(): starting") - go d.Downloader() + d.StartDownloader() for { time.Sleep(1 * time.Second) d.Update(d.conf.Urls) @@ -399,7 +406,6 @@ func (d *Daemon) Updater() { enc.Encode(d.pl) d.Unlock() of.Close() - log.Print("Updater(): done writing") } d.dlwake <-struct{}{} time.Sleep(30 * time.Minute)