Restructuring update/download threads.

This commit is contained in:
Greg 2019-03-15 12:41:32 -04:00
parent 9a8aa9d21d
commit e609afeb02
1 changed files with 41 additions and 35 deletions

76
main.go
View File

@ -203,6 +203,7 @@ type dlQueue struct {
respch chan *grab.Response respch chan *grab.Response
sync.Mutex sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
wake chan struct{}
} }
func (q *dlQueue) Sort() { func (q *dlQueue) Sort() {
@ -225,6 +226,7 @@ func (q *dlQueue) Waiting() *dlQueue {
respch: q.respch, respch: q.respch,
Mutex: q.Mutex, Mutex: q.Mutex,
wg: q.wg, wg: q.wg,
wake: q.wake,
} }
for _,i := range q.items { for _,i := range q.items {
if i.downloading == false { if i.downloading == false {
@ -258,7 +260,7 @@ func NewDaemon(conf Config, pl *pcList) *Daemon {
conf: conf, conf: conf,
g: grab.NewClient(), g: grab.NewClient(),
pl: pl, pl: pl,
queue: &dlQueue{}, queue: &dlQueue{ wake: make(chan struct{}) },
dlwake: make(chan struct{}), dlwake: make(chan struct{}),
} }
} }
@ -282,7 +284,7 @@ func (d *Daemon) Monitor() {
100*resp.Progress()) 100*resp.Progress())
} }
mon := func(resp *grab.Response) { mon := func(resp *grab.Response) {
t := time.NewTicker(1 * time.Second) t := time.NewTicker(5 * time.Second)
defer t.Stop() defer t.Stop()
Loop: Loop:
for { for {
@ -303,7 +305,7 @@ func (d *Daemon) Monitor() {
} }
} }
func (d *Daemon) Downloader() { func (d *Daemon) StartDownloader() {
log.Print("Downloader(): starting") log.Print("Downloader(): starting")
d.queue.reqch = make(chan *grab.Request) d.queue.reqch = make(chan *grab.Request)
d.queue.respch = make(chan *grab.Response) d.queue.respch = make(chan *grab.Response)
@ -323,38 +325,44 @@ func (d *Daemon) Downloader() {
d.queue.items = make([]*dlItem,0) d.queue.items = make([]*dlItem,0)
d.queue.Unlock() d.queue.Unlock()
} }
upd := make(chan struct{}) go d.QueueUpdater()
go func() { go d.queue.Downloader()
t := time.NewTicker(30 * time.Second) }
defer t.Stop()
for { func (d *Daemon) QueueUpdater() {
// lock Podcast list and update download queue t := time.NewTicker(30 * time.Second)
log.Print("Downloader(): Updating podcast list") defer t.Stop()
d.queue.Lock() for {
for _,p := range d.pl.Podcasts { // lock Podcast list and update download queue
for _,i := range p.Items { log.Print("QueueUpdater(): Updating download queue")
d.queue.Add(&i) d.queue.Lock()
} for _,p := range d.pl.Podcasts {
} for _,i := range p.Items {
log.Print("Downloader(): Done updating podcast list") d.queue.Add(&i)
d.queue.Unlock()
upd<- struct{}{}
select {
case <-t.C:
continue
case <-d.dlwake:
continue
} }
} }
}() log.Print("QueueUpdater(): Done updating download queue")
d.queue.Unlock()
d.queue.wake<- struct{}{}
log.Print("QueueUpdater(): Sleeping")
select {
case <-t.C:
continue
case <-d.dlwake:
continue
}
}
}
func (q *dlQueue) Downloader() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
LOOP: LOOP:
for { for {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
// launch requests for files we are not yet downloading // launch requests for files we are not yet downloading
d.queue.Lock() q.Lock()
waiting := d.queue.Waiting() waiting := q.Waiting()
d.queue.Unlock() q.Unlock()
log.Print("Download queue length: ",len(waiting.items)) log.Print("Download queue length: ",len(waiting.items))
for _,i := range waiting.items { for _,i := range waiting.items {
if !i.downloading { if !i.downloading {
@ -366,18 +374,17 @@ func (d *Daemon) Downloader() {
} }
i.downloading = true i.downloading = true
t := time.Now() t := time.Now()
d.queue.reqch <- req q.reqch <- req
if time.Now().After(t.Add(5 * time.Second)) { if time.Now().After(t.Add(5 * time.Second)) {
continue LOOP // refresh list continue LOOP // refresh list
} }
} }
} }
log.Print("Downloader(): Done launching downloads")
log.Print("Downloader(): sleeping") log.Print("Downloader(): sleeping")
select { select {
case <-t.C: case <-t.C:
continue continue
case <-upd: case <-q.wake:
continue continue
} }
} }
@ -385,7 +392,7 @@ func (d *Daemon) Downloader() {
func (d *Daemon) Updater() { func (d *Daemon) Updater() {
log.Print("Updater(): starting") log.Print("Updater(): starting")
go d.Downloader() d.StartDownloader()
for { for {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
d.Update(d.conf.Urls) d.Update(d.conf.Urls)
@ -399,7 +406,6 @@ func (d *Daemon) Updater() {
enc.Encode(d.pl) enc.Encode(d.pl)
d.Unlock() d.Unlock()
of.Close() of.Close()
log.Print("Updater(): done writing")
} }
d.dlwake <-struct{}{} d.dlwake <-struct{}{}
time.Sleep(30 * time.Minute) time.Sleep(30 * time.Minute)