pg_stat_statements + pg_stat_activity + loq_query = pg_ash?

от автора

В качестве короткого дополнения к статье Попытка создать аналог ASH для PostgreSQL.

Задача.

Необходимо связать историю представлений pg_stat_statemenets, pg_stat_activity. В результате, используя историю планов выполнения из сервисной таблицы log_query, можно получить очень много полезной информации, для использования в процессе решения инцидентов производительности и оптимизации запросов.

Предупреждение.
В связи с продолжением тестирования и разработки, статья не может претендовать на описание готового промышленного решения.
Критика и замечания по реализации всячески приветствуются и ожидаются.

Входные данные.

Таблица history_pg_stat_activity

--ACTIVITY_HIST.HISTORY_PG_STAT_ACTIVITY DROP TABLE IF EXISTS activity_hist.history_pg_stat_activity; CREATE TABLE activity_hist.history_pg_stat_activity (   timepoint timestamp without time zone ,   datid             oid  ,    datname           name ,   pid               integer,   usesysid          oid    ,   usename           name   ,   application_name  text   ,   client_addr       inet   ,   client_hostname   text   ,   client_port       integer,   backend_start     timestamp without time zone ,   xact_start        timestamp without time zone ,   query_start       timestamp without time zone ,   state_change      timestamp without time zone ,   wait_event_type   text ,                        wait_event        text ,                      state             text ,                     backend_xid       xid  ,                    backend_xmin      xid  ,                   query             text ,                  backend_type      text ,   queryid           bigint );

Таблица pg_stat_db_queries

CREATE TABLE pg_stat_db_queries (   database_id integer ,   queryid bigint ,   query text ,   max_time double precision );(

Материализованное представление mvw_pg_stat_queries

CREATE MATERIALIZED VIEW public.mvw_pg_stat_queries AS  SELECT t.queryid,     t.max_time,     t.query    FROM public.dblink('LINK1'::text, 'SELECT queryid , max_time , query FROM pg_stat_statements WHERE dbid=(SELECT oid FROM  pg_database WHERE datname=current_database() ) AND max_time >= 0 '::text) t(queryid bigint, max_time double precision, query text)   WITH NO DATA; 

Таблица log_query

CREATE TABLE log_query (   id  integer ,   queryid  bigint ,   query_md5hash  text ,   database_id  integer ,   timepoint  timestamp without time zone ,    query  text ,   explained_plan  text[] ,    plan_md5hash  text ,   explained_plan_wo_costs  text[] ,    plan_hash_value  text ,   ip  text,   port  text ,    pid  integer  );

Общий алгоритм

Обновить таблицу pg_stat_db_queries

Обновить материальное представление mvw_pg_stat_queries

CREATE OR REPLACE FUNCTION refresh_pg_stat_queries_list( database_id int) RETURNS BOOLEAN AS $$ DECLARE  result BOOLEAN ;  database_rec record ;   BEGIN      SELECT *   INTO database_rec   FROM endpoint e JOIN database d ON e.id = d.endpoint_id    WHERE d.id = database_id  ;        IF NOT database_rec.is_need_monitoring THEN RAISE NOTICE 'NO NEED MONITORING FOR database_id=%',database_id; return TRUE ; END IF ;      EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' port=5432 dbname='||database_rec.name|| 		                                         ' user='||database_rec.s_name||' password='||database_rec.s_pass|| ' '')';       REFRESH MATERIALIZED VIEW mvw_pg_stat_queries ;      PERFORM dblink_disconnect('LINK1');      RETURN result; END $$ LANGUAGE plpgsql;

Заполнить таблицу pg_stat_db_queries

CREATE OR REPLACE FUNCTION refresh_pg_stat_db_queries( ) RETURNS BOOLEAN AS $$ DECLARE  result BOOLEAN ;  database_rec record ;    pg_stat_rec record ; BEGIN    TRUNCATE pg_stat_db_queries;         FOR database_rec IN   SELECT *   FROM database d    LOOP        IF NOT database_rec.is_need_monitoring THEN RAISE NOTICE 'NO NEED MONITORING FOR database_id=%',database_rec.id; CONTINUE ; END IF ;         PERFORM refresh_pg_stat_queries_list( database_rec.id ) ;  	 	FOR pg_stat_rec IN 	SELECT *  	FROM mvw_pg_stat_queries  	LOOP 	  INSERT INTO pg_stat_db_queries 	  ( database_id , queryid , query , max_time ) 	  VALUES 	  ( database_rec.id , pg_stat_rec.queryid , pg_stat_rec.query , pg_stat_rec.max_time); 	END LOOP;        END LOOP;     RETURN TRUE; END $$ LANGUAGE plpgsql;

В результате таблица содержит нормализованные тексты запросов, queryid, максимальное время выполнения запроса на текущий момент (используется для мониторинга).

Заполнение log_query и формирование истории планов выполения.

Актуальный текст запроса берется из log-файла. Log-файл с целевого хоста на хост мониторинга по частям, bash скриптом, по cron. Для экономии места и в связи с тривиальностью задачи копирования куска текстового файла с хоста на хост скрипт не приводится.

Парсинг log-файла и выделение текста запроса

#!/bin/bash ######################################################### # upload_log_query.sh # Upload table table from dowloaded aws file  # version 12.0 ###########################################################   echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '  source_file=$1 echo 'source_file='$source_file  database_id=$2 echo 'database_id='$database_id  database_name=$3 echo 'database_name='$database_name   beginer=' ' first_line='1' let "line_count=0" sql_line=' ' sql_flag=' '     space=' ' cat $source_file | while read line do   #first line will be passed   if [[ $line_count == '0' ]]; then      let "line_count++"      continue    fi      line="$space$line"   #echo 'line='$line    if [[ $first_line == "1" ]]; then     beginer=`echo $line | awk -F" " '{ print $1}' `     first_line='0'   fi    current_beginer=`echo $line | awk -F" " '{ print $1}' `   #echo 'current_beginer='$current_beginer   #echo 'beginer='$beginer       if [[ $current_beginer == $beginer ]]; then    if [[ $sql_flag == '1' ]]; then      sql_flag='0'       #echo 'TIMESTAMP:'$(date +%c)' Upload log_query table : SQL STATEMENT ='"$sql_line"       log_date=`echo $sql_line | awk -F" " '{ print $1}' `      #echo 'log_date='$log_date       log_time=`echo $sql_line | awk -F" " '{ print $2}' `      #echo 'log_time='$log_time            duration=`echo $sql_line | awk -F" " '{ print $5}' `      #echo 'duration='$duration  	 connect=`echo $sql_line | awk -F" " '{ print $3}' ` 	 userdb=`echo $connect | awk -F":" '{ print $3}' ` 	 userdb2=$userdb'@' 	 db_port_log=`echo $connect | awk -F"@" '{ print $2}' ` 	 log_database_name=`echo $db_port_log | awk -F":" '{ print $1}' ` 	  	  	 #echo 'connect='$connect 	 #echo 'userdb='$userdb 	 #echo 'userdb2='$userdb2 	 #echo 'db_port_log='$db_port_log 	 #echo 'log_database_name='$log_database_name 	  	 if [[  "$log_database_name" != "$database_name" ]]; 	 then 	   echo '*** database_name '$log_database_name' from log is not equal '$database_name' CONTINUE ' 	   continue; 	 fi 	       #replace ' to ''      sql_modline=`echo "$sql_line" | sed 's/'\''/'\'''\''/g'`      sql_line=' ' 	 #echo '*********************************log_query start'      #echo 'pid_str='$pid_str  	 #echo 'ip_port='$ip_port  	 #echo 'database_id='$database_id  	 #echo 'log_date='$log_date 	 #echo 'log_time='$log_time 	 #echo 'duration='$duration 	 #echo 'sql_modline='$sql_modline      if ! psql -U monitor -d monitor -v ON_ERROR_STOP=1 -A -t -q -c "select log_query( '$pid_str' , '$ip_port' , $database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )"      then         echo 'FATAL_ERROR - log_query '         exit 1      fi 	 #echo '**********************************log_query finish'      fi #if [[ $sql_flag == '1' ]]; then      let "line_count=line_count+1"     #echo 'line_count= '$line_count     #echo $line      #check=`echo $line | awk -F" " '{ print $8}' `     #check_sql=${check^^}          #echo 'check_sql='$check_sql      	if [[ ${line^^} =~ "SELECT" ]];  	then  	 if [[ $line =~ "duration:" ]]; 	 then 	    test_statement=`echo $line | awk -F" " '{ print $8}'`  		is_select=${test_statement^^}		 		 		#echo 'test_statement='$test_statement 		#echo 'is_select='$is_select 		         if [[ $is_select == 'SELECT' ]];          then		 		  sql_flag='1'     		  sql_line="$sql_line$line" 		  ip_port=`echo $sql_line | awk -F":" '{ print $4}' ` 		  pid_str=`echo $sql_line | awk -F":" '{ print $6}' ` 		fi 	 fi	      fi   else            #echo $line     #echo 'sql_flag ='$sql_flag      if [[ $sql_flag == '1' ]]; then       sql_line="$sql_line$line"     fi           fi #if [[ $current_beginer == $beginer ]]; then  done

Заполнение таблицы log_query

--log_query.sql --insert new query into log_query table CREATE OR REPLACE FUNCTION log_query( pid_str text , ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text   ) RETURNS boolean AS $$ DECLARE   result boolean ;   log_timepoint timestamp without time zone ;   log_duration double precision ;    pos integer ;   log_query text ;   activity_string text ;   log_md5hash text ;   log_explain_plan text[] ;      log_planhash text ;   log_plan_wo_costs text[] ;       database_rec record ;      pg_stat_query text ;    test_log_query text ;   metric_rec record;   log_query_rec record;   found_flag boolean;      pg_stat_history_rec record ;   port_start integer ;   port_end integer ;   client_ip text ;   client_port text ;   log_queryid bigint ;   log_query_text text ;   pg_stat_query_text text ;    current_pid_str text ;   current_pid integer;   pid_start_pos integer ;   pid_finish_pos integer ;    BEGIN   result = TRUE ;          IF ip_port != '[local]' THEN     port_start = position('(' in ip_port);     port_end = position(')' in ip_port);     client_ip = substring( ip_port from 1 for port_start-1 );     client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );   ELSE      client_ip = 'local'; 	client_port = 'local';   END IF;       pid_start_pos = position('[' in pid_str);   pid_finish_pos = position(']' in pid_str);   current_pid_str=substring( pid_str from 2 for pid_finish_pos - pid_start_pos -1 );   current_pid = to_number(current_pid_str , '999999999999');      SELECT e.host , d.name , d.owner_pwd , d.owner_user   INTO database_rec   FROM database d JOIN endpoint e ON e.id = d.endpoint_id   WHERE d.id = log_database_id ;      log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');   log_duration = duration::double precision ;         pos = position ('SELECT' in UPPER(sql_line) );   log_query = substring( sql_line from pos for LENGTH(sql_line));   log_query = regexp_replace(log_query,' +',' ','g');   log_query = regexp_replace(log_query,';+','','g');   log_query = trim(trailing ' ' from log_query);      log_md5hash = md5( log_query::text );      --Explain execution plan-- EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' port=5432 dbname='||database_rec.name||' user='||database_rec.owner_user||' password='||database_rec.owner_pwd||' '')';       log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );   log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );      PERFORM dblink_disconnect('LINK1');   --------------------------   BEGIN 	INSERT INTO log_query 	( 		query_md5hash , 		database_id ,  		timepoint , 		duration , 		query , 		explained_plan , 		plan_md5hash ,  		explained_plan_wo_costs ,  		plan_hash_value ,  		ip ,  		port , 		pid 	)  	VALUES  	( 		log_md5hash , 		log_database_id ,  		log_timepoint ,  		log_duration ,  		log_query , 		log_explain_plan ,  		md5(log_explain_plan::text) , 		log_plan_wo_costs ,  		md5(log_plan_wo_costs::text), 		client_ip ,  		client_port ,  		current_pid 		 	); 	activity_string = 	'New query has logged '|| 						' database_id = '|| log_database_id || 						' query_md5hash='||log_md5hash|| 						' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS'); 	PERFORM pg_log( log_database_id , 'log_query' , activity_string);    	EXCEPTION 	  WHEN unique_violation THEN 		activity_string = 	'EXCEPTION *** query already has logged '|| 							' database_id = '|| log_database_id || 							' query_md5hash='||log_md5hash|| 							' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');					          PERFORM pg_log( log_database_id , 'log_query' , activity_string); 	END;  	SELECT 	queryid 	INTO   	log_queryid 	FROM 	log_query  	WHERE 	query_md5hash = log_md5hash AND 			timepoint = log_timepoint;  	IF log_queryid IS NOT NULL  	THEN  	  RETURN result; 	END IF; 	 	------------------------------------------------ 	SELECT *  	INTO log_query_rec 	FROM log_query 	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;  	 	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');  	FOR pg_stat_history_rec IN 	 SELECT              queryid ,   	    query  	 FROM         pg_stat_db_queries       WHERE   	   database_id = log_database_id AND        queryid is not null  	LOOP 	  pg_stat_query = pg_stat_history_rec.query ;  	  pg_stat_query=regexp_replace(pg_stat_query,'\n+',' ','g'); 	  pg_stat_query=regexp_replace(pg_stat_query,'\t+',' ','g'); 	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g'); 	  pg_stat_query=regexp_replace(pg_stat_query,'\$.','%','g'); 	 	  log_query_text = trim(trailing ' ' from log_query_rec.query); 	  pg_stat_query_text = pg_stat_query;    	   	  IF (log_query_text LIKE pg_stat_query_text) THEN 		found_flag = TRUE ; 	  ELSE 		found_flag = FALSE ; 	  END IF;	    	  IF found_flag THEN 	     		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid|| 		                    ' for log_query with id = '||log_query_rec.id;					 		EXIT ; 	  END IF ;	   	END LOOP ;	   RETURN result ; END $$ LANGUAGE plpgsql;

В результате таблица содержит актуальный текст запроса, планы выполнения, хэш-значение плана выполнения, хэш-значение текста запроса.

Заполнить значение queryid в таблице history_pg_stat_activity

update_history_pg_stat_activity_by_queryid.sql

--update_history_pg_stat_activity_by_queryid.sql CREATE OR REPLACE FUNCTION update_history_pg_stat_activity_by_queryid() RETURNS boolean AS $$ DECLARE   result boolean ;   history_pg_stat_activity_rec record ;    pg_stat_query text ;   pg_stat_query_text text ;   pg_stat_history_rec record;   found_flag boolean;   history_pg_stat_activity_query text ;    query_text text ;   activity_string text ;     BEGIN   RAISE NOTICE '***update_history_pg_stat_activity_by_queryid';      result = TRUE ;      FOR history_pg_stat_activity_rec IN    SELECT DISTINCT(query) AS query   FROM activity_hist.history_pg_stat_activity   WHERE queryid IS NULL   LOOP 		history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_rec.query,'\n+',' ','g'); 		history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_query,'\t+',' ','g'); 		history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_query,' +',' ','g'); 		history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_query,';','','g'); 		query_text = trim(trailing ' ' from history_pg_stat_activity_query); 		 		FOR pg_stat_history_rec IN 		SELECT  			queryid , 			query  		FROM  			--pg_stat_history 			pg_stat_db_queries 		WHERE   			queryid is not null  		GROUP BY queryid ,	query  		LOOP 			pg_stat_query = pg_stat_history_rec.query ;  			pg_stat_query=regexp_replace(pg_stat_query,'\n+',' ','g'); 			pg_stat_query=regexp_replace(pg_stat_query,'\t+',' ','g'); 			pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g'); 			pg_stat_query=regexp_replace(pg_stat_query,'\$.','%','g');	 			 			pg_stat_query_text = pg_stat_query;  	   			IF (query_text LIKE pg_stat_query_text) THEN 				found_flag = TRUE ; 			ELSE 				found_flag = FALSE ; 			END IF;	   			 			IF found_flag  			THEN 				UPDATE activity_hist.history_pg_stat_activity 				SET queryid = pg_stat_history_rec.queryid 				WHERE regexp_replace(regexp_replace(regexp_replace(regexp_replace(query,'\n+',' ','g'),'\t+',' ','g'),' +',' ','g'),';','','g')  				      LIKE query_text||'%' ; 		 				activity_string = 	'history_pg_stat_activity has updated by queryid = '||pg_stat_history_rec.queryid; 				RAISE NOTICE '%',activity_string;	 				 				PERFORM pg_log( 999 , 'update_history_pg_stat_activity_by_queryid' , activity_string);  				 				EXIT ; 				 			END IF ;	   		END LOOP ; 		 		IF NOT found_flag  		THEN 			activity_string = 'WARNING : Not FOUND queryid for the query : '||query_text ; 			 			RAISE NOTICE '%',activity_string;	 				 			PERFORM pg_log( 999 , 'update_history_pg_stat_activity_by_queryid' , activity_string);  		END IF ; 		 	RAISE NOTICE 'UPDATE log_query if query has not logged in log-file'; 		   END LOOP;      RETURN result ; END $$ LANGUAGE plpgsql;

В результате таблица содержит значение queryid соответствующее значению queryid запроса.

Итог.

Связав pg_stat_activity, pg_stat_statements, log_query, можно получить много полезной информации о запросе, в частности:
-История планов выполнения.
-История CPU-time запроса.
-История ожиданий запроса.
Данные и множество дополнительных отчетов, будут описаны в следующей статье.

Развитие.

Связав имеющуюся информацию с историей представления pg_locks можно получить информацию о том, какой конкретно блокировки ждал запрос и самое главное какой процесс(запрос) удерживал эту блокировку.
Решение этой задачи будет описано в следующей статье. Сейчас идет тестирование и доработка.


ссылка на оригинал статьи https://habr.com/ru/post/467277/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *