03/27/2025 9:10 PM

Building a Real-Time Notification System in Go with PostgreSQL

4 minutes

Go
PostgreSQL
Websockets
Pub/Sub

In Finly, we implemented real-time notifications using PGNotify as a lightweight Pub/Sub system. This powers our notification center, ensuring it updates automatically when new entries arrive and triggers a toast notification for immediate feedback. In this post, I’ll walk through how we set it up in Go and provided our client with a GraphQL subscription.

Ravi

Ravi

CTO

Introduction: Keeping Users in the Loop with Real-Time Notifications

With the recent launch of mentions in Finly's rich text editor, allowing users to mention teammates in task descriptions and comments, we wanted to add notifications to keep users informed. Our goal is to make Finly the all-in-one app for financial advisors, so users shouldn't rely on emails to stay updated on their workspace. Therefore, in addition to a notification center, we aimed to implement real-time notifications using a toast component.

Screenshot 2025-03-28 102857.png

Since Finly's backend is a scalable Go application, we introduced a Pub/Sub system to distribute notifications across multiple instances. To avoid the complexities of adding another infrastructure component like RabbitMQ, we chose PGNotify for its simplicity. It provides the essential functionality we need for notifications, sending messages to channels based on trigger events, such as when a notification is created.

Database Triggers: Pushing Notifications on Create

The first step in creating real-time notifications with PGNotify is to implement a trigger that activates whenever a new row is created. Since notifications are simply another entity in our system, we can include the creation of this trigger function in our migrations. This will push to the notifications, notification-user-id, and notification-workspace-id channels.

add_notifications.sql
1-- Create enum type "notification_type"
2CREATE TYPE "notification_type" AS ENUM ('mention');
3-- Create "notifications" table
4CREATE TABLE "notifications" (
5  "id" uuid NOT NULL DEFAULT gen_random_uuid(),
6  "created_at" timestamptz NULL,
7  "updated_at" timestamptz NULL,
8  "deleted_at" timestamptz NULL,
9  "user_id" uuid NULL,
10  "workspace_id" uuid NULL,
11  "type" text NULL,
12  "task_id" uuid NULL,
13  "task_comment_id" uuid NULL,
14  PRIMARY KEY ("id"),
15  CONSTRAINT "fk_notifications_task" FOREIGN KEY ("task_id") REFERENCES "tasks" ("id") ON UPDATE NO ACTION ON DELETE NO ACTION,
16  CONSTRAINT "fk_notifications_task_comment" FOREIGN KEY ("task_comment_id") REFERENCES "task_comments" ("id") ON UPDATE NO ACTION ON DELETE NO ACTION,
17  CONSTRAINT "fk_notifications_user" FOREIGN KEY ("user_id") REFERENCES "users" ("id") ON UPDATE NO ACTION ON DELETE NO ACTION
18  CONSTRAINT "fk_notifications_workspace" FOREIGN KEY ("workspace_id") REFERENCES "workspaces" ("id") ON UPDATE NO ACTION ON DELETE NO ACTION
19);
20-- Add function "notify_after_notification_insert"
21CREATE OR REPLACE FUNCTION notify_after_notification_insert()
22RETURNS TRIGGER AS
23$BODY$
24  BEGIN
25    PERFORM pg_notify('notification', row_to_json(NEW)::text);
26    PERFORM pg_notify('notification-' || NEW.user_id, row_to_json(NEW)::text);
27    PERFORM pg_notify('notification-' || NEW.workspace_id, row_to_json(NEW)::text);
28    RETURN NEW;
29  END;
30$BODY$
31LANGUAGE plpgsql;
32-- Add trigger "trigger_notify_after_notification_insert"
33CREATE OR REPLACE TRIGGER trigger_notify_after_notification_insert
34AFTER INSERT ON notifications
35FOR EACH ROW EXECUTE PROCEDURE notify_after_notification_insert();

Listening for Notifications in Go with PGX

Now that our database will push messages to those channels, we need to listen to the channel using PGX. We can achieve this with a simple Exec() call, such as running LISTEN notification-user-id, which will initiate the listener.

listener.go
1// Listen issues a LISTEN command for the supplied topic.
2func (l *listener) Listen(ctx context.Context, topic string) error {
3	var (
4		logger = l.logger.With(
5			zap.String("topic", topic),
6		)
7	)
8
9	l.mu.Lock()
10	defer l.mu.Unlock()
11
12	if _, err := l.conn.Exec(ctx, "LISTEN \""+topic+"\""); err != nil {
13		logger.Error("Error listening to channel", zap.Error(err))
14
15		return err
16	}
17
18	return nil
19}

After starting the listener, we need to wait for new notifications. PGX supports this with the WaitForNotification() method, which returns a *pgconn.Notification that includes the notification channel and payload.

listener.go
1// WaitForNotification blocks until receiving a notification and returns it. The
2// pgx driver should maintain a buffer of notifications, so as long as Listen
3// has been called, repeatedly calling WaitForNotification should yield all
4// notifications.
5func (l *listener) WaitForNotification(ctx context.Context) (*Notification, error) {
6	l.mu.Lock()
7	defer l.mu.Unlock()
8
9	pgn, err := l.conn.WaitForNotification(ctx)
10
11	if err != nil {
12		l.logger.Error("Error retrieving notification", zap.Error(err))
13
14		return nil, err
15	}
16
17	n := Notification{
18		Channel: pgn.Channel,
19		Payload: []byte(pgn.Payload),
20	}
21
22	return &n, nil
23}

We could simply run the LISTEN in a goroutine and then use a for-loop to continuously wait for new notifications and push them to a channel. However, we decided to implement the listener/notifier interfaces inspired by Jon Brown’s blog post "Go and Postgres Listen/Notify or: How I Learned to Stop Worrying and Love PubSub." This approach allows us to listen to the same channel with one listener, reusing connections to avoid exceeding our database’s connection limits. Additionally, the interface could enable us to switch to a different implementation, such as RabbitMQ, in the future if we need a different type of Pub/Sub system without rewriting our GraphQL resolvers.

1// Notification represents a published message
2type Notification struct {
3	Channel string `json:"channel"`
4	Payload []byte `json:"payload"`
5}
6
7// Listener interface connects to the database and allows callers to listen to a
8// particular topic by issuing a LISTEN command. WaitForNotification blocks
9// until receiving a notification or until the supplied context expires. The
10// default implementation is tightly coupled to pgx (following River's
11// implementation), but callers may implement their own listeners for any
12// backend they'd like.
13type Listener interface {
14	Close(ctx context.Context) error
15	Connect(ctx context.Context) error
16	Listen(ctx context.Context, topic string) error
17	Ping(ctx context.Context) error
18	Unlisten(ctx context.Context, topic string) error
19	WaitForNotification(ctx context.Context) (*Notification, error)
20}

With these interfaces, we have the implementation-specific listener that initiates the connection to Postgres and listens on all subscribed channels. The Notifier executes new LISTEN commands and UNLISTEN if all subscriptions are canceled. The Subscription contains the actual notification channel, an established channel to wait for a successful connection, and an Unsubscribe() method. The Notifier will run in a loop, continuously listening for new notifications using Uber Fx’s lifecycle methods to register the start function at application startup.

notifier.go
1func NewNotifier(l *zap.Logger, li Listener, lc fx.Lifecycle) Notifier {
2	logger := l.Named("pgxNotifier")
3
4	notifier := &notifier{
5		mu:                        sync.RWMutex{},
6		logger:                    logger,
7		listener:                  li,
8		subscriptions:             make(map[string][]*subscription),
9		channelChanges:            []channelChange{},
10		waitForNotificationCancel: context.CancelFunc(func() {}),
11	}
12
13	lc.Append(fx.StartHook(func() {
14		go func() {
15			for {
16				if err := notifier.Run(context.Background()); err != nil {
17					logger.Error("error running notifier", zap.Error(err))
18				}
19			}
20		}()
21	}))
22
23	return notifier
24}

We also need to ensure our listener connects to the database, grabbing a connection from the pool and reserving it for notifications. We can achieve this with an fx.StartStopHook.

listener.go
1// NewListener return a Listener that draws a connection from the supplied Pool. This
2// is somewhat discouraged
3func NewListener(db *gorm.DB, l *zap.Logger, lc fx.Lifecycle) Listener {
4	logger := l.Named("pgxListener")
5
6	listener := &listener{
7		mu:     sync.Mutex{},
8		db:     db,
9		logger: logger,
10	}
11
12	lc.Append(fx.StartStopHook(listener.Connect, listener.Close))
13
14	return listener
15}

Real-Time Delivery: WebSockets & GraphQL Subscriptions

The final piece of the puzzle on the backend is exposing the notifications as a subscription, allowing clients to be notified when a notification is added. GQLGen supports GraphQL subscriptions over WebSockets and SSE. For simplicity, we chose WebSockets but plan to support SSE in the future.

Adding a subscription is as simple as declaring it in our schema files.

notification.graphqls
1type Notification {
2  id: ID!
3  type: NotificationType!
4  task: Task @goField(forceResolver: true)
5  taskComment: TaskComment @goField(forceResolver: true)
6  read: Boolean!
7  createdAt: Time!
8}
9
10extend type Subscription {
11  notification: Notification!
12}

Running go tool gqlgen generate generates a new resolver file, providing a scaffolded subscription resolver method that expects us to return a channel for new notifications. In this resolver, we listen to the user notification channel, unmarshal the JSON payload into our Notification model, and push it to the subscription channel for delivery to the client.

notification.resolvers.go
1// Notification is the resolver for the notification field.
2func (r *subscriptionResolver) Notification(ctx context.Context) (<-chan *models.Notification, error) {
3	ch := make(chan *models.Notification)
4
5	user, err := r.authService.GetCurrentUser(ctx)
6
7	if err != nil {
8		return nil, err
9	}
10
11	subscription := r.notifier.Listen("notification-" + user.ID.String())
12
13	go func() {
14		defer close(ch)
15		defer subscription.Unsubscribe(ctx)
16
17		<-subscription.EstablishedC()
18
19		for {
20			select {
21			case <-ctx.Done():
22				subscription.Unsubscribe(ctx)
23				r.logger.Debug("Subscription closed")
24				return
25			case payload := <-subscription.NotificationC():
26				var notification models.Notification
27				json.Unmarshal(payload, &notification)
28				ch <- &notification
29			}
30		}
31	}()
32
33	return ch, nil
34}

WebSocket Authentication: Passing User Context

To read the current user from the context, our auth middleware must support WebSocket transport. When a WebSocket connection is established, the client can pass an initPayload, which our server processes using a WebSocketInit function to set the Go context (ctx).

middleware.go
1var jwtCtxKey = &contextKey{"jwt"}
2
3type contextKey struct {
4	name string
5}
6
7func WebSocketInit(as *Service, logger *zap.Logger) func(ctx context.Context, initPayload transport.InitPayload) (context.Context, *transport.InitPayload, error) {
8	return func(ctx context.Context, initPayload transport.InitPayload) (context.Context, *transport.InitPayload, error) {
9		authToken, ok := initPayload["authToken"].(string)
10		if !ok || authToken == "" {
11			return ctx, &initPayload, nil
12		}
13
14		jwtToken, err := as.ParseAccessToken(authToken)
15		if err != nil || !jwtToken.Valid {
16			if err != nil {
17				logger.Error("Error parsing token", zap.Error(err))
18			}
19
20			return ctx, &initPayload, errors.New("Invalid auth token in payload")
21		}
22
23		ctxNew := context.WithValue(ctx, jwtCtxKey, jwtToken)
24
25		return ctxNew, &initPayload, nil
26	}
27}

With all that in place, we're ready to integrate the subscriptions into the frontend.

Bringing It to Life: Displaying Notifications in the UI

Since we're using Apollo Client with React on the frontend, adding support for subscriptions over WebSockets is straightforward. Apollo provides GraphQLWsLink, which we can configure with custom connectionParams to pass the authToken and workspaceId in the initPayload.

apollo-client.tsx
1const wsLink = (
2  baseUrl: string,
3  getSessionRef?: MutableRefObject<() => Promise<Session | null>>,
4  workspaceId?: string,
5) =>
6  new GraphQLWsLink(
7    createClient({
8      url: new URL(
9        "/graphql",
10        baseUrl.replace("https", "wss").replace("http", "ws"),
11      ).toString(),
12      connectionParams: async () => {
13        const authToken = await getSessionRef
14          ?.current()
15          .then((session) => session?.accessToken);
16
17        return {
18          workspaceId,
19          authToken,
20        };
21      },
22    }),
23  );

Next, using Apollo's split link, we ensure that GraphQLWsLink is used only for subscriptions, while all other operations continue to use the HTTP link (which we've augmented with upload support).

For more details on configuring WebSockets in Apollo, refer to their WebSocket setup documentation.

Then, we initialize the notification center with existing notifications using a standard query.

index.tsx
1const getNotifications = graphql(`
2  query GetNotifications(
3    $filter: NotificationFilterInput
4    $orderBy: [NotificationOrderByInput!]
5  ) {
6    notifications(filter: $filter, orderBy: $orderBy) {
7      edges {
8        node {
9          ...NotificationCardFields
10        }
11      }
12      pageInfo {
13        ...PageInfoFields
14      }
15    }
16  }
17`);
18
19export const Notifications = ({
20  variant,
21  color,
22}: {
23  variant: "sm" | "md" | "xl";
24  color?: "primary";
25}) => {
26  const { workspaceId } = useWorkspaces();
27
28  const theme = useMantineTheme();
29
30  const { data, subscribeToMore } = useQuery(getNotifications, {
31    variables: {
32      filter: { workspaceId: { eq: workspaceId } },
33      orderBy: [
34        {
35          field: NotificationOrderField.CreatedAt,
36          direction: OrderDirection.Desc,
37        },
38      ],
39    },
40  });
41}

In Finly, we display this as a sleek notification drawer, providing consultants with a clear overview of all important updates in one place.

postspark_export_2025-03-28_11-19-47.png

To handle real-time updates, we use Apollo’s subscribeToMore method to start a subscription and update the query data by prepending new notifications to the list. Additionally, we display a toast notification using Mantine’s notification system for immediate user feedback.

index.tsx
1useEffect(() => {
2  const unsubscribe = subscribeToMore({
3    document: graphql(`
4      subscription SubscribeToNotifications {
5        notification {
6          ...NotificationCardFields
7        }
8      }
9    `),
10    updateQuery: (prev, { subscriptionData }) => {
11      if (!subscriptionData.data) return prev;
12
13      const newNotification = subscriptionData.data.notification;
14
15      notifications.show({
16        title: getNotificationTitle(newNotification),
17        message: getNotificationDescription(newNotification),
18        icon: <NotificationIcon type={newNotification.type} />,
19      });
20
21      return Object.assign({}, prev, {
22        notifications: {
23          edges: [{ node: newNotification }, ...prev.notifications.edges],
24        },
25      });
26    },
27  });

And that's all! With PGNotify and GraphQL subscriptions it's become trivial to setup real-time communication in our apps, providing rich user experiences and quick feedback loops for consultants and backoffice employees to stay on top of their tasks and collaborate more effectively.

Finly — Building a Real-Time Notification System in Go with PostgreSQL