Compare commits
5 Commits
9a8aa9d21d
...
8b02e05f2d
Author | SHA1 | Date | |
---|---|---|---|
8b02e05f2d | |||
92374e120f | |||
661f42516f | |||
ac96eaab6d | |||
e609afeb02 |
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,2 +1,3 @@
|
||||||
rssd
|
rssd
|
||||||
log
|
log
|
||||||
|
nohup.out
|
||||||
|
|
156
main.go
156
main.go
|
@ -1,7 +1,10 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
@ -18,7 +21,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
confFile,dataFile,dstDir string
|
confFile,dataFile,queueFile,dstDir string
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -30,6 +33,7 @@ func init() {
|
||||||
confDir := path.Join(homeDir,".config","rssd")
|
confDir := path.Join(homeDir,".config","rssd")
|
||||||
confFile = path.Join(confDir,"rssd.conf")
|
confFile = path.Join(confDir,"rssd.conf")
|
||||||
dataFile = path.Join(confDir,"podcasts.conf")
|
dataFile = path.Join(confDir,"podcasts.conf")
|
||||||
|
queueFile = path.Join(confDir,"queue.conf")
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
@ -189,7 +193,9 @@ func readFeed(u string,sel Selector) *Podcast {
|
||||||
|
|
||||||
type dlItem struct {
|
type dlItem struct {
|
||||||
Item *Item
|
Item *Item
|
||||||
downloading bool
|
Filename string
|
||||||
|
Downloading bool
|
||||||
|
Complete bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type ByDate []*dlItem
|
type ByDate []*dlItem
|
||||||
|
@ -198,19 +204,20 @@ func (a ByDate) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
func (a ByDate) Less(i, j int) bool { return a[j].Item.Published.After(a[i].Item.Published) }
|
func (a ByDate) Less(i, j int) bool { return a[j].Item.Published.After(a[i].Item.Published) }
|
||||||
|
|
||||||
type dlQueue struct {
|
type dlQueue struct {
|
||||||
items []*dlItem
|
Items []*dlItem
|
||||||
reqch chan *grab.Request
|
reqch chan *grab.Request
|
||||||
respch chan *grab.Response
|
respch chan *grab.Response
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
wake chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *dlQueue) Sort() {
|
func (q *dlQueue) Sort() {
|
||||||
sort.Sort(ByDate(q.items))
|
sort.Sort(ByDate(q.Items))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *dlQueue) Find(x *Item) (int, bool) {
|
func (q *dlQueue) Find(x *Item) (int, bool) {
|
||||||
for i,y := range q.items {
|
for i,y := range q.Items {
|
||||||
if y.Item.Title == x.Title {
|
if y.Item.Title == x.Title {
|
||||||
return i, true
|
return i, true
|
||||||
}
|
}
|
||||||
|
@ -218,17 +225,27 @@ func (q *dlQueue) Find(x *Item) (int, bool) {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (q *dlQueue) FindFilename(x string) (int, bool) {
|
||||||
|
for i,y := range q.Items {
|
||||||
|
if y.Filename == x {
|
||||||
|
return i, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
func (q *dlQueue) Waiting() *dlQueue {
|
func (q *dlQueue) Waiting() *dlQueue {
|
||||||
ret := &dlQueue{
|
ret := &dlQueue{
|
||||||
items: make([]*dlItem,0),
|
Items: make([]*dlItem,0),
|
||||||
reqch: q.reqch,
|
reqch: q.reqch,
|
||||||
respch: q.respch,
|
respch: q.respch,
|
||||||
Mutex: q.Mutex,
|
Mutex: q.Mutex,
|
||||||
wg: q.wg,
|
wg: q.wg,
|
||||||
|
wake: q.wake,
|
||||||
}
|
}
|
||||||
for _,i := range q.items {
|
for _,i := range q.Items {
|
||||||
if i.downloading == false {
|
if i.Downloading == false {
|
||||||
ret.items = append(ret.items,i)
|
ret.Items = append(ret.Items,i)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
|
@ -240,7 +257,44 @@ func (q *dlQueue) Add(i *Item) {
|
||||||
}
|
}
|
||||||
di := &dlItem{Item: &Item{} }
|
di := &dlItem{Item: &Item{} }
|
||||||
*di.Item = *i
|
*di.Item = *i
|
||||||
q.items = append(q.items,di)
|
di.Filename = path.Join(dstDir,i.Filename) + path.Ext(i.Url)
|
||||||
|
q.Items = append(q.Items,di)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewQueue() *dlQueue {
|
||||||
|
ret := &dlQueue{
|
||||||
|
Items: make([]*dlItem,0),
|
||||||
|
reqch: make(chan *grab.Request),
|
||||||
|
respch: make(chan *grab.Response),
|
||||||
|
wake: make(chan struct{}),
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *dlQueue) Load() {
|
||||||
|
q.Lock()
|
||||||
|
if _, err := toml.DecodeFile(queueFile, q); err != nil {
|
||||||
|
log.Print("Cannot read queue status file:",err)
|
||||||
|
}
|
||||||
|
q.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *dlQueue) Save() {
|
||||||
|
of,err := os.Create(queueFile)
|
||||||
|
if err != nil {
|
||||||
|
log.Print("dlQueue.Save(): Cannot open output file")
|
||||||
|
} else {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
w := bufio.NewWriter(&buf)
|
||||||
|
enc := toml.NewEncoder(w)
|
||||||
|
log.Printf("dlQueue.Save(): encoding %d entries\n",len(q.Items))
|
||||||
|
q.Lock() // do not lock around IO
|
||||||
|
enc.Encode(q)
|
||||||
|
q.Unlock()
|
||||||
|
io.Copy(of,&buf)
|
||||||
|
of.Close()
|
||||||
|
log.Print("dlQueue.Save(): done")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Daemon struct {
|
type Daemon struct {
|
||||||
|
@ -254,17 +308,19 @@ type Daemon struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDaemon(conf Config, pl *pcList) *Daemon {
|
func NewDaemon(conf Config, pl *pcList) *Daemon {
|
||||||
return &Daemon{
|
ret := &Daemon{
|
||||||
conf: conf,
|
conf: conf,
|
||||||
g: grab.NewClient(),
|
g: grab.NewClient(),
|
||||||
pl: pl,
|
pl: pl,
|
||||||
queue: &dlQueue{},
|
queue: NewQueue(),
|
||||||
dlwake: make(chan struct{}),
|
dlwake: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
ret.queue.Load()
|
||||||
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) Update(urls []string) {
|
func (d *Daemon) Update(urls []string) {
|
||||||
sel := daysAgo(60)
|
sel := daysAgo(30)
|
||||||
for _,url := range urls {
|
for _,url := range urls {
|
||||||
log.Print(" -> ",url)
|
log.Print(" -> ",url)
|
||||||
f := readFeed(url,sel) // do not lock around IO
|
f := readFeed(url,sel) // do not lock around IO
|
||||||
|
@ -282,7 +338,7 @@ func (d *Daemon) Monitor() {
|
||||||
100*resp.Progress())
|
100*resp.Progress())
|
||||||
}
|
}
|
||||||
mon := func(resp *grab.Response) {
|
mon := func(resp *grab.Response) {
|
||||||
t := time.NewTicker(1 * time.Second)
|
t := time.NewTicker(5 * time.Second)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
Loop:
|
Loop:
|
||||||
for {
|
for {
|
||||||
|
@ -291,6 +347,15 @@ func (d *Daemon) Monitor() {
|
||||||
status(resp)
|
status(resp)
|
||||||
case <-resp.Done:
|
case <-resp.Done:
|
||||||
status(resp)
|
status(resp)
|
||||||
|
d.queue.Lock()
|
||||||
|
if i, ok := d.queue.FindFilename(resp.Filename); ok {
|
||||||
|
d.queue.Items[i].Complete = true
|
||||||
|
d.queue.Unlock()
|
||||||
|
d.queue.Save()
|
||||||
|
} else {
|
||||||
|
d.queue.Unlock()
|
||||||
|
log.Printf("Error finding queue entry for %s\n",resp.Filename)
|
||||||
|
}
|
||||||
break Loop
|
break Loop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -298,15 +363,12 @@ func (d *Daemon) Monitor() {
|
||||||
log.Printf("Download failed for %s (%s)\n",resp.Filename,err)
|
log.Printf("Download failed for %s (%s)\n",resp.Filename,err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for {
|
for r := range d.queue.respch {
|
||||||
mon(<-d.queue.respch)
|
go mon(r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) Downloader() {
|
func (d *Daemon) StartDownloader() {
|
||||||
log.Print("Downloader(): starting")
|
|
||||||
d.queue.reqch = make(chan *grab.Request)
|
|
||||||
d.queue.respch = make(chan *grab.Response)
|
|
||||||
log.Print("Downloader(): spawning workers")
|
log.Print("Downloader(): spawning workers")
|
||||||
for i := 0; i < d.workers; i++ {
|
for i := 0; i < d.workers; i++ {
|
||||||
d.queue.wg.Add(1)
|
d.queue.wg.Add(1)
|
||||||
|
@ -317,28 +379,27 @@ func (d *Daemon) Downloader() {
|
||||||
}
|
}
|
||||||
log.Print("Downloader(): starting monitor")
|
log.Print("Downloader(): starting monitor")
|
||||||
go d.Monitor()
|
go d.Monitor()
|
||||||
|
go d.QueueUpdater()
|
||||||
if d.queue.items == nil {
|
go d.queue.Downloader()
|
||||||
d.queue.Lock()
|
|
||||||
d.queue.items = make([]*dlItem,0)
|
|
||||||
d.queue.Unlock()
|
|
||||||
}
|
}
|
||||||
upd := make(chan struct{})
|
|
||||||
go func() {
|
func (d *Daemon) QueueUpdater() {
|
||||||
t := time.NewTicker(30 * time.Second)
|
t := time.NewTicker(30 * time.Minute)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
for {
|
for {
|
||||||
// lock Podcast list and update download queue
|
// lock Podcast list and update download queue
|
||||||
log.Print("Downloader(): Updating podcast list")
|
log.Print("QueueUpdater(): Updating download queue")
|
||||||
d.queue.Lock()
|
d.queue.Lock()
|
||||||
for _,p := range d.pl.Podcasts {
|
for _,p := range d.pl.Podcasts {
|
||||||
for _,i := range p.Items {
|
for _,i := range p.Items {
|
||||||
d.queue.Add(&i)
|
d.queue.Add(&i)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Print("Downloader(): Done updating podcast list")
|
log.Print("QueueUpdater(): Done updating download queue")
|
||||||
d.queue.Unlock()
|
d.queue.Unlock()
|
||||||
upd<- struct{}{}
|
d.queue.Save()
|
||||||
|
d.queue.wake<- struct{}{}
|
||||||
|
log.Print("QueueUpdater(): Sleeping")
|
||||||
select {
|
select {
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
continue
|
continue
|
||||||
|
@ -346,38 +407,38 @@ func (d *Daemon) Downloader() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
|
|
||||||
|
func (q *dlQueue) Downloader() {
|
||||||
|
t := time.NewTicker(30 * time.Minute)
|
||||||
|
defer t.Stop()
|
||||||
LOOP:
|
LOOP:
|
||||||
for {
|
for {
|
||||||
t := time.NewTicker(30 * time.Second)
|
|
||||||
defer t.Stop()
|
|
||||||
// launch requests for files we are not yet downloading
|
// launch requests for files we are not yet downloading
|
||||||
d.queue.Lock()
|
q.Lock()
|
||||||
waiting := d.queue.Waiting()
|
waiting := q.Waiting()
|
||||||
d.queue.Unlock()
|
q.Unlock()
|
||||||
log.Print("Download queue length: ",len(waiting.items))
|
log.Print("Download queue length: ",len(waiting.Items))
|
||||||
for _,i := range waiting.items {
|
for _,i := range waiting.Items {
|
||||||
if !i.downloading {
|
if !i.Downloading && !i.Complete {
|
||||||
dst := path.Join(dstDir,i.Item.Filename) + path.Ext(i.Item.Url)
|
req,err := grab.NewRequest(i.Filename,i.Item.Url)
|
||||||
req,err := grab.NewRequest(dst,i.Item.Url)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("Request error: ",err)
|
log.Print("Request error: ",err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
i.downloading = true
|
i.Downloading = true
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
d.queue.reqch <- req
|
q.reqch <- req
|
||||||
if time.Now().After(t.Add(5 * time.Second)) {
|
if time.Now().After(t.Add(5 * time.Second)) {
|
||||||
continue LOOP // refresh list
|
continue LOOP // refresh list
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Print("Downloader(): Done launching downloads")
|
|
||||||
log.Print("Downloader(): sleeping")
|
log.Print("Downloader(): sleeping")
|
||||||
select {
|
select {
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
continue
|
continue
|
||||||
case <-upd:
|
case <-q.wake:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -385,7 +446,7 @@ func (d *Daemon) Downloader() {
|
||||||
|
|
||||||
func (d *Daemon) Updater() {
|
func (d *Daemon) Updater() {
|
||||||
log.Print("Updater(): starting")
|
log.Print("Updater(): starting")
|
||||||
go d.Downloader()
|
d.StartDownloader()
|
||||||
for {
|
for {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
d.Update(d.conf.Urls)
|
d.Update(d.conf.Urls)
|
||||||
|
@ -399,7 +460,6 @@ func (d *Daemon) Updater() {
|
||||||
enc.Encode(d.pl)
|
enc.Encode(d.pl)
|
||||||
d.Unlock()
|
d.Unlock()
|
||||||
of.Close()
|
of.Close()
|
||||||
log.Print("Updater(): done writing")
|
|
||||||
}
|
}
|
||||||
d.dlwake <-struct{}{}
|
d.dlwake <-struct{}{}
|
||||||
time.Sleep(30 * time.Minute)
|
time.Sleep(30 * time.Minute)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user