Для приготовления асинхронных уведомлений listen/notify в реплике нам понадобится postgres. Как говорится в документации:
Транзакции, запущенные в режиме горячего резерва, никогда не получают ID транзакции и не могут быть записаны в журнал предзаписи. Поэтому при попытке выполнить следующие действия возникнут ошибки:
LISTEN,NOTIFY
Поэтому берём файл async.c файл из исходников, переименовываем в нём все публичные методы (не static-функции), удаляем связь с транзакциями и добавляем обработку сигнала SIGUSR1, чтобы получилось так:
src/backend/commands/async.c
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 5739d2b40f..9f62d4ca6b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -1,3 +1,5 @@ +#include <include.h> + /*------------------------------------------------------------------------- * * async.c @@ -46,7 +48,7 @@ * to. In case there is a match it delivers the notification event to its * frontend. Non-matching events are simply skipped. * - * 4. The NOTIFY statement (routine Async_Notify) stores the notification in + * 4. The NOTIFY statement (routine Async_Notify_My) stores the notification in * a backend-local list which will not be processed until transaction end. * * Duplicate notifications from the same transaction are sent out as one @@ -56,7 +58,7 @@ * that has been sent, it can easily add some unique string into the extra * payload parameter. * - * When the transaction is ready to commit, PreCommit_Notify() adds the + * When the transaction is ready to commit, PreCommit_Notify_My() adds the * pending notifications to the head of the queue. The head pointer of the * queue always points to the next free position and a position is just a * page number and the offset in that page. This is done before marking the @@ -67,7 +69,7 @@ * Once we have put all of the notifications into the queue, we return to * CommitTransaction() which will then do the actual transaction commit. * - * After commit we are called another time (AtCommit_Notify()). Here we + * After commit we are called another time (AtCommit_Notify_My()). Here we * make the actual updates to the effective listen state (listenChannels). * * Finally, after we are out of the transaction altogether, we check if @@ -171,7 +173,7 @@ typedef struct AsyncQueueEntry { int length; /* total allocated length of entry */ Oid dboid; /* sender's database OID */ - TransactionId xid; /* sender's XID */ +// TransactionId xid; /* sender's XID */ int32 srcPid; /* sender's PID */ char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; } AsyncQueueEntry; @@ -414,14 +416,16 @@ typedef struct NotificationHash static NotificationList *pendingNotifies = NULL; +static pqsigfunc pg_async_signal_original = NULL; + /* - * Inbound notifications are initially processed by HandleNotifyInterrupt(), + * Inbound notifications are initially processed by HandleNotifyInterruptMy(), * called from inside a signal handler. That just sets the * notifyInterruptPending flag and sets the process - * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to + * latch. ProcessNotifyInterruptMy() will then be called whenever it's safe to * actually deal with the interrupt. */ -volatile sig_atomic_t notifyInterruptPending = false; +//volatile sig_atomic_t notifyInterruptPending = false; /* True if we've registered an on_shmem_exit cleanup */ static bool unlistenExitRegistered = false; @@ -436,7 +440,7 @@ static bool backendHasSentNotifications = false; static bool backendTryAdvanceTail = false; /* GUC parameter */ -bool Trace_notify = false; +//bool Trace_notify = false; /* local function prototypes */ static int asyncQueuePageDiff(int p, int q); @@ -469,6 +473,12 @@ static uint32 notification_hash(const void *key, Size keysize); static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); +static void pg_async_signal(SIGNAL_ARGS) { + HandleNotifyInterruptMy(); + if (notifyInterruptPending) ProcessNotifyInterruptMy(); + pg_async_signal_original(postgres_signal_arg); +} + /* * Compute the difference between two queue page numbers (i.e., p - q), * accounting for wraparound. @@ -509,11 +519,11 @@ asyncQueuePagePrecedes(int p, int q) * Report space needed for our shared memory area */ Size -AsyncShmemSize(void) +AsyncShmemSizeMy(void) { Size size; - /* This had better match AsyncShmemInit */ + /* This had better match AsyncShmemInitMy */ size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); size = add_size(size, offsetof(AsyncQueueControl, backend)); @@ -526,7 +536,7 @@ AsyncShmemSize(void) * Initialize our shared memory area */ void -AsyncShmemInit(void) +AsyncShmemInitMy(void) { bool found; Size size; @@ -585,7 +595,7 @@ AsyncShmemInit(void) * SQL function to send a notification event */ Datum -pg_notify(PG_FUNCTION_ARGS) +pg_notify_my(PG_FUNCTION_ARGS) { const char *channel; const char *payload; @@ -601,16 +611,16 @@ pg_notify(PG_FUNCTION_ARGS) payload = text_to_cstring(PG_GETARG_TEXT_PP(1)); /* For NOTIFY as a statement, this is checked in ProcessUtility */ - PreventCommandDuringRecovery("NOTIFY"); +// PreventCommandDuringRecovery("NOTIFY"); - Async_Notify(channel, payload); + Async_Notify_My(channel, payload); PG_RETURN_VOID(); } /* - * Async_Notify + * Async_Notify_My * * This is executed by the SQL notify command. * @@ -619,7 +629,7 @@ pg_notify(PG_FUNCTION_ARGS) * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ */ void -Async_Notify(const char *channel, const char *payload) +Async_Notify_My(const char *channel, const char *payload) { int my_level = GetCurrentTransactionNestLevel(); size_t channel_len; @@ -631,7 +641,7 @@ Async_Notify(const char *channel, const char *payload) elog(ERROR, "cannot send notifications from a parallel worker"); if (Trace_notify) - elog(DEBUG1, "Async_Notify(%s)", channel); + elog(DEBUG1, "Async_Notify_My(%s)", channel); channel_len = channel ? strlen(channel) : 0; payload_len = payload ? strlen(payload) : 0; @@ -679,7 +689,7 @@ Async_Notify(const char *channel, const char *payload) /* * First notify event in current (sub)xact. Note that we allocate the * NotificationList in TopTransactionContext; the nestingLevel might - * get changed later by AtSubCommit_Notify. + * get changed later by AtSubCommit_Notify_My. */ notifies = (NotificationList *) MemoryContextAlloc(TopTransactionContext, @@ -725,7 +735,7 @@ queue_listen(ListenActionKind action, const char *channel) int my_level = GetCurrentTransactionNestLevel(); /* - * Unlike Async_Notify, we don't try to collapse out duplicates. It would + * Unlike Async_Notify_My, we don't try to collapse out duplicates. It would * be too complicated to ensure we get the right interactions of * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there * would be any performance benefit anyway in sane applications. @@ -745,7 +755,7 @@ queue_listen(ListenActionKind action, const char *channel) /* * First action in current sub(xact). Note that we allocate the * ActionList in TopTransactionContext; the nestingLevel might get - * changed later by AtSubCommit_Notify. + * changed later by AtSubCommit_Notify_My. */ actions = (ActionList *) MemoryContextAlloc(TopTransactionContext, sizeof(ActionList)); @@ -761,29 +771,29 @@ queue_listen(ListenActionKind action, const char *channel) } /* - * Async_Listen + * Async_Listen_My * * This is executed by the SQL listen command. */ void -Async_Listen(const char *channel) +Async_Listen_My(const char *channel) { if (Trace_notify) - elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); + elog(DEBUG1, "Async_Listen_My(%s,%d)", channel, MyProcPid); queue_listen(LISTEN_LISTEN, channel); } /* - * Async_Unlisten + * Async_Unlisten_My * * This is executed by the SQL unlisten command. */ void -Async_Unlisten(const char *channel) +Async_Unlisten_My(const char *channel) { if (Trace_notify) - elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); + elog(DEBUG1, "Async_Unlisten_My(%s,%d)", channel, MyProcPid); /* If we couldn't possibly be listening, no need to queue anything */ if (pendingActions == NULL && !unlistenExitRegistered) @@ -793,15 +803,15 @@ Async_Unlisten(const char *channel) } /* - * Async_UnlistenAll + * Async_UnlistenAll_My * * This is invoked by UNLISTEN * command, and also at backend exit. */ void -Async_UnlistenAll(void) +Async_UnlistenAll_My(void) { if (Trace_notify) - elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid); + elog(DEBUG1, "Async_UnlistenAll_My(%d)", MyProcPid); /* If we couldn't possibly be listening, no need to queue anything */ if (pendingActions == NULL && !unlistenExitRegistered) @@ -818,7 +828,7 @@ Async_UnlistenAll(void) * change within a transaction. */ Datum -pg_listening_channels(PG_FUNCTION_ARGS) +pg_listening_channels_my(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; @@ -858,13 +868,13 @@ Async_UnlistenOnExit(int code, Datum arg) } /* - * AtPrepare_Notify + * AtPrepare_Notify_My * * This is called at the prepare phase of a two-phase * transaction. Save the state for possible commit later. */ void -AtPrepare_Notify(void) +AtPrepare_Notify_My(void) { /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */ if (pendingActions || pendingNotifies) @@ -874,7 +884,7 @@ AtPrepare_Notify(void) } /* - * PreCommit_Notify + * PreCommit_Notify_My * * This is called at transaction commit, before actually committing to * clog. @@ -889,7 +899,7 @@ AtPrepare_Notify(void) * we can still throw error if we run out of queue space. */ void -PreCommit_Notify(void) +PreCommit_Notify_My(void) { ListCell *p; @@ -897,7 +907,7 @@ PreCommit_Notify(void) return; /* no relevant statements in this xact */ if (Trace_notify) - elog(DEBUG1, "PreCommit_Notify"); + elog(DEBUG1, "PreCommit_Notify_My"); /* Preflight for any pending listen/unlisten actions */ if (pendingActions != NULL) @@ -932,7 +942,7 @@ PreCommit_Notify(void) * so cheap if we don't, and we'd prefer not to do that work while * holding NotifyQueueLock. */ - (void) GetCurrentTransactionId(); +// (void) GetCurrentTransactionId(); /* * Serialize writers by acquiring a special lock that we hold till @@ -951,7 +961,7 @@ PreCommit_Notify(void) * used by the flatfiles mechanism.) */ LockSharedObject(DatabaseRelationId, InvalidOid, 0, - AccessExclusiveLock); + RowExclusiveLock); /* Now push the notifications into the queue */ backendHasSentNotifications = true; @@ -984,14 +994,14 @@ PreCommit_Notify(void) } /* - * AtCommit_Notify + * AtCommit_Notify_My * * This is called at transaction commit, after committing to clog. * * Update listenChannels and clear transaction-local state. */ void -AtCommit_Notify(void) +AtCommit_Notify_My(void) { ListCell *p; @@ -1003,7 +1013,7 @@ AtCommit_Notify(void) return; if (Trace_notify) - elog(DEBUG1, "AtCommit_Notify"); + elog(DEBUG1, "AtCommit_Notify_My"); /* Perform any pending listen/unlisten actions */ if (pendingActions != NULL) @@ -1036,7 +1046,7 @@ AtCommit_Notify(void) } /* - * Exec_ListenPreCommit --- subroutine for PreCommit_Notify + * Exec_ListenPreCommit --- subroutine for PreCommit_Notify_My * * This function must make sure we are ready to catch any incoming messages. */ @@ -1131,7 +1141,7 @@ Exec_ListenPreCommit(void) } /* - * Exec_ListenCommit --- subroutine for AtCommit_Notify + * Exec_ListenCommit --- subroutine for AtCommit_Notify_My * * Add the channel to the list of channels we are listening on. */ @@ -1155,10 +1165,12 @@ Exec_ListenCommit(const char *channel) oldcontext = MemoryContextSwitchTo(TopMemoryContext); listenChannels = lappend(listenChannels, pstrdup(channel)); MemoryContextSwitchTo(oldcontext); + + if (!pg_async_signal_original) pg_async_signal_original = pqsignal(SIGUSR1, pg_async_signal); } /* - * Exec_UnlistenCommit --- subroutine for AtCommit_Notify + * Exec_UnlistenCommit --- subroutine for AtCommit_Notify_My * * Remove the specified channel name from listenChannels. */ @@ -1186,10 +1198,15 @@ Exec_UnlistenCommit(const char *channel) * We do not complain about unlistening something not being listened; * should we? */ + + if (!list_length(listenChannels) && pg_async_signal_original) { + pqsignal(SIGUSR1, pg_async_signal_original); + pg_async_signal_original = NULL; + } } /* - * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify + * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify_My * * Unlisten on all channels for this backend. */ @@ -1201,10 +1218,15 @@ Exec_UnlistenAllCommit(void) list_free_deep(listenChannels); listenChannels = NIL; + + if (pg_async_signal_original) { + pqsignal(SIGUSR1, pg_async_signal_original); + pg_async_signal_original = NULL; + } } /* - * ProcessCompletedNotifies --- send out signals and self-notifies + * ProcessCompletedNotifiesMy --- send out signals and self-notifies * * This is called from postgres.c just before going idle at the completion * of a transaction. If we issued any notifications in the just-completed @@ -1213,10 +1235,10 @@ Exec_UnlistenAllCommit(void) * Also, if we filled enough queue pages with new notifies, try to advance * the queue tail pointer. * - * The reason that this is not done in AtCommit_Notify is that there is + * The reason that this is not done in AtCommit_Notify_My is that there is * a nonzero chance of errors here (for example, encoding conversion errors * while trying to format messages to our frontend). An error during - * AtCommit_Notify would be a PANIC condition. The timing is also arranged + * AtCommit_Notify_My would be a PANIC condition. The timing is also arranged * to ensure that a transaction's self-notifies are delivered to the frontend * before it gets the terminating ReadyForQuery message. * @@ -1227,8 +1249,9 @@ Exec_UnlistenAllCommit(void) * NOTE: we are outside of any transaction here. */ void -ProcessCompletedNotifies(void) +ProcessCompletedNotifiesMy(void) { + bool idle = !IsTransactionOrTransactionBlock(); MemoryContext caller_context; /* Nothing to do if we didn't send any notifications */ @@ -1249,12 +1272,13 @@ ProcessCompletedNotifies(void) caller_context = CurrentMemoryContext; if (Trace_notify) - elog(DEBUG1, "ProcessCompletedNotifies"); + elog(DEBUG1, "ProcessCompletedNotifiesMy"); /* * We must run asyncQueueReadAllNotifications inside a transaction, else * bad things happen if it gets an error. */ + if (idle) StartTransactionCommand(); /* Send signals to other backends */ @@ -1275,6 +1299,7 @@ ProcessCompletedNotifies(void) asyncQueueAdvanceTail(); } + if (idle) CommitTransactionCommand(); MemoryContextSwitchTo(caller_context); @@ -1431,7 +1456,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) entryLength = QUEUEALIGN(entryLength); qe->length = entryLength; qe->dboid = MyDatabaseId; - qe->xid = GetCurrentTransactionId(); +// qe->xid = GetCurrentTransactionId(); qe->srcPid = MyProcPid; memcpy(qe->data, n->data, channellen + payloadlen + 2); } @@ -1567,7 +1592,7 @@ asyncQueueAddEntries(ListCell *nextNotify) * occupied. */ Datum -pg_notification_queue_usage(PG_FUNCTION_ARGS) +pg_notification_queue_usage_my(PG_FUNCTION_ARGS) { double usage; @@ -1749,7 +1774,7 @@ SignalBackends(void) } /* - * AtAbort_Notify + * AtAbort_Notify_My * * This is called at transaction abort. * @@ -1757,10 +1782,10 @@ SignalBackends(void) * executed if the transaction got committed. */ void -AtAbort_Notify(void) +AtAbort_Notify_My(void) { /* - * If we LISTEN but then roll back the transaction after PreCommit_Notify, + * If we LISTEN but then roll back the transaction after PreCommit_Notify_My, * we have registered as a listener but have not made any entry in * listenChannels. In that case, deregister again. */ @@ -1772,12 +1797,12 @@ AtAbort_Notify(void) } /* - * AtSubCommit_Notify() --- Take care of subtransaction commit. + * AtSubCommit_Notify_My() --- Take care of subtransaction commit. * * Reassign all items in the pending lists to the parent transaction. */ void -AtSubCommit_Notify(void) +AtSubCommit_Notify_My(void) { int my_level = GetCurrentTransactionNestLevel(); @@ -1844,10 +1869,10 @@ AtSubCommit_Notify(void) } /* - * AtSubAbort_Notify() --- Take care of subtransaction abort. + * AtSubAbort_Notify_My() --- Take care of subtransaction abort. */ void -AtSubAbort_Notify(void) +AtSubAbort_Notify_My(void) { int my_level = GetCurrentTransactionNestLevel(); @@ -1882,15 +1907,15 @@ AtSubAbort_Notify(void) } /* - * HandleNotifyInterrupt + * HandleNotifyInterruptMy * * Signal handler portion of interrupt handling. Let the backend know * that there's a pending notify interrupt. If we're currently reading * from the client, this will interrupt the read and - * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt(). + * ProcessClientReadInterrupt() will call ProcessNotifyInterruptMy(). */ void -HandleNotifyInterrupt(void) +HandleNotifyInterruptMy(void) { /* * Note: this is called by a SIGNAL HANDLER. You must be very wary what @@ -1905,18 +1930,18 @@ HandleNotifyInterrupt(void) } /* - * ProcessNotifyInterrupt + * ProcessNotifyInterruptMy * * This is called if we see notifyInterruptPending set, just before * transmitting ReadyForQuery at the end of a frontend command, and * also if a notify signal occurs while reading from the frontend. - * HandleNotifyInterrupt() will cause the read to be interrupted + * HandleNotifyInterruptMy() will cause the read to be interrupted * via the process's latch, and this routine will get called. * If we are truly idle (ie, *not* inside a transaction block), * process the incoming notifies. */ void -ProcessNotifyInterrupt(void) +ProcessNotifyInterruptMy(void) { if (IsTransactionOrTransactionBlock()) return; /* not really idle */ @@ -1999,7 +2024,7 @@ asyncQueueReadAllNotifications(void) * before we see them. *---------- */ - snapshot = RegisterSnapshot(GetLatestSnapshot()); +// snapshot = RegisterSnapshot(GetLatestSnapshot()); /* * It is possible that we fail while trying to send a message to our @@ -2078,7 +2103,7 @@ asyncQueueReadAllNotifications(void) PG_END_TRY(); /* Done with snapshot */ - UnregisterSnapshot(snapshot); +// UnregisterSnapshot(snapshot); } /* @@ -2126,6 +2151,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, /* Ignore messages destined for other databases */ if (qe->dboid == MyDatabaseId) { +#if 0 if (XidInMVCCSnapshot(qe->xid, snapshot)) { /* @@ -2153,6 +2179,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, } else if (TransactionIdDidCommit(qe->xid)) { +#endif /* qe->data is the null-terminated channel name */ char *channel = qe->data; @@ -2161,8 +2188,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, /* payload follows channel name */ char *payload = qe->data + strlen(channel) + 1; - NotifyMyFrontEnd(channel, payload, qe->srcPid); + NotifyMyFrontEndMy(channel, payload, qe->srcPid); } +#if 0 } else { @@ -2171,6 +2199,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, * ignore its notifications. */ } +#endif } /* Loop back if we're not at end of page */ @@ -2271,6 +2300,7 @@ static void ProcessIncomingNotify(void) { /* We *must* reset the flag */ + bool idle = !IsTransactionOrTransactionBlock(); notifyInterruptPending = false; /* Do nothing else if we aren't actively listening */ @@ -2286,10 +2316,12 @@ ProcessIncomingNotify(void) * We must run asyncQueueReadAllNotifications inside a transaction, else * bad things happen if it gets an error. */ + if (idle) StartTransactionCommand(); asyncQueueReadAllNotifications(); + if (idle) CommitTransactionCommand(); /* @@ -2307,7 +2339,7 @@ ProcessIncomingNotify(void) * Send NOTIFY message to my front end. */ void -NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) +NotifyMyFrontEndMy(const char *channel, const char *payload, int32 srcPid) { if (whereToSendOutput == DestRemote) {
Теперь делаем расширение для функций
pg_async—1.0.sql
-- complain if script is sourced in psql, rather than via CREATE EXTENSION \echo Use "CREATE EXTENSION pg_async" to load this file. \quit CREATE FUNCTION pg_listen(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_listen' LANGUAGE C; CREATE FUNCTION pg_listening_channels() RETURNS setof pg_catalog.text STRICT AS 'MODULE_PATHNAME', 'pg_async_listening_channels' LANGUAGE C; CREATE FUNCTION pg_notification_queue_usage() RETURNS pg_catalog.float8 STRICT AS 'MODULE_PATHNAME', 'pg_async_notification_queue_usage' LANGUAGE C; CREATE FUNCTION pg_notify(channel pg_catalog.text default null, payload pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_notify' LANGUAGE C; CREATE FUNCTION pg_unlisten_all() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten_all' LANGUAGE C; CREATE FUNCTION pg_unlisten(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten' LANGUAGE C;
Здесь к стандартным pg_listening_channels, pg_notification_queue_usage и pg_notify добавлены новые удобные функции pg_listen, pg_unlisten и pg_unlisten_all, дополняющие соответствующие команды LISTEN, UNLISTEN и UNLISTEN *.
Делаем реализацию этих функций, вызывая на ведущем оригинальные функции, а на реплике функции из изменённого скопированного файла async.c:
pg_async.c
#define EXTENSION(function) Datum (function)(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(function); Datum (function)(PG_FUNCTION_ARGS) EXTENSION(pg_async_listen) { const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0)); !XactReadOnly ? Async_Listen(channel) : Async_Listen_My(channel); PG_RETURN_VOID(); } EXTENSION(pg_async_listening_channels) { return !XactReadOnly ? pg_listening_channels(fcinfo) : pg_listening_channels_my(fcinfo); } EXTENSION(pg_async_notification_queue_usage) { return !XactReadOnly ? pg_notification_queue_usage(fcinfo) : pg_notification_queue_usage_my(fcinfo); } EXTENSION(pg_async_notify) { return !XactReadOnly ? pg_notify(fcinfo) : pg_notify_my(fcinfo); } EXTENSION(pg_async_unlisten_all) { !XactReadOnly ? Async_UnlistenAll() : Async_UnlistenAll_My(); PG_RETURN_VOID(); } EXTENSION(pg_async_unlisten) { const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0)); !XactReadOnly ? Async_Unlisten(channel) : Async_Unlisten_My(channel); PG_RETURN_VOID(); }
Также, регистрируем хуки на выполнение команд, на транзакции и разделяемую память:
pg_async.c
static ProcessUtility_hook_type pg_async_ProcessUtility_hook_original = NULL; static shmem_startup_hook_type pg_async_shmem_startup_hook_original = NULL; void _PG_init(void); void _PG_init(void) { if (!process_shared_preload_libraries_in_progress) return; pg_async_ProcessUtility_hook_original = ProcessUtility_hook; ProcessUtility_hook = pg_async_ProcessUtility_hook; pg_async_shmem_startup_hook_original = shmem_startup_hook; shmem_startup_hook = pg_async_shmem_startup_hook; RequestAddinShmemSpace(AsyncShmemSizeMy()); RegisterSubXactCallback(pg_async_SubXactCallback, NULL); RegisterXactCallback(pg_async_XactCallback, NULL); } void _PG_fini(void); void _PG_fini(void) { ProcessUtility_hook = pg_async_ProcessUtility_hook_original; shmem_startup_hook = pg_async_shmem_startup_hook_original; UnregisterSubXactCallback(pg_async_SubXactCallback, NULL); UnregisterXactCallback(pg_async_XactCallback, NULL); }
В хуке на разделяемую память регистрируем её из изменённого скопированного файла async.c:
pg_async.c
static void pg_async_shmem_startup_hook(void) { if (pg_async_shmem_startup_hook_original) pg_async_shmem_startup_hook_original(); LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); AsyncShmemInitMy(); LWLockRelease(AddinShmemInitLock); }
В хуке на транзакции на реплике вызываем соответсвующие функции из изменённого скопированного файла async.c:
pg_async.c
static void pg_async_XactCallback(XactEvent event, void *arg) { if (!XactReadOnly) return; switch (event) { case XACT_EVENT_ABORT: AtAbort_Notify_My(); break; case XACT_EVENT_COMMIT: AtCommit_Notify_My(); ProcessCompletedNotifiesMy(); break; case XACT_EVENT_PRE_COMMIT: PreCommit_Notify_My(); break; case XACT_EVENT_PREPARE: AtPrepare_Notify_My(); break; default: break; } }
В хуке на выполнение команд на реплике для команд LISTEN, UNLISTEN и NOTIFY вызываем соответсвующие функции из изменённого скопированного файла async.c:
pg_async.c
static void CheckRestrictedOperation(const char *cmdname) { if (InSecurityRestrictedOperation()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("cannot execute %s within security-restricted operation", cmdname))); } static void pg_async_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc) { Node *parsetree = pstmt->utilityStmt; if (!XactReadOnly) return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc); check_stack_depth(); switch (nodeTag(parsetree)) { case T_ListenStmt: { ListenStmt *stmt = (ListenStmt *)parsetree; CheckRestrictedOperation("LISTEN"); Async_Listen_My(stmt->conditionname); } break; case T_NotifyStmt: { NotifyStmt *stmt = (NotifyStmt *)parsetree; Async_Notify_My(stmt->conditionname, stmt->payload); } break; case T_UnlistenStmt: { UnlistenStmt *stmt = (UnlistenStmt *)parsetree; CheckRestrictedOperation("UNLISTEN"); stmt->conditionname ? Async_Unlisten_My(stmt->conditionname) : Async_UnlistenAll_My(); } break; default: return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc); } CommandCounterIncrement(); }
Всё это можно посмотреть в репозитории.
ссылка на оригинал статьи https://habr.com/ru/post/550104/
Добавить комментарий