diff --git a/.gitignore b/.gitignore index 5991c74..1bdea3a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +rssd rssd.conf output.conf *.mp3 diff --git a/main.go b/main.go index e348174..01a03ea 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,18 @@ package main import ( + "fmt" "log" + "net/url" "os" + "path" + "sort" "strconv" + "sync" "time" "github.com/BurntSushi/toml" + "github.com/cavaliercoder/grab" "github.com/mmcdole/gofeed" ) @@ -17,6 +23,8 @@ type Config struct { type Item struct { Title, Description, Url, Filename string Length int + Published time.Time + Podcast *Podcast } type Podcast struct { @@ -64,7 +72,6 @@ func (p *pcList) Add(x *Podcast) { func (p *Podcast) Merge(x *Podcast) { for _,item := range x.Items { if !p.Has(item) { - log.Print(" Appending '",item.Title,"'") p.Items = append(p.Items,item) } } @@ -117,11 +124,11 @@ func daysAgo(x int) Selector { return newerThan(time.Date(d.Year(),d.Month(),d.Day()-x,0,0,0,0,time.Local)) } -func toPodcast(sel Selector, url string, feed *gofeed.Feed) (ret *Podcast) { +func toPodcast(sel Selector, u string, feed *gofeed.Feed) (ret *Podcast) { ret = &Podcast{ Title: feed.Title, Description: feed.Description, - Url: url, + Url: u, Items: []Item{}, } for _, i := range feed.Items { @@ -129,10 +136,17 @@ func toPodcast(sel Selector, url string, feed *gofeed.Feed) (ret *Podcast) { it := Item{ Title: i.Title, Description: i.Description, + Filename: path.Join(ret.Title, i.Title), + Published: *i.PublishedParsed, } for _, n := range i.Enclosures { if n.Type == "audio/mpeg" { - it.Url = n.URL + u,err := url.Parse(n.URL) + if err != nil { + it.Url = n.URL + } else { + it.Url = fmt.Sprintf("%s://%s%s",u.Scheme,u.Host,u.Path) + } if l, err := strconv.Atoi(n.Length); err == nil { it.Length = l } @@ -144,14 +158,225 @@ func toPodcast(sel Selector, url string, feed *gofeed.Feed) (ret *Podcast) { return } -func readFeed(url string,sel Selector) *Podcast { +func readFeed(u string,sel Selector) *Podcast { fp := gofeed.NewParser() - feed, err := fp.ParseURL(url) + feed, err := fp.ParseURL(u) if err != nil { log.Print(err) return nil } - return toPodcast(sel,url,feed) + return toPodcast(sel,u,feed) +} + +type dlItem struct { + Item *Item + downloading bool +} + +type ByDate []*dlItem +func (a ByDate) Len() int { return len(a) } +func (a ByDate) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByDate) Less(i, j int) bool { return a[j].Item.Published.After(a[i].Item.Published) } + +type dlQueue struct { + items []*dlItem + reqch chan *grab.Request + respch chan *grab.Response + sync.Mutex + wg sync.WaitGroup +} + +func (q *dlQueue) Sort() { + sort.Sort(ByDate(q.items)) +} + +func (q *dlQueue) Find(x *Item) (int, bool) { + for i,y := range q.items { + if y.Item.Title == x.Title { + return i, true + } + } + return 0, false +} + +func (q *dlQueue) Waiting() *dlQueue { + ret := &dlQueue{ + items: make([]*dlItem,0), + reqch: q.reqch, + respch: q.respch, + Mutex: q.Mutex, + wg: q.wg, + } + for _,i := range q.items { + if i.downloading == false { + ret.items = append(ret.items,i) + } + } + return ret +} + +func (q *dlQueue) Add(i *Item) { + if _, ok := q.Find(i); ok == true { + return + } + di := &dlItem{Item: &Item{} } + *di.Item = *i + q.items = append(q.items,di) +} + +type Daemon struct { + conf Config + g *grab.Client + pl *pcList + queue *dlQueue + sync.Mutex +} + +func NewDaemon(conf Config, pl *pcList) *Daemon { + return &Daemon{ + conf: conf, + g: grab.NewClient(), + pl: pl, + queue: &dlQueue{}, + } +} + +func (d *Daemon) Update(urls []string) { + sel := daysAgo(60) + for _,url := range urls { + log.Print(" -> ",url) + f := readFeed(url,sel) // do not lock around IO + d.Lock() + d.pl.Add(f) + d.Unlock() + } +} + +func (d *Daemon) Monitor() { + status := func(resp *grab.Response) { + log.Printf(" %s: %v bytes (%.2f%%)\n", + resp.Filename, + resp.BytesComplete(), + 100*resp.Progress()) + } + mon := func(resp *grab.Response) { + t := time.NewTicker(1 * time.Second) + defer t.Stop() + Loop: + for { + select { + case <-t.C: + status(resp) + case <-resp.Done: + status(resp) + break Loop + } + } + if err := resp.Err(); err != nil { + log.Printf("Download failed for %s (%s)\n",resp.Filename,err) + } + } + for { + mon(<-d.queue.respch) + } +} + +func (d *Daemon) Downloader(workers int) { + log.Print("Downloader(): starting") + d.queue.reqch = make(chan *grab.Request) + d.queue.respch = make(chan *grab.Response) + log.Print("Downloader(): spawning workers") + for i := 0; i < workers; i++ { + d.queue.wg.Add(1) + go func() { + d.g.DoChannel(d.queue.reqch,d.queue.respch) + d.queue.wg.Done() + }() + } + log.Print("Downloader(): starting monitor") + go d.Monitor() + + if d.queue.items == nil { + d.queue.Lock() + d.queue.items = make([]*dlItem,0) + d.queue.Unlock() + } + upd := make(chan struct{}) + go func() { + for { + // lock Podcast list and update download queue + log.Print("Downloader(): Updating podcast list") + d.queue.Lock() + for _,p := range d.pl.Podcasts { + for _,i := range p.Items { + d.queue.Add(&i) + } + } + log.Print("Done updating podcast list") + d.queue.Unlock() + upd<- struct{}{} + time.Sleep(30 * time.Second) + } + }() + t := time.NewTicker(30 * time.Second) + defer t.Stop() + LOOP: + for { + // launch requests for files we are not yet downloading + d.queue.Lock() + waiting := d.queue.Waiting() + d.queue.Unlock() + log.Print("Download queue length: ",len(waiting.items)) + for _,i := range waiting.items { + if !i.downloading { + dst := path.Join("downloads",path.Base(i.Item.Url)) + req,err := grab.NewRequest(dst,i.Item.Url) + if err != nil { + log.Print("Request error: ",err) + continue + } + i.downloading = true + t := time.Now() + d.queue.reqch <- req + if time.Now().After(t.Add(5 * time.Second)) { + continue LOOP // refresh list + } + } + } + log.Print("Downloader(): Done launching downloads") + log.Print("Downloader(): sleeping") + select { + case <-t.C: + continue LOOP + case <-upd: + continue LOOP + } + } +} + +func (d *Daemon) Updater() { + for { + time.Sleep(1 * time.Second) + d.Lock() + d.Update(d.conf.Urls) + of,err := os.Create("output.conf") + if err != nil { + log.Print("Cannot open output file") + } else { + enc := toml.NewEncoder(of) + log.Print("writing output") + enc.Encode(d.pl) + of.Close() + log.Print("done") + } + d.Unlock() + time.Sleep(30 * time.Minute) + } +} + +func (d *Daemon) Start() { + go d.Downloader(3) // use 3 download queues + go d.Updater() } func main() { @@ -161,20 +386,9 @@ func main() { if _, err := toml.DecodeFile("rssd.conf", &conf); err != nil { log.Fatal("Error reading config file:",err) } - pl := newpcList("output.conf") - sel := daysAgo(60) - for _,url := range conf.Urls { - log.Print(" -> ",url) - pl.Add(readFeed(url,sel)) - } - of,err := os.Create("output.conf") - if err != nil { - log.Fatal("Cannot open output file") - } - enc := toml.NewEncoder(of) - log.Print("writing output") - enc.Encode(pl) - of.Close() - log.Print("done") + pl := newpcList("output.conf") + d := NewDaemon(conf,pl) + d.Start() + select { } }