Нагрузочный тест на Go, версия 2

от автора

Никак не доходили руки переписать go-meter. Увеличить производительность, получить более полный контроль над процессом и довести до приближения к wrk. В идеале хочется увидеть легко и удобно расширяемую альтернативу. Да, в wrk недавно появилась поддержка Lua скриптов, которые решают многие неудобства, но и там тоже есть неприятные нюансы, например, расширенную статистику собирать не получится, так как методы вывода статистики работают только на первом потоке, и к собранным данным на других потоках доступа нет, поэтому сводится опять к тому, что-бы разбираться в исходниках и делать под себя, но это не тривиальная задача. И так, готовим нагрузочный тест на Go, c плюшками. Кому интересно, прошу под кат.

Что есть и что нужно

С начала разберемся что нам нужно:
— отправка GET/POST/PUT/DELETE запросов
— перебор URL, и POST body
— контроль над открытыми соединениями
— контроль над потоками
— указание продолжительности тестирования
— ограничения по максимальному количеству запросов в секунду
— возможность исключить несколько первых секунд из статистики, чтобы избежать искажения в момент прогрева HTTP сервера

План

— пул соединений
— простые Request/Response
— статистика
— profit

мысли вслух

Раз нужно контролировать соединения, стандартный http.Client нам не подходит (да и большой он для такой задачи), умеет слишком много из-за чего страдает производительность. Так как у нас подразумевается несколько потоков воркеров для отправки запросов, то нам нужен пул соединений, которые они будут между собой делить. Воркеру ждать ответа от сервера не имеет смысла, мы просто теряем на это драгоценное время. Как оценить проходящий трафик? Стандартные http.Request, http.Respose такой информации не дают, использовать их не получится, значит нужно реализовывать простой Request/Response, который нам все неоходимое даст. Собирать сырые данные и в конце их агрегировать не получится, так как память не резиновая. Собираем стату на лету.

Поехали

Пул соединений пишем на основе ограниченного канала. Выглядеть он будет как простой пул объектов, взяли объект из канала, поработали, положили обратно.

type Connection struct { 	conn    net.Conn 	manager *ConnectionManager }  type ConnectionManager struct { 	conns  chan *Connection 	config *Config }  func NewConnectionManager(config *Config) (result *ConnectionManager) {  	result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)} 	for i := 0; i < config.Connections; i++ { 		connection := &Connection{manager: result} 		if connection.Dial() != nil { 			ConnectionErrors++ 		} 		result.conns <- connection 	} 	return }  func (this *ConnectionManager) Get() *Connection { 	return <-this.conns }  func (this *Connection) Dial() error { 	if this.IsConnected() { 		this.Disconnect() 	} 	conn, err := net.Dial("tcp4", this.manager.config.Url.Host) 	if err == nil { 		this.conn = conn 	} 	return err }  func (this *Connection) Disconnect() { 	this.conn.Close() 	this.conn = nil }  func (this *Connection) IsConnected() bool { 	return this.conn != nil }  func (this *Connection) Return() { 	this.manager.conns <- this } 

Request/Response тут можно почитать исходники Go, посмотреть как реализовано там, и сделать упрощенную аналогию, главным отличием будет возможность получить объем трафика каждого запроса/ответа и сэкономить драгоценное время.

Request

type Request struct { 	Method string  	URL *url.URL  	Header map[string][]string  	Body          io.Reader 	ContentLength int64  	Host string  	BufferSize int64 }  func (req *Request) Write(w io.Writer) error {  	bw := &bytes.Buffer{}  	fmt.Fprintf(bw, "%s %s HTTP/1.1\r\n", valueOrDefault(req.Method, "GET"), req.URL.RequestURI()) 	fmt.Fprintf(bw, "Host: %s\r\n", req.Host)  	userAgent := "" 	if req.Header != nil { 		if ua := req.Header["User-Agent"]; len(ua) > 0 { 			userAgent = ua[0] 		} 	} 	if userAgent != "" { 		fmt.Fprintf(bw, "User-Agent: %s\r\n", userAgent) 	}  	if req.Method == "POST" || req.Method == "PUT" { 		fmt.Fprintf(bw, "Content-Length: %d\r\n", req.ContentLength) 	}  	if req.Header != nil { 		for key, values := range req.Header { 			if key == "User-Agent" || key == "Content-Length" || key == "Host" { 				continue 			} 			for _, value := range values { 				fmt.Fprintf(bw, "%s: %s\r\n", key, value) 			} 		} 	}  	io.WriteString(bw, "\r\n")  	if req.Method == "POST" || req.Method == "PUT" { 		bodyReader := bufio.NewReader(req.Body) 		_, err := bodyReader.WriteTo(bw) 		if err != nil { 			return err 		} 	} 	req.BufferSize = int64(bw.Len()) 	_, err := bw.WriteTo(w) 	return err } 

Response

type Response struct { 	Status     string 	StatusCode int  	Header map[string][]string  	ContentLength int64  	BufferSize int64 }  func ReadResponse(r *bufio.Reader) (*Response, error) { 	tp := textproto.NewReader(r) 	resp := &Response{}  	line, err := tp.ReadLine() 	if err != nil { 		return nil, err 	} 	f := strings.SplitN(line, " ", 3) 	resp.BufferSize += int64(len(f) + 2)  	if len(f) < 2 { 		return nil, errors.New("Response Header ERROR") 	}  	reasonPhrase := "" 	if len(f) > 2 { 		reasonPhrase = f[2] 	} 	resp.Status = f[1] + " " + reasonPhrase 	resp.StatusCode, err = strconv.Atoi(f[1]) 	if err != nil { 		return nil, errors.New("malformed HTTP status code") 	}  	resp.Header = make(map[string][]string) 	for { 		line, err := tp.ReadLine() 		if err != nil { 			return nil, errors.New("Response Header ERROR") 		} 		resp.BufferSize += int64(len(line) + 2) 		if len(line) == 0 { 			break 		} else { 			f := strings.SplitN(line, ":", 2) 			resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1])) 		} 	}  	if cl := resp.Header["Content-Length"]; len(cl) > 0 { 		i, err := strconv.ParseInt(cl[0], 10, 0) 		if err == nil { 			resp.ContentLength = i 		} 	}  	buff := make([]byte, resp.ContentLength) 	r.Read(buff) 	resp.BufferSize += int64(resp.ContentLength)  	return resp, nil } 

Для того что бы наши потоки выключились, когда время тестирования закончится, сделаем канал для завершения работы потоков и канал, по которому каждый поток будет сообщать, что он корректно завершил свою работу

WorkerQuit := make(chan bool, *_threads) WorkerQuited := make(chan bool, *_threads) 

засечем время, и также будем ждать Ctr+C(SIGTERM), чтобы наше приложение могло завершить тестирование в любой момент

//Start Ctr+C listen signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)  //Wait timers or SIGTERM select { case <-time.After(config.Duration): case <-signalChan: } for i := 0; i < config.Threads; i++ { 	config.WorkerQuit <- true } //Wait for threads complete for i := 0; i < config.Threads; i++ { 	<-config.WorkerQuited } 

Теперь посмотрим на сам воркер: для ограничения по количеству запросов в секунду возьмем для каждого его долю от общего количества, 4 раза в секунду будем увеличивать счетчик и ждать либо освободившиеся соединение либо завершение работы

func NewThread(config *Config) { 	timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond) 	allow := int32(config.MRQ / 4 / config.Threads) 	if config.MRQ == -1 { 		allow = 2147483647 	} else if allow <= 0 { 		allow = 1 	} 	var connectionErrors int32 = 0 	currentAllow := allow 	for { 		select { 		//По таймеру выставляем счетчик на количество разрешенных запросов 		case <-timerAllow.C: 			currentAllow = allow 		//Получаем свободное соединение 		case connection := <-config.ConnectionManager.conns: 			currentAllow-- 			//Если разрешенные запросы кончились - возвращаем соединение в пул 			if currentAllow < 0 { 				connection.Return() 			} else { 				//Формируем запрос 				req := getRequest(config.Method, config.Url, config.Source.GetNext()) 				//Если нужно переподключаться на каждом запросе 				if config.Reconnect && connection.IsConnected() { 					connection.Disconnect() 				} 				//Если соединение разорвано, то пробуем его восстановить 				if !connection.IsConnected() { 					if connection.Dial() != nil { 						connectionErrors++ 					} 				} 				//Отправляем запрос если есть соединение, иначе возвращаем соединение 				if connection.IsConnected() { 					go writeSocket(connection, req, config.RequestStats) 				} else { 					connection.Return() 				} 			} 		//Ждем завершения 		case <-config.WorkerQuit: 			//Записываем ошибки по соединениям 			atomic.AddInt32(&ConnectionErrors, connectionErrors) 			//Подтверждаем завершение 			config.WorkerQuited <- true 			return 		} 	} } 

Как только соединение освободится, формируем следующий запрос и запускаем асинхронно отправку его, так по кругу пока не кончится время. После того как запрос отправлен, а ответ прочитан, соединение возвращаем в пул, и поток снова подхватит его.

Отправка запроса

func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) { 	result := &RequestStats{} 	//По окончанию обязательно отправляем статус и отдаем соединение в пул 	defer func() { 		connection.Return() 		read <- result 	}()  	now := time.Now() 	conn := connection.conn 	bw := bufio.NewWriter(conn) 	//Пишем запрос 	err := req.Write(bw) 	if err != nil { 		result.WriteError = err 		return 	} 	err = bw.Flush() 	if err != nil { 		result.WriteError = err 		return 	} 	//Ждем ответа 	res, err := http.ReadResponse(bufio.NewReader(conn)) 	if err != nil { 		result.ReadError = err 		return 	} 	//Собираем нужную информацию 	result.Duration = time.Now().Sub(now) 	result.NetOut = req.BufferSize 	result.NetIn = res.BufferSize 	result.ResponseCode = res.StatusCode 	req.Body = nil } 

Осталось дело за малым, собрать статистику из объектов RequestStats и оформить ее

//Вся статистика type StatsSource struct { 	Readed          int64 	Writed          int64 	Requests        int 	Skiped          int 	Min             time.Duration 	Max             time.Duration 	Sum             int64 	Codes           map[int]int 	DurationPercent map[time.Duration]int 	ReadErrors      int 	WriteErrors     int 	Work            time.Duration }  //Статистика для посекундных отчетов type StatsSourcePerSecond struct { 	Readed   int64 	Writed   int64 	Requests int 	Skiped   int 	Sum      int64 }  //Агрегатор статистики func StartStatsAggregator(config *Config) { 	allowStore := true 	allowStoreTime := time.After(config.ExcludeSeconds) 	if config.ExcludeSeconds.Seconds() > 0 { 		allowStore = false 	}                 	verboseTimer := time.NewTicker(time.Duration(1) * time.Second) 	if config.Verbose { 		fmt.Printf("%s %s %s %s %s %s\n", 			newSpancesFormatRightf("Second", 10, "%s"), 			newSpancesFormatRightf("Total", 10, "%s"), 			newSpancesFormatRightf("Req/sec", 10, "%s"), 			newSpancesFormatRightf("Avg/sec", 10, "%s"), 			newSpancesFormatRightf("In/sec", 10, "%s"), 			newSpancesFormatRightf("Out/sec", 10, "%s"), 		) 	} else { 		verboseTimer.Stop() 	}  	source = StatsSource{ 		Codes:           make(map[int]int), 		DurationPercent: make(map[time.Duration]int), 	}  	perSecond := StatsSourcePerSecond{}  	start := time.Now() 	for { 		select { 		 //Таймер для посекундных отчетов 		case <-verboseTimer.C: 			if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose { 				//Считаем среднее время ответа 				avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped) 				avg := time.Duration(avgMilliseconds) * time.Millisecond 				//Пишем статистику 				fmt.Printf("%s %s %s %s %s %s\n", 					newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"), 					newSpancesFormatRightf(source.Requests, 10, "%d"), 					newSpancesFormatRightf(perSecond.Requests, 10, "%d"), 					newSpancesFormatRightf(avg, 10, "%v"), 					newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"), 					newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"), 				) 			} 			//Сбрасываем данные 			perSecond = StatsSourcePerSecond{} 		//Таймер для разрешения сбора статистики нужен для пропуска на старте 		case <-allowStoreTime: 			allowStore = true 		//Получаем ответ от сервера 		case res := <-config.RequestStats: 			//Если были ошибки - просто их записываем, остальная информация нам не интересна 			if res.ReadError != nil { 				source.ReadErrors++ 				continue 			} else if res.WriteError != nil { 				source.WriteErrors++ 				continue 			} 			//Инкрементируем счетчики 			source.Requests++ 			perSecond.Requests++ 			perSecond.Readed += res.NetIn 			perSecond.Writed += res.NetOut 			source.Readed += res.NetIn 			source.Writed += res.NetOut 			//Собираем статистику по запросам в разрезе HTTP кодов 			source.Codes[res.ResponseCode]++ 			if !allowStore { 				perSecond.Skiped++ 				source.Skiped++ 				continue 			} 			//Для среднего времени ответа 			sum := int64(res.Duration.Seconds() * 1000) 			source.Sum += sum 			perSecond.Sum += sum  			//Максимальное и минимальное время ответа 			if source.Min > res.Duration { 				source.Min = roundDuration(res.Duration) 			} 			if source.Max < res.Duration { 				source.Max = roundDuration(res.Duration) 			} 			//Количество запросов в разрезе времени ответа округленная до 10 миллисекунд 			duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10 			source.DurationPercent[duration]++ 		//Завершение сбора статистики 		case <-config.StatsQuit: 			//Записываем общее время теста 			source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond 			if config.Verbose { 				s := "" 				for { 					if len(s) >= 61 { 						break 					} 					s += "-" 				} 				fmt.Println(s) 			} 			//Подтверждаем завершение 			config.StatsQuit <- true 			return 		} 	} } 
Подводим итоги

Как парсить аргументы запуска и форматировать вывод статистики я опущу, так как это не интересно. А теперь давайте проверим, что у нас получилось. Для пробы натравим wrk на Node.js кластер

% ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html Running 30s test @ http://localhost:3001/index.html   7 threads and 21 connections   Thread Stats   Avg      Stdev     Max   +/- Stdev     Latency     1.09ms    6.55ms 152.07ms   99.63%     Req/Sec     5.20k     3.08k   14.33k    58.75%   Latency Distribution      50%  490.00us      75%    0.89ms      90%    1.83ms      99%    5.04ms   1031636 requests in 30.00s, 153.48MB read Requests/sec:  34388.25 Transfer/sec:      5.12MB 

и тоже самое на go с GOMAXPROCS=1

% ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html     Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html Stats:            Min       Avg       Max   Latency           0         0      83ms   843183 requests in 30s, net: in 103MB, out 62MB HTTP Codes:       200       100.00% Latency:                 0        99.99%      10ms - 80ms         0.01% Requests: 28106.10/sec Net In: 27MBit/sec Net Out: 17MBit/sec Transfer: 5.5MB/sec 

Получаем 28106 против 34388 запросов в секунду — это примерно на 20% меньше, по сравнению с чистым Cи + event loop + nio. Довольно неплохо, при изменении GOMAXPROCS разницы практически нет, так как большую часть процессорного времени отбирает Node.js.
Минусы:
— потеря 20% производительности, можно попробовать упростить Request/Response, может дать немного производительности
— еще нет поддержи HTTPS
— еще нельзя указать пользовательские HTTP заголовки и timeout

Все исходники тут — Github

Как пользоваться

% go get github.com/a696385/go-meter  % $GOPATH/bin/go-meter -h  

Спасибо за внимание!

ссылка на оригинал статьи http://habrahabr.ru/post/203328/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *