Improve configuration options. Better coordination between Update and Download

threads.
This commit is contained in:
Greg 2019-03-15 11:23:27 -04:00
parent 4e11288508
commit 9301346632

81
main.go
View File

@ -13,10 +13,28 @@ import (
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/cavaliercoder/grab" "github.com/cavaliercoder/grab"
homedir "github.com/mitchellh/go-homedir"
"github.com/mmcdole/gofeed" "github.com/mmcdole/gofeed"
) )
var (
confFile,dataFile,dstDir string
)
func init() {
os.Chdir(path.Join()) // go to the root directory
homeDir,err := homedir.Dir()
if err != nil {
log.Fatal("Cannot locate user's home directory")
}
confDir := path.Join(homeDir,".config","rssd")
confFile = path.Join(confDir,"rssd.conf")
dataFile = path.Join(confDir,"podcasts.conf")
}
type Config struct { type Config struct {
Workers int
DestDir string
Urls []string Urls []string
} }
@ -40,7 +58,7 @@ func newpcList(confs ...string) (ret *pcList) {
ret = &pcList{} ret = &pcList{}
ret.Podcasts = make([]Podcast,0) ret.Podcasts = make([]Podcast,0)
if len(confs) > 0 { if len(confs) > 0 {
if _, err := toml.DecodeFile("output.conf", &ret); err != nil { if _, err := toml.DecodeFile(dataFile, &ret); err != nil {
log.Print("Error reading podcast list:",err) log.Print("Error reading podcast list:",err)
} }
} }
@ -133,10 +151,11 @@ func toPodcast(sel Selector, u string, feed *gofeed.Feed) (ret *Podcast) {
} }
for _, i := range feed.Items { for _, i := range feed.Items {
if sel(i) { if sel(i) {
fn := i.PublishedParsed.Format("20060102--") + i.Title
it := Item{ it := Item{
Title: i.Title, Title: i.Title,
Description: i.Description, Description: i.Description,
Filename: path.Join(ret.Title, i.Title), Filename: path.Join(ret.Title, fn),
Published: *i.PublishedParsed, Published: *i.PublishedParsed,
} }
for _, n := range i.Enclosures { for _, n := range i.Enclosures {
@ -230,6 +249,8 @@ type Daemon struct {
pl *pcList pl *pcList
queue *dlQueue queue *dlQueue
sync.Mutex sync.Mutex
workers int
dlwake chan struct{}
} }
func NewDaemon(conf Config, pl *pcList) *Daemon { func NewDaemon(conf Config, pl *pcList) *Daemon {
@ -238,6 +259,7 @@ func NewDaemon(conf Config, pl *pcList) *Daemon {
g: grab.NewClient(), g: grab.NewClient(),
pl: pl, pl: pl,
queue: &dlQueue{}, queue: &dlQueue{},
dlwake: make(chan struct{}),
} }
} }
@ -281,12 +303,12 @@ func (d *Daemon) Monitor() {
} }
} }
func (d *Daemon) Downloader(workers int) { func (d *Daemon) Downloader() {
log.Print("Downloader(): starting") log.Print("Downloader(): starting")
d.queue.reqch = make(chan *grab.Request) d.queue.reqch = make(chan *grab.Request)
d.queue.respch = make(chan *grab.Response) d.queue.respch = make(chan *grab.Response)
log.Print("Downloader(): spawning workers") log.Print("Downloader(): spawning workers")
for i := 0; i < workers; i++ { for i := 0; i < d.workers; i++ {
d.queue.wg.Add(1) d.queue.wg.Add(1)
go func() { go func() {
d.g.DoChannel(d.queue.reqch,d.queue.respch) d.g.DoChannel(d.queue.reqch,d.queue.respch)
@ -303,6 +325,8 @@ func (d *Daemon) Downloader(workers int) {
} }
upd := make(chan struct{}) upd := make(chan struct{})
go func() { go func() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for { for {
// lock Podcast list and update download queue // lock Podcast list and update download queue
log.Print("Downloader(): Updating podcast list") log.Print("Downloader(): Updating podcast list")
@ -312,16 +336,21 @@ func (d *Daemon) Downloader(workers int) {
d.queue.Add(&i) d.queue.Add(&i)
} }
} }
log.Print("Done updating podcast list") log.Print("Downloader(): Done updating podcast list")
d.queue.Unlock() d.queue.Unlock()
upd<- struct{}{} upd<- struct{}{}
time.Sleep(30 * time.Second) select {
case <-t.C:
continue
case <-d.dlwake:
continue
}
} }
}() }()
t := time.NewTicker(30 * time.Second)
defer t.Stop()
LOOP: LOOP:
for { for {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
// launch requests for files we are not yet downloading // launch requests for files we are not yet downloading
d.queue.Lock() d.queue.Lock()
waiting := d.queue.Waiting() waiting := d.queue.Waiting()
@ -329,7 +358,7 @@ func (d *Daemon) Downloader(workers int) {
log.Print("Download queue length: ",len(waiting.items)) log.Print("Download queue length: ",len(waiting.items))
for _,i := range waiting.items { for _,i := range waiting.items {
if !i.downloading { if !i.downloading {
dst := path.Join("downloads",path.Base(i.Item.Url)) dst := path.Join(dstDir,i.Item.Filename) + path.Ext(i.Item.Url)
req,err := grab.NewRequest(dst,i.Item.Url) req,err := grab.NewRequest(dst,i.Item.Url)
if err != nil { if err != nil {
log.Print("Request error: ",err) log.Print("Request error: ",err)
@ -347,35 +376,37 @@ func (d *Daemon) Downloader(workers int) {
log.Print("Downloader(): sleeping") log.Print("Downloader(): sleeping")
select { select {
case <-t.C: case <-t.C:
continue LOOP continue
case <-upd: case <-upd:
continue LOOP continue
} }
} }
} }
func (d *Daemon) Updater() { func (d *Daemon) Updater() {
log.Print("Updater(): starting")
go d.Downloader()
for { for {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
d.Lock()
d.Update(d.conf.Urls) d.Update(d.conf.Urls)
of,err := os.Create("output.conf") of,err := os.Create(dataFile)
if err != nil { if err != nil {
log.Print("Cannot open output file") log.Print("Updater(): Cannot open output file")
} else { } else {
enc := toml.NewEncoder(of) enc := toml.NewEncoder(of)
log.Print("writing output") log.Print("Updater(): writing output")
d.Lock()
enc.Encode(d.pl) enc.Encode(d.pl)
d.Unlock()
of.Close() of.Close()
log.Print("done") log.Print("Updater(): done writing")
} }
d.Unlock() d.dlwake <-struct{}{}
time.Sleep(30 * time.Minute) time.Sleep(30 * time.Minute)
} }
} }
func (d *Daemon) Start() { func (d *Daemon) Start() {
go d.Downloader(3) // use 3 download queues
go d.Updater() go d.Updater()
} }
@ -383,12 +414,22 @@ func main() {
log.Print("rssd") log.Print("rssd")
log.Print("reading configuration") log.Print("reading configuration")
var conf Config var conf Config
if _, err := toml.DecodeFile("rssd.conf", &conf); err != nil { var err error
if _, err = toml.DecodeFile(confFile, &conf); err != nil {
log.Fatal("Error reading config file:",err) log.Fatal("Error reading config file:",err)
} }
pl := newpcList("output.conf") pl := newpcList(dataFile)
d := NewDaemon(conf,pl) d := NewDaemon(conf,pl)
if d.conf.Workers != 0 {
d.workers = d.conf.Workers
} else {
d.workers = 3
}
dstDir,err = homedir.Expand(conf.DestDir)
if err != nil {
log.Fatal("Error locating DestDir.")
}
d.Start() d.Start()
select { } select { }
} }