Что есть и что нужно
С начала разберемся что нам нужно:
— отправка GET/POST/PUT/DELETE запросов
— перебор URL, и POST body
— контроль над открытыми соединениями
— контроль над потоками
— указание продолжительности тестирования
— ограничения по максимальному количеству запросов в секунду
— возможность исключить несколько первых секунд из статистики, чтобы избежать искажения в момент прогрева HTTP сервера
План
— пул соединений
— простые Request/Response
— статистика
— profit
Поехали
Пул соединений пишем на основе ограниченного канала. Выглядеть он будет как простой пул объектов, взяли объект из канала, поработали, положили обратно.
type Connection struct { conn net.Conn manager *ConnectionManager } type ConnectionManager struct { conns chan *Connection config *Config } func NewConnectionManager(config *Config) (result *ConnectionManager) { result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)} for i := 0; i < config.Connections; i++ { connection := &Connection{manager: result} if connection.Dial() != nil { ConnectionErrors++ } result.conns <- connection } return } func (this *ConnectionManager) Get() *Connection { return <-this.conns } func (this *Connection) Dial() error { if this.IsConnected() { this.Disconnect() } conn, err := net.Dial("tcp4", this.manager.config.Url.Host) if err == nil { this.conn = conn } return err } func (this *Connection) Disconnect() { this.conn.Close() this.conn = nil } func (this *Connection) IsConnected() bool { return this.conn != nil } func (this *Connection) Return() { this.manager.conns <- this }
Request/Response тут можно почитать исходники Go, посмотреть как реализовано там, и сделать упрощенную аналогию, главным отличием будет возможность получить объем трафика каждого запроса/ответа и сэкономить драгоценное время.
type Request struct { Method string URL *url.URL Header map[string][]string Body io.Reader ContentLength int64 Host string BufferSize int64 } func (req *Request) Write(w io.Writer) error { bw := &bytes.Buffer{} fmt.Fprintf(bw, "%s %s HTTP/1.1\r\n", valueOrDefault(req.Method, "GET"), req.URL.RequestURI()) fmt.Fprintf(bw, "Host: %s\r\n", req.Host) userAgent := "" if req.Header != nil { if ua := req.Header["User-Agent"]; len(ua) > 0 { userAgent = ua[0] } } if userAgent != "" { fmt.Fprintf(bw, "User-Agent: %s\r\n", userAgent) } if req.Method == "POST" || req.Method == "PUT" { fmt.Fprintf(bw, "Content-Length: %d\r\n", req.ContentLength) } if req.Header != nil { for key, values := range req.Header { if key == "User-Agent" || key == "Content-Length" || key == "Host" { continue } for _, value := range values { fmt.Fprintf(bw, "%s: %s\r\n", key, value) } } } io.WriteString(bw, "\r\n") if req.Method == "POST" || req.Method == "PUT" { bodyReader := bufio.NewReader(req.Body) _, err := bodyReader.WriteTo(bw) if err != nil { return err } } req.BufferSize = int64(bw.Len()) _, err := bw.WriteTo(w) return err }
type Response struct { Status string StatusCode int Header map[string][]string ContentLength int64 BufferSize int64 } func ReadResponse(r *bufio.Reader) (*Response, error) { tp := textproto.NewReader(r) resp := &Response{} line, err := tp.ReadLine() if err != nil { return nil, err } f := strings.SplitN(line, " ", 3) resp.BufferSize += int64(len(f) + 2) if len(f) < 2 { return nil, errors.New("Response Header ERROR") } reasonPhrase := "" if len(f) > 2 { reasonPhrase = f[2] } resp.Status = f[1] + " " + reasonPhrase resp.StatusCode, err = strconv.Atoi(f[1]) if err != nil { return nil, errors.New("malformed HTTP status code") } resp.Header = make(map[string][]string) for { line, err := tp.ReadLine() if err != nil { return nil, errors.New("Response Header ERROR") } resp.BufferSize += int64(len(line) + 2) if len(line) == 0 { break } else { f := strings.SplitN(line, ":", 2) resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1])) } } if cl := resp.Header["Content-Length"]; len(cl) > 0 { i, err := strconv.ParseInt(cl[0], 10, 0) if err == nil { resp.ContentLength = i } } buff := make([]byte, resp.ContentLength) r.Read(buff) resp.BufferSize += int64(resp.ContentLength) return resp, nil }
Для того что бы наши потоки выключились, когда время тестирования закончится, сделаем канал для завершения работы потоков и канал, по которому каждый поток будет сообщать, что он корректно завершил свою работу
WorkerQuit := make(chan bool, *_threads) WorkerQuited := make(chan bool, *_threads)
засечем время, и также будем ждать Ctr+C(SIGTERM), чтобы наше приложение могло завершить тестирование в любой момент
//Start Ctr+C listen signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) //Wait timers or SIGTERM select { case <-time.After(config.Duration): case <-signalChan: } for i := 0; i < config.Threads; i++ { config.WorkerQuit <- true } //Wait for threads complete for i := 0; i < config.Threads; i++ { <-config.WorkerQuited }
Теперь посмотрим на сам воркер: для ограничения по количеству запросов в секунду возьмем для каждого его долю от общего количества, 4 раза в секунду будем увеличивать счетчик и ждать либо освободившиеся соединение либо завершение работы
func NewThread(config *Config) { timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond) allow := int32(config.MRQ / 4 / config.Threads) if config.MRQ == -1 { allow = 2147483647 } else if allow <= 0 { allow = 1 } var connectionErrors int32 = 0 currentAllow := allow for { select { //По таймеру выставляем счетчик на количество разрешенных запросов case <-timerAllow.C: currentAllow = allow //Получаем свободное соединение case connection := <-config.ConnectionManager.conns: currentAllow-- //Если разрешенные запросы кончились - возвращаем соединение в пул if currentAllow < 0 { connection.Return() } else { //Формируем запрос req := getRequest(config.Method, config.Url, config.Source.GetNext()) //Если нужно переподключаться на каждом запросе if config.Reconnect && connection.IsConnected() { connection.Disconnect() } //Если соединение разорвано, то пробуем его восстановить if !connection.IsConnected() { if connection.Dial() != nil { connectionErrors++ } } //Отправляем запрос если есть соединение, иначе возвращаем соединение if connection.IsConnected() { go writeSocket(connection, req, config.RequestStats) } else { connection.Return() } } //Ждем завершения case <-config.WorkerQuit: //Записываем ошибки по соединениям atomic.AddInt32(&ConnectionErrors, connectionErrors) //Подтверждаем завершение config.WorkerQuited <- true return } } }
Как только соединение освободится, формируем следующий запрос и запускаем асинхронно отправку его, так по кругу пока не кончится время. После того как запрос отправлен, а ответ прочитан, соединение возвращаем в пул, и поток снова подхватит его.
func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) { result := &RequestStats{} //По окончанию обязательно отправляем статус и отдаем соединение в пул defer func() { connection.Return() read <- result }() now := time.Now() conn := connection.conn bw := bufio.NewWriter(conn) //Пишем запрос err := req.Write(bw) if err != nil { result.WriteError = err return } err = bw.Flush() if err != nil { result.WriteError = err return } //Ждем ответа res, err := http.ReadResponse(bufio.NewReader(conn)) if err != nil { result.ReadError = err return } //Собираем нужную информацию result.Duration = time.Now().Sub(now) result.NetOut = req.BufferSize result.NetIn = res.BufferSize result.ResponseCode = res.StatusCode req.Body = nil }
Осталось дело за малым, собрать статистику из объектов RequestStats и оформить ее
//Вся статистика type StatsSource struct { Readed int64 Writed int64 Requests int Skiped int Min time.Duration Max time.Duration Sum int64 Codes map[int]int DurationPercent map[time.Duration]int ReadErrors int WriteErrors int Work time.Duration } //Статистика для посекундных отчетов type StatsSourcePerSecond struct { Readed int64 Writed int64 Requests int Skiped int Sum int64 } //Агрегатор статистики func StartStatsAggregator(config *Config) { allowStore := true allowStoreTime := time.After(config.ExcludeSeconds) if config.ExcludeSeconds.Seconds() > 0 { allowStore = false } verboseTimer := time.NewTicker(time.Duration(1) * time.Second) if config.Verbose { fmt.Printf("%s %s %s %s %s %s\n", newSpancesFormatRightf("Second", 10, "%s"), newSpancesFormatRightf("Total", 10, "%s"), newSpancesFormatRightf("Req/sec", 10, "%s"), newSpancesFormatRightf("Avg/sec", 10, "%s"), newSpancesFormatRightf("In/sec", 10, "%s"), newSpancesFormatRightf("Out/sec", 10, "%s"), ) } else { verboseTimer.Stop() } source = StatsSource{ Codes: make(map[int]int), DurationPercent: make(map[time.Duration]int), } perSecond := StatsSourcePerSecond{} start := time.Now() for { select { //Таймер для посекундных отчетов case <-verboseTimer.C: if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose { //Считаем среднее время ответа avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped) avg := time.Duration(avgMilliseconds) * time.Millisecond //Пишем статистику fmt.Printf("%s %s %s %s %s %s\n", newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"), newSpancesFormatRightf(source.Requests, 10, "%d"), newSpancesFormatRightf(perSecond.Requests, 10, "%d"), newSpancesFormatRightf(avg, 10, "%v"), newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"), newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"), ) } //Сбрасываем данные perSecond = StatsSourcePerSecond{} //Таймер для разрешения сбора статистики нужен для пропуска на старте case <-allowStoreTime: allowStore = true //Получаем ответ от сервера case res := <-config.RequestStats: //Если были ошибки - просто их записываем, остальная информация нам не интересна if res.ReadError != nil { source.ReadErrors++ continue } else if res.WriteError != nil { source.WriteErrors++ continue } //Инкрементируем счетчики source.Requests++ perSecond.Requests++ perSecond.Readed += res.NetIn perSecond.Writed += res.NetOut source.Readed += res.NetIn source.Writed += res.NetOut //Собираем статистику по запросам в разрезе HTTP кодов source.Codes[res.ResponseCode]++ if !allowStore { perSecond.Skiped++ source.Skiped++ continue } //Для среднего времени ответа sum := int64(res.Duration.Seconds() * 1000) source.Sum += sum perSecond.Sum += sum //Максимальное и минимальное время ответа if source.Min > res.Duration { source.Min = roundDuration(res.Duration) } if source.Max < res.Duration { source.Max = roundDuration(res.Duration) } //Количество запросов в разрезе времени ответа округленная до 10 миллисекунд duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10 source.DurationPercent[duration]++ //Завершение сбора статистики case <-config.StatsQuit: //Записываем общее время теста source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond if config.Verbose { s := "" for { if len(s) >= 61 { break } s += "-" } fmt.Println(s) } //Подтверждаем завершение config.StatsQuit <- true return } } }
Подводим итоги
Как парсить аргументы запуска и форматировать вывод статистики я опущу, так как это не интересно. А теперь давайте проверим, что у нас получилось. Для пробы натравим wrk на Node.js кластер
% ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html Running 30s test @ http://localhost:3001/index.html 7 threads and 21 connections Thread Stats Avg Stdev Max +/- Stdev Latency 1.09ms 6.55ms 152.07ms 99.63% Req/Sec 5.20k 3.08k 14.33k 58.75% Latency Distribution 50% 490.00us 75% 0.89ms 90% 1.83ms 99% 5.04ms 1031636 requests in 30.00s, 153.48MB read Requests/sec: 34388.25 Transfer/sec: 5.12MB
и тоже самое на go с GOMAXPROCS=1
% ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html Stats: Min Avg Max Latency 0 0 83ms 843183 requests in 30s, net: in 103MB, out 62MB HTTP Codes: 200 100.00% Latency: 0 99.99% 10ms - 80ms 0.01% Requests: 28106.10/sec Net In: 27MBit/sec Net Out: 17MBit/sec Transfer: 5.5MB/sec
Получаем 28106 против 34388 запросов в секунду — это примерно на 20% меньше, по сравнению с чистым Cи + event loop + nio. Довольно неплохо, при изменении GOMAXPROCS разницы практически нет, так как большую часть процессорного времени отбирает Node.js.
Минусы:
— потеря 20% производительности, можно попробовать упростить Request/Response, может дать немного производительности
— еще нет поддержи HTTPS
— еще нельзя указать пользовательские HTTP заголовки и timeout
Все исходники тут — Github
Как пользоваться
% go get github.com/a696385/go-meter % $GOPATH/bin/go-meter -h
Спасибо за внимание!
ссылка на оригинал статьи http://habrahabr.ru/post/203328/
Добавить комментарий