lib/shared/modules/ms-client/ms-client.ts
Microservices client for dispatching messages between microservices.
Methods |
|
constructor(logger: Logger, proxy: ClientProxy)
|
|||||||||
Defined in lib/shared/modules/ms-client/ms-client.ts:30
|
|||||||||
Parameters :
|
dispatch | ||||||||||||||||||||
dispatch(pattern: any, data: TInput, opts?: MsClientOptions)
|
||||||||||||||||||||
Defined in lib/shared/modules/ms-client/ms-client.ts:43
|
||||||||||||||||||||
Type parameters :
|
||||||||||||||||||||
Dispatches a message with the given pattern and data.
Parameters :
Returns :
Promise<TResult>
A promise resolving to the result of the dispatched message. |
emit | ||||||||||||||||
emit(pattern: any, data: TInput, opts?: MsClientOptions)
|
||||||||||||||||
Defined in lib/shared/modules/ms-client/ms-client.ts:80
|
||||||||||||||||
Type parameters :
|
||||||||||||||||
Emits a message with the given pattern and data.
Parameters :
Returns :
Observable<T>
An observable of the result of the emitted message. |
Private handleRequest | ||||||||||||||||||||
handleRequest(source: Observable
|
||||||||||||||||||||
Defined in lib/shared/modules/ms-client/ms-client.ts:97
|
||||||||||||||||||||
Type parameters :
|
||||||||||||||||||||
Handles the request by logging and managing errors and timeouts.
Parameters :
Returns :
Observable<T>
An observable of the result. |
send | ||||||||||||||||
send(pattern: any, data: TInput, opts?: MsClientOptions)
|
||||||||||||||||
Defined in lib/shared/modules/ms-client/ms-client.ts:64
|
||||||||||||||||
Type parameters :
|
||||||||||||||||
Sends a message with the given pattern and data.
Parameters :
Returns :
Observable<T>
An observable of the result of the send message. |
import { ClientProxy } from "@nestjs/microservices";
import { catchError, Observable, throwError, timeout } from "rxjs";
import { HttpException, HttpStatus, Inject, Logger } from "@nestjs/common";
import { ObjectUtils } from "../../utils/object.utils";
import { MessageBus, MsClientOptions } from "./ms-client.types";
import { LOGGER } from "../log/log.constants";
import { MS_EXCEPTION_ID } from "../../constants";
import { MsException } from "../../exceptions/ms.exception";
import inspect = ObjectUtils.inspect;
/**
* Microservices client for dispatching messages between microservices.
*/
export class MsClient implements MessageBus {
constructor(
@Inject(LOGGER) private readonly logger: Logger,
private readonly proxy: ClientProxy,
) {}
/**
* Dispatches a message with the given pattern and data.
* @param pattern - The message pattern.
* @param data - The message data.
* @param opts - Optional configuration options for the client.
* @returns A promise resolving to the result of the dispatched message.
*/
dispatch<TResult = any, TInput = any>(
pattern: any,
data: TInput = Object(),
opts?: MsClientOptions,
): Promise<TResult> {
return new Promise<TResult>((resolve, reject) => {
const source = this.proxy.send<TResult, TInput>(pattern, data);
this.handleRequest(source, pattern, data, opts).subscribe({
next: (result) => resolve(result),
error: (error) => reject(error),
});
});
}
/**
* Sends a message with the given pattern and data.
* @param pattern - The message pattern.
* @param data - The message data.
* @param opts - Optional configuration options for the client.
* @returns An observable of the result of the send message.
*/
send<TResult = any, TInput = any>(
pattern: any,
data: TInput,
opts?: MsClientOptions,
) {
const source = this.proxy.send<TResult, TInput>(pattern, data);
return this.handleRequest(source, pattern, data, opts);
}
/**
* Emits a message with the given pattern and data.
* @param pattern - The message pattern.
* @param data - The message data.
* @param opts - Optional configuration options for the client.
* @returns An observable of the result of the emitted message.
*/
emit<TResult = any, TInput = any>(
pattern: any,
data: TInput,
opts?: MsClientOptions,
) {
const source = this.proxy.emit<TResult, TInput>(pattern, data);
return this.handleRequest(source, pattern, data, opts);
}
/**
* Handles the request by logging and managing errors and timeouts.
* @param source - The observable source.
* @param pattern - The message pattern.
* @param data - The message data.
* @param opts - Optional configuration options for the client.
* @returns An observable of the result.
*/
private handleRequest<T>(
source: Observable<T>,
pattern: any,
data: any,
opts?: MsClientOptions,
): Observable<T> {
this.logger.debug(`Sending request with pattern: ${inspect(pattern)}`);
return source.pipe(
timeout(opts?.timeout || parseInt(process.env.TRANSPORT_TIMEOUT)),
catchError((error) => {
if (error?.type === MS_EXCEPTION_ID) {
const err = error as MsException;
this.logger.error(
`Microservice exception: ${err.message}`,
err.stack,
);
throw new HttpException(err.message, err.code);
}
if (error.name === "TimeoutError") {
this.logger.warn(`Request timeout for pattern: ${inspect(pattern)}`);
throw new HttpException(
"Request Timeout",
HttpStatus.REQUEST_TIMEOUT,
);
}
this.logger.error(
`Unknown error for pattern: ${inspect(pattern)}`,
error,
);
return throwError(error);
}),
);
}
}