File

lib/shared/modules/messages-broker/messages-broker.service.ts

Description

A service that implements the MessagesBroker interface for message brokering using Redis.

Index

Properties
Methods

Constructor

constructor(pubClient: Redis)
Parameters :
Name Type Optional
pubClient Redis No

Methods

Async emit
emit(chanel: string, data: T)
Type parameters :
  • T

Emits a message to a specific channel.

Parameters :
Name Type Optional Description
chanel string No
  • The channel to emit the message to.
data T No
  • The data to emit.
Returns : any
Private safeDeSerialize
safeDeSerialize(data: string)

Safely deserializes a JSON string to an object.

Parameters :
Name Type Optional Description
data string No
  • The JSON string to deserialize.
Returns : any

The deserialized object.

Private safeSerialize
safeSerialize(data: T)
Type parameters :
  • T

Safely serializes data to a JSON string.

Parameters :
Name Type Optional Description
data T No
  • The data to serialize.
Returns : any

The serialized JSON string.

subscribe
subscribe(chanel: string, handler: (data: T) => void)
Type parameters :
  • T

Subscribes to a specific channel with a handler function.

Parameters :
Name Type Optional Description
chanel string No
  • The channel to subscribe to.
handler function No
  • The handler function to handle the data.
Returns : void
unsubscribe
unsubscribe(chanel: string)

Unsubscribes from a specific channel.

Parameters :
Name Type Optional Description
chanel string No
  • The channel to unsubscribe from.
Returns : void

Properties

Private Readonly subClient
Type : Redis
Private Readonly subscribers
Type : Map<string | void>
Default value : new Map()
import { Inject, Injectable } from "@nestjs/common";
import Redis from "ioredis";
import { MessagesBroker } from "./messages-broker.types";
import { REDIS_CLIENT } from "../cache/cache.constants";

/**
 * A service that implements the MessagesBroker interface for message brokering using Redis.
 */
@Injectable()
export class MessagesBrokerService implements MessagesBroker {
  private readonly subClient: Redis;
  private readonly subscribers: Map<string, (data: unknown) => void> =
    new Map();

  constructor(@Inject(REDIS_CLIENT) private readonly pubClient: Redis) {
    this.subClient = this.pubClient.duplicate();
    this.subClient.on("message", (chanel, data) => {
      if (!this.subscribers.has(chanel)) {
        return;
      }
      const fun = this.subscribers.get(chanel);
      fun(this.safeDeSerialize(data));
    });
  }

  /**
   * Emits a message to a specific channel.
   * @param chanel - The channel to emit the message to.
   * @param data - The data to emit.
   */
  async emit<T = unknown>(chanel: string, data: T) {
    await this.pubClient.publish(chanel, this.safeSerialize(data));
  }

  /**
   * Subscribes to a specific channel with a handler function.
   * @param chanel - The channel to subscribe to.
   * @param handler - The handler function to handle the data.
   */
  subscribe<T = unknown>(chanel: string, handler: (data: T) => void) {
    this.subClient.subscribe(chanel);
    this.subscribers.set(chanel, handler);
  }

  /**
   * Unsubscribes from a specific channel.
   * @param chanel - The channel to unsubscribe from.
   */
  unsubscribe(chanel: string) {
    this.subClient.unsubscribe(chanel);
    this.subscribers.delete(chanel);
  }

  /**
   * Safely serializes data to a JSON string.
   * @param data - The data to serialize.
   * @returns The serialized JSON string.
   */
  private safeSerialize<T = unknown>(data: T) {
    try {
      return JSON.stringify(data);
    } catch (error) {
      return data.toString();
    }
  }

  /**
   * Safely deserializes a JSON string to an object.
   * @param data - The JSON string to deserialize.
   * @returns The deserialized object.
   */
  private safeDeSerialize(data: string) {
    try {
      return JSON.parse(data);
    } catch (error) {
      return data;
    }
  }
}

results matching ""

    No results matching ""