File

lib/common/process/process-manager.service.ts

Description

Service for managing processes.

Index

Properties
Methods

Constructor

constructor(lockExec: WarlockFn, logger: Logger, broker: MessagesBroker, processUnitRep: Repository<ProcessUnitEntity>, processLogRep: Repository<ProcessLogEntity>)
Parameters :
Name Type Optional
lockExec WarlockFn No
logger Logger No
broker MessagesBroker No
processUnitRep Repository<ProcessUnitEntity> No
processLogRep Repository<ProcessLogEntity> No

Methods

Async createLogInstance
createLogInstance(processCode: string)

Creates a log instance for a process.

Parameters :
Name Type Optional Description
processCode string No
  • The code of the process.
Returns : unknown

The created ProcessLogEntity instance.

getLastLogsByProcess
getLastLogsByProcess(processCode: string, limit: number)

Gets the last logs of a process by its code.

Parameters :
Name Type Optional Default value Description
processCode string No
  • The code of the process.
limit number No 3
  • The maximum number of logs to retrieve (default is 3).
Returns : any

An array of ProcessLogEntity instances.

getProcessData
getProcessData(code: string, force)

Gets process data by its code.

Parameters :
Name Type Optional Default value Description
code string No
  • The code of the process.
force No false
  • Whether to force getting the process data regardless of its enabled status.
Returns : any

The ProcessUnitEntity instance.

getProcessLogById
getProcessLogById(id: number)

Gets a process log by its ID.

Parameters :
Name Type Optional Description
id number No
  • The ID of the process log.
Returns : any

The ProcessLogEntity instance.

Async getProcessUnitStatus
getProcessUnitStatus(code: string)

Gets the status of a process unit.

Parameters :
Name Type Optional Description
code string No
  • The code of the process unit.
Returns : unknown

The current status of the process unit.

Async init
init()

Initializes the process manager service, resets process statuses, and registers processes.

Returns : any
Private Async resetAllProcessStatuses
resetAllProcessStatuses()

Resets the statuses of all enabled processes to "Ready".

Returns : any
Async setProcessUnitStatus
setProcessUnitStatus(code: string, status: Process.Status)

Sets the status of a process unit.

Parameters :
Name Type Optional Description
code string No
  • The code of the process unit.
status Process.Status No
  • The new status to set.
Returns : unknown
Async startProcess
startProcess(code: string)

Starts a process by its code.

Parameters :
Name Type Optional Description
code string No
  • The code of the process to start.
Returns : any
Async stopProcess
stopProcess(code: string)

Stops a process by its code.

Parameters :
Name Type Optional Description
code string No
  • The code of the process to stop.
Returns : any
Async toggleProcess
toggleProcess(code: string)

Toggles the enabled status of a process.

Parameters :
Name Type Optional Description
code string No
  • The code of the process to toggle.
Returns : any
updateLogInstance
updateLogInstance(logInstance: ProcessLogEntity)

Updates a log instance.

Parameters :
Name Type Optional Description
logInstance ProcessLogEntity No
  • The ProcessLogEntity instance to update.
Returns : any

The updated ProcessLogEntity instance.

Properties

Private Static pmInitStatus
Type : boolean
import {
  Inject,
  Injectable,
  InternalServerErrorException,
  Logger,
} from "@nestjs/common";
import { InjectRepository } from "@nestjs/typeorm";
import { Repository } from "typeorm";
import { ProcessUnitEntity } from "./entity/process.unit.entity";
import { Process } from "./process.constants";
import { ProcessLogEntity } from "./entity/process.log.entity";
import { WARLOCK } from "../../shared/modules/warlock/warlock.constants";
import { WarlockFn } from "../../shared/modules/warlock/warlock.types";
import { LOGGER } from "../../shared/modules/log/log.constants";
import { MESSAGES_BROKER } from "../../shared/modules/messages-broker/messages-broker.constants";
import { MessagesBroker } from "../../shared/modules/messages-broker/messages-broker.types";
import Status = Process.Status;
import Command = Process.Command;
import hasProcessInstance = Process.hasProcessInstance;

/**
 * Service for managing processes.
 */
@Injectable()
export class ProcessManagerService {
  private static pmInitStatus: boolean;

  constructor(
    @Inject(WARLOCK) private readonly lockExec: WarlockFn,
    @Inject(LOGGER) private readonly logger: Logger,
    @Inject(MESSAGES_BROKER) private readonly broker: MessagesBroker,
    @InjectRepository(ProcessUnitEntity)
    private readonly processUnitRep: Repository<ProcessUnitEntity>,
    @InjectRepository(ProcessLogEntity)
    private readonly processLogRep: Repository<ProcessLogEntity>,
  ) {}

  /**
   * Initializes the process manager service, resets process statuses, and registers processes.
   */
  async init() {
    if (ProcessManagerService.pmInitStatus) {
      this.logger.warn("Autostart processes has been executed");
      return;
    }
    await this.resetAllProcessStatuses();
    this.logger.log("Init process manager");
    ProcessManagerService.pmInitStatus = true;
    const processList = await this.processUnitRep.find({
      where: { enabled: true },
    });
    for (const processData of processList) {
      if (!processData.cronTab?.length) {
        this.logger.warn(
          `Process ${processData.code} hasn't cron-tab, skip job registration`,
        );
        continue;
      }
      this.broker.emit(Command.Register, processData);
    }
  }

  /**
   * Starts a process by its code.
   * @param code - The code of the process to start.
   */
  async startProcess(code: string) {
    const processData = await this.getProcessData(code, true);
    if (!processData) {
      throw new InternalServerErrorException(
        `Process ${code} hasn't options-data`,
      );
    }
    this.broker.emit(Command.Start, processData);
  }

  /**
   * Stops a process by its code.
   * @param code - The code of the process to stop.
   */
  async stopProcess(code: string) {
    const processData = await this.getProcessData(code, true);
    if (!processData) {
      throw new InternalServerErrorException(
        `Process ${code} hasn't options-data`,
      );
    }
    this.broker.emit(Command.Stop, processData);
  }

  /**
   * Toggles the enabled status of a process.
   * @param code - The code of the process to toggle.
   */
  async toggleProcess(code: string) {
    if (!hasProcessInstance(code)) {
      throw new InternalServerErrorException(`Process ${code} not exists`);
    }
    const processData = await this.processUnitRep.findOne({ where: { code } });
    processData.enabled = !processData.enabled;
    await this.processUnitRep.save(processData);
    if (processData.enabled) {
      this.broker.emit(Command.Register, processData);
    } else {
      this.broker.emit(Command.Unregister, processData);
    }
  }

  /**
   * Sets the status of a process unit.
   * @param code - The code of the process unit.
   * @param status - The new status to set.
   */
  async setProcessUnitStatus(code: string, status: Process.Status) {
    const processData = await this.getProcessData(code, true);
    processData.status = status;
    return this.processUnitRep.save(processData);
  }

  /**
   * Gets the status of a process unit.
   * @param code - The code of the process unit.
   * @returns The current status of the process unit.
   */
  async getProcessUnitStatus(code: string) {
    const processData = await this.getProcessData(code, true);
    return processData.status;
  }

  /**
   * Creates a log instance for a process.
   * @param processCode - The code of the process.
   * @returns The created ProcessLogEntity instance.
   */
  async createLogInstance(processCode: string) {
    const process = await this.getProcessData(processCode);
    return this.processLogRep.save({
      process,
      content: "",
    } as ProcessLogEntity);
  }

  /**
   * Updates a log instance.
   * @param logInstance - The ProcessLogEntity instance to update.
   * @returns The updated ProcessLogEntity instance.
   */
  updateLogInstance(logInstance: ProcessLogEntity) {
    return this.processLogRep.save(logInstance);
  }

  /**
   * Gets process data by its code.
   * @param code - The code of the process.
   * @param force - Whether to force getting the process data regardless of its enabled status.
   * @returns The ProcessUnitEntity instance.
   */
  getProcessData(code: string, force = false) {
    const params = { code, enabled: true };
    return this.processUnitRep.findOne({ where: force ? { code } : params });
  }

  /**
   * Gets a process log by its ID.
   * @param id - The ID of the process log.
   * @returns The ProcessLogEntity instance.
   */
  getProcessLogById(id: number) {
    return this.processLogRep.findOne({
      where: { id },
      relations: ["process"],
    });
  }

  /**
   * Gets the last logs of a process by its code.
   * @param processCode - The code of the process.
   * @param limit - The maximum number of logs to retrieve (default is 3).
   * @returns An array of ProcessLogEntity instances.
   */
  getLastLogsByProcess(processCode: string, limit = 3) {
    return this.processLogRep.find({
      where: { process: { code: processCode } },
      take: limit,
      order: { tsUpdated: "DESC" },
    });
  }

  /**
   * Resets the statuses of all enabled processes to "Ready".
   */
  private async resetAllProcessStatuses() {
    const entities = await this.processUnitRep.find({
      where: { enabled: true },
    });
    for (const processData of entities) {
      await this.setProcessUnitStatus(processData.code, Status.Ready);
    }
  }
}

results matching ""

    No results matching ""