Знакомство с Go, часть 2: пишем граббер изображений с балансировщиком и извращениями

в 21:25, , рубрики: golang, метки:

1382132010577733282106

Задание

Недавно я рассказывал, как выполнял секретную миссию и с помощью Go загружал дамп цитат с «пустоты». Пришло время снова вступить в бой, на этот раз дело касается «Ататы», и не только потому что в рифму с пустотой.

Для тех кто пропустил — в свое время «Тематические Медиа» (да-да, хозяева хабра) запустили проект «Респектива» ( этакую гламурную имиджбордутумблр с одним разделом), хитроумными баннерами заманили туда девушек и программистов и стали ждать, видимо надеясь что срастется. Не срослось, и через некоторое время «Респективу» мутировали в «Атату», которая уже имела функционал для создания пользовательских бордпотоков, хабов для объединения тематических потоков и так далее. Какое-то время проект развивался, но… Девушки ушли, остались программисты, да и их стало куда меньше. Сейчас это место почти покинуто, там чумные ветра, брошенные поезда, темнота и только редкие вопли нескольких выживших старожилов изредка прорезают хтмл в ночи.

И ладно, атата с ней, с этой ататой. Но отличного контента в виде картинок там осталось немало, один тред с обоями чего стоит! Поэтому неудивительно, что следующее задание из штаба выглядело следующим образом:
«Срочно сохранить для потомков все изображения из потока #949 #291. ps. Извращенно.»

Ну, что же, есть задание — нужно работать. Вообще-то по извращенным структурам у нас уже есть некий кофейный чемпион, но и на Go тоже можно что-нибудь сообразить! После 9 часов работы в Paint у меня родился следующий план атаки:

Знакомство с Go, часть 2: пишем граббер изображений с балансировщиком и извращениями

Степень изврата поражает, начальство будет довольно! Рассмотрим что тут и как:

«Генератор» будет загружать с ататы хтмл код страниц, выдергивать урлы картинок и передавать их «Балансировщику», который раскидывает эти урлы некоторому количеству «Рабочих» (каждый из которых имеет свою небольшую очередь заданий) следя за тем, чтобы все рабочие были равномерно нагружены. Ну а «рабочие» загружают картинки и радуют глаз умиленного балансировщика. Выглядит избыточно, но зато про войну! Поехали:

Генератор

Это будет самое простое, в цикле загружаем страницы, разбираем их и скармливаем вытянутые урлы картинок в канал (про каналы, гоурутины и некоторые другие штуки я поверхностно рассказал в прошлой статье, поэтому тут повторяться не буду).
Атата разбита на разделы (которые там зовутся «потоками»), одна страница каждого «потока» содержит 20 постов. Ссылки выглядят следующим образом: «home.atata.com/streams/291?order=date&from=40», где 291 это айди потока, а 40 — отступ в постах с конца (то есть чем больше тем раньше). Интересующий нас код шаблона поста выглядит так:

<figure>
	<figcaption></figcaption>
	<a class='image' href=''>
		<img class='thumb' rel='' src='' />
	</a>
</figure>

Будем идти по страницам увеличивая "from" в url, дергать ссылки с классом image и отдавать их в канал. Только вот до каких пор? Оказывается, что на последней странице потока есть опознавательный знак в виде скрытой кнопки загрузки следующей партии постов (<li class="last hide">), вот за ним и будем следить. Есть много вариантов как оповестить программу о том что достигнута последняя страница и ссылок больше не будет. Мы же, для простоты, дойдя до конца отправим в поток некую кодовую фразу, чтобы на другом конце поняли что пора закругляться.

Парсинг страниц, как и в прошлый раз, возложим на пакет goquery (http://github.com/opesun/goquery). Итак, ближе к коду:

// atatagrab project main.go
package main

import (
	"fmt"
	//Добавим пакеты:
	"github.com/opesun/goquery"
	"strconv"
)

const (
	//Зададим кодовую фразу на случай конца потока:
	ENDMESSAGE = "TooLateToDieYoung"
)

//Генератор загружает страницы и достает из них ссылки на картинки
func generator(out chan string, stream, start int) {
	for pos := start; ; pos += 20 {
		//Разбираем страницу:
		x, err := goquery.ParseUrl("http://home.atata.com/streams/" + strconv.Itoa(stream) + "?order=date&from=" + strconv.Itoa(pos))
		if err == nil {
			//Отправляем все найденные ссылки в поток:
			for _, url := range x.Find("figure a.image").Attrs("href") {
				out <- "http://atata.com/" + url
			}
			//А если встретили признак последней страницы - отправляем кодовую фразу..
			if len(x.Find("li.last.hide")) > 0 {
				out <- ENDMESSAGE
				//..и прекращаем работу генератора
				return
			}
		}
	}
}

Балансировщик и Рабочие

Балансировщик и Рабочие плотно связаны друг с другом, поэтому мы и будем рассматривать их вместе
Итак, мы получаем из канала от генератора ссылки, и отдаем их рабочим следя за тем чтобы все были равномерно загружены (потому как каждый рабочий у нас имеет свою собственную конечную очередь заданий). Вроде все просто — крутись себе в бесконечном цикле, бери из одних каналов, клади в другие и радуйся. Дьявол, как обычно, сидит в мелочах, спросим себя:

«Что будет, если все рабочие уже заняты по-максимуму а пришли еще задания?»

Логично будет притормозить генератор, до тех пор пока не появится свободное место. Для этого предусмотрим в балансировщике реализацию технологии PMFC (Poor Man's Flow Control или «регулятор потока для бедных». Для бедных, потому что это будет ну очень топорный (но очень эффективный) регулятор).

«Если генератор имеет конечное количество работы, то и балансировщик должен иметь конечный цикл работы, да?»

Да! У нас работа может прекращаться в двух случаях — «кончились задания» и «пользователь запросил остановку». Почему так важно обработать эти ситуации? Потому что и там и там у нас есть некая работа, которую нужно завершить:

  • В случае окончания заданий (последняя страница потока достигнута и разобрана) — нужно подождать пока все рабочие не завершат свои очереди заданий.
  • В случае запроса остановки — подождать пока рабочие не закончат свои текущие задания (то есть докачают файл который уже начат — зачем нам огрызки?).

Будем иметь эти моменты в виду и приступим к коду. Объявим «рабочего»:

//Рабочий
type Worker struct {
	urls    chan string     // канал для заданий
	pending int             // кол-во оставшихся задач
	index   int             // позиция в куче
	wg      *sync.WaitGroup //указатель на группу ожидания
}

Что это все значит: мы объявили новый тип Worker, который представляет собой структуру struct — этакий отдаленный аналог record из паскаля.
«А почему не класс?» — спросят некоторые? Потому что в Go немного другой подход: у нас есть типы данных к которым мы можем объявить необходимые методы. Этим мы сегодня будем часто заниматься, а пока пробежимся по полям — с первыми все понятно, с индексом станет понятно чуть позже, а вот про последний стоит сказать отдельно:

Так как нам необходимо будет ждать завершения заданий нужно каким-то образом отслеживать статус каждого рабочего чтобы знать — делает он что-то или уже отдыхает. Для этого мы воспользуемся «группой ожидания» (WaitGroup) из пакета sync. В чем ее суть: WaitGroup реализует некий счетчик, который мы можем увеличивать в начале работы и уменьшать, когда работа будет сделана. А также предлагает инструмент позволяющий затормозить выполнение подпрограммы до того момента, пока счетчик не будет равен нулю.
Ждать у нас будет балансировщик, а вот работать со счетчиком мы будем в экземплярах Worker'a.

Напишем для «рабочего» метод work (синтаксически метод отличается от обычной функции тем, что в объявлении задается «получатель», этакий this. В самом теле метода им можно пользоваться чтобы получить доступ данным объекта для которого он был вызван):

//В качестве аргумента получаем указатель на канал завершения
func (w *Worker) work(done chan *Worker) {
	for {
		url := <-w.urls //читаем следующее задание
		w.wg.Add(1)     //инкриминируем счетчик группы ожидания
		download(url)   //загружаем файл
		w.wg.Done()     //сигнализируем группе ожидания что закончили
		done <- w       //показываем что завершили работу
	}
}

И не забудем добавить пакет sync в список импорта:

import (
	...
	"sync"
)

Тут есть интересный момент: после завершения задания рабочий отправляет указатель на себя в канал done, полученный из аргументов метода work(). Таким образом он не только сигнализирует об окончании работы, но и удобно предоставляет доступ к своим полям для манипуляций (позднее это будет использоваться в балансировщике).
Функция download() не является стандартной — мы напишем ее чуть позже, чтобы не отвлекаться. С рабочим почти все.

Теперь реализуем Worker Pool для нашего балансировщика, но не простой, а на базе кучи (Wikipedia: Куча (структура данных)), чтобы было просто и удобно получать наименее загруженного рабочего для распределения задачи. Реализация кучи есть в стандартном пакете "container/heap", и чтобы использовать ее с нашими типами данных необходимо реализовать ее интерфейс (это что-то вроде абстрактного класса).

Как это выглядит: есть объявление неких функций составляющих интерфейс и чтобы его реализовать для какого-либо типа достаточно чтобы они присутствовали в качестве его методов, например:

package main
import ("fmt")

//Общий интерфейс
type Animal interface {
	Say() string
}

type Cat struct{}
func (c Cat) Say() string { return "Мяу!" }

type Parrot struct {
	name string
}
func (p Parrot) Say() string { return p.name + " дурак!" }

func main() {
	kitty := Cat{}
	popka := Parrot{name: "Попка"}
	animals := []Animal{kitty, popka}
	for _, animal := range animals {
		fmt.Println(animal.Say())
	}
}

Посмотрим в документации, какие методы нужно реализовать для использования container.Heap (http://golang.org/pkg/container/heap/#Interface):

type Interface interface {
        sort.Interface
        Push(x interface{}) // add x as element Len()
        Pop() interface{}   // remove and return element Len() - 1.
}

Так, помимо Push и Pop нужно еще полностью удовлетворить sort.Interface (http://golang.org/pkg/sort/#Interface), а это:

type Interface interface {
        // Len is the number of elements in the collection.
        Len() int
        // Less returns whether the element with index i should sort
        // before the element with index j.
        Less(i, j int) bool
        // Swap swaps the elements with indexes i and j.
        Swap(i, j int)
}

Примеры кода для обоих интерфейсов есть в документации, поэтому нам несложно будет написать свой:

//Это будет наша "куча":
type Pool []*Worker

//Проверка кто меньше - в нашем случае меньше тот у кого меньше заданий:
func (p Pool) Less(i, j int) bool { return p[i].pending < p[j].pending }

//Вернем количество рабочих в пуле:
func (p Pool) Len() int { return len(p) }

//Реализуем обмен местами:
func (p Pool) Swap(i, j int) {
	if i >= 0 && i < len(p) && j >= 0 && j < len(p) {
		p[i], p[j] = p[j], p[i]
		p[i].index, p[j].index = i, j
	}
}

//Заталкивание элемента:
func (p *Pool) Push(x interface{}) {
	n := len(*p)
	worker := x.(*Worker)
	worker.index = n
	*p = append(*p, worker)
}

//И выталкивание:
func (p *Pool) Pop() interface{} {
	old := *p
	n := len(old)
	item := old[n-1]
	item.index = -1
	*p = old[0 : n-1]
	return item
}

Пара слов о том, что такое interface{} — это минимальный интерфейс, которому удовлетворяют все типы (потому что любой тип реализует как минимум ноль методов, а значит подходит под требования interface{}). Таким образом это некий any, использовав который в качестве аргумента мы можем передать любой тип. Подробнее хорошо написано тут: http://research.swtch.com/interfaces
Вот и все, теперь мы можем пользоваться функциями heap.* к нашему Pool, и всю работу по представлению нашей плоской структуры в «кучу» они возьмут на себя.
Осталось добавить пакет «container/heap» в список импорта:

import (
	...
	"container/heap"
)

Теперь приступим к балансировщику. Принцип работы мы уже обсудили, инструменты подготовили, о подводных камнях подумали, поэтому я в основном буду просто давать код с очень подробными комментариями:

//Балансировщик
type Balancer struct {
	pool     Pool         //Наша "куча" рабочих
	done     chan *Worker //Канал уведомления о завершении для рабочих
	requests chan string  //Канал для получения новых заданий
	flowctrl chan bool    //Канал для PMFC
	queue    int          //Количество незавершенных заданий переданных рабочим
	wg       *sync.WaitGroup //Группа ожидания для рабочих
}

У нас много всего требующего инициализации перед началом работы, поэтому приготовим функцию init():

//Инициализируем балансировщик. Аргументом получаем канал по которому приходят задания
func (b *Balancer) init(in chan string) {
	b.requests = make(chan string)
	b.flowctrl = make(chan bool)
	b.done = make(chan *Worker)
	b.wg = new(sync.WaitGroup)

	//Запускаем наш Flow Control:
	go func() {
		for {
			b.requests <- <-in //получаем новое задание и пересылаем его на внутренний канал
			<-b.flowctrl       //а потом ждем получения подтверждения
		}
	}()

	//Инициализируем кучу и создаем рабочих:
	heap.Init(&b.pool)
	for i := 0; i < WORKERS; i++ {
		w := &Worker{
			urls:    make(chan string, WORKERSCAP),
			index:   0,
			pending: 0,
			wg:      b.wg,
		}
		go w.work(b.done)     //запускаем рабочего
		heap.Push(&b.pool, w) //и заталкиваем его в кучу
	}
}

Здесь заслуживает упоминания наш регулятор потока — он «врезается» в канал между генератором и балансировщиком и согласует его исходя из подтверждения команд балансировщиком. А генератор послушно тормозит, и даже и знать не знает что кто-то искусственно держит его в узде — еще один шаг на пути к мировому господству и тирании!
Обратите внимание, как создается канал для рабочего: функции make() дается еще один аргумент, который задает размер буфера канала. Это забавная возможность, которая делает запись в такой канал асинхронной — подпрограммы больше не будут прерываться для ожидания доступности получателя «на другом конце» канала; таким образом можно посылать несколько сообщений, которые будут дожидаться обработки в буфере.

В init() мы использовали переменные WORKERS и WORKERSCAP которые планируется задавать в аргументах командной строки, объявим их:

var (
	WORKERS     = 5      //количество рабочих 
	WORKERSCAP  = 5      //размер очереди каждого рабочего
)

Теперь напишем саму рабочую функцию балансировщика. Не забудем, что она должна поддерживать остановку по требованию и по истечению заданий, ожидая завершения различных операций (Тут много чего происходит и все важное, но понятное. Поэтому либо комментировать почти все либо, либо вообще ничего. Я выбрал первый вариант, так что комментариев будет больше чем кода, простите):

//Рабочая функция балансировщика получает аргументом канал уведомлений от главного цикла
func (b *Balancer) balance(quit chan bool) {
	lastjobs := false //Флаг завершения, поднимаем когда кончились задания
	for {
		select { //В цикле ожидаем коммуникации по каналам:

		case <-quit: //пришло указание на остановку работы
			b.wg.Wait()  //ждем завершения текущих загрузок рабочими..
			quit <- true //..и отправляем сигнал что закончили

		case url := <-b.requests: //Получено новое задание (от flow controller)
			if url != ENDMESSAGE {  //Проверяем - а не кодовая ли это фраза?
				b.dispatch(url) // если нет, то отправляем рабочим
			} else {
				lastjobs = true //иначе поднимаем флаг завершения
			}

		case w := <-b.done: //пришло уведомление, что рабочий закончил загрузку
			b.completed(w) //обновляем его данные
			if lastjobs {
				if w.pending == 0 { //если у рабочего кончились задания..
					heap.Remove(&b.pool, w.index) //то удаляем его из кучи
				}
				if len(b.pool) == 0 { //а если куча стала пуста
					//значит все рабочие закончили свои очереди
					quit <- true //и можно отправлять сигнал подтверждения готовности к останову
				}
			}
		}
	}
}

Реализуем функцию отправки:

// Функция отправки задания
func (b *Balancer) dispatch(url string) {
	w := heap.Pop(&b.pool).(*Worker) //Берем из кучи самого незагруженного рабочего..
	w.urls <- url                    //..и отправляем ему задание.
	w.pending++                      //Добавляем ему "весу"..
	heap.Push(&b.pool, w)            //..и отправляем назад в кучу
	if b.queue++; b.queue < WORKERS*WORKERSCAP {
		b.flowctrl <- true
	}
}

… в которой отдаем задание рабочему. В условии мы проверяем: есть ли хоть одно свободное место в очередях рабочих, и, если да, отправляем регулятору потока сигнал о том что можно продолжать получение заданий от генератора.
Еще мы использовали метод completed на рабочем завершившем задание, вот что там происходит:

//Обработка завершения задания
func (b *Balancer) completed(w *Worker) {
	w.pending--
	heap.Remove(&b.pool, w.index)
	heap.Push(&b.pool, w)
	if b.queue--; b.queue == WORKERS*WORKERSCAP-1 {
		b.flowctrl <- true
	}
}

Рабочий после завершения очередной загрузки возвращает в канале указатель на себя, а мы, пользуясь этим, актуализируем информацию о его загруженности и обновляем позицию в куче на ее основе. Условие служит следующей цели: как вы помните, в dispatch мы не отправляли сигнала на продолжение если все очереди были полны, теперь же, после завершения очередного задания, самое время проверить — может кризис миновал и пора возобновить работу?
Основной функционал готов, остались мелочи. Во-первых напишем функцию download(), которую наши рабочие используют для загрузки картинки:

//Загрузка изображения
func download(url string) {
	fileName := IMGDIR + "/" + url[strings.LastIndex(url, "/")+1:]
	output, err := os.Create(fileName)
	defer output.Close()

	response, err := http.Get(url)
	if err != nil {
		fmt.Println("Error while downloading", url, "-", err)
		return
	}
	defer response.Body.Close()
	io.Copy(output, response.Body)
}

Во-вторых у нас был ряд переменных которые мы собирались брать из командной строки — объявим их (а также пару новых про запас) и подготовим указания для разбора флагов (такие фокусы мы уже делали в прошлой серии, поэтому подробно останавливаться не буду):

//Вот так у нас теперь выглядит объявление переменных:
var (
	WORKERS     = 5     //количество рабочих
	WORKERSCAP  = 5     //размер очереди каждого рабочего
	ATATASTREAM = 291   //id потока ататы
	ATATAPOS    = 0     //стартовая позиция в потоке ататы
	IMGDIR      = "img" //директория для сохранения картинок
)

//Назначим флаги командной строки:
func init() {
	flag.IntVar(&WORKERS, "w", WORKERS, "количество рабочих")
	flag.IntVar(&ATATASTREAM, "s", ATATASTREAM, "id потока ататы")
	flag.IntVar(&ATATAPOS, "p", ATATAPOS, "стартовая позиция")
	flag.StringVar(&IMGDIR, "d", IMGDIR, "директория для картинок")
}

В-третьих приведем в порядок секцию импорта добавив туда все использованные пакеты. И еще, заодно, добавим os/signal, с помощью которого будем ловить ctrl-c в основном цикле:

import (
	"io"
	"os"
	"os/signal"
	"sync"
	
	"flag"
	"fmt"
	"container/heap"
	"strconv"
	"strings"
		
	"github.com/opesun/goquery"
	"net/http"
)

Теперь все готово к финальному аккорду!

Основная функция

func main() {
	//разберем флаги
	flag.Parse()
	//создадим директорию для загрузки, если её еще нет
	if err := os.MkdirAll(IMGDIR, 666); err != nil {
		panic(err)
	}

	//Подготовим каналы и балансировщик
	links := make(chan string)
	quit := make(chan bool)
	b := new(Balancer)
	b.init(links)

	//Приготовимся перехватывать сигнал останова в канал keys
	keys := make(chan os.Signal, 1)
	signal.Notify(keys, os.Interrupt)

	//Запускаем балансировщик и генератор
	go b.balance(quit)
	go generator(links, ATATASTREAM, ATATAPOS)

	fmt.Println("Начинаем загрузку изображений...")
	//Основной цикл программы:
	for {
		select {
		case <-keys: //пришла информация от нотификатора сигналов:
			fmt.Println("CTRL-C: Ожидаю завершения активных загрузок")
			quit <- true //посылаем сигнал останова балансировщику

		case <-quit: //пришло подтверждение о завершении от балансировщика
			fmt.Println("Загрузки завершены!")
			return
		}
	}
}

Запускаем… (для пробного запуска взял первый попавшийся поток покороче)

Знакомство с Go, часть 2: пишем граббер изображений с балансировщиком и извращениями

… и видим что все работает. Очередное задание штаба выполнено, и извращенно-то как! Все как было заказано.

Осталось только вставить time.Sleep в воркер потому что я уверен, что сервер ататы не будет спокойно смотреть как его мучают выливая гигабайты картинок и начнет огрызаться. А потом запустить работать на ночь, спасая с тонущей ататы потоки с обоями и «тупичок» с тысячами и тысячами постов. На досуге можно прикрутить к программе отчеты о процессе загрузки, продолжение с места окончания и прочие чисто технические и совсем не интересные для разбора в статье вещи.

Код примера целиком на pastbin.com

P.S. Если будете тестировать программу с параметрами по умолчанию, то она будет выкачивать поток #291 (Обои) а там как раз в начале некий «фотограф» набросал своих фото по 10 метров вместо обоев, так что или указываете точку старта подальше ( например "-p=200" ) или не удивляйтесь почему так долго и ползет одна фигня.

Для тех, кому хочется еще

vimeo.com/49718712 — Rob Pike — 'Concurrency Is Not Parallelism'
sites.google.com/site/gopatterns/ — Go Language Patterns
golangtutorials.blogspot.ru/2011/05/table-of-contents.html — GoLang Tutorials
talks.golang.org/ — интерактивные слайды к лекциям по Go

На этом я прощаюсь, удачи!

(И кстати — я понятия не имею откуда берутся цифры в начале поста — я их туда не пишу и на предпросмотре их нет… Хабр их сам вставляет?)

Автор: Dreadd

Источник

Поделиться

* - обязательные к заполнению поля