rssd/main.go

496 lines
9.4 KiB
Go

package main
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"net/url"
"os"
"path"
"sort"
"strconv"
"sync"
"time"
"github.com/BurntSushi/toml"
"github.com/cavaliercoder/grab"
homedir "github.com/mitchellh/go-homedir"
"github.com/mmcdole/gofeed"
)
var (
confFile,dataFile,queueFile,dstDir string
)
func init() {
os.Chdir(path.Join()) // go to the root directory
homeDir,err := homedir.Dir()
if err != nil {
log.Fatal("Cannot locate user's home directory")
}
confDir := path.Join(homeDir,".config","rssd")
confFile = path.Join(confDir,"rssd.conf")
dataFile = path.Join(confDir,"podcasts.conf")
queueFile = path.Join(confDir,"queue.conf")
}
type Config struct {
Workers int
DestDir string
Urls []string
}
type Item struct {
Title, Description, Url, Filename string
Length int
Published time.Time
Podcast *Podcast
}
type Podcast struct {
Title, Description, Url string
Items []Item
}
type pcList struct {
Podcasts []Podcast
}
func newpcList(confs ...string) (ret *pcList) {
ret = &pcList{}
ret.Podcasts = make([]Podcast,0)
if len(confs) > 0 {
if _, err := toml.DecodeFile(dataFile, &ret); err != nil {
log.Print("Error reading podcast list:",err)
}
}
return
}
func (p *pcList) Find(x *Podcast) (int, bool) {
for i,y := range p.Podcasts {
if y.Title == x.Title {
return i, true
}
}
return 0, false
}
func (p *pcList) Add(x *Podcast) {
if x == nil {
return
}
if i,ok := p.Find(x); ok == true {
log.Print(" Existing podcast")
p.Podcasts[i].Merge(x)
} else {
log.Print(" New podcast")
p.Podcasts = append((*p).Podcasts,*x)
}
}
func (p *Podcast) Merge(x *Podcast) {
for _,item := range x.Items {
if !p.Has(item) {
p.Items = append(p.Items,item)
}
}
}
func (p *Podcast) Has(i Item) bool {
for _,x := range p.Items {
if x.Title == i.Title {
return true
}
}
return false
}
type Selector func(*gofeed.Item) bool
func AllSelectors(ss ...Selector) Selector {
return func(i *gofeed.Item) bool {
for _, s := range ss {
if !s(i) {
return false
}
}
return true
}
}
func AnySelector(ss ...Selector) Selector {
return func(i *gofeed.Item) bool {
for _, s := range ss {
if s(i) {
return true
}
}
return false
}
}
func newerThan(t time.Time) Selector {
return func(i *gofeed.Item) bool {
if i.PublishedParsed.After(t) {
return true
}
return false
}
}
func daysAgo(x int) Selector {
d := time.Now()
return newerThan(time.Date(d.Year(),d.Month(),d.Day()-x,0,0,0,0,time.Local))
}
func toPodcast(sel Selector, u string, feed *gofeed.Feed) (ret *Podcast) {
ret = &Podcast{
Title: feed.Title,
Description: feed.Description,
Url: u,
Items: []Item{},
}
for _, i := range feed.Items {
if sel(i) {
fn := i.PublishedParsed.Format("20060102--") + i.Title
it := Item{
Title: i.Title,
Description: i.Description,
Filename: path.Join(ret.Title, fn),
Published: *i.PublishedParsed,
}
for _, n := range i.Enclosures {
if n.Type == "audio/mpeg" {
u,err := url.Parse(n.URL)
if err != nil {
it.Url = n.URL
} else {
it.Url = fmt.Sprintf("%s://%s%s",u.Scheme,u.Host,u.Path)
}
if l, err := strconv.Atoi(n.Length); err == nil {
it.Length = l
}
}
}
ret.Items = append(ret.Items,it)
}
}
return
}
func readFeed(u string,sel Selector) *Podcast {
fp := gofeed.NewParser()
feed, err := fp.ParseURL(u)
if err != nil {
log.Print(err)
return nil
}
return toPodcast(sel,u,feed)
}
type dlItem struct {
Item *Item
Filename string
Downloading bool
Complete bool
}
type ByDate []*dlItem
func (a ByDate) Len() int { return len(a) }
func (a ByDate) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByDate) Less(i, j int) bool { return a[j].Item.Published.After(a[i].Item.Published) }
type dlQueue struct {
Items []*dlItem
reqch chan *grab.Request
respch chan *grab.Response
sync.Mutex
wg sync.WaitGroup
wake chan struct{}
}
func (q *dlQueue) Sort() {
sort.Sort(ByDate(q.Items))
}
func (q *dlQueue) Find(x *Item) (int, bool) {
for i,y := range q.Items {
if y.Item.Title == x.Title {
return i, true
}
}
return 0, false
}
func (q *dlQueue) FindFilename(x string) (int, bool) {
for i,y := range q.Items {
if y.Filename == x {
return i, true
}
}
return 0, false
}
func (q *dlQueue) Waiting() *dlQueue {
ret := &dlQueue{
Items: make([]*dlItem,0),
reqch: q.reqch,
respch: q.respch,
Mutex: q.Mutex,
wg: q.wg,
wake: q.wake,
}
for _,i := range q.Items {
if i.Downloading == false {
ret.Items = append(ret.Items,i)
}
}
return ret
}
func (q *dlQueue) Add(i *Item) {
if _, ok := q.Find(i); ok == true {
return
}
di := &dlItem{Item: &Item{} }
*di.Item = *i
di.Filename = path.Join(dstDir,i.Filename) + path.Ext(i.Url)
q.Items = append(q.Items,di)
}
func NewQueue() *dlQueue {
ret := &dlQueue{
Items: make([]*dlItem,0),
reqch: make(chan *grab.Request),
respch: make(chan *grab.Response),
wake: make(chan struct{}),
}
return ret
}
func (q *dlQueue) Load() {
q.Lock()
if _, err := toml.DecodeFile(queueFile, q); err != nil {
log.Print("Cannot read queue status file:",err)
}
q.Unlock()
}
func (q *dlQueue) Save() {
of,err := os.Create(queueFile)
if err != nil {
log.Print("dlQueue.Save(): Cannot open output file")
} else {
var buf bytes.Buffer
w := bufio.NewWriter(&buf)
enc := toml.NewEncoder(w)
log.Printf("dlQueue.Save(): encoding %d entries\n",len(q.Items))
q.Lock() // do not lock around IO
enc.Encode(q)
q.Unlock()
io.Copy(of,&buf)
of.Close()
log.Print("dlQueue.Save(): done")
}
}
type Daemon struct {
conf Config
g *grab.Client
pl *pcList
queue *dlQueue
sync.Mutex
workers int
dlwake chan struct{}
}
func NewDaemon(conf Config, pl *pcList) *Daemon {
ret := &Daemon{
conf: conf,
g: grab.NewClient(),
pl: pl,
queue: NewQueue(),
dlwake: make(chan struct{}),
}
ret.queue.Load()
return ret
}
func (d *Daemon) Update(urls []string) {
sel := daysAgo(30)
for _,url := range urls {
log.Print(" -> ",url)
f := readFeed(url,sel) // do not lock around IO
d.Lock()
d.pl.Add(f)
d.Unlock()
}
}
func (d *Daemon) Monitor() {
status := func(resp *grab.Response) {
log.Printf(" %s: %v bytes (%.2f%%)\n",
resp.Filename,
resp.BytesComplete(),
100*resp.Progress())
}
mon := func(resp *grab.Response) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
Loop:
for {
select {
case <-t.C:
status(resp)
case <-resp.Done:
status(resp)
d.queue.Lock()
if i, ok := d.queue.FindFilename(resp.Filename); ok {
d.queue.Items[i].Complete = true
d.queue.Unlock()
d.queue.Save()
} else {
d.queue.Unlock()
log.Printf("Error finding queue entry for %s\n",resp.Filename)
}
break Loop
}
}
if err := resp.Err(); err != nil {
log.Printf("Download failed for %s (%s)\n",resp.Filename,err)
}
}
for r := range d.queue.respch {
go mon(r)
}
}
func (d *Daemon) StartDownloader() {
log.Print("Downloader(): spawning workers")
for i := 0; i < d.workers; i++ {
d.queue.wg.Add(1)
go func() {
d.g.DoChannel(d.queue.reqch,d.queue.respch)
d.queue.wg.Done()
}()
}
log.Print("Downloader(): starting monitor")
go d.Monitor()
go d.QueueUpdater()
go d.queue.Downloader()
}
func (d *Daemon) QueueUpdater() {
t := time.NewTicker(30 * time.Minute)
defer t.Stop()
for {
// lock Podcast list and update download queue
log.Print("QueueUpdater(): Updating download queue")
d.queue.Lock()
for _,p := range d.pl.Podcasts {
for _,i := range p.Items {
d.queue.Add(&i)
}
}
log.Print("QueueUpdater(): Done updating download queue")
d.queue.Unlock()
d.queue.Save()
d.queue.wake<- struct{}{}
log.Print("QueueUpdater(): Sleeping")
select {
case <-t.C:
continue
case <-d.dlwake:
continue
}
}
}
func (q *dlQueue) Downloader() {
t := time.NewTicker(30 * time.Minute)
defer t.Stop()
LOOP:
for {
// launch requests for files we are not yet downloading
q.Lock()
waiting := q.Waiting()
q.Unlock()
log.Print("Download queue length: ",len(waiting.Items))
for _,i := range waiting.Items {
if !i.Downloading && !i.Complete {
req,err := grab.NewRequest(i.Filename,i.Item.Url)
if err != nil {
log.Print("Request error: ",err)
continue
}
i.Downloading = true
t := time.Now()
q.reqch <- req
if time.Now().After(t.Add(5 * time.Second)) {
continue LOOP // refresh list
}
}
}
log.Print("Downloader(): sleeping")
select {
case <-t.C:
continue
case <-q.wake:
continue
}
}
}
func (d *Daemon) Updater() {
log.Print("Updater(): starting")
d.StartDownloader()
for {
time.Sleep(1 * time.Second)
d.Update(d.conf.Urls)
of,err := os.Create(dataFile)
if err != nil {
log.Print("Updater(): Cannot open output file")
} else {
enc := toml.NewEncoder(of)
log.Print("Updater(): writing output")
d.Lock()
enc.Encode(d.pl)
d.Unlock()
of.Close()
}
d.dlwake <-struct{}{}
time.Sleep(30 * time.Minute)
}
}
func (d *Daemon) Start() {
go d.Updater()
}
func main() {
log.Print("rssd")
log.Print("reading configuration")
var conf Config
var err error
if _, err = toml.DecodeFile(confFile, &conf); err != nil {
log.Fatal("Error reading config file:",err)
}
pl := newpcList(dataFile)
d := NewDaemon(conf,pl)
if d.conf.Workers != 0 {
d.workers = d.conf.Workers
} else {
d.workers = 3
}
dstDir,err = homedir.Expand(conf.DestDir)
if err != nil {
log.Fatal("Error locating DestDir.")
}
d.Start()
select { }
}