Restructure Daemon with updater and downloader threads. Implement download

queue.
This commit is contained in:
Greg 2019-03-15 09:57:33 -04:00
parent 95e73fd219
commit 4e11288508
2 changed files with 237 additions and 22 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
rssd
rssd.conf rssd.conf
output.conf output.conf
*.mp3 *.mp3

258
main.go
View File

@ -1,12 +1,18 @@
package main package main
import ( import (
"fmt"
"log" "log"
"net/url"
"os" "os"
"path"
"sort"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/cavaliercoder/grab"
"github.com/mmcdole/gofeed" "github.com/mmcdole/gofeed"
) )
@ -17,6 +23,8 @@ type Config struct {
type Item struct { type Item struct {
Title, Description, Url, Filename string Title, Description, Url, Filename string
Length int Length int
Published time.Time
Podcast *Podcast
} }
type Podcast struct { type Podcast struct {
@ -64,7 +72,6 @@ func (p *pcList) Add(x *Podcast) {
func (p *Podcast) Merge(x *Podcast) { func (p *Podcast) Merge(x *Podcast) {
for _,item := range x.Items { for _,item := range x.Items {
if !p.Has(item) { if !p.Has(item) {
log.Print(" Appending '",item.Title,"'")
p.Items = append(p.Items,item) p.Items = append(p.Items,item)
} }
} }
@ -117,11 +124,11 @@ func daysAgo(x int) Selector {
return newerThan(time.Date(d.Year(),d.Month(),d.Day()-x,0,0,0,0,time.Local)) return newerThan(time.Date(d.Year(),d.Month(),d.Day()-x,0,0,0,0,time.Local))
} }
func toPodcast(sel Selector, url string, feed *gofeed.Feed) (ret *Podcast) { func toPodcast(sel Selector, u string, feed *gofeed.Feed) (ret *Podcast) {
ret = &Podcast{ ret = &Podcast{
Title: feed.Title, Title: feed.Title,
Description: feed.Description, Description: feed.Description,
Url: url, Url: u,
Items: []Item{}, Items: []Item{},
} }
for _, i := range feed.Items { for _, i := range feed.Items {
@ -129,10 +136,17 @@ func toPodcast(sel Selector, url string, feed *gofeed.Feed) (ret *Podcast) {
it := Item{ it := Item{
Title: i.Title, Title: i.Title,
Description: i.Description, Description: i.Description,
Filename: path.Join(ret.Title, i.Title),
Published: *i.PublishedParsed,
} }
for _, n := range i.Enclosures { for _, n := range i.Enclosures {
if n.Type == "audio/mpeg" { if n.Type == "audio/mpeg" {
it.Url = n.URL u,err := url.Parse(n.URL)
if err != nil {
it.Url = n.URL
} else {
it.Url = fmt.Sprintf("%s://%s%s",u.Scheme,u.Host,u.Path)
}
if l, err := strconv.Atoi(n.Length); err == nil { if l, err := strconv.Atoi(n.Length); err == nil {
it.Length = l it.Length = l
} }
@ -144,14 +158,225 @@ func toPodcast(sel Selector, url string, feed *gofeed.Feed) (ret *Podcast) {
return return
} }
func readFeed(url string,sel Selector) *Podcast { func readFeed(u string,sel Selector) *Podcast {
fp := gofeed.NewParser() fp := gofeed.NewParser()
feed, err := fp.ParseURL(url) feed, err := fp.ParseURL(u)
if err != nil { if err != nil {
log.Print(err) log.Print(err)
return nil return nil
} }
return toPodcast(sel,url,feed) return toPodcast(sel,u,feed)
}
type dlItem struct {
Item *Item
downloading bool
}
type ByDate []*dlItem
func (a ByDate) Len() int { return len(a) }
func (a ByDate) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByDate) Less(i, j int) bool { return a[j].Item.Published.After(a[i].Item.Published) }
type dlQueue struct {
items []*dlItem
reqch chan *grab.Request
respch chan *grab.Response
sync.Mutex
wg sync.WaitGroup
}
func (q *dlQueue) Sort() {
sort.Sort(ByDate(q.items))
}
func (q *dlQueue) Find(x *Item) (int, bool) {
for i,y := range q.items {
if y.Item.Title == x.Title {
return i, true
}
}
return 0, false
}
func (q *dlQueue) Waiting() *dlQueue {
ret := &dlQueue{
items: make([]*dlItem,0),
reqch: q.reqch,
respch: q.respch,
Mutex: q.Mutex,
wg: q.wg,
}
for _,i := range q.items {
if i.downloading == false {
ret.items = append(ret.items,i)
}
}
return ret
}
func (q *dlQueue) Add(i *Item) {
if _, ok := q.Find(i); ok == true {
return
}
di := &dlItem{Item: &Item{} }
*di.Item = *i
q.items = append(q.items,di)
}
type Daemon struct {
conf Config
g *grab.Client
pl *pcList
queue *dlQueue
sync.Mutex
}
func NewDaemon(conf Config, pl *pcList) *Daemon {
return &Daemon{
conf: conf,
g: grab.NewClient(),
pl: pl,
queue: &dlQueue{},
}
}
func (d *Daemon) Update(urls []string) {
sel := daysAgo(60)
for _,url := range urls {
log.Print(" -> ",url)
f := readFeed(url,sel) // do not lock around IO
d.Lock()
d.pl.Add(f)
d.Unlock()
}
}
func (d *Daemon) Monitor() {
status := func(resp *grab.Response) {
log.Printf(" %s: %v bytes (%.2f%%)\n",
resp.Filename,
resp.BytesComplete(),
100*resp.Progress())
}
mon := func(resp *grab.Response) {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
Loop:
for {
select {
case <-t.C:
status(resp)
case <-resp.Done:
status(resp)
break Loop
}
}
if err := resp.Err(); err != nil {
log.Printf("Download failed for %s (%s)\n",resp.Filename,err)
}
}
for {
mon(<-d.queue.respch)
}
}
func (d *Daemon) Downloader(workers int) {
log.Print("Downloader(): starting")
d.queue.reqch = make(chan *grab.Request)
d.queue.respch = make(chan *grab.Response)
log.Print("Downloader(): spawning workers")
for i := 0; i < workers; i++ {
d.queue.wg.Add(1)
go func() {
d.g.DoChannel(d.queue.reqch,d.queue.respch)
d.queue.wg.Done()
}()
}
log.Print("Downloader(): starting monitor")
go d.Monitor()
if d.queue.items == nil {
d.queue.Lock()
d.queue.items = make([]*dlItem,0)
d.queue.Unlock()
}
upd := make(chan struct{})
go func() {
for {
// lock Podcast list and update download queue
log.Print("Downloader(): Updating podcast list")
d.queue.Lock()
for _,p := range d.pl.Podcasts {
for _,i := range p.Items {
d.queue.Add(&i)
}
}
log.Print("Done updating podcast list")
d.queue.Unlock()
upd<- struct{}{}
time.Sleep(30 * time.Second)
}
}()
t := time.NewTicker(30 * time.Second)
defer t.Stop()
LOOP:
for {
// launch requests for files we are not yet downloading
d.queue.Lock()
waiting := d.queue.Waiting()
d.queue.Unlock()
log.Print("Download queue length: ",len(waiting.items))
for _,i := range waiting.items {
if !i.downloading {
dst := path.Join("downloads",path.Base(i.Item.Url))
req,err := grab.NewRequest(dst,i.Item.Url)
if err != nil {
log.Print("Request error: ",err)
continue
}
i.downloading = true
t := time.Now()
d.queue.reqch <- req
if time.Now().After(t.Add(5 * time.Second)) {
continue LOOP // refresh list
}
}
}
log.Print("Downloader(): Done launching downloads")
log.Print("Downloader(): sleeping")
select {
case <-t.C:
continue LOOP
case <-upd:
continue LOOP
}
}
}
func (d *Daemon) Updater() {
for {
time.Sleep(1 * time.Second)
d.Lock()
d.Update(d.conf.Urls)
of,err := os.Create("output.conf")
if err != nil {
log.Print("Cannot open output file")
} else {
enc := toml.NewEncoder(of)
log.Print("writing output")
enc.Encode(d.pl)
of.Close()
log.Print("done")
}
d.Unlock()
time.Sleep(30 * time.Minute)
}
}
func (d *Daemon) Start() {
go d.Downloader(3) // use 3 download queues
go d.Updater()
} }
func main() { func main() {
@ -161,20 +386,9 @@ func main() {
if _, err := toml.DecodeFile("rssd.conf", &conf); err != nil { if _, err := toml.DecodeFile("rssd.conf", &conf); err != nil {
log.Fatal("Error reading config file:",err) log.Fatal("Error reading config file:",err)
} }
pl := newpcList("output.conf")
sel := daysAgo(60) pl := newpcList("output.conf")
for _,url := range conf.Urls { d := NewDaemon(conf,pl)
log.Print(" -> ",url) d.Start()
pl.Add(readFeed(url,sel)) select { }
}
of,err := os.Create("output.conf")
if err != nil {
log.Fatal("Cannot open output file")
}
enc := toml.NewEncoder(of)
log.Print("writing output")
enc.Encode(pl)
of.Close()
log.Print("done")
} }