{"id":320666,"date":"2021-04-01T09:01:04","date_gmt":"2021-04-01T09:01:04","guid":{"rendered":"http:\/\/savepearlharbor.com\/?p=320666"},"modified":"-0001-11-30T00:00:00","modified_gmt":"-0001-11-29T21:00:00","slug":"","status":"publish","type":"post","link":"https:\/\/savepearlharbor.com\/?p=320666","title":{"rendered":"\u0420\u0435\u0446\u0435\u043f\u0442\u044b PostgreSQL: \u0430\u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u044b\u0435 \u0443\u0432\u0435\u0434\u043e\u043c\u043b\u0435\u043d\u0438\u044f \u0432\u2026 \u0440\u0435\u043f\u043b\u0438\u043a\u0435!?"},"content":{"rendered":"\n<div class=\"post__text post__text_v2\" id=\"post-content-body\">\n<p>\u0414\u043b\u044f \u043f\u0440\u0438\u0433\u043e\u0442\u043e\u0432\u043b\u0435\u043d\u0438\u044f \u0430\u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u044b\u0445 \u0443\u0432\u0435\u0434\u043e\u043c\u043b\u0435\u043d\u0438\u0439 <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/sql-listen\" rel=\"noopener noreferrer nofollow\">listen<\/a>\/<a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/sql-notify\" rel=\"noopener noreferrer nofollow\">notify<\/a> \u0432 <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/hot-standby\" rel=\"noopener noreferrer nofollow\">\u0440\u0435\u043f\u043b\u0438\u043a\u0435<\/a> \u043d\u0430\u043c \u043f\u043e\u043d\u0430\u0434\u043e\u0431\u0438\u0442\u0441\u044f <a href=\"https:\/\/www.postgresql.org\/\" rel=\"noopener noreferrer nofollow\">postgres<\/a>. \u041a\u0430\u043a \u0433\u043e\u0432\u043e\u0440\u0438\u0442\u0441\u044f \u0432 <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/hot-standby\" rel=\"noopener noreferrer nofollow\">\u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430\u0446\u0438\u0438<\/a>:<\/p>\n<blockquote>\n<p>\u0422\u0440\u0430\u043d\u0437\u0430\u043a\u0446\u0438\u0438, \u0437\u0430\u043f\u0443\u0449\u0435\u043d\u043d\u044b\u0435 \u0432 \u0440\u0435\u0436\u0438\u043c\u0435 \u0433\u043e\u0440\u044f\u0447\u0435\u0433\u043e \u0440\u0435\u0437\u0435\u0440\u0432\u0430, \u043d\u0438\u043a\u043e\u0433\u0434\u0430 \u043d\u0435 \u043f\u043e\u043b\u0443\u0447\u0430\u044e\u0442 ID  \u0442\u0440\u0430\u043d\u0437\u0430\u043a\u0446\u0438\u0438 \u0438 \u043d\u0435 \u043c\u043e\u0433\u0443\u0442 \u0431\u044b\u0442\u044c \u0437\u0430\u043f\u0438\u0441\u0430\u043d\u044b \u0432 \u0436\u0443\u0440\u043d\u0430\u043b \u043f\u0440\u0435\u0434\u0437\u0430\u043f\u0438\u0441\u0438. \u041f\u043e\u044d\u0442\u043e\u043c\u0443 \u043f\u0440\u0438  \u043f\u043e\u043f\u044b\u0442\u043a\u0435 \u0432\u044b\u043f\u043e\u043b\u043d\u0438\u0442\u044c \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u0435 \u0434\u0435\u0439\u0441\u0442\u0432\u0438\u044f \u0432\u043e\u0437\u043d\u0438\u043a\u043d\u0443\u0442 \u043e\u0448\u0438\u0431\u043a\u0438:<\/p>\n<p><code>LISTEN<\/code>, <code>NOTIFY<\/code><\/p>\n<\/blockquote>\n<p>\u041f\u043e\u044d\u0442\u043e\u043c\u0443 \u0431\u0435\u0440\u0451\u043c \u0444\u0430\u0439\u043b <strong>async.c <\/strong>\u0444\u0430\u0439\u043b \u0438\u0437 \u0438\u0441\u0445\u043e\u0434\u043d\u0438\u043a\u043e\u0432, \u043f\u0435\u0440\u0435\u0438\u043c\u0435\u043d\u043e\u0432\u044b\u0432\u0430\u0435\u043c \u0432 \u043d\u0451\u043c \u0432\u0441\u0435 \u043f\u0443\u0431\u043b\u0438\u0447\u043d\u044b\u0435 \u043c\u0435\u0442\u043e\u0434\u044b (\u043d\u0435 static-\u0444\u0443\u043d\u043a\u0446\u0438\u0438), \u0443\u0434\u0430\u043b\u044f\u0435\u043c \u0441\u0432\u044f\u0437\u044c \u0441 \u0442\u0440\u0430\u043d\u0437\u0430\u043a\u0446\u0438\u044f\u043c\u0438 \u0438 \u0434\u043e\u0431\u0430\u0432\u043b\u044f\u0435\u043c \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0443 \u0441\u0438\u0433\u043d\u0430\u043b\u0430 SIGUSR1, \u0447\u0442\u043e\u0431\u044b \u043f\u043e\u043b\u0443\u0447\u0438\u043b\u043e\u0441\u044c \u0442\u0430\u043a:<\/p>\n<details class=\"spoiler\">\n<summary>src\/backend\/commands\/async.c<\/summary>\n<div class=\"spoiler__content\">\n<pre><code class=\"diff\">diff --git a\/src\/backend\/commands\/async.c b\/src\/backend\/commands\/async.c index 5739d2b40f..9f62d4ca6b 100644 --- a\/src\/backend\/commands\/async.c +++ b\/src\/backend\/commands\/async.c @@ -1,3 +1,5 @@ +#include &lt;include.h&gt; +  \/*-------------------------------------------------------------------------   *   * async.c @@ -46,7 +48,7 @@   *\t  to. In case there is a match it delivers the notification event to its   *\t  frontend.  Non-matching events are simply skipped.   * - * 4. The NOTIFY statement (routine Async_Notify) stores the notification in + * 4. The NOTIFY statement (routine Async_Notify_My) stores the notification in   *\t  a backend-local list which will not be processed until transaction end.   *   *\t  Duplicate notifications from the same transaction are sent out as one @@ -56,7 +58,7 @@   *\t  that has been sent, it can easily add some unique string into the extra   *\t  payload parameter.   * - *\t  When the transaction is ready to commit, PreCommit_Notify() adds the + *\t  When the transaction is ready to commit, PreCommit_Notify_My() adds the   *\t  pending notifications to the head of the queue. The head pointer of the   *\t  queue always points to the next free position and a position is just a   *\t  page number and the offset in that page. This is done before marking the @@ -67,7 +69,7 @@   *\t  Once we have put all of the notifications into the queue, we return to   *\t  CommitTransaction() which will then do the actual transaction commit.   * - *\t  After commit we are called another time (AtCommit_Notify()). Here we + *\t  After commit we are called another time (AtCommit_Notify_My()). Here we   *\t  make the actual updates to the effective listen state (listenChannels).   *   *\t  Finally, after we are out of the transaction altogether, we check if @@ -171,7 +173,7 @@ typedef struct AsyncQueueEntry  {  \tint\t\t\tlength;\t\t\t\/* total allocated length of entry *\/  \tOid\t\t\tdboid;\t\t\t\/* sender's database OID *\/ -\tTransactionId xid;\t\t\t\/* sender's XID *\/ +\/\/\tTransactionId xid;\t\t\t\/* sender's XID *\/  \tint32\t\tsrcPid;\t\t\t\/* sender's PID *\/  \tchar\t\tdata[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];  } AsyncQueueEntry; @@ -414,14 +416,16 @@ typedef struct NotificationHash    static NotificationList *pendingNotifies = NULL;   +static pqsigfunc pg_async_signal_original = NULL; +  \/* - * Inbound notifications are initially processed by HandleNotifyInterrupt(), + * Inbound notifications are initially processed by HandleNotifyInterruptMy(),   * called from inside a signal handler. That just sets the   * notifyInterruptPending flag and sets the process - * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to + * latch. ProcessNotifyInterruptMy() will then be called whenever it's safe to   * actually deal with the interrupt.   *\/ -volatile sig_atomic_t notifyInterruptPending = false; +\/\/volatile sig_atomic_t notifyInterruptPending = false;    \/* True if we've registered an on_shmem_exit cleanup *\/  static bool unlistenExitRegistered = false; @@ -436,7 +440,7 @@ static bool backendHasSentNotifications = false;  static bool backendTryAdvanceTail = false;    \/* GUC parameter *\/ -bool\t\tTrace_notify = false; +\/\/bool\t\tTrace_notify = false;    \/* local function prototypes *\/  static int\tasyncQueuePageDiff(int p, int q); @@ -469,6 +473,12 @@ static uint32 notification_hash(const void *key, Size keysize);  static int\tnotification_match(const void *key1, const void *key2, Size keysize);  static void ClearPendingActionsAndNotifies(void);   +static void pg_async_signal(SIGNAL_ARGS) { +    HandleNotifyInterruptMy(); +    if (notifyInterruptPending) ProcessNotifyInterruptMy(); +    pg_async_signal_original(postgres_signal_arg); +} +  \/*   * Compute the difference between two queue page numbers (i.e., p - q),   * accounting for wraparound. @@ -509,11 +519,11 @@ asyncQueuePagePrecedes(int p, int q)   * Report space needed for our shared memory area   *\/  Size -AsyncShmemSize(void) +AsyncShmemSizeMy(void)  {  \tSize\t\tsize;   -\t\/* This had better match AsyncShmemInit *\/ +\t\/* This had better match AsyncShmemInitMy *\/  \tsize = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));  \tsize = add_size(size, offsetof(AsyncQueueControl, backend));   @@ -526,7 +536,7 @@ AsyncShmemSize(void)   * Initialize our shared memory area   *\/  void -AsyncShmemInit(void) +AsyncShmemInitMy(void)  {  \tbool\t\tfound;  \tSize\t\tsize; @@ -585,7 +595,7 @@ AsyncShmemInit(void)   *\t  SQL function to send a notification event   *\/  Datum -pg_notify(PG_FUNCTION_ARGS) +pg_notify_my(PG_FUNCTION_ARGS)  {  \tconst char *channel;  \tconst char *payload; @@ -601,16 +611,16 @@ pg_notify(PG_FUNCTION_ARGS)  \t\tpayload = text_to_cstring(PG_GETARG_TEXT_PP(1));    \t\/* For NOTIFY as a statement, this is checked in ProcessUtility *\/ -\tPreventCommandDuringRecovery(\"NOTIFY\"); +\/\/\tPreventCommandDuringRecovery(\"NOTIFY\");   -\tAsync_Notify(channel, payload); +\tAsync_Notify_My(channel, payload);    \tPG_RETURN_VOID();  }      \/* - * Async_Notify + * Async_Notify_My   *   *\t\tThis is executed by the SQL notify command.   * @@ -619,7 +629,7 @@ pg_notify(PG_FUNCTION_ARGS)   *\t\t^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^   *\/  void -Async_Notify(const char *channel, const char *payload) +Async_Notify_My(const char *channel, const char *payload)  {  \tint\t\t\tmy_level = GetCurrentTransactionNestLevel();  \tsize_t\t\tchannel_len; @@ -631,7 +641,7 @@ Async_Notify(const char *channel, const char *payload)  \t\telog(ERROR, \"cannot send notifications from a parallel worker\");    \tif (Trace_notify) -\t\telog(DEBUG1, \"Async_Notify(%s)\", channel); +\t\telog(DEBUG1, \"Async_Notify_My(%s)\", channel);    \tchannel_len = channel ? strlen(channel) : 0;  \tpayload_len = payload ? strlen(payload) : 0; @@ -679,7 +689,7 @@ Async_Notify(const char *channel, const char *payload)  \t\t\/*  \t\t * First notify event in current (sub)xact. Note that we allocate the  \t\t * NotificationList in TopTransactionContext; the nestingLevel might -\t\t * get changed later by AtSubCommit_Notify. +\t\t * get changed later by AtSubCommit_Notify_My.  \t\t *\/  \t\tnotifies = (NotificationList *)  \t\t\tMemoryContextAlloc(TopTransactionContext, @@ -725,7 +735,7 @@ queue_listen(ListenActionKind action, const char *channel)  \tint\t\t\tmy_level = GetCurrentTransactionNestLevel();    \t\/* -\t * Unlike Async_Notify, we don't try to collapse out duplicates. It would +\t * Unlike Async_Notify_My, we don't try to collapse out duplicates. It would  \t * be too complicated to ensure we get the right interactions of  \t * conflicting LISTEN\/UNLISTEN\/UNLISTEN_ALL, and it's unlikely that there  \t * would be any performance benefit anyway in sane applications. @@ -745,7 +755,7 @@ queue_listen(ListenActionKind action, const char *channel)  \t\t\/*  \t\t * First action in current sub(xact). Note that we allocate the  \t\t * ActionList in TopTransactionContext; the nestingLevel might get -\t\t * changed later by AtSubCommit_Notify. +\t\t * changed later by AtSubCommit_Notify_My.  \t\t *\/  \t\tactions = (ActionList *)  \t\t\tMemoryContextAlloc(TopTransactionContext, sizeof(ActionList)); @@ -761,29 +771,29 @@ queue_listen(ListenActionKind action, const char *channel)  }    \/* - * Async_Listen + * Async_Listen_My   *   *\t\tThis is executed by the SQL listen command.   *\/  void -Async_Listen(const char *channel) +Async_Listen_My(const char *channel)  {  \tif (Trace_notify) -\t\telog(DEBUG1, \"Async_Listen(%s,%d)\", channel, MyProcPid); +\t\telog(DEBUG1, \"Async_Listen_My(%s,%d)\", channel, MyProcPid);    \tqueue_listen(LISTEN_LISTEN, channel);  }    \/* - * Async_Unlisten + * Async_Unlisten_My   *   *\t\tThis is executed by the SQL unlisten command.   *\/  void -Async_Unlisten(const char *channel) +Async_Unlisten_My(const char *channel)  {  \tif (Trace_notify) -\t\telog(DEBUG1, \"Async_Unlisten(%s,%d)\", channel, MyProcPid); +\t\telog(DEBUG1, \"Async_Unlisten_My(%s,%d)\", channel, MyProcPid);    \t\/* If we couldn't possibly be listening, no need to queue anything *\/  \tif (pendingActions == NULL &amp;&amp; !unlistenExitRegistered) @@ -793,15 +803,15 @@ Async_Unlisten(const char *channel)  }    \/* - * Async_UnlistenAll + * Async_UnlistenAll_My   *   *\t\tThis is invoked by UNLISTEN * command, and also at backend exit.   *\/  void -Async_UnlistenAll(void) +Async_UnlistenAll_My(void)  {  \tif (Trace_notify) -\t\telog(DEBUG1, \"Async_UnlistenAll(%d)\", MyProcPid); +\t\telog(DEBUG1, \"Async_UnlistenAll_My(%d)\", MyProcPid);    \t\/* If we couldn't possibly be listening, no need to queue anything *\/  \tif (pendingActions == NULL &amp;&amp; !unlistenExitRegistered) @@ -818,7 +828,7 @@ Async_UnlistenAll(void)   * change within a transaction.   *\/  Datum -pg_listening_channels(PG_FUNCTION_ARGS) +pg_listening_channels_my(PG_FUNCTION_ARGS)  {  \tFuncCallContext *funcctx;   @@ -858,13 +868,13 @@ Async_UnlistenOnExit(int code, Datum arg)  }    \/* - * AtPrepare_Notify + * AtPrepare_Notify_My   *   *\t\tThis is called at the prepare phase of a two-phase   *\t\ttransaction.  Save the state for possible commit later.   *\/  void -AtPrepare_Notify(void) +AtPrepare_Notify_My(void)  {  \t\/* It's not allowed to have any pending LISTEN\/UNLISTEN\/NOTIFY actions *\/  \tif (pendingActions || pendingNotifies) @@ -874,7 +884,7 @@ AtPrepare_Notify(void)  }    \/* - * PreCommit_Notify + * PreCommit_Notify_My   *   *\t\tThis is called at transaction commit, before actually committing to   *\t\tclog. @@ -889,7 +899,7 @@ AtPrepare_Notify(void)   *\t\twe can still throw error if we run out of queue space.   *\/  void -PreCommit_Notify(void) +PreCommit_Notify_My(void)  {  \tListCell   *p;   @@ -897,7 +907,7 @@ PreCommit_Notify(void)  \t\treturn;\t\t\t\t\t\/* no relevant statements in this xact *\/    \tif (Trace_notify) -\t\telog(DEBUG1, \"PreCommit_Notify\"); +\t\telog(DEBUG1, \"PreCommit_Notify_My\");    \t\/* Preflight for any pending listen\/unlisten actions *\/  \tif (pendingActions != NULL) @@ -932,7 +942,7 @@ PreCommit_Notify(void)  \t\t * so cheap if we don't, and we'd prefer not to do that work while  \t\t * holding NotifyQueueLock.  \t\t *\/ -\t\t(void) GetCurrentTransactionId(); +\/\/\t\t(void) GetCurrentTransactionId();    \t\t\/*  \t\t * Serialize writers by acquiring a special lock that we hold till @@ -951,7 +961,7 @@ PreCommit_Notify(void)  \t\t * used by the flatfiles mechanism.)  \t\t *\/  \t\tLockSharedObject(DatabaseRelationId, InvalidOid, 0, -\t\t\t\t\t\t AccessExclusiveLock); +\t\t\t\t\t\t RowExclusiveLock);    \t\t\/* Now push the notifications into the queue *\/  \t\tbackendHasSentNotifications = true; @@ -984,14 +994,14 @@ PreCommit_Notify(void)  }    \/* - * AtCommit_Notify + * AtCommit_Notify_My   *   *\t\tThis is called at transaction commit, after committing to clog.   *   *\t\tUpdate listenChannels and clear transaction-local state.   *\/  void -AtCommit_Notify(void) +AtCommit_Notify_My(void)  {  \tListCell   *p;   @@ -1003,7 +1013,7 @@ AtCommit_Notify(void)  \t\treturn;    \tif (Trace_notify) -\t\telog(DEBUG1, \"AtCommit_Notify\"); +\t\telog(DEBUG1, \"AtCommit_Notify_My\");    \t\/* Perform any pending listen\/unlisten actions *\/  \tif (pendingActions != NULL) @@ -1036,7 +1046,7 @@ AtCommit_Notify(void)  }    \/* - * Exec_ListenPreCommit --- subroutine for PreCommit_Notify + * Exec_ListenPreCommit --- subroutine for PreCommit_Notify_My   *   * This function must make sure we are ready to catch any incoming messages.   *\/ @@ -1131,7 +1141,7 @@ Exec_ListenPreCommit(void)  }    \/* - * Exec_ListenCommit --- subroutine for AtCommit_Notify + * Exec_ListenCommit --- subroutine for AtCommit_Notify_My   *   * Add the channel to the list of channels we are listening on.   *\/ @@ -1155,10 +1165,12 @@ Exec_ListenCommit(const char *channel)  \toldcontext = MemoryContextSwitchTo(TopMemoryContext);  \tlistenChannels = lappend(listenChannels, pstrdup(channel));  \tMemoryContextSwitchTo(oldcontext); + +\tif (!pg_async_signal_original) pg_async_signal_original = pqsignal(SIGUSR1, pg_async_signal);  }    \/* - * Exec_UnlistenCommit --- subroutine for AtCommit_Notify + * Exec_UnlistenCommit --- subroutine for AtCommit_Notify_My   *   * Remove the specified channel name from listenChannels.   *\/ @@ -1186,10 +1198,15 @@ Exec_UnlistenCommit(const char *channel)  \t * We do not complain about unlistening something not being listened;  \t * should we?  \t *\/ + +\tif (!list_length(listenChannels) &amp;&amp; pg_async_signal_original) { +\t\tpqsignal(SIGUSR1, pg_async_signal_original); +\t\tpg_async_signal_original = NULL; +\t}  }    \/* - * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify + * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify_My   *   *\t\tUnlisten on all channels for this backend.   *\/ @@ -1201,10 +1218,15 @@ Exec_UnlistenAllCommit(void)    \tlist_free_deep(listenChannels);  \tlistenChannels = NIL; + +\tif (pg_async_signal_original) { +\t\tpqsignal(SIGUSR1, pg_async_signal_original); +\t\tpg_async_signal_original = NULL; +\t}  }    \/* - * ProcessCompletedNotifies --- send out signals and self-notifies + * ProcessCompletedNotifiesMy --- send out signals and self-notifies   *   * This is called from postgres.c just before going idle at the completion   * of a transaction.  If we issued any notifications in the just-completed @@ -1213,10 +1235,10 @@ Exec_UnlistenAllCommit(void)   * Also, if we filled enough queue pages with new notifies, try to advance   * the queue tail pointer.   * - * The reason that this is not done in AtCommit_Notify is that there is + * The reason that this is not done in AtCommit_Notify_My is that there is   * a nonzero chance of errors here (for example, encoding conversion errors   * while trying to format messages to our frontend).  An error during - * AtCommit_Notify would be a PANIC condition.  The timing is also arranged + * AtCommit_Notify_My would be a PANIC condition.  The timing is also arranged   * to ensure that a transaction's self-notifies are delivered to the frontend   * before it gets the terminating ReadyForQuery message.   * @@ -1227,8 +1249,9 @@ Exec_UnlistenAllCommit(void)   * NOTE: we are outside of any transaction here.   *\/  void -ProcessCompletedNotifies(void) +ProcessCompletedNotifiesMy(void)  { +\tbool idle = !IsTransactionOrTransactionBlock();  \tMemoryContext caller_context;    \t\/* Nothing to do if we didn't send any notifications *\/ @@ -1249,12 +1272,13 @@ ProcessCompletedNotifies(void)  \tcaller_context = CurrentMemoryContext;    \tif (Trace_notify) -\t\telog(DEBUG1, \"ProcessCompletedNotifies\"); +\t\telog(DEBUG1, \"ProcessCompletedNotifiesMy\");    \t\/*  \t * We must run asyncQueueReadAllNotifications inside a transaction, else  \t * bad things happen if it gets an error.  \t *\/ +\tif (idle)  \tStartTransactionCommand();    \t\/* Send signals to other backends *\/ @@ -1275,6 +1299,7 @@ ProcessCompletedNotifies(void)  \t\tasyncQueueAdvanceTail();  \t}   +\tif (idle)  \tCommitTransactionCommand();    \tMemoryContextSwitchTo(caller_context); @@ -1431,7 +1456,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)  \tentryLength = QUEUEALIGN(entryLength);  \tqe-&gt;length = entryLength;  \tqe-&gt;dboid = MyDatabaseId; -\tqe-&gt;xid = GetCurrentTransactionId(); +\/\/\tqe-&gt;xid = GetCurrentTransactionId();  \tqe-&gt;srcPid = MyProcPid;  \tmemcpy(qe-&gt;data, n-&gt;data, channellen + payloadlen + 2);  } @@ -1567,7 +1592,7 @@ asyncQueueAddEntries(ListCell *nextNotify)   * occupied.   *\/  Datum -pg_notification_queue_usage(PG_FUNCTION_ARGS) +pg_notification_queue_usage_my(PG_FUNCTION_ARGS)  {  \tdouble\t\tusage;   @@ -1749,7 +1774,7 @@ SignalBackends(void)  }    \/* - * AtAbort_Notify + * AtAbort_Notify_My   *   *\tThis is called at transaction abort.   * @@ -1757,10 +1782,10 @@ SignalBackends(void)   *\texecuted if the transaction got committed.   *\/  void -AtAbort_Notify(void) +AtAbort_Notify_My(void)  {  \t\/* -\t * If we LISTEN but then roll back the transaction after PreCommit_Notify, +\t * If we LISTEN but then roll back the transaction after PreCommit_Notify_My,  \t * we have registered as a listener but have not made any entry in  \t * listenChannels.  In that case, deregister again.  \t *\/ @@ -1772,12 +1797,12 @@ AtAbort_Notify(void)  }    \/* - * AtSubCommit_Notify() --- Take care of subtransaction commit. + * AtSubCommit_Notify_My() --- Take care of subtransaction commit.   *   * Reassign all items in the pending lists to the parent transaction.   *\/  void -AtSubCommit_Notify(void) +AtSubCommit_Notify_My(void)  {  \tint\t\t\tmy_level = GetCurrentTransactionNestLevel();   @@ -1844,10 +1869,10 @@ AtSubCommit_Notify(void)  }    \/* - * AtSubAbort_Notify() --- Take care of subtransaction abort. + * AtSubAbort_Notify_My() --- Take care of subtransaction abort.   *\/  void -AtSubAbort_Notify(void) +AtSubAbort_Notify_My(void)  {  \tint\t\t\tmy_level = GetCurrentTransactionNestLevel();   @@ -1882,15 +1907,15 @@ AtSubAbort_Notify(void)  }    \/* - * HandleNotifyInterrupt + * HandleNotifyInterruptMy   *   *\t\tSignal handler portion of interrupt handling. Let the backend know   *\t\tthat there's a pending notify interrupt. If we're currently reading   *\t\tfrom the client, this will interrupt the read and - *\t\tProcessClientReadInterrupt() will call ProcessNotifyInterrupt(). + *\t\tProcessClientReadInterrupt() will call ProcessNotifyInterruptMy().   *\/  void -HandleNotifyInterrupt(void) +HandleNotifyInterruptMy(void)  {  \t\/*  \t * Note: this is called by a SIGNAL HANDLER. You must be very wary what @@ -1905,18 +1930,18 @@ HandleNotifyInterrupt(void)  }    \/* - * ProcessNotifyInterrupt + * ProcessNotifyInterruptMy   *   *\t\tThis is called if we see notifyInterruptPending set, just before   *\t\ttransmitting ReadyForQuery at the end of a frontend command, and   *\t\talso if a notify signal occurs while reading from the frontend. - *\t\tHandleNotifyInterrupt() will cause the read to be interrupted + *\t\tHandleNotifyInterruptMy() will cause the read to be interrupted   *\t\tvia the process's latch, and this routine will get called.   *\t\tIf we are truly idle (ie, *not* inside a transaction block),   *\t\tprocess the incoming notifies.   *\/  void -ProcessNotifyInterrupt(void) +ProcessNotifyInterruptMy(void)  {  \tif (IsTransactionOrTransactionBlock())  \t\treturn;\t\t\t\t\t\/* not really idle *\/ @@ -1999,7 +2024,7 @@ asyncQueueReadAllNotifications(void)  \t * before we see them.  \t *----------  \t *\/ -\tsnapshot = RegisterSnapshot(GetLatestSnapshot()); +\/\/\tsnapshot = RegisterSnapshot(GetLatestSnapshot());    \t\/*  \t * It is possible that we fail while trying to send a message to our @@ -2078,7 +2103,7 @@ asyncQueueReadAllNotifications(void)  \tPG_END_TRY();    \t\/* Done with snapshot *\/ -\tUnregisterSnapshot(snapshot); +\/\/\tUnregisterSnapshot(snapshot);  }    \/* @@ -2126,6 +2151,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,  \t\t\/* Ignore messages destined for other databases *\/  \t\tif (qe-&gt;dboid == MyDatabaseId)  \t\t{ +#if 0  \t\t\tif (XidInMVCCSnapshot(qe-&gt;xid, snapshot))  \t\t\t{  \t\t\t\t\/* @@ -2153,6 +2179,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,  \t\t\t}  \t\t\telse if (TransactionIdDidCommit(qe-&gt;xid))  \t\t\t{ +#endif  \t\t\t\t\/* qe-&gt;data is the null-terminated channel name *\/  \t\t\t\tchar\t   *channel = qe-&gt;data;   @@ -2161,8 +2188,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,  \t\t\t\t\t\/* payload follows channel name *\/  \t\t\t\t\tchar\t   *payload = qe-&gt;data + strlen(channel) + 1;   -\t\t\t\t\tNotifyMyFrontEnd(channel, payload, qe-&gt;srcPid); +\t\t\t\t\tNotifyMyFrontEndMy(channel, payload, qe-&gt;srcPid);  \t\t\t\t} +#if 0  \t\t\t}  \t\t\telse  \t\t\t{ @@ -2171,6 +2199,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,  \t\t\t\t * ignore its notifications.  \t\t\t\t *\/  \t\t\t} +#endif  \t\t}    \t\t\/* Loop back if we're not at end of page *\/ @@ -2271,6 +2300,7 @@ static void  ProcessIncomingNotify(void)  {  \t\/* We *must* reset the flag *\/ +\tbool idle = !IsTransactionOrTransactionBlock();  \tnotifyInterruptPending = false;    \t\/* Do nothing else if we aren't actively listening *\/ @@ -2286,10 +2316,12 @@ ProcessIncomingNotify(void)  \t * We must run asyncQueueReadAllNotifications inside a transaction, else  \t * bad things happen if it gets an error.  \t *\/ +\tif (idle)  \tStartTransactionCommand();    \tasyncQueueReadAllNotifications();   +\tif (idle)  \tCommitTransactionCommand();    \t\/* @@ -2307,7 +2339,7 @@ ProcessIncomingNotify(void)   * Send NOTIFY message to my front end.   *\/  void -NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) +NotifyMyFrontEndMy(const char *channel, const char *payload, int32 srcPid)  {  \tif (whereToSendOutput == DestRemote)  \t{ <\/code><\/pre>\n<\/div>\n<\/details>\n<p>\u0422\u0435\u043f\u0435\u0440\u044c \u0434\u0435\u043b\u0430\u0435\u043c \u0440\u0430\u0441\u0448\u0438\u0440\u0435\u043d\u0438\u0435 \u0434\u043b\u044f \u0444\u0443\u043d\u043a\u0446\u0438\u0439<\/p>\n<details class=\"spoiler\">\n<summary>pg_async&#8212;1.0.sql<\/summary>\n<div class=\"spoiler__content\">\n<pre><code class=\"pgsql\">-- complain if script is sourced in psql, rather than via CREATE EXTENSION \\echo Use \"CREATE EXTENSION pg_async\" to load this file. \\quit  CREATE FUNCTION pg_listen(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_listen' LANGUAGE C; CREATE FUNCTION pg_listening_channels() RETURNS setof pg_catalog.text STRICT AS 'MODULE_PATHNAME', 'pg_async_listening_channels' LANGUAGE C; CREATE FUNCTION pg_notification_queue_usage() RETURNS pg_catalog.float8 STRICT AS 'MODULE_PATHNAME', 'pg_async_notification_queue_usage' LANGUAGE C; CREATE FUNCTION pg_notify(channel pg_catalog.text default null, payload pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_notify' LANGUAGE C; CREATE FUNCTION pg_unlisten_all() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten_all' LANGUAGE C; CREATE FUNCTION pg_unlisten(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten' LANGUAGE C;<\/code><\/pre>\n<\/div>\n<\/details>\n<p>\u0417\u0434\u0435\u0441\u044c \u043a \u0441\u0442\u0430\u043d\u0434\u0430\u0440\u0442\u043d\u044b\u043c <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/functions-info\" rel=\"noopener noreferrer nofollow\">pg_listening_channels<\/a>, <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/functions-info\" rel=\"noopener noreferrer nofollow\">pg_notification_queue_usage<\/a> \u0438 <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/sql-notify\" rel=\"noopener noreferrer nofollow\">pg_notify<\/a> \u0434\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u044b \u043d\u043e\u0432\u044b\u0435 \u0443\u0434\u043e\u0431\u043d\u044b\u0435 \u0444\u0443\u043d\u043a\u0446\u0438\u0438 pg_listen, pg_unlisten \u0438 pg_unlisten_all, \u0434\u043e\u043f\u043e\u043b\u043d\u044f\u044e\u0449\u0438\u0435 \u0441\u043e\u043e\u0442\u0432\u0435\u0442\u0441\u0442\u0432\u0443\u044e\u0449\u0438\u0435 \u043a\u043e\u043c\u0430\u043d\u0434\u044b <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/sql-listen\" rel=\"noopener noreferrer nofollow\">LISTEN<\/a>, <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/sql-unlisten\" rel=\"noopener noreferrer nofollow\">UNLISTEN<\/a> \u0438 <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/sql-unlisten\" rel=\"noopener noreferrer nofollow\">UNLISTEN *<\/a>.<\/p>\n<p>\u0414\u0435\u043b\u0430\u0435\u043c \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u044d\u0442\u0438\u0445 \u0444\u0443\u043d\u043a\u0446\u0438\u0439, \u0432\u044b\u0437\u044b\u0432\u0430\u044f \u043d\u0430 \u0432\u0435\u0434\u0443\u0449\u0435\u043c \u043e\u0440\u0438\u0433\u0438\u043d\u0430\u043b\u044c\u043d\u044b\u0435 \u0444\u0443\u043d\u043a\u0446\u0438\u0438, \u0430 \u043d\u0430 \u0440\u0435\u043f\u043b\u0438\u043a\u0435 \u0444\u0443\u043d\u043a\u0446\u0438\u0438 \u0438\u0437 \u0438\u0437\u043c\u0435\u043d\u0451\u043d\u043d\u043e\u0433\u043e \u0441\u043a\u043e\u043f\u0438\u0440\u043e\u0432\u0430\u043d\u043d\u043e\u0433\u043e \u0444\u0430\u0439\u043b\u0430 <strong>async.c<\/strong>:<\/p>\n<details class=\"spoiler\">\n<summary>pg_async.c<\/summary>\n<div class=\"spoiler__content\">\n<pre><code class=\"cpp\">#define EXTENSION(function) Datum (function)(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(function); Datum (function)(PG_FUNCTION_ARGS)  EXTENSION(pg_async_listen) {     const char *channel = PG_ARGISNULL(0) ? \"\" : text_to_cstring(PG_GETARG_TEXT_PP(0));     !XactReadOnly ? Async_Listen(channel) : Async_Listen_My(channel);     PG_RETURN_VOID(); }  EXTENSION(pg_async_listening_channels) {     return !XactReadOnly ? pg_listening_channels(fcinfo) : pg_listening_channels_my(fcinfo); }  EXTENSION(pg_async_notification_queue_usage) {     return !XactReadOnly ? pg_notification_queue_usage(fcinfo) : pg_notification_queue_usage_my(fcinfo); }  EXTENSION(pg_async_notify) {     return !XactReadOnly ? pg_notify(fcinfo) : pg_notify_my(fcinfo); }  EXTENSION(pg_async_unlisten_all) {     !XactReadOnly ? Async_UnlistenAll() : Async_UnlistenAll_My();     PG_RETURN_VOID(); }  EXTENSION(pg_async_unlisten) {     const char *channel = PG_ARGISNULL(0) ? \"\" : text_to_cstring(PG_GETARG_TEXT_PP(0));     !XactReadOnly ? Async_Unlisten(channel) : Async_Unlisten_My(channel);     PG_RETURN_VOID(); } <\/code><\/pre>\n<\/div>\n<\/details>\n<p>\u0422\u0430\u043a\u0436\u0435, \u0440\u0435\u0433\u0438\u0441\u0442\u0440\u0438\u0440\u0443\u0435\u043c \u0445\u0443\u043a\u0438 \u043d\u0430 \u0432\u044b\u043f\u043e\u043b\u043d\u0435\u043d\u0438\u0435 \u043a\u043e\u043c\u0430\u043d\u0434, \u043d\u0430 \u0442\u0440\u0430\u043d\u0437\u0430\u043a\u0446\u0438\u0438 \u0438 \u0440\u0430\u0437\u0434\u0435\u043b\u044f\u0435\u043c\u0443\u044e \u043f\u0430\u043c\u044f\u0442\u044c:<\/p>\n<details class=\"spoiler\">\n<summary>pg_async.c<\/summary>\n<div class=\"spoiler__content\">\n<pre><code class=\"cpp\">static ProcessUtility_hook_type pg_async_ProcessUtility_hook_original = NULL; static shmem_startup_hook_type pg_async_shmem_startup_hook_original = NULL;  void _PG_init(void); void _PG_init(void) {     if (!process_shared_preload_libraries_in_progress) return;     pg_async_ProcessUtility_hook_original = ProcessUtility_hook;     ProcessUtility_hook = pg_async_ProcessUtility_hook;     pg_async_shmem_startup_hook_original = shmem_startup_hook;     shmem_startup_hook = pg_async_shmem_startup_hook;     RequestAddinShmemSpace(AsyncShmemSizeMy());     RegisterSubXactCallback(pg_async_SubXactCallback, NULL);     RegisterXactCallback(pg_async_XactCallback, NULL); }  void _PG_fini(void); void _PG_fini(void) {     ProcessUtility_hook = pg_async_ProcessUtility_hook_original;     shmem_startup_hook = pg_async_shmem_startup_hook_original;     UnregisterSubXactCallback(pg_async_SubXactCallback, NULL);     UnregisterXactCallback(pg_async_XactCallback, NULL); } <\/code><\/pre>\n<\/div>\n<\/details>\n<p>\u0412 \u0445\u0443\u043a\u0435 \u043d\u0430 \u0440\u0430\u0437\u0434\u0435\u043b\u044f\u0435\u043c\u0443\u044e \u043f\u0430\u043c\u044f\u0442\u044c \u0440\u0435\u0433\u0438\u0441\u0442\u0440\u0438\u0440\u0443\u0435\u043c \u0435\u0451 \u0438\u0437 \u0438\u0437\u043c\u0435\u043d\u0451\u043d\u043d\u043e\u0433\u043e \u0441\u043a\u043e\u043f\u0438\u0440\u043e\u0432\u0430\u043d\u043d\u043e\u0433\u043e \u0444\u0430\u0439\u043b\u0430 <strong>async.c<\/strong>:<\/p>\n<details class=\"spoiler\">\n<summary>pg_async.c<\/summary>\n<div class=\"spoiler__content\">\n<pre><code class=\"cpp\">static void pg_async_shmem_startup_hook(void) {     if (pg_async_shmem_startup_hook_original) pg_async_shmem_startup_hook_original();     LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);     AsyncShmemInitMy();     LWLockRelease(AddinShmemInitLock); } <\/code><\/pre>\n<\/div>\n<\/details>\n<p>\u0412 \u0445\u0443\u043a\u0435 \u043d\u0430 \u0442\u0440\u0430\u043d\u0437\u0430\u043a\u0446\u0438\u0438 \u043d\u0430 \u0440\u0435\u043f\u043b\u0438\u043a\u0435 \u0432\u044b\u0437\u044b\u0432\u0430\u0435\u043c \u0441\u043e\u043e\u0442\u0432\u0435\u0442\u0441\u0432\u0443\u044e\u0449\u0438\u0435 \u0444\u0443\u043d\u043a\u0446\u0438\u0438 \u0438\u0437 \u0438\u0437\u043c\u0435\u043d\u0451\u043d\u043d\u043e\u0433\u043e \u0441\u043a\u043e\u043f\u0438\u0440\u043e\u0432\u0430\u043d\u043d\u043e\u0433\u043e \u0444\u0430\u0439\u043b\u0430 <strong>async.c<\/strong>:<\/p>\n<details class=\"spoiler\">\n<summary>pg_async.c<\/summary>\n<div class=\"spoiler__content\">\n<pre><code class=\"cpp\">static void pg_async_XactCallback(XactEvent event, void *arg) {     if (!XactReadOnly) return;     switch (event) {         case XACT_EVENT_ABORT: AtAbort_Notify_My(); break;         case XACT_EVENT_COMMIT: AtCommit_Notify_My(); ProcessCompletedNotifiesMy(); break;         case XACT_EVENT_PRE_COMMIT: PreCommit_Notify_My(); break;         case XACT_EVENT_PREPARE: AtPrepare_Notify_My(); break;         default: break;     } } <\/code><\/pre>\n<\/div>\n<\/details>\n<p>\u0412 \u0445\u0443\u043a\u0435 \u043d\u0430 \u0432\u044b\u043f\u043e\u043b\u043d\u0435\u043d\u0438\u0435 \u043a\u043e\u043c\u0430\u043d\u0434 \u043d\u0430 \u0440\u0435\u043f\u043b\u0438\u043a\u0435 \u0434\u043b\u044f \u043a\u043e\u043c\u0430\u043d\u0434 LISTEN, UNLISTEN \u0438 NOTIFY \u0432\u044b\u0437\u044b\u0432\u0430\u0435\u043c \u0441\u043e\u043e\u0442\u0432\u0435\u0442\u0441\u0432\u0443\u044e\u0449\u0438\u0435 \u0444\u0443\u043d\u043a\u0446\u0438\u0438 \u0438\u0437 \u0438\u0437\u043c\u0435\u043d\u0451\u043d\u043d\u043e\u0433\u043e \u0441\u043a\u043e\u043f\u0438\u0440\u043e\u0432\u0430\u043d\u043d\u043e\u0433\u043e \u0444\u0430\u0439\u043b\u0430 <strong>async.c<\/strong>:<\/p>\n<details class=\"spoiler\">\n<summary>pg_async.c<\/summary>\n<div class=\"spoiler__content\">\n<pre><code class=\"cpp\">static void CheckRestrictedOperation(const char *cmdname) {     if (InSecurityRestrictedOperation()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg(\"cannot execute %s within security-restricted operation\", cmdname))); }  static void pg_async_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc) {     Node *parsetree = pstmt-&gt;utilityStmt;     if (!XactReadOnly) return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);     check_stack_depth();     switch (nodeTag(parsetree)) {         case T_ListenStmt: {             ListenStmt *stmt = (ListenStmt *)parsetree;             CheckRestrictedOperation(\"LISTEN\");             Async_Listen_My(stmt-&gt;conditionname);         } break;         case T_NotifyStmt: {             NotifyStmt *stmt = (NotifyStmt *)parsetree;             Async_Notify_My(stmt-&gt;conditionname, stmt-&gt;payload);         } break;         case T_UnlistenStmt: {             UnlistenStmt *stmt = (UnlistenStmt *)parsetree;             CheckRestrictedOperation(\"UNLISTEN\");             stmt-&gt;conditionname ? Async_Unlisten_My(stmt-&gt;conditionname) : Async_UnlistenAll_My();         } break;         default: return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);     }     CommandCounterIncrement(); } <\/code><\/pre>\n<\/div>\n<\/details>\n<p>\u0412\u0441\u0451 \u044d\u0442\u043e \u043c\u043e\u0436\u043d\u043e \u043f\u043e\u0441\u043c\u043e\u0442\u0440\u0435\u0442\u044c \u0432 <a href=\"https:\/\/github.com\/RekGRpth\/pg_async\" rel=\"noopener noreferrer nofollow\">\u0440\u0435\u043f\u043e\u0437\u0438\u0442\u043e\u0440\u0438\u0438<\/a>.<\/p>\n<\/div>\n<p> \u0441\u0441\u044b\u043b\u043a\u0430 \u043d\u0430 \u043e\u0440\u0438\u0433\u0438\u043d\u0430\u043b \u0441\u0442\u0430\u0442\u044c\u0438 <a href=\"https:\/\/habr.com\/ru\/post\/550104\/\"> https:\/\/habr.com\/ru\/post\/550104\/<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"\n<div class=\"post__text post__text_v2\" id=\"post-content-body\">\n<p>\u0414\u043b\u044f \u043f\u0440\u0438\u0433\u043e\u0442\u043e\u0432\u043b\u0435\u043d\u0438\u044f \u0430\u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u044b\u0445 \u0443\u0432\u0435\u0434\u043e\u043c\u043b\u0435\u043d\u0438\u0439 <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/sql-listen\" rel=\"noopener noreferrer nofollow\">listen<\/a>\/<a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/sql-notify\" rel=\"noopener noreferrer nofollow\">notify<\/a> \u0432 <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/hot-standby\" rel=\"noopener noreferrer nofollow\">\u0440\u0435\u043f\u043b\u0438\u043a\u0435<\/a> \u043d\u0430\u043c \u043f\u043e\u043d\u0430\u0434\u043e\u0431\u0438\u0442\u0441\u044f <a href=\"https:\/\/www.postgresql.org\/\" rel=\"noopener noreferrer nofollow\">postgres<\/a>. \u041a\u0430\u043a \u0433\u043e\u0432\u043e\u0440\u0438\u0442\u0441\u044f \u0432 <a href=\"https:\/\/postgrespro.ru\/docs\/postgresql\/13\/hot-standby\" rel=\"noopener noreferrer nofollow\">\u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430\u0446\u0438\u0438<\/a>:<\/p>\n<blockquote>\n<p>\u0422\u0440\u0430\u043d\u0437\u0430\u043a\u0446\u0438\u0438, \u0437\u0430\u043f\u0443\u0449\u0435\u043d\u043d\u044b\u0435 \u0432 \u0440\u0435\u0436\u0438\u043c\u0435 \u0433\u043e\u0440\u044f\u0447\u0435\u0433\u043e \u0440\u0435\u0437\u0435\u0440\u0432\u0430, \u043d\u0438\u043a\u043e\u0433\u0434\u0430 \u043d\u0435 \u043f\u043e\u043b\u0443\u0447\u0430\u044e\u0442 ID  \u0442\u0440\u0430\u043d\u0437\u0430\u043a\u0446\u0438\u0438 \u0438 \u043d\u0435 \u043c\u043e\u0433\u0443\u0442 \u0431\u044b\u0442\u044c \u0437\u0430\u043f\u0438\u0441\u0430\u043d\u044b \u0432 \u0436\u0443\u0440\u043d\u0430\u043b \u043f\u0440\u0435\u0434\u0437\u0430\u043f\u0438\u0441\u0438. \u041f\u043e\u044d\u0442\u043e\u043c\u0443 \u043f\u0440\u0438  \u043f\u043e\u043f\u044b\u0442\u043a\u0435 \u0432\u044b\u043f\u043e\u043b\u043d\u0438\u0442\u044c \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u0435 \u0434\u0435\u0439\u0441\u0442\u0432\u0438\u044f \u0432\u043e\u0437\u043d\u0438\u043a\u043d\u0443\u0442 \u043e\u0448\u0438\u0431\u043a\u0438:<\/p>\n<p><code>LISTEN<\/code>, <code>NOTIFY<\/code><\/p>\n<\/blockquote>\n<p>\u041f\u043e\u044d\u0442\u043e\u043c\u0443 \u0431\u0435\u0440\u0451\u043c \u0444\u0430\u0439\u043b <strong>async.c <\/strong>\u0444\u0430\u0439\u043b \u0438\u0437 \u0438\u0441\u0445\u043e\u0434\u043d\u0438\u043a\u043e\u0432, \u043f\u0435\u0440\u0435\u0438\u043c\u0435\u043d\u043e\u0432\u044b\u0432\u0430\u0435\u043c \u0432 \u043d\u0451\u043c \u0432\u0441\u0435 \u043f\u0443\u0431\u043b\u0438\u0447\u043d\u044b\u0435 \u043c\u0435\u0442\u043e\u0434\u044b (\u043d\u0435 static-\u0444\u0443\u043d\u043a\u0446\u0438\u0438), \u0443\u0434\u0430\u043b\u044f\u0435\u043c \u0441\u0432\u044f\u0437\u044c \u0441 \u0442\u0440\u0430\u043d\u0437\u0430\u043a\u0446\u0438\u044f\u043c\u0438 \u0438 \u0434\u043e\u0431\u0430\u0432\u043b\u044f\u0435\u043c \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0443 \u0441\u0438\u0433\u043d\u0430\u043b\u0430 SIGUSR1, \u0447\u0442\u043e\u0431\u044b \u043f\u043e\u043b\u0443\u0447\u0438\u043b\u043e\u0441\u044c \u0442\u0430\u043a:<\/p>\n<details class=\"spoiler\">\n<summary>src\/backend\/commands\/async.c<\/summary>\n<div class=\"spoiler__content\">\n<pre><code class=\"diff\">diff --git a\/src\/backend\/commands\/async.c b\/src\/backend\/commands\/async.c index 5739d2b40f..9f62d4ca6b 100644 --- a\/src\/backend\/commands\/async.c +++ b\/src\/backend\/commands\/async.c @@ -1,3 +1,5 @@ +#include &lt;include.h&gt; +  \/*-------------------------------------------------------------------------   *   * async.c @@ -46,7 +48,7 @@   *\t  to. In case there is a match it delivers the notification event to its   *\t  frontend.  Non-matching events are simply skipped.   * - * 4. The NOTIFY statement (routine Async_Notify) stores the notification in + * 4. The NOTIFY statement (routine Async_Notify_My) stores the notification in   *\t  a backend-local list which will not be processed until transaction end.   *   *\t  Duplicate notifications from the same transaction are sent out as one @@ -56,7 +58,7 @@   *\t  that has been sent, it can easily add some unique string into the extra   *\t  payload parameter.   * - *\t  When the transaction is ready to commit, PreCommit_Notify() adds the + *\t  When the transaction is ready to commit, PreCommit_Notify_My() adds the   *\t  pending notifications to the head of the queue. The head pointer of the   *\t  queue always points to the next free position and a position is just a   *\t  page number and the offset in that page. This is done before marking the @@ -67,7 +69,7 @@   *\t  Once we have put all of the notifications into the queue, we return to   *\t  CommitTransaction() which will then do the actual transaction commit.   * - *\t  After commit we are called another time (AtCommit_Notify()). Here we + *\t  After commit we are called another time (AtCommit_Notify_My()). Here we   *\t  make the actual updates to the effective listen state (listenChannels).   *   *\t  Finally, after we are out of the transaction altogether, we check if @@ -171,7 +173,7 @@ typedef struct AsyncQueueEntry  {  \tint\t\t\tlength;\t\t\t\/* total allocated length of entry *\/  \tOid\t\t\tdboid;\t\t\t\/* sender's database OID *\/ -\tTransactionId xid;\t\t\t\/* sender's XID *\/ +\/\/\tTransactionId xid;\t\t\t\/* sender's XID *\/  \tint32\t\tsrcPid;\t\t\t\/* sender's PID *\/  \tchar\t\tdata[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];  } AsyncQueueEntry; @@ -414,14 +416,16 @@ typedef struct NotificationHash    static NotificationList *pendingNotifies = NULL;   +static pqsigfunc pg_async_signal_original = NULL; +  \/* - * Inbound notifications are initially processed by HandleNotifyInterrupt(), + * Inbound notifications are initially processed by HandleNotifyInterruptMy(),   * called from inside a signal handler. That just sets the   * notifyInterruptPending flag and sets the process - * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to + * latch. ProcessNotifyInterruptMy() will then be called whenever it's safe to   * actually deal with the interrupt.   *\/ -volatile sig_atomic_t notifyInterruptPending = false; +\/\/volatile sig_atomic_t notifyInterruptPending = false;    \/* True if we've registered an on_shmem_exit cleanup *\/  static bool unlistenExitRegistered = false; @@ -436,7 +440,7 @@ static bool backendHasSentNotifications = false;  static bool backendTryAdvanceTail = false;    \/* GUC parameter *\/ -bool\t\tTrace_notify = false; +\/\/bool\t\tTrace_notify = false;    \/* local function prototypes *\/  static int\tasyncQueuePageDiff(int p, int q); @@ -469,6 +473,12 @@ static uint32 notification_hash(const void *key, Size keysize);  static int\tnotification_match(const void *key1, const void *key2, Size keysize);  static void ClearPendingActionsAndNotifies(void);   +static void pg_async_signal(SIGNAL_ARGS) { +    HandleNotifyInterruptMy(); +    if (notifyInterruptPending) ProcessNotifyInterruptMy(); +    pg_async_signal_original(postgres_signal_arg); +} +  \/*   * Compute the difference between two queue page numbers (i.e., p - q),   * accounting for wraparound. @@ -509,11 +519,11 @@ asyncQueuePagePrecedes(int p, int q)   * Report space needed for our shared memory area   *\/  Size -AsyncShmemSize(void) +AsyncShmemSizeMy(void)  {  \tSize\t\tsize;   -\t\/* This had better match AsyncShmemInit *\/ +\t\/* This had better match AsyncShmemInitMy *\/  \tsize = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));  \tsize = add_size(size, offsetof(AsyncQueueControl, backend));   @@ -526,7 +536,7 @@ AsyncShmemSize(void)   * Initialize our shared memory area   *\/  void -AsyncShmemInit(void) +AsyncShmemInitMy(void)  {  \tbool\t\tfound;  \tSize\t\tsize; @@ -585,7 +595,7 @@ AsyncShmemInit(void)   *\t  SQL function to send a notification event   *\/  Datum -pg_notify(PG_FUNCTION_ARGS) +pg_notify_my(PG_FUNCTION_ARGS)  {  \tconst char *channel;  \tconst char *payload; @@ -601,16 +611,16 @@ pg_notify(PG_FUNCTION_ARGS)  \t\tpayload = text_to_cstring(PG_GETARG_TEXT_PP(1));    \t\/* For NOTIFY as a statement, this is checked in ProcessUtility *\/ -\tPreventCommandDuringRecovery(\"NOTIFY\"); +\/\/\tPreventCommandDuringRecovery(\"NOTIFY\");   -\tAsync_Notify(channel, payload); +\tAsync_Notify_My(channel, payload);    \tPG_RETURN_VOID();  }      \/* - * Async_Notify + * Async_Notify_My   *   *\t\tThis is executed by the SQL notify command.   * @@ -619,7 +629,7 @@ pg_notify(PG_FUNCTION_ARGS)   *\t\t^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^   *\/  void -Async_Notify(const char *channel, const char *payload) +Async_Notify_My(const char *channel, const char *payload)  {  \tint\t\t\tmy_level = GetCurrentTransactionNestLevel();  \tsize_t\t\tchannel_len; @@ -631,7 +641,7 @@ Async_Notify(const char *channel, const char *payload)  \t\telog(ERROR, \"cannot send notifications from a parallel worker\");    \tif (Trace_notify) -\t\telog(DEBUG1, \"Async_Notify(%s)\", channel); +\t\telog(DEBUG1, \"Async_Notify_My(%s)\", channel);    \tchannel_len = channel ? strlen(channel) : 0;  \tpayload_len = payload ? strlen(payload) : 0; @@ -679,7 +689,7 @@ Async_Notify(const char *channel, const char *payload)  \t\t\/*  \t\t * First notify event in current (sub)xact. Note that we allocate the  \t\t * NotificationList in TopTransactionContext; the nestingLevel might -\t\t * get changed later by AtSubCommit_Notify. +\t\t * get changed later by AtSubCommit_Notify_My.  \t\t *\/  \t\tnotifies = (NotificationList *)  \t\t\tMemoryContextAlloc(TopTransactionContext, @@ -725,7 +735,7 @@ queue_listen(ListenActionKind action, const char *channel)  \tint\t\t\tmy_level = GetCurrentTransactionNestLevel();    \t\/* -\t * Unlike Async_Notify, we don't try to collapse out duplicates. It would +\t * Unlike Async_Notify_My, we don't try to collapse out duplicates. It would  \t * be too complicated to ensure we get the right interactions of  \t * conflicting LISTEN\/UNLISTEN\/UNLISTEN_ALL, and it's unlikely that there  \t * would be any performance benefit anyway in sane applications. @@ -745,7 +755,7 @@ queue_listen(ListenActionKind action, const char *channel)  \t\t\/*  \t\t * First action in current sub(xact). Note that we allocate the  \t\t * ActionList in TopTransactionContext; the nestingLevel might get -\t\t * changed later by AtSubCommit_Notify. +\t\t * changed later by AtSubCommit_Notify_My.  \t\t *\/  \t\tactions = (ActionList *)  \t\t\tMemoryContextAlloc(TopTransactionContext, sizeof(ActionList)); @@ -761,29 +771,29 @@ queue_listen(ListenActionKind action, const char *channel)  }    \/* - * Async_Listen + * Async_Listen_My   *   *\t\tThis is executed by the SQL listen command.   *\/  void -Async_Listen(const char *channel) +Async_Listen_My(const char *channel)  {  \tif (Trace_notify) -\t\telog(DEBUG1, \"Async_Listen(%s,%d)\", channel, MyProcPid); +\t\telog(DEBUG1, \"Async_Listen_My(%s,%d)\", channel, MyProcPid);    \tqueue_listen(LISTEN_LISTEN, channel);  }    \/* - * Async_Unlisten + * Async_Unlisten_My   *   *\t\tThis is executed by the SQL unlisten command.   *\/  void -Async_Unlisten(const char *channel) +Async_Unlisten_My(const char *channel)  {  \tif (Trace_notify) -\t\telog(DEBUG1, \"Async_Unlisten(%s,%d)\", channel, MyProcPid); +\t\telog(DEBUG1, \"Async_Unlisten_My(%s,%d)\", channel, MyProcPid);    \t\/* If we couldn't possibly be listening, no need to queue anything *\/  \tif (pendingActions == NULL &amp;&amp; !unlistenExitRegistered) @@ -793,15 +803,15 @@ Async_Unlisten(const char *channel)  }    \/* - * Async_UnlistenAll + * Async_UnlistenAll_My   *   *\t\tThis is invoked by UNLISTEN * command, and also at backend exit.   *\/  void -Async_UnlistenAll(void) +Async_UnlistenAll_My(void)  {  \tif (Trace_notify) -\t\telog(DEBUG1, \"Async_UnlistenAll(%d)\", MyProcPid); +\t\telog(DEBUG1, \"Async_UnlistenAll_My(%d)\", MyProcPid);    \t\/* If we couldn't possibly be listening, no need to queue anything *\/  \tif (pendingActions == NULL &amp;&amp; !unlistenExitRegistered) @@ -818,7 +828,7 @@ Async_UnlistenAll(void)   * change within a transaction.   *\/  Datum -pg_listening_channels(PG_FUNCTION_ARGS) +pg_listening_channels_my(PG_FUNCTION_ARGS)  {  \tFuncCallContext *funcctx;   @@ -858,13 +868,13 @@ Async_UnlistenOnExit(int code, Datum arg)  }    \/* - * AtPrepare_Notify + * AtPrepare_Notify_My   *   *\t\tThis is called at the prepare phase of a two-phase   *\t\ttransaction.  Save the state for possible commit later.   *\/  void -AtPrepare_Notify(void) +AtPrepare_Notify_My(void)  {  \t\/* It's not allowed to have any pending LISTEN\/UNLISTEN\/NOTIFY actions *\/  \tif (pendingActions || pendingNotifies) @@ -874,7 +884,7 @@ AtPrepare_Notify(void)  }    \/* - * PreCommit_Notify + * PreCommit_Notify_My   *   *\t\tThis is called at transaction commit, before actually committing to   *\t\tclog. @@ -889,7 +899,7 @@ AtPrepare_Notify(void)   *\t\twe can still throw error if we run out of queue space.   *\/  void -PreCommit_Notify(void)<\/code><\/pre>\n<\/div>\n<\/details>\n<\/div>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[],"tags":[],"class_list":["post-320666","post","type-post","status-publish","format-standard","hentry"],"_links":{"self":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/320666","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=320666"}],"version-history":[{"count":0,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/320666\/revisions"}],"wp:attachment":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=320666"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=320666"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=320666"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}