Compare commits
	
		
			5 Commits
		
	
	
		
			9a8aa9d21d
			...
			8b02e05f2d
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 8b02e05f2d | |||
| 92374e120f | |||
| 661f42516f | |||
| ac96eaab6d | |||
| e609afeb02 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							|  | @ -1,2 +1,3 @@ | |||
| rssd | ||||
| log | ||||
| nohup.out | ||||
|  |  | |||
							
								
								
									
										184
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										184
									
								
								main.go
									
									
									
									
									
								
							|  | @ -1,7 +1,10 @@ | |||
| package main | ||||
| 
 | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"log" | ||||
| 	"net/url" | ||||
| 	"os" | ||||
|  | @ -18,7 +21,7 @@ import ( | |||
| ) | ||||
| 
 | ||||
| var ( | ||||
| 	confFile,dataFile,dstDir string | ||||
| 	confFile,dataFile,queueFile,dstDir string | ||||
| ) | ||||
| 
 | ||||
| func init() { | ||||
|  | @ -30,6 +33,7 @@ func init() { | |||
| 	confDir := path.Join(homeDir,".config","rssd") | ||||
| 	confFile = path.Join(confDir,"rssd.conf") | ||||
| 	dataFile = path.Join(confDir,"podcasts.conf") | ||||
| 	queueFile = path.Join(confDir,"queue.conf") | ||||
| } | ||||
| 
 | ||||
| type Config struct { | ||||
|  | @ -189,7 +193,9 @@ func readFeed(u string,sel Selector) *Podcast { | |||
| 
 | ||||
| type dlItem struct { | ||||
| 	Item *Item | ||||
| 	downloading bool | ||||
| 	Filename string | ||||
| 	Downloading bool | ||||
| 	Complete bool | ||||
| } | ||||
| 
 | ||||
| type ByDate []*dlItem | ||||
|  | @ -198,19 +204,20 @@ func (a ByDate) Swap(i, j int)		{ a[i], a[j] = a[j], a[i] } | |||
| func (a ByDate) Less(i, j int) bool	{ return a[j].Item.Published.After(a[i].Item.Published) } | ||||
| 
 | ||||
| type dlQueue struct { | ||||
| 	items []*dlItem | ||||
| 	Items []*dlItem | ||||
| 	reqch chan *grab.Request | ||||
| 	respch chan *grab.Response | ||||
| 	sync.Mutex | ||||
| 	wg sync.WaitGroup | ||||
| 	wake chan struct{} | ||||
| } | ||||
| 
 | ||||
| func (q *dlQueue) Sort() { | ||||
| 	sort.Sort(ByDate(q.items)) | ||||
| 	sort.Sort(ByDate(q.Items)) | ||||
| } | ||||
| 
 | ||||
| func (q *dlQueue) Find(x *Item) (int, bool) { | ||||
| 	for i,y := range q.items { | ||||
| 	for i,y := range q.Items { | ||||
| 		if y.Item.Title == x.Title { | ||||
| 			return i, true | ||||
| 		} | ||||
|  | @ -218,17 +225,27 @@ func (q *dlQueue) Find(x *Item) (int, bool) { | |||
| 	return 0, false | ||||
| } | ||||
| 
 | ||||
| func (q *dlQueue) FindFilename(x string) (int, bool) { | ||||
| 	for i,y := range q.Items { | ||||
| 		if y.Filename == x { | ||||
| 			return i, true | ||||
| 		} | ||||
| 	} | ||||
| 	return 0, false | ||||
| } | ||||
| 
 | ||||
| func (q *dlQueue) Waiting() *dlQueue { | ||||
| 	ret := &dlQueue{ | ||||
| 		items: make([]*dlItem,0), | ||||
| 		Items: make([]*dlItem,0), | ||||
| 		reqch: q.reqch, | ||||
| 		respch: q.respch, | ||||
| 		Mutex: q.Mutex, | ||||
| 		wg: q.wg, | ||||
| 		wake: q.wake, | ||||
| 	} | ||||
| 	for _,i := range q.items { | ||||
| 		if i.downloading == false { | ||||
| 			ret.items = append(ret.items,i) | ||||
| 	for _,i := range q.Items { | ||||
| 		if i.Downloading == false { | ||||
| 			ret.Items = append(ret.Items,i) | ||||
| 		} | ||||
| 	} | ||||
| 	return ret | ||||
|  | @ -240,7 +257,44 @@ func (q *dlQueue) Add(i *Item) { | |||
| 	} | ||||
| 	di := &dlItem{Item: &Item{} } | ||||
| 	*di.Item = *i | ||||
| 	q.items = append(q.items,di) | ||||
| 	di.Filename = path.Join(dstDir,i.Filename) + path.Ext(i.Url) | ||||
| 	q.Items = append(q.Items,di) | ||||
| } | ||||
| 
 | ||||
| func NewQueue() *dlQueue { | ||||
| 	ret := &dlQueue{ | ||||
| 			Items: make([]*dlItem,0), | ||||
| 			reqch: make(chan *grab.Request), | ||||
| 			respch: make(chan *grab.Response), | ||||
| 			wake: make(chan struct{}), | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| func (q *dlQueue) Load() { | ||||
| 	q.Lock() | ||||
| 	if _, err := toml.DecodeFile(queueFile, q); err != nil { | ||||
| 		log.Print("Cannot read queue status file:",err) | ||||
| 	} | ||||
| 	q.Unlock() | ||||
| } | ||||
| 
 | ||||
| func (q *dlQueue) Save() { | ||||
| 	of,err := os.Create(queueFile) | ||||
| 	if err != nil { | ||||
| 		log.Print("dlQueue.Save(): Cannot open output file") | ||||
| 	} else { | ||||
| 		var buf bytes.Buffer | ||||
| 		w := bufio.NewWriter(&buf) | ||||
| 		enc := toml.NewEncoder(w) | ||||
| 		log.Printf("dlQueue.Save(): encoding %d entries\n",len(q.Items)) | ||||
| 		q.Lock() // do not lock around IO
 | ||||
| 		enc.Encode(q) | ||||
| 		q.Unlock() | ||||
| 		io.Copy(of,&buf) | ||||
| 		of.Close() | ||||
| 		log.Print("dlQueue.Save(): done") | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| type Daemon struct { | ||||
|  | @ -254,17 +308,19 @@ type Daemon struct { | |||
| } | ||||
| 
 | ||||
| func NewDaemon(conf Config, pl *pcList) *Daemon { | ||||
| 	return &Daemon{ | ||||
| 	ret := &Daemon{ | ||||
| 		conf: conf, | ||||
| 		g: grab.NewClient(), | ||||
| 		pl: pl, | ||||
| 		queue: &dlQueue{}, | ||||
| 		queue: NewQueue(), | ||||
| 		dlwake: make(chan struct{}), | ||||
| 	} | ||||
| 	ret.queue.Load() | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| func (d *Daemon) Update(urls []string) { | ||||
| 	sel := daysAgo(60) | ||||
| 	sel := daysAgo(30) | ||||
| 	for _,url := range urls { | ||||
| 		log.Print("  -> ",url) | ||||
| 		f := readFeed(url,sel) // do not lock around IO
 | ||||
|  | @ -282,7 +338,7 @@ func (d *Daemon) Monitor() { | |||
| 			100*resp.Progress()) | ||||
| 	} | ||||
| 	mon := func(resp *grab.Response) { | ||||
| 		t := time.NewTicker(1 * time.Second) | ||||
| 		t := time.NewTicker(5 * time.Second) | ||||
| 		defer t.Stop() | ||||
| 		Loop: | ||||
| 		for { | ||||
|  | @ -291,6 +347,15 @@ func (d *Daemon) Monitor() { | |||
| 				status(resp) | ||||
| 			case <-resp.Done: | ||||
| 				status(resp) | ||||
| 				d.queue.Lock() | ||||
| 				if i, ok := d.queue.FindFilename(resp.Filename); ok { | ||||
| 					d.queue.Items[i].Complete = true | ||||
| 					d.queue.Unlock() | ||||
| 					d.queue.Save() | ||||
| 				} else { | ||||
| 					d.queue.Unlock() | ||||
| 					log.Printf("Error finding queue entry for %s\n",resp.Filename) | ||||
| 				} | ||||
| 				break Loop | ||||
| 			} | ||||
| 		} | ||||
|  | @ -298,15 +363,12 @@ func (d *Daemon) Monitor() { | |||
| 			log.Printf("Download failed for %s (%s)\n",resp.Filename,err) | ||||
| 		} | ||||
| 	} | ||||
| 	for { | ||||
| 		mon(<-d.queue.respch) | ||||
| 	for r := range d.queue.respch { | ||||
| 		go mon(r) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (d *Daemon) Downloader() { | ||||
| 	log.Print("Downloader(): starting") | ||||
| 	d.queue.reqch = make(chan *grab.Request) | ||||
| 	d.queue.respch = make(chan *grab.Response) | ||||
| func (d *Daemon) StartDownloader() { | ||||
| 	log.Print("Downloader(): spawning workers") | ||||
| 	for i := 0; i < d.workers; i++ { | ||||
| 		d.queue.wg.Add(1) | ||||
|  | @ -317,67 +379,66 @@ func (d *Daemon) Downloader() { | |||
| 	} | ||||
| 	log.Print("Downloader(): starting monitor") | ||||
| 	go d.Monitor() | ||||
| 	go d.QueueUpdater() | ||||
| 	go d.queue.Downloader() | ||||
| } | ||||
| 
 | ||||
| 	if d.queue.items == nil { | ||||
| func (d *Daemon) QueueUpdater() { | ||||
| 	t := time.NewTicker(30 * time.Minute) | ||||
| 	defer t.Stop() | ||||
| 	for { | ||||
| 		// lock Podcast list and update download queue
 | ||||
| 		log.Print("QueueUpdater(): Updating download queue") | ||||
| 		d.queue.Lock() | ||||
| 		d.queue.items = make([]*dlItem,0) | ||||
| 		d.queue.Unlock() | ||||
| 	} | ||||
| 	upd := make(chan struct{}) | ||||
| 	go func() { | ||||
| 		t := time.NewTicker(30 * time.Second) | ||||
| 		defer t.Stop() | ||||
| 		for { | ||||
| 			// lock Podcast list and update download queue
 | ||||
| 			log.Print("Downloader(): Updating podcast list") | ||||
| 			d.queue.Lock() | ||||
| 			for _,p := range d.pl.Podcasts { | ||||
| 				for _,i := range p.Items { | ||||
| 					d.queue.Add(&i) | ||||
| 				} | ||||
| 			} | ||||
| 			log.Print("Downloader(): Done updating podcast list") | ||||
| 			d.queue.Unlock() | ||||
| 			upd<- struct{}{} | ||||
| 			select { | ||||
| 			case <-t.C: | ||||
| 				continue | ||||
| 			case <-d.dlwake: | ||||
| 				continue | ||||
| 		for _,p := range d.pl.Podcasts { | ||||
| 			for _,i := range p.Items { | ||||
| 				d.queue.Add(&i) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 		log.Print("QueueUpdater(): Done updating download queue") | ||||
| 		d.queue.Unlock() | ||||
| 		d.queue.Save() | ||||
| 		d.queue.wake<- struct{}{} | ||||
| 		log.Print("QueueUpdater(): Sleeping") | ||||
| 		select { | ||||
| 		case <-t.C: | ||||
| 			continue | ||||
| 		case <-d.dlwake: | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (q *dlQueue) Downloader() { | ||||
| 	t := time.NewTicker(30 * time.Minute) | ||||
| 	defer t.Stop() | ||||
| 	LOOP: | ||||
| 	for { | ||||
| 		t := time.NewTicker(30 * time.Second) | ||||
| 		defer t.Stop() | ||||
| 		// launch requests for files we are not yet downloading
 | ||||
| 		d.queue.Lock() | ||||
| 		waiting := d.queue.Waiting() | ||||
| 		d.queue.Unlock() | ||||
| 		log.Print("Download queue length: ",len(waiting.items)) | ||||
| 		for _,i := range waiting.items { | ||||
| 			if !i.downloading { | ||||
| 				dst := path.Join(dstDir,i.Item.Filename) + path.Ext(i.Item.Url) | ||||
| 				req,err := grab.NewRequest(dst,i.Item.Url) | ||||
| 		q.Lock() | ||||
| 		waiting := q.Waiting() | ||||
| 		q.Unlock() | ||||
| 		log.Print("Download queue length: ",len(waiting.Items)) | ||||
| 		for _,i := range waiting.Items { | ||||
| 			if !i.Downloading && !i.Complete { | ||||
| 				req,err := grab.NewRequest(i.Filename,i.Item.Url) | ||||
| 				if err != nil { | ||||
| 					log.Print("Request error: ",err) | ||||
| 					continue | ||||
| 				} | ||||
| 				i.downloading = true | ||||
| 				i.Downloading = true | ||||
| 				t := time.Now() | ||||
| 				d.queue.reqch <- req | ||||
| 				q.reqch <- req | ||||
| 				if time.Now().After(t.Add(5 * time.Second)) { | ||||
| 					continue LOOP // refresh list
 | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		log.Print("Downloader(): Done launching downloads") | ||||
| 		log.Print("Downloader(): sleeping") | ||||
| 		select { | ||||
| 		case <-t.C: | ||||
| 			continue | ||||
| 		case <-upd: | ||||
| 		case <-q.wake: | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
|  | @ -385,7 +446,7 @@ func (d *Daemon) Downloader() { | |||
| 
 | ||||
| func (d *Daemon) Updater() { | ||||
| 	log.Print("Updater(): starting") | ||||
| 	go d.Downloader() | ||||
| 	d.StartDownloader() | ||||
| 	for { | ||||
| 		time.Sleep(1 * time.Second) | ||||
| 		d.Update(d.conf.Urls) | ||||
|  | @ -399,7 +460,6 @@ func (d *Daemon) Updater() { | |||
| 			enc.Encode(d.pl) | ||||
| 			d.Unlock() | ||||
| 			of.Close() | ||||
| 			log.Print("Updater(): done writing") | ||||
| 		} | ||||
| 		d.dlwake <-struct{}{} | ||||
| 		time.Sleep(30 * time.Minute) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	Block a user