Нагрузочный тест на Go, версия 2

в 21:28, , рубрики: Веб-разработка, нагрузочное тестирование, тестирование, метки:

Никак не доходили руки переписать go-meter. Увеличить производительность, получить более полный контроль над процессом и довести до приближения к wrk. В идеале хочется увидеть легко и удобно расширяемую альтернативу. Да, в wrk недавно появилась поддержка Lua скриптов, которые решают многие неудобства, но и там тоже есть неприятные нюансы, например, расширенную статистику собирать не получится, так как методы вывода статистики работают только на первом потоке, и к собранным данным на других потоках доступа нет, поэтому сводится опять к тому, что-бы разбираться в исходниках и делать под себя, но это не тривиальная задача. И так, готовим нагрузочный тест на Go, c плюшками. Кому интересно, прошу под кат.

Что есть и что нужно

С начала разберемся что нам нужно:
— отправка GET/POST/PUT/DELETE запросов
— перебор URL, и POST body
— контроль над открытыми соединениями
— контроль над потоками
— указание продолжительности тестирования
— ограничения по максимальному количеству запросов в секунду
— возможность исключить несколько первых секунд из статистики, чтобы избежать искажения в момент прогрева HTTP сервера

План

— пул соединений
— простые Request/Response
— статистика
— profit

мысли вслух

Раз нужно контролировать соединения, стандартный http.Client нам не подходит (да и большой он для такой задачи), умеет слишком много из-за чего страдает производительность. Так как у нас подразумевается несколько потоков воркеров для отправки запросов, то нам нужен пул соединений, которые они будут между собой делить. Воркеру ждать ответа от сервера не имеет смысла, мы просто теряем на это драгоценное время. Как оценить проходящий трафик? Стандартные http.Request, http.Respose такой информации не дают, использовать их не получится, значит нужно реализовывать простой Request/Response, который нам все неоходимое даст. Собирать сырые данные и в конце их агрегировать не получится, так как память не резиновая. Собираем стату на лету.

Поехали

Пул соединений пишем на основе ограниченного канала. Выглядеть он будет как простой пул объектов, взяли объект из канала, поработали, положили обратно.

type Connection struct {
	conn    net.Conn
	manager *ConnectionManager
}

type ConnectionManager struct {
	conns  chan *Connection
	config *Config
}

func NewConnectionManager(config *Config) (result *ConnectionManager) {

	result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)}
	for i := 0; i < config.Connections; i++ {
		connection := &Connection{manager: result}
		if connection.Dial() != nil {
			ConnectionErrors++
		}
		result.conns <- connection
	}
	return
}

func (this *ConnectionManager) Get() *Connection {
	return <-this.conns
}

func (this *Connection) Dial() error {
	if this.IsConnected() {
		this.Disconnect()
	}
	conn, err := net.Dial("tcp4", this.manager.config.Url.Host)
	if err == nil {
		this.conn = conn
	}
	return err
}

func (this *Connection) Disconnect() {
	this.conn.Close()
	this.conn = nil
}

func (this *Connection) IsConnected() bool {
	return this.conn != nil
}

func (this *Connection) Return() {
	this.manager.conns <- this
}

Request/Response тут можно почитать исходники Go, посмотреть как реализовано там, и сделать упрощенную аналогию, главным отличием будет возможность получить объем трафика каждого запроса/ответа и сэкономить драгоценное время.

Request

type Request struct {
	Method string

	URL *url.URL

	Header map[string][]string

	Body          io.Reader
	ContentLength int64

	Host string

	BufferSize int64
}

func (req *Request) Write(w io.Writer) error {

	bw := &bytes.Buffer{}

	fmt.Fprintf(bw, "%s %s HTTP/1.1rn", valueOrDefault(req.Method, "GET"), req.URL.RequestURI())
	fmt.Fprintf(bw, "Host: %srn", req.Host)

	userAgent := ""
	if req.Header != nil {
		if ua := req.Header["User-Agent"]; len(ua) > 0 {
			userAgent = ua[0]
		}
	}
	if userAgent != "" {
		fmt.Fprintf(bw, "User-Agent: %srn", userAgent)
	}

	if req.Method == "POST" || req.Method == "PUT" {
		fmt.Fprintf(bw, "Content-Length: %drn", req.ContentLength)
	}

	if req.Header != nil {
		for key, values := range req.Header {
			if key == "User-Agent" || key == "Content-Length" || key == "Host" {
				continue
			}
			for _, value := range values {
				fmt.Fprintf(bw, "%s: %srn", key, value)
			}
		}
	}

	io.WriteString(bw, "rn")

	if req.Method == "POST" || req.Method == "PUT" {
		bodyReader := bufio.NewReader(req.Body)
		_, err := bodyReader.WriteTo(bw)
		if err != nil {
			return err
		}
	}
	req.BufferSize = int64(bw.Len())
	_, err := bw.WriteTo(w)
	return err
}

Response
type Response struct {
	Status     string
	StatusCode int

	Header map[string][]string

	ContentLength int64

	BufferSize int64
}

func ReadResponse(r *bufio.Reader) (*Response, error) {
	tp := textproto.NewReader(r)
	resp := &Response{}

	line, err := tp.ReadLine()
	if err != nil {
		return nil, err
	}
	f := strings.SplitN(line, " ", 3)
	resp.BufferSize += int64(len(f) + 2)

	if len(f) < 2 {
		return nil, errors.New("Response Header ERROR")
	}

	reasonPhrase := ""
	if len(f) > 2 {
		reasonPhrase = f[2]
	}
	resp.Status = f[1] + " " + reasonPhrase
	resp.StatusCode, err = strconv.Atoi(f[1])
	if err != nil {
		return nil, errors.New("malformed HTTP status code")
	}

	resp.Header = make(map[string][]string)
	for {
		line, err := tp.ReadLine()
		if err != nil {
			return nil, errors.New("Response Header ERROR")
		}
		resp.BufferSize += int64(len(line) + 2)
		if len(line) == 0 {
			break
		} else {
			f := strings.SplitN(line, ":", 2)
			resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1]))
		}
	}

	if cl := resp.Header["Content-Length"]; len(cl) > 0 {
		i, err := strconv.ParseInt(cl[0], 10, 0)
		if err == nil {
			resp.ContentLength = i
		}
	}

	buff := make([]byte, resp.ContentLength)
	r.Read(buff)
	resp.BufferSize += int64(resp.ContentLength)

	return resp, nil
}

Для того что бы наши потоки выключились, когда время тестирования закончится, сделаем канал для завершения работы потоков и канал, по которому каждый поток будет сообщать, что он корректно завершил свою работу

WorkerQuit := make(chan bool, *_threads)
WorkerQuited := make(chan bool, *_threads)

засечем время, и также будем ждать Ctr+C(SIGTERM), чтобы наше приложение могло завершить тестирование в любой момент

//Start Ctr+C listen
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

//Wait timers or SIGTERM
select {
case <-time.After(config.Duration):
case <-signalChan:
}
for i := 0; i < config.Threads; i++ {
	config.WorkerQuit <- true
}
//Wait for threads complete
for i := 0; i < config.Threads; i++ {
	<-config.WorkerQuited
}

Теперь посмотрим на сам воркер: для ограничения по количеству запросов в секунду возьмем для каждого его долю от общего количества, 4 раза в секунду будем увеличивать счетчик и ждать либо освободившиеся соединение либо завершение работы

func NewThread(config *Config) {
	timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond)
	allow := int32(config.MRQ / 4 / config.Threads)
	if config.MRQ == -1 {
		allow = 2147483647
	} else if allow <= 0 {
		allow = 1
	}
	var connectionErrors int32 = 0
	currentAllow := allow
	for {
		select {
		//По таймеру выставляем счетчик на количество разрешенных запросов
		case <-timerAllow.C:
			currentAllow = allow
		//Получаем свободное соединение
		case connection := <-config.ConnectionManager.conns:
			currentAllow--
			//Если разрешенные запросы кончились - возвращаем соединение в пул
			if currentAllow < 0 {
				connection.Return()
			} else {
				//Формируем запрос
				req := getRequest(config.Method, config.Url, config.Source.GetNext())
				//Если нужно переподключаться на каждом запросе
				if config.Reconnect && connection.IsConnected() {
					connection.Disconnect()
				}
				//Если соединение разорвано, то пробуем его восстановить
				if !connection.IsConnected() {
					if connection.Dial() != nil {
						connectionErrors++
					}
				}
				//Отправляем запрос если есть соединение, иначе возвращаем соединение
				if connection.IsConnected() {
					go writeSocket(connection, req, config.RequestStats)
				} else {
					connection.Return()
				}
			}
		//Ждем завершения
		case <-config.WorkerQuit:
			//Записываем ошибки по соединениям
			atomic.AddInt32(&ConnectionErrors, connectionErrors)
			//Подтверждаем завершение
			config.WorkerQuited <- true
			return
		}
	}
}

Как только соединение освободится, формируем следующий запрос и запускаем асинхронно отправку его, так по кругу пока не кончится время. После того как запрос отправлен, а ответ прочитан, соединение возвращаем в пул, и поток снова подхватит его.

Отправка запроса

func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) {
	result := &RequestStats{}
	//По окончанию обязательно отправляем статус и отдаем соединение в пул
	defer func() {
		connection.Return()
		read <- result
	}()

	now := time.Now()
	conn := connection.conn
	bw := bufio.NewWriter(conn)
	//Пишем запрос
	err := req.Write(bw)
	if err != nil {
		result.WriteError = err
		return
	}
	err = bw.Flush()
	if err != nil {
		result.WriteError = err
		return
	}
	//Ждем ответа
	res, err := http.ReadResponse(bufio.NewReader(conn))
	if err != nil {
		result.ReadError = err
		return
	}
	//Собираем нужную информацию
	result.Duration = time.Now().Sub(now)
	result.NetOut = req.BufferSize
	result.NetIn = res.BufferSize
	result.ResponseCode = res.StatusCode
	req.Body = nil
}

Осталось дело за малым, собрать статистику из объектов RequestStats и оформить ее

//Вся статистика
type StatsSource struct {
	Readed          int64
	Writed          int64
	Requests        int
	Skiped          int
	Min             time.Duration
	Max             time.Duration
	Sum             int64
	Codes           map[int]int
	DurationPercent map[time.Duration]int
	ReadErrors      int
	WriteErrors     int
	Work            time.Duration
}

//Статистика для посекундных отчетов
type StatsSourcePerSecond struct {
	Readed   int64
	Writed   int64
	Requests int
	Skiped   int
	Sum      int64
}

//Агрегатор статистики
func StartStatsAggregator(config *Config) {
	allowStore := true
	allowStoreTime := time.After(config.ExcludeSeconds)
	if config.ExcludeSeconds.Seconds() > 0 {
		allowStore = false
	}
       
       
	verboseTimer := time.NewTicker(time.Duration(1) * time.Second)
	if config.Verbose {
		fmt.Printf("%s %s %s %s %s %sn",
			newSpancesFormatRightf("Second", 10, "%s"),
			newSpancesFormatRightf("Total", 10, "%s"),
			newSpancesFormatRightf("Req/sec", 10, "%s"),
			newSpancesFormatRightf("Avg/sec", 10, "%s"),
			newSpancesFormatRightf("In/sec", 10, "%s"),
			newSpancesFormatRightf("Out/sec", 10, "%s"),
		)
	} else {
		verboseTimer.Stop()
	}

	source = StatsSource{
		Codes:           make(map[int]int),
		DurationPercent: make(map[time.Duration]int),
	}

	perSecond := StatsSourcePerSecond{}

	start := time.Now()
	for {
		select {
		 //Таймер для посекундных отчетов
		case <-verboseTimer.C:
			if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose {
				//Считаем среднее время ответа
				avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped)
				avg := time.Duration(avgMilliseconds) * time.Millisecond
				//Пишем статистику
				fmt.Printf("%s %s %s %s %s %sn",
					newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"),
					newSpancesFormatRightf(source.Requests, 10, "%d"),
					newSpancesFormatRightf(perSecond.Requests, 10, "%d"),
					newSpancesFormatRightf(avg, 10, "%v"),
					newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"),
					newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"),
				)
			}
			//Сбрасываем данные
			perSecond = StatsSourcePerSecond{}
		//Таймер для разрешения сбора статистики нужен для пропуска на старте
		case <-allowStoreTime:
			allowStore = true
		//Получаем ответ от сервера
		case res := <-config.RequestStats:
			//Если были ошибки - просто их записываем, остальная информация нам не интересна
			if res.ReadError != nil {
				source.ReadErrors++
				continue
			} else if res.WriteError != nil {
				source.WriteErrors++
				continue
			}
			//Инкрементируем счетчики
			source.Requests++
			perSecond.Requests++
			perSecond.Readed += res.NetIn
			perSecond.Writed += res.NetOut
			source.Readed += res.NetIn
			source.Writed += res.NetOut
			//Собираем статистику по запросам в разрезе HTTP кодов
			source.Codes[res.ResponseCode]++
			if !allowStore {
				perSecond.Skiped++
				source.Skiped++
				continue
			}
			//Для среднего времени ответа
			sum := int64(res.Duration.Seconds() * 1000)
			source.Sum += sum
			perSecond.Sum += sum

			//Максимальное и минимальное время ответа
			if source.Min > res.Duration {
				source.Min = roundDuration(res.Duration)
			}
			if source.Max < res.Duration {
				source.Max = roundDuration(res.Duration)
			}
			//Количество запросов в разрезе времени ответа округленная до 10 миллисекунд
			duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10
			source.DurationPercent[duration]++
		//Завершение сбора статистики
		case <-config.StatsQuit:
			//Записываем общее время теста
			source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond
			if config.Verbose {
				s := ""
				for {
					if len(s) >= 61 {
						break
					}
					s += "-"
				}
				fmt.Println(s)
			}
			//Подтверждаем завершение
			config.StatsQuit <- true
			return
		}
	}
}
Подводим итоги

Как парсить аргументы запуска и форматировать вывод статистики я опущу, так как это не интересно. А теперь давайте проверим, что у нас получилось. Для пробы натравим wrk на Node.js кластер

% ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html
Running 30s test @ http://localhost:3001/index.html
  7 threads and 21 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.09ms    6.55ms 152.07ms   99.63%
    Req/Sec     5.20k     3.08k   14.33k    58.75%
  Latency Distribution
     50%  490.00us
     75%    0.89ms
     90%    1.83ms
     99%    5.04ms
  1031636 requests in 30.00s, 153.48MB read
Requests/sec:  34388.25
Transfer/sec:      5.12MB

и тоже самое на go с GOMAXPROCS=1

% ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html    
Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html
Stats:            Min       Avg       Max
  Latency           0         0      83ms
  843183 requests in 30s, net: in 103MB, out 62MB
HTTP Codes: 
     200       100.00%
Latency: 
               0        99.99%
     10ms - 80ms         0.01%
Requests: 28106.10/sec
Net In: 27MBit/sec
Net Out: 17MBit/sec
Transfer: 5.5MB/sec

Получаем 28106 против 34388 запросов в секунду — это примерно на 20% меньше, по сравнению с чистым Cи + event loop + nio. Довольно неплохо, при изменении GOMAXPROCS разницы практически нет, так как большую часть процессорного времени отбирает Node.js.
Минусы:
— потеря 20% производительности, можно попробовать упростить Request/Response, может дать немного производительности
— еще нет поддержи HTTPS
— еще нельзя указать пользовательские HTTP заголовки и timeout

Все исходники тут — Github

Как пользоваться

% go get github.com/a696385/go-meter 
% $GOPATH/bin/go-meter -h 

Спасибо за внимание!

Автор: a696385

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js