File

lib/shared/modules/ms-client/ms-client.ts

Description

Microservices client for dispatching messages between microservices.

Implements

MessageBus

Index

Methods

Constructor

constructor(logger: Logger, proxy: ClientProxy)
Parameters :
Name Type Optional
logger Logger No
proxy ClientProxy No

Methods

dispatch
dispatch(pattern: any, data: TInput, opts?: MsClientOptions)
Type parameters :
  • TResult
  • TInput

Dispatches a message with the given pattern and data.

Parameters :
Name Type Optional Default value Description
pattern any No
  • The message pattern.
data TInput No Object()
  • The message data.
opts MsClientOptions Yes
  • Optional configuration options for the client.
Returns : Promise<TResult>

A promise resolving to the result of the dispatched message.

emit
emit(pattern: any, data: TInput, opts?: MsClientOptions)
Type parameters :
  • TResult
  • TInput

Emits a message with the given pattern and data.

Parameters :
Name Type Optional Description
pattern any No
  • The message pattern.
data TInput No
  • The message data.
opts MsClientOptions Yes
  • Optional configuration options for the client.
Returns : Observable<T>

An observable of the result of the emitted message.

Private handleRequest
handleRequest(source: Observable, pattern: any, data: any, opts?: MsClientOptions)
Type parameters :
  • T

Handles the request by logging and managing errors and timeouts.

Parameters :
Name Type Optional Description
source Observable<T> No
  • The observable source.
pattern any No
  • The message pattern.
data any No
  • The message data.
opts MsClientOptions Yes
  • Optional configuration options for the client.
Returns : Observable<T>

An observable of the result.

send
send(pattern: any, data: TInput, opts?: MsClientOptions)
Type parameters :
  • TResult
  • TInput

Sends a message with the given pattern and data.

Parameters :
Name Type Optional Description
pattern any No
  • The message pattern.
data TInput No
  • The message data.
opts MsClientOptions Yes
  • Optional configuration options for the client.
Returns : Observable<T>

An observable of the result of the send message.

import { ClientProxy } from "@nestjs/microservices";
import { catchError, Observable, throwError, timeout } from "rxjs";
import { HttpException, HttpStatus, Inject, Logger } from "@nestjs/common";
import { ObjectUtils } from "../../utils/object.utils";
import { MessageBus, MsClientOptions } from "./ms-client.types";
import { LOGGER } from "../log/log.constants";
import { MS_EXCEPTION_ID } from "../../constants";
import { MsException } from "../../exceptions/ms.exception";
import inspect = ObjectUtils.inspect;

/**
 * Microservices client for dispatching messages between microservices.
 */
export class MsClient implements MessageBus {
  constructor(
    @Inject(LOGGER) private readonly logger: Logger,
    private readonly proxy: ClientProxy,
  ) {}

  /**
   * Dispatches a message with the given pattern and data.
   * @param pattern - The message pattern.
   * @param data - The message data.
   * @param opts - Optional configuration options for the client.
   * @returns A promise resolving to the result of the dispatched message.
   */
  dispatch<TResult = any, TInput = any>(
    pattern: any,
    data: TInput = Object(),
    opts?: MsClientOptions,
  ): Promise<TResult> {
    return new Promise<TResult>((resolve, reject) => {
      const source = this.proxy.send<TResult, TInput>(pattern, data);
      this.handleRequest(source, pattern, data, opts).subscribe({
        next: (result) => resolve(result),
        error: (error) => reject(error),
      });
    });
  }

  /**
   * Sends a message with the given pattern and data.
   * @param pattern - The message pattern.
   * @param data - The message data.
   * @param opts - Optional configuration options for the client.
   * @returns An observable of the result of the send message.
   */
  send<TResult = any, TInput = any>(
    pattern: any,
    data: TInput,
    opts?: MsClientOptions,
  ) {
    const source = this.proxy.send<TResult, TInput>(pattern, data);
    return this.handleRequest(source, pattern, data, opts);
  }

  /**
   * Emits a message with the given pattern and data.
   * @param pattern - The message pattern.
   * @param data - The message data.
   * @param opts - Optional configuration options for the client.
   * @returns An observable of the result of the emitted message.
   */
  emit<TResult = any, TInput = any>(
    pattern: any,
    data: TInput,
    opts?: MsClientOptions,
  ) {
    const source = this.proxy.emit<TResult, TInput>(pattern, data);
    return this.handleRequest(source, pattern, data, opts);
  }

  /**
   * Handles the request by logging and managing errors and timeouts.
   * @param source - The observable source.
   * @param pattern - The message pattern.
   * @param data - The message data.
   * @param opts - Optional configuration options for the client.
   * @returns An observable of the result.
   */
  private handleRequest<T>(
    source: Observable<T>,
    pattern: any,
    data: any,
    opts?: MsClientOptions,
  ): Observable<T> {
    this.logger.debug(`Sending request with pattern: ${inspect(pattern)}`);
    return source.pipe(
      timeout(opts?.timeout || parseInt(process.env.TRANSPORT_TIMEOUT)),
      catchError((error) => {
        if (error?.type === MS_EXCEPTION_ID) {
          const err = error as MsException;
          this.logger.error(
            `Microservice exception: ${err.message}`,
            err.stack,
          );
          throw new HttpException(err.message, err.code);
        }
        if (error.name === "TimeoutError") {
          this.logger.warn(`Request timeout for pattern: ${inspect(pattern)}`);
          throw new HttpException(
            "Request Timeout",
            HttpStatus.REQUEST_TIMEOUT,
          );
        }
        this.logger.error(
          `Unknown error for pattern: ${inspect(pattern)}`,
          error,
        );
        return throwError(error);
      }),
    );
  }
}

results matching ""

    No results matching ""