package main import ( "fmt" "log" "net/url" "os" "path" "sort" "strconv" "sync" "time" "github.com/BurntSushi/toml" "github.com/cavaliercoder/grab" homedir "github.com/mitchellh/go-homedir" "github.com/mmcdole/gofeed" ) var ( confFile,dataFile,dstDir string ) func init() { os.Chdir(path.Join()) // go to the root directory homeDir,err := homedir.Dir() if err != nil { log.Fatal("Cannot locate user's home directory") } confDir := path.Join(homeDir,".config","rssd") confFile = path.Join(confDir,"rssd.conf") dataFile = path.Join(confDir,"podcasts.conf") } type Config struct { Workers int DestDir string Urls []string } type Item struct { Title, Description, Url, Filename string Length int Published time.Time Podcast *Podcast } type Podcast struct { Title, Description, Url string Items []Item } type pcList struct { Podcasts []Podcast } func newpcList(confs ...string) (ret *pcList) { ret = &pcList{} ret.Podcasts = make([]Podcast,0) if len(confs) > 0 { if _, err := toml.DecodeFile(dataFile, &ret); err != nil { log.Print("Error reading podcast list:",err) } } return } func (p *pcList) Find(x *Podcast) (int, bool) { for i,y := range p.Podcasts { if y.Title == x.Title { return i, true } } return 0, false } func (p *pcList) Add(x *Podcast) { if x == nil { return } if i,ok := p.Find(x); ok == true { log.Print(" Existing podcast") p.Podcasts[i].Merge(x) } else { log.Print(" New podcast") p.Podcasts = append((*p).Podcasts,*x) } } func (p *Podcast) Merge(x *Podcast) { for _,item := range x.Items { if !p.Has(item) { p.Items = append(p.Items,item) } } } func (p *Podcast) Has(i Item) bool { for _,x := range p.Items { if x.Title == i.Title { return true } } return false } type Selector func(*gofeed.Item) bool func AllSelectors(ss ...Selector) Selector { return func(i *gofeed.Item) bool { for _, s := range ss { if !s(i) { return false } } return true } } func AnySelector(ss ...Selector) Selector { return func(i *gofeed.Item) bool { for _, s := range ss { if s(i) { return true } } return false } } func newerThan(t time.Time) Selector { return func(i *gofeed.Item) bool { if i.PublishedParsed.After(t) { return true } return false } } func daysAgo(x int) Selector { d := time.Now() return newerThan(time.Date(d.Year(),d.Month(),d.Day()-x,0,0,0,0,time.Local)) } func toPodcast(sel Selector, u string, feed *gofeed.Feed) (ret *Podcast) { ret = &Podcast{ Title: feed.Title, Description: feed.Description, Url: u, Items: []Item{}, } for _, i := range feed.Items { if sel(i) { fn := i.PublishedParsed.Format("20060102--") + i.Title it := Item{ Title: i.Title, Description: i.Description, Filename: path.Join(ret.Title, fn), Published: *i.PublishedParsed, } for _, n := range i.Enclosures { if n.Type == "audio/mpeg" { u,err := url.Parse(n.URL) if err != nil { it.Url = n.URL } else { it.Url = fmt.Sprintf("%s://%s%s",u.Scheme,u.Host,u.Path) } if l, err := strconv.Atoi(n.Length); err == nil { it.Length = l } } } ret.Items = append(ret.Items,it) } } return } func readFeed(u string,sel Selector) *Podcast { fp := gofeed.NewParser() feed, err := fp.ParseURL(u) if err != nil { log.Print(err) return nil } return toPodcast(sel,u,feed) } type dlItem struct { Item *Item downloading bool } type ByDate []*dlItem func (a ByDate) Len() int { return len(a) } func (a ByDate) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a ByDate) Less(i, j int) bool { return a[j].Item.Published.After(a[i].Item.Published) } type dlQueue struct { items []*dlItem reqch chan *grab.Request respch chan *grab.Response sync.Mutex wg sync.WaitGroup wake chan struct{} } func (q *dlQueue) Sort() { sort.Sort(ByDate(q.items)) } func (q *dlQueue) Find(x *Item) (int, bool) { for i,y := range q.items { if y.Item.Title == x.Title { return i, true } } return 0, false } func (q *dlQueue) Waiting() *dlQueue { ret := &dlQueue{ items: make([]*dlItem,0), reqch: q.reqch, respch: q.respch, Mutex: q.Mutex, wg: q.wg, wake: q.wake, } for _,i := range q.items { if i.downloading == false { ret.items = append(ret.items,i) } } return ret } func (q *dlQueue) Add(i *Item) { if _, ok := q.Find(i); ok == true { return } di := &dlItem{Item: &Item{} } *di.Item = *i q.items = append(q.items,di) } type Daemon struct { conf Config g *grab.Client pl *pcList queue *dlQueue sync.Mutex workers int dlwake chan struct{} } func NewDaemon(conf Config, pl *pcList) *Daemon { return &Daemon{ conf: conf, g: grab.NewClient(), pl: pl, queue: &dlQueue{ wake: make(chan struct{}) }, dlwake: make(chan struct{}), } } func (d *Daemon) Update(urls []string) { sel := daysAgo(30) for _,url := range urls { log.Print(" -> ",url) f := readFeed(url,sel) // do not lock around IO d.Lock() d.pl.Add(f) d.Unlock() } } func (d *Daemon) Monitor() { status := func(resp *grab.Response) { log.Printf(" %s: %v bytes (%.2f%%)\n", resp.Filename, resp.BytesComplete(), 100*resp.Progress()) } mon := func(resp *grab.Response) { t := time.NewTicker(5 * time.Second) defer t.Stop() Loop: for { select { case <-t.C: status(resp) case <-resp.Done: status(resp) break Loop } } if err := resp.Err(); err != nil { log.Printf("Download failed for %s (%s)\n",resp.Filename,err) } } for { mon(<-d.queue.respch) } } func (d *Daemon) StartDownloader() { log.Print("Downloader(): starting") d.queue.reqch = make(chan *grab.Request) d.queue.respch = make(chan *grab.Response) log.Print("Downloader(): spawning workers") for i := 0; i < d.workers; i++ { d.queue.wg.Add(1) go func() { d.g.DoChannel(d.queue.reqch,d.queue.respch) d.queue.wg.Done() }() } log.Print("Downloader(): starting monitor") go d.Monitor() if d.queue.items == nil { d.queue.Lock() d.queue.items = make([]*dlItem,0) d.queue.Unlock() } go d.QueueUpdater() go d.queue.Downloader() } func (d *Daemon) QueueUpdater() { t := time.NewTicker(30 * time.Minute) defer t.Stop() for { // lock Podcast list and update download queue log.Print("QueueUpdater(): Updating download queue") d.queue.Lock() for _,p := range d.pl.Podcasts { for _,i := range p.Items { d.queue.Add(&i) } } log.Print("QueueUpdater(): Done updating download queue") d.queue.Unlock() d.queue.wake<- struct{}{} log.Print("QueueUpdater(): Sleeping") select { case <-t.C: continue case <-d.dlwake: continue } } } func (q *dlQueue) Downloader() { t := time.NewTicker(30 * time.Minute) defer t.Stop() LOOP: for { // launch requests for files we are not yet downloading q.Lock() waiting := q.Waiting() q.Unlock() log.Print("Download queue length: ",len(waiting.items)) for _,i := range waiting.items { if !i.downloading { dst := path.Join(dstDir,i.Item.Filename) + path.Ext(i.Item.Url) req,err := grab.NewRequest(dst,i.Item.Url) if err != nil { log.Print("Request error: ",err) continue } i.downloading = true t := time.Now() q.reqch <- req if time.Now().After(t.Add(5 * time.Second)) { continue LOOP // refresh list } } } log.Print("Downloader(): sleeping") select { case <-t.C: continue case <-q.wake: continue } } } func (d *Daemon) Updater() { log.Print("Updater(): starting") d.StartDownloader() for { time.Sleep(1 * time.Second) d.Update(d.conf.Urls) of,err := os.Create(dataFile) if err != nil { log.Print("Updater(): Cannot open output file") } else { enc := toml.NewEncoder(of) log.Print("Updater(): writing output") d.Lock() enc.Encode(d.pl) d.Unlock() of.Close() } d.dlwake <-struct{}{} time.Sleep(30 * time.Minute) } } func (d *Daemon) Start() { go d.Updater() } func main() { log.Print("rssd") log.Print("reading configuration") var conf Config var err error if _, err = toml.DecodeFile(confFile, &conf); err != nil { log.Fatal("Error reading config file:",err) } pl := newpcList(dataFile) d := NewDaemon(conf,pl) if d.conf.Workers != 0 { d.workers = d.conf.Workers } else { d.workers = 3 } dstDir,err = homedir.Expand(conf.DestDir) if err != nil { log.Fatal("Error locating DestDir.") } d.Start() select { } }