From 930134663243df7bb63e50bd093d7d0aa0808c66 Mon Sep 17 00:00:00 2001 From: Greg Date: Fri, 15 Mar 2019 11:23:27 -0400 Subject: [PATCH] Improve configuration options. Better coordination between Update and Download threads. --- main.go | 81 +++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 20 deletions(-) diff --git a/main.go b/main.go index 01a03ea..dfc8ab6 100644 --- a/main.go +++ b/main.go @@ -13,10 +13,28 @@ import ( "github.com/BurntSushi/toml" "github.com/cavaliercoder/grab" + homedir "github.com/mitchellh/go-homedir" "github.com/mmcdole/gofeed" ) +var ( + confFile,dataFile,dstDir string +) + +func init() { + os.Chdir(path.Join()) // go to the root directory + homeDir,err := homedir.Dir() + if err != nil { + log.Fatal("Cannot locate user's home directory") + } + confDir := path.Join(homeDir,".config","rssd") + confFile = path.Join(confDir,"rssd.conf") + dataFile = path.Join(confDir,"podcasts.conf") +} + type Config struct { + Workers int + DestDir string Urls []string } @@ -40,7 +58,7 @@ func newpcList(confs ...string) (ret *pcList) { ret = &pcList{} ret.Podcasts = make([]Podcast,0) if len(confs) > 0 { - if _, err := toml.DecodeFile("output.conf", &ret); err != nil { + if _, err := toml.DecodeFile(dataFile, &ret); err != nil { log.Print("Error reading podcast list:",err) } } @@ -133,10 +151,11 @@ func toPodcast(sel Selector, u string, feed *gofeed.Feed) (ret *Podcast) { } for _, i := range feed.Items { if sel(i) { + fn := i.PublishedParsed.Format("20060102--") + i.Title it := Item{ Title: i.Title, Description: i.Description, - Filename: path.Join(ret.Title, i.Title), + Filename: path.Join(ret.Title, fn), Published: *i.PublishedParsed, } for _, n := range i.Enclosures { @@ -230,6 +249,8 @@ type Daemon struct { pl *pcList queue *dlQueue sync.Mutex + workers int + dlwake chan struct{} } func NewDaemon(conf Config, pl *pcList) *Daemon { @@ -238,6 +259,7 @@ func NewDaemon(conf Config, pl *pcList) *Daemon { g: grab.NewClient(), pl: pl, queue: &dlQueue{}, + dlwake: make(chan struct{}), } } @@ -281,12 +303,12 @@ func (d *Daemon) Monitor() { } } -func (d *Daemon) Downloader(workers int) { +func (d *Daemon) Downloader() { log.Print("Downloader(): starting") d.queue.reqch = make(chan *grab.Request) d.queue.respch = make(chan *grab.Response) log.Print("Downloader(): spawning workers") - for i := 0; i < workers; i++ { + for i := 0; i < d.workers; i++ { d.queue.wg.Add(1) go func() { d.g.DoChannel(d.queue.reqch,d.queue.respch) @@ -303,6 +325,8 @@ func (d *Daemon) Downloader(workers int) { } upd := make(chan struct{}) go func() { + t := time.NewTicker(30 * time.Second) + defer t.Stop() for { // lock Podcast list and update download queue log.Print("Downloader(): Updating podcast list") @@ -312,16 +336,21 @@ func (d *Daemon) Downloader(workers int) { d.queue.Add(&i) } } - log.Print("Done updating podcast list") + log.Print("Downloader(): Done updating podcast list") d.queue.Unlock() upd<- struct{}{} - time.Sleep(30 * time.Second) + select { + case <-t.C: + continue + case <-d.dlwake: + continue + } } }() - t := time.NewTicker(30 * time.Second) - defer t.Stop() LOOP: for { + t := time.NewTicker(30 * time.Second) + defer t.Stop() // launch requests for files we are not yet downloading d.queue.Lock() waiting := d.queue.Waiting() @@ -329,7 +358,7 @@ func (d *Daemon) Downloader(workers int) { log.Print("Download queue length: ",len(waiting.items)) for _,i := range waiting.items { if !i.downloading { - dst := path.Join("downloads",path.Base(i.Item.Url)) + dst := path.Join(dstDir,i.Item.Filename) + path.Ext(i.Item.Url) req,err := grab.NewRequest(dst,i.Item.Url) if err != nil { log.Print("Request error: ",err) @@ -347,35 +376,37 @@ func (d *Daemon) Downloader(workers int) { log.Print("Downloader(): sleeping") select { case <-t.C: - continue LOOP + continue case <-upd: - continue LOOP + continue } } } func (d *Daemon) Updater() { + log.Print("Updater(): starting") + go d.Downloader() for { time.Sleep(1 * time.Second) - d.Lock() d.Update(d.conf.Urls) - of,err := os.Create("output.conf") + of,err := os.Create(dataFile) if err != nil { - log.Print("Cannot open output file") + log.Print("Updater(): Cannot open output file") } else { enc := toml.NewEncoder(of) - log.Print("writing output") + log.Print("Updater(): writing output") + d.Lock() enc.Encode(d.pl) + d.Unlock() of.Close() - log.Print("done") + log.Print("Updater(): done writing") } - d.Unlock() + d.dlwake <-struct{}{} time.Sleep(30 * time.Minute) } } func (d *Daemon) Start() { - go d.Downloader(3) // use 3 download queues go d.Updater() } @@ -383,12 +414,22 @@ func main() { log.Print("rssd") log.Print("reading configuration") var conf Config - if _, err := toml.DecodeFile("rssd.conf", &conf); err != nil { + var err error + if _, err = toml.DecodeFile(confFile, &conf); err != nil { log.Fatal("Error reading config file:",err) } - pl := newpcList("output.conf") + pl := newpcList(dataFile) d := NewDaemon(conf,pl) + if d.conf.Workers != 0 { + d.workers = d.conf.Workers + } else { + d.workers = 3 + } + dstDir,err = homedir.Expand(conf.DestDir) + if err != nil { + log.Fatal("Error locating DestDir.") + } d.Start() select { } }