Рецепты PostgreSQL: асинхронные уведомления в… реплике!?

от автора

Для приготовления асинхронных уведомлений listen/notify в реплике нам понадобится postgres. Как говорится в документации:

Транзакции, запущенные в режиме горячего резерва, никогда не получают ID транзакции и не могут быть записаны в журнал предзаписи. Поэтому при попытке выполнить следующие действия возникнут ошибки:

LISTEN, NOTIFY

Поэтому берём файл async.c файл из исходников, переименовываем в нём все публичные методы (не static-функции), удаляем связь с транзакциями и добавляем обработку сигнала SIGUSR1, чтобы получилось так:

src/backend/commands/async.c
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 5739d2b40f..9f62d4ca6b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -1,3 +1,5 @@ +#include <include.h> +  /*-------------------------------------------------------------------------   *   * async.c @@ -46,7 +48,7 @@   *	  to. In case there is a match it delivers the notification event to its   *	  frontend.  Non-matching events are simply skipped.   * - * 4. The NOTIFY statement (routine Async_Notify) stores the notification in + * 4. The NOTIFY statement (routine Async_Notify_My) stores the notification in   *	  a backend-local list which will not be processed until transaction end.   *   *	  Duplicate notifications from the same transaction are sent out as one @@ -56,7 +58,7 @@   *	  that has been sent, it can easily add some unique string into the extra   *	  payload parameter.   * - *	  When the transaction is ready to commit, PreCommit_Notify() adds the + *	  When the transaction is ready to commit, PreCommit_Notify_My() adds the   *	  pending notifications to the head of the queue. The head pointer of the   *	  queue always points to the next free position and a position is just a   *	  page number and the offset in that page. This is done before marking the @@ -67,7 +69,7 @@   *	  Once we have put all of the notifications into the queue, we return to   *	  CommitTransaction() which will then do the actual transaction commit.   * - *	  After commit we are called another time (AtCommit_Notify()). Here we + *	  After commit we are called another time (AtCommit_Notify_My()). Here we   *	  make the actual updates to the effective listen state (listenChannels).   *   *	  Finally, after we are out of the transaction altogether, we check if @@ -171,7 +173,7 @@ typedef struct AsyncQueueEntry  {  	int			length;			/* total allocated length of entry */  	Oid			dboid;			/* sender's database OID */ -	TransactionId xid;			/* sender's XID */ +//	TransactionId xid;			/* sender's XID */  	int32		srcPid;			/* sender's PID */  	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];  } AsyncQueueEntry; @@ -414,14 +416,16 @@ typedef struct NotificationHash    static NotificationList *pendingNotifies = NULL;   +static pqsigfunc pg_async_signal_original = NULL; +  /* - * Inbound notifications are initially processed by HandleNotifyInterrupt(), + * Inbound notifications are initially processed by HandleNotifyInterruptMy(),   * called from inside a signal handler. That just sets the   * notifyInterruptPending flag and sets the process - * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to + * latch. ProcessNotifyInterruptMy() will then be called whenever it's safe to   * actually deal with the interrupt.   */ -volatile sig_atomic_t notifyInterruptPending = false; +//volatile sig_atomic_t notifyInterruptPending = false;    /* True if we've registered an on_shmem_exit cleanup */  static bool unlistenExitRegistered = false; @@ -436,7 +440,7 @@ static bool backendHasSentNotifications = false;  static bool backendTryAdvanceTail = false;    /* GUC parameter */ -bool		Trace_notify = false; +//bool		Trace_notify = false;    /* local function prototypes */  static int	asyncQueuePageDiff(int p, int q); @@ -469,6 +473,12 @@ static uint32 notification_hash(const void *key, Size keysize);  static int	notification_match(const void *key1, const void *key2, Size keysize);  static void ClearPendingActionsAndNotifies(void);   +static void pg_async_signal(SIGNAL_ARGS) { +    HandleNotifyInterruptMy(); +    if (notifyInterruptPending) ProcessNotifyInterruptMy(); +    pg_async_signal_original(postgres_signal_arg); +} +  /*   * Compute the difference between two queue page numbers (i.e., p - q),   * accounting for wraparound. @@ -509,11 +519,11 @@ asyncQueuePagePrecedes(int p, int q)   * Report space needed for our shared memory area   */  Size -AsyncShmemSize(void) +AsyncShmemSizeMy(void)  {  	Size		size;   -	/* This had better match AsyncShmemInit */ +	/* This had better match AsyncShmemInitMy */  	size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));  	size = add_size(size, offsetof(AsyncQueueControl, backend));   @@ -526,7 +536,7 @@ AsyncShmemSize(void)   * Initialize our shared memory area   */  void -AsyncShmemInit(void) +AsyncShmemInitMy(void)  {  	bool		found;  	Size		size; @@ -585,7 +595,7 @@ AsyncShmemInit(void)   *	  SQL function to send a notification event   */  Datum -pg_notify(PG_FUNCTION_ARGS) +pg_notify_my(PG_FUNCTION_ARGS)  {  	const char *channel;  	const char *payload; @@ -601,16 +611,16 @@ pg_notify(PG_FUNCTION_ARGS)  		payload = text_to_cstring(PG_GETARG_TEXT_PP(1));    	/* For NOTIFY as a statement, this is checked in ProcessUtility */ -	PreventCommandDuringRecovery("NOTIFY"); +//	PreventCommandDuringRecovery("NOTIFY");   -	Async_Notify(channel, payload); +	Async_Notify_My(channel, payload);    	PG_RETURN_VOID();  }      /* - * Async_Notify + * Async_Notify_My   *   *		This is executed by the SQL notify command.   * @@ -619,7 +629,7 @@ pg_notify(PG_FUNCTION_ARGS)   *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^   */  void -Async_Notify(const char *channel, const char *payload) +Async_Notify_My(const char *channel, const char *payload)  {  	int			my_level = GetCurrentTransactionNestLevel();  	size_t		channel_len; @@ -631,7 +641,7 @@ Async_Notify(const char *channel, const char *payload)  		elog(ERROR, "cannot send notifications from a parallel worker");    	if (Trace_notify) -		elog(DEBUG1, "Async_Notify(%s)", channel); +		elog(DEBUG1, "Async_Notify_My(%s)", channel);    	channel_len = channel ? strlen(channel) : 0;  	payload_len = payload ? strlen(payload) : 0; @@ -679,7 +689,7 @@ Async_Notify(const char *channel, const char *payload)  		/*  		 * First notify event in current (sub)xact. Note that we allocate the  		 * NotificationList in TopTransactionContext; the nestingLevel might -		 * get changed later by AtSubCommit_Notify. +		 * get changed later by AtSubCommit_Notify_My.  		 */  		notifies = (NotificationList *)  			MemoryContextAlloc(TopTransactionContext, @@ -725,7 +735,7 @@ queue_listen(ListenActionKind action, const char *channel)  	int			my_level = GetCurrentTransactionNestLevel();    	/* -	 * Unlike Async_Notify, we don't try to collapse out duplicates. It would +	 * Unlike Async_Notify_My, we don't try to collapse out duplicates. It would  	 * be too complicated to ensure we get the right interactions of  	 * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there  	 * would be any performance benefit anyway in sane applications. @@ -745,7 +755,7 @@ queue_listen(ListenActionKind action, const char *channel)  		/*  		 * First action in current sub(xact). Note that we allocate the  		 * ActionList in TopTransactionContext; the nestingLevel might get -		 * changed later by AtSubCommit_Notify. +		 * changed later by AtSubCommit_Notify_My.  		 */  		actions = (ActionList *)  			MemoryContextAlloc(TopTransactionContext, sizeof(ActionList)); @@ -761,29 +771,29 @@ queue_listen(ListenActionKind action, const char *channel)  }    /* - * Async_Listen + * Async_Listen_My   *   *		This is executed by the SQL listen command.   */  void -Async_Listen(const char *channel) +Async_Listen_My(const char *channel)  {  	if (Trace_notify) -		elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); +		elog(DEBUG1, "Async_Listen_My(%s,%d)", channel, MyProcPid);    	queue_listen(LISTEN_LISTEN, channel);  }    /* - * Async_Unlisten + * Async_Unlisten_My   *   *		This is executed by the SQL unlisten command.   */  void -Async_Unlisten(const char *channel) +Async_Unlisten_My(const char *channel)  {  	if (Trace_notify) -		elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); +		elog(DEBUG1, "Async_Unlisten_My(%s,%d)", channel, MyProcPid);    	/* If we couldn't possibly be listening, no need to queue anything */  	if (pendingActions == NULL && !unlistenExitRegistered) @@ -793,15 +803,15 @@ Async_Unlisten(const char *channel)  }    /* - * Async_UnlistenAll + * Async_UnlistenAll_My   *   *		This is invoked by UNLISTEN * command, and also at backend exit.   */  void -Async_UnlistenAll(void) +Async_UnlistenAll_My(void)  {  	if (Trace_notify) -		elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid); +		elog(DEBUG1, "Async_UnlistenAll_My(%d)", MyProcPid);    	/* If we couldn't possibly be listening, no need to queue anything */  	if (pendingActions == NULL && !unlistenExitRegistered) @@ -818,7 +828,7 @@ Async_UnlistenAll(void)   * change within a transaction.   */  Datum -pg_listening_channels(PG_FUNCTION_ARGS) +pg_listening_channels_my(PG_FUNCTION_ARGS)  {  	FuncCallContext *funcctx;   @@ -858,13 +868,13 @@ Async_UnlistenOnExit(int code, Datum arg)  }    /* - * AtPrepare_Notify + * AtPrepare_Notify_My   *   *		This is called at the prepare phase of a two-phase   *		transaction.  Save the state for possible commit later.   */  void -AtPrepare_Notify(void) +AtPrepare_Notify_My(void)  {  	/* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */  	if (pendingActions || pendingNotifies) @@ -874,7 +884,7 @@ AtPrepare_Notify(void)  }    /* - * PreCommit_Notify + * PreCommit_Notify_My   *   *		This is called at transaction commit, before actually committing to   *		clog. @@ -889,7 +899,7 @@ AtPrepare_Notify(void)   *		we can still throw error if we run out of queue space.   */  void -PreCommit_Notify(void) +PreCommit_Notify_My(void)  {  	ListCell   *p;   @@ -897,7 +907,7 @@ PreCommit_Notify(void)  		return;					/* no relevant statements in this xact */    	if (Trace_notify) -		elog(DEBUG1, "PreCommit_Notify"); +		elog(DEBUG1, "PreCommit_Notify_My");    	/* Preflight for any pending listen/unlisten actions */  	if (pendingActions != NULL) @@ -932,7 +942,7 @@ PreCommit_Notify(void)  		 * so cheap if we don't, and we'd prefer not to do that work while  		 * holding NotifyQueueLock.  		 */ -		(void) GetCurrentTransactionId(); +//		(void) GetCurrentTransactionId();    		/*  		 * Serialize writers by acquiring a special lock that we hold till @@ -951,7 +961,7 @@ PreCommit_Notify(void)  		 * used by the flatfiles mechanism.)  		 */  		LockSharedObject(DatabaseRelationId, InvalidOid, 0, -						 AccessExclusiveLock); +						 RowExclusiveLock);    		/* Now push the notifications into the queue */  		backendHasSentNotifications = true; @@ -984,14 +994,14 @@ PreCommit_Notify(void)  }    /* - * AtCommit_Notify + * AtCommit_Notify_My   *   *		This is called at transaction commit, after committing to clog.   *   *		Update listenChannels and clear transaction-local state.   */  void -AtCommit_Notify(void) +AtCommit_Notify_My(void)  {  	ListCell   *p;   @@ -1003,7 +1013,7 @@ AtCommit_Notify(void)  		return;    	if (Trace_notify) -		elog(DEBUG1, "AtCommit_Notify"); +		elog(DEBUG1, "AtCommit_Notify_My");    	/* Perform any pending listen/unlisten actions */  	if (pendingActions != NULL) @@ -1036,7 +1046,7 @@ AtCommit_Notify(void)  }    /* - * Exec_ListenPreCommit --- subroutine for PreCommit_Notify + * Exec_ListenPreCommit --- subroutine for PreCommit_Notify_My   *   * This function must make sure we are ready to catch any incoming messages.   */ @@ -1131,7 +1141,7 @@ Exec_ListenPreCommit(void)  }    /* - * Exec_ListenCommit --- subroutine for AtCommit_Notify + * Exec_ListenCommit --- subroutine for AtCommit_Notify_My   *   * Add the channel to the list of channels we are listening on.   */ @@ -1155,10 +1165,12 @@ Exec_ListenCommit(const char *channel)  	oldcontext = MemoryContextSwitchTo(TopMemoryContext);  	listenChannels = lappend(listenChannels, pstrdup(channel));  	MemoryContextSwitchTo(oldcontext); + +	if (!pg_async_signal_original) pg_async_signal_original = pqsignal(SIGUSR1, pg_async_signal);  }    /* - * Exec_UnlistenCommit --- subroutine for AtCommit_Notify + * Exec_UnlistenCommit --- subroutine for AtCommit_Notify_My   *   * Remove the specified channel name from listenChannels.   */ @@ -1186,10 +1198,15 @@ Exec_UnlistenCommit(const char *channel)  	 * We do not complain about unlistening something not being listened;  	 * should we?  	 */ + +	if (!list_length(listenChannels) && pg_async_signal_original) { +		pqsignal(SIGUSR1, pg_async_signal_original); +		pg_async_signal_original = NULL; +	}  }    /* - * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify + * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify_My   *   *		Unlisten on all channels for this backend.   */ @@ -1201,10 +1218,15 @@ Exec_UnlistenAllCommit(void)    	list_free_deep(listenChannels);  	listenChannels = NIL; + +	if (pg_async_signal_original) { +		pqsignal(SIGUSR1, pg_async_signal_original); +		pg_async_signal_original = NULL; +	}  }    /* - * ProcessCompletedNotifies --- send out signals and self-notifies + * ProcessCompletedNotifiesMy --- send out signals and self-notifies   *   * This is called from postgres.c just before going idle at the completion   * of a transaction.  If we issued any notifications in the just-completed @@ -1213,10 +1235,10 @@ Exec_UnlistenAllCommit(void)   * Also, if we filled enough queue pages with new notifies, try to advance   * the queue tail pointer.   * - * The reason that this is not done in AtCommit_Notify is that there is + * The reason that this is not done in AtCommit_Notify_My is that there is   * a nonzero chance of errors here (for example, encoding conversion errors   * while trying to format messages to our frontend).  An error during - * AtCommit_Notify would be a PANIC condition.  The timing is also arranged + * AtCommit_Notify_My would be a PANIC condition.  The timing is also arranged   * to ensure that a transaction's self-notifies are delivered to the frontend   * before it gets the terminating ReadyForQuery message.   * @@ -1227,8 +1249,9 @@ Exec_UnlistenAllCommit(void)   * NOTE: we are outside of any transaction here.   */  void -ProcessCompletedNotifies(void) +ProcessCompletedNotifiesMy(void)  { +	bool idle = !IsTransactionOrTransactionBlock();  	MemoryContext caller_context;    	/* Nothing to do if we didn't send any notifications */ @@ -1249,12 +1272,13 @@ ProcessCompletedNotifies(void)  	caller_context = CurrentMemoryContext;    	if (Trace_notify) -		elog(DEBUG1, "ProcessCompletedNotifies"); +		elog(DEBUG1, "ProcessCompletedNotifiesMy");    	/*  	 * We must run asyncQueueReadAllNotifications inside a transaction, else  	 * bad things happen if it gets an error.  	 */ +	if (idle)  	StartTransactionCommand();    	/* Send signals to other backends */ @@ -1275,6 +1299,7 @@ ProcessCompletedNotifies(void)  		asyncQueueAdvanceTail();  	}   +	if (idle)  	CommitTransactionCommand();    	MemoryContextSwitchTo(caller_context); @@ -1431,7 +1456,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)  	entryLength = QUEUEALIGN(entryLength);  	qe->length = entryLength;  	qe->dboid = MyDatabaseId; -	qe->xid = GetCurrentTransactionId(); +//	qe->xid = GetCurrentTransactionId();  	qe->srcPid = MyProcPid;  	memcpy(qe->data, n->data, channellen + payloadlen + 2);  } @@ -1567,7 +1592,7 @@ asyncQueueAddEntries(ListCell *nextNotify)   * occupied.   */  Datum -pg_notification_queue_usage(PG_FUNCTION_ARGS) +pg_notification_queue_usage_my(PG_FUNCTION_ARGS)  {  	double		usage;   @@ -1749,7 +1774,7 @@ SignalBackends(void)  }    /* - * AtAbort_Notify + * AtAbort_Notify_My   *   *	This is called at transaction abort.   * @@ -1757,10 +1782,10 @@ SignalBackends(void)   *	executed if the transaction got committed.   */  void -AtAbort_Notify(void) +AtAbort_Notify_My(void)  {  	/* -	 * If we LISTEN but then roll back the transaction after PreCommit_Notify, +	 * If we LISTEN but then roll back the transaction after PreCommit_Notify_My,  	 * we have registered as a listener but have not made any entry in  	 * listenChannels.  In that case, deregister again.  	 */ @@ -1772,12 +1797,12 @@ AtAbort_Notify(void)  }    /* - * AtSubCommit_Notify() --- Take care of subtransaction commit. + * AtSubCommit_Notify_My() --- Take care of subtransaction commit.   *   * Reassign all items in the pending lists to the parent transaction.   */  void -AtSubCommit_Notify(void) +AtSubCommit_Notify_My(void)  {  	int			my_level = GetCurrentTransactionNestLevel();   @@ -1844,10 +1869,10 @@ AtSubCommit_Notify(void)  }    /* - * AtSubAbort_Notify() --- Take care of subtransaction abort. + * AtSubAbort_Notify_My() --- Take care of subtransaction abort.   */  void -AtSubAbort_Notify(void) +AtSubAbort_Notify_My(void)  {  	int			my_level = GetCurrentTransactionNestLevel();   @@ -1882,15 +1907,15 @@ AtSubAbort_Notify(void)  }    /* - * HandleNotifyInterrupt + * HandleNotifyInterruptMy   *   *		Signal handler portion of interrupt handling. Let the backend know   *		that there's a pending notify interrupt. If we're currently reading   *		from the client, this will interrupt the read and - *		ProcessClientReadInterrupt() will call ProcessNotifyInterrupt(). + *		ProcessClientReadInterrupt() will call ProcessNotifyInterruptMy().   */  void -HandleNotifyInterrupt(void) +HandleNotifyInterruptMy(void)  {  	/*  	 * Note: this is called by a SIGNAL HANDLER. You must be very wary what @@ -1905,18 +1930,18 @@ HandleNotifyInterrupt(void)  }    /* - * ProcessNotifyInterrupt + * ProcessNotifyInterruptMy   *   *		This is called if we see notifyInterruptPending set, just before   *		transmitting ReadyForQuery at the end of a frontend command, and   *		also if a notify signal occurs while reading from the frontend. - *		HandleNotifyInterrupt() will cause the read to be interrupted + *		HandleNotifyInterruptMy() will cause the read to be interrupted   *		via the process's latch, and this routine will get called.   *		If we are truly idle (ie, *not* inside a transaction block),   *		process the incoming notifies.   */  void -ProcessNotifyInterrupt(void) +ProcessNotifyInterruptMy(void)  {  	if (IsTransactionOrTransactionBlock())  		return;					/* not really idle */ @@ -1999,7 +2024,7 @@ asyncQueueReadAllNotifications(void)  	 * before we see them.  	 *----------  	 */ -	snapshot = RegisterSnapshot(GetLatestSnapshot()); +//	snapshot = RegisterSnapshot(GetLatestSnapshot());    	/*  	 * It is possible that we fail while trying to send a message to our @@ -2078,7 +2103,7 @@ asyncQueueReadAllNotifications(void)  	PG_END_TRY();    	/* Done with snapshot */ -	UnregisterSnapshot(snapshot); +//	UnregisterSnapshot(snapshot);  }    /* @@ -2126,6 +2151,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,  		/* Ignore messages destined for other databases */  		if (qe->dboid == MyDatabaseId)  		{ +#if 0  			if (XidInMVCCSnapshot(qe->xid, snapshot))  			{  				/* @@ -2153,6 +2179,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,  			}  			else if (TransactionIdDidCommit(qe->xid))  			{ +#endif  				/* qe->data is the null-terminated channel name */  				char	   *channel = qe->data;   @@ -2161,8 +2188,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,  					/* payload follows channel name */  					char	   *payload = qe->data + strlen(channel) + 1;   -					NotifyMyFrontEnd(channel, payload, qe->srcPid); +					NotifyMyFrontEndMy(channel, payload, qe->srcPid);  				} +#if 0  			}  			else  			{ @@ -2171,6 +2199,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,  				 * ignore its notifications.  				 */  			} +#endif  		}    		/* Loop back if we're not at end of page */ @@ -2271,6 +2300,7 @@ static void  ProcessIncomingNotify(void)  {  	/* We *must* reset the flag */ +	bool idle = !IsTransactionOrTransactionBlock();  	notifyInterruptPending = false;    	/* Do nothing else if we aren't actively listening */ @@ -2286,10 +2316,12 @@ ProcessIncomingNotify(void)  	 * We must run asyncQueueReadAllNotifications inside a transaction, else  	 * bad things happen if it gets an error.  	 */ +	if (idle)  	StartTransactionCommand();    	asyncQueueReadAllNotifications();   +	if (idle)  	CommitTransactionCommand();    	/* @@ -2307,7 +2339,7 @@ ProcessIncomingNotify(void)   * Send NOTIFY message to my front end.   */  void -NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) +NotifyMyFrontEndMy(const char *channel, const char *payload, int32 srcPid)  {  	if (whereToSendOutput == DestRemote)  	{ 

Теперь делаем расширение для функций

pg_async—1.0.sql
-- complain if script is sourced in psql, rather than via CREATE EXTENSION \echo Use "CREATE EXTENSION pg_async" to load this file. \quit  CREATE FUNCTION pg_listen(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_listen' LANGUAGE C; CREATE FUNCTION pg_listening_channels() RETURNS setof pg_catalog.text STRICT AS 'MODULE_PATHNAME', 'pg_async_listening_channels' LANGUAGE C; CREATE FUNCTION pg_notification_queue_usage() RETURNS pg_catalog.float8 STRICT AS 'MODULE_PATHNAME', 'pg_async_notification_queue_usage' LANGUAGE C; CREATE FUNCTION pg_notify(channel pg_catalog.text default null, payload pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_notify' LANGUAGE C; CREATE FUNCTION pg_unlisten_all() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten_all' LANGUAGE C; CREATE FUNCTION pg_unlisten(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten' LANGUAGE C;

Здесь к стандартным pg_listening_channels, pg_notification_queue_usage и pg_notify добавлены новые удобные функции pg_listen, pg_unlisten и pg_unlisten_all, дополняющие соответствующие команды LISTEN, UNLISTEN и UNLISTEN *.

Делаем реализацию этих функций, вызывая на ведущем оригинальные функции, а на реплике функции из изменённого скопированного файла async.c:

pg_async.c
#define EXTENSION(function) Datum (function)(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(function); Datum (function)(PG_FUNCTION_ARGS)  EXTENSION(pg_async_listen) {     const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));     !XactReadOnly ? Async_Listen(channel) : Async_Listen_My(channel);     PG_RETURN_VOID(); }  EXTENSION(pg_async_listening_channels) {     return !XactReadOnly ? pg_listening_channels(fcinfo) : pg_listening_channels_my(fcinfo); }  EXTENSION(pg_async_notification_queue_usage) {     return !XactReadOnly ? pg_notification_queue_usage(fcinfo) : pg_notification_queue_usage_my(fcinfo); }  EXTENSION(pg_async_notify) {     return !XactReadOnly ? pg_notify(fcinfo) : pg_notify_my(fcinfo); }  EXTENSION(pg_async_unlisten_all) {     !XactReadOnly ? Async_UnlistenAll() : Async_UnlistenAll_My();     PG_RETURN_VOID(); }  EXTENSION(pg_async_unlisten) {     const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));     !XactReadOnly ? Async_Unlisten(channel) : Async_Unlisten_My(channel);     PG_RETURN_VOID(); } 

Также, регистрируем хуки на выполнение команд, на транзакции и разделяемую память:

pg_async.c
static ProcessUtility_hook_type pg_async_ProcessUtility_hook_original = NULL; static shmem_startup_hook_type pg_async_shmem_startup_hook_original = NULL;  void _PG_init(void); void _PG_init(void) {     if (!process_shared_preload_libraries_in_progress) return;     pg_async_ProcessUtility_hook_original = ProcessUtility_hook;     ProcessUtility_hook = pg_async_ProcessUtility_hook;     pg_async_shmem_startup_hook_original = shmem_startup_hook;     shmem_startup_hook = pg_async_shmem_startup_hook;     RequestAddinShmemSpace(AsyncShmemSizeMy());     RegisterSubXactCallback(pg_async_SubXactCallback, NULL);     RegisterXactCallback(pg_async_XactCallback, NULL); }  void _PG_fini(void); void _PG_fini(void) {     ProcessUtility_hook = pg_async_ProcessUtility_hook_original;     shmem_startup_hook = pg_async_shmem_startup_hook_original;     UnregisterSubXactCallback(pg_async_SubXactCallback, NULL);     UnregisterXactCallback(pg_async_XactCallback, NULL); } 

В хуке на разделяемую память регистрируем её из изменённого скопированного файла async.c:

pg_async.c
static void pg_async_shmem_startup_hook(void) {     if (pg_async_shmem_startup_hook_original) pg_async_shmem_startup_hook_original();     LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);     AsyncShmemInitMy();     LWLockRelease(AddinShmemInitLock); } 

В хуке на транзакции на реплике вызываем соответсвующие функции из изменённого скопированного файла async.c:

pg_async.c
static void pg_async_XactCallback(XactEvent event, void *arg) {     if (!XactReadOnly) return;     switch (event) {         case XACT_EVENT_ABORT: AtAbort_Notify_My(); break;         case XACT_EVENT_COMMIT: AtCommit_Notify_My(); ProcessCompletedNotifiesMy(); break;         case XACT_EVENT_PRE_COMMIT: PreCommit_Notify_My(); break;         case XACT_EVENT_PREPARE: AtPrepare_Notify_My(); break;         default: break;     } } 

В хуке на выполнение команд на реплике для команд LISTEN, UNLISTEN и NOTIFY вызываем соответсвующие функции из изменённого скопированного файла async.c:

pg_async.c
static void CheckRestrictedOperation(const char *cmdname) {     if (InSecurityRestrictedOperation()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("cannot execute %s within security-restricted operation", cmdname))); }  static void pg_async_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc) {     Node *parsetree = pstmt->utilityStmt;     if (!XactReadOnly) return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);     check_stack_depth();     switch (nodeTag(parsetree)) {         case T_ListenStmt: {             ListenStmt *stmt = (ListenStmt *)parsetree;             CheckRestrictedOperation("LISTEN");             Async_Listen_My(stmt->conditionname);         } break;         case T_NotifyStmt: {             NotifyStmt *stmt = (NotifyStmt *)parsetree;             Async_Notify_My(stmt->conditionname, stmt->payload);         } break;         case T_UnlistenStmt: {             UnlistenStmt *stmt = (UnlistenStmt *)parsetree;             CheckRestrictedOperation("UNLISTEN");             stmt->conditionname ? Async_Unlisten_My(stmt->conditionname) : Async_UnlistenAll_My();         } break;         default: return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);     }     CommandCounterIncrement(); } 

Всё это можно посмотреть в репозитории.

ссылка на оригинал статьи https://habr.com/ru/post/550104/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *