Nestjs Queues 初探

NestJs 中的 Queues 使用。
每个例子都有对应的代码。

快速上手

完整代码:

1
2
3
git clone git@github.com:sertceps/nestjs-queues-demo.git
git switch message
# switch 不起作用可以使用 git checkout message

安装依赖

新建 nest 项目,并安装 bull 依赖:

1
npm install --save @nestjs/bull bull

此外还需自行安装 Redis

如果有 docket 环境可以使用下面简易 docker-compose.yaml

1
2
3
4
5
6
7
8
version: "3"
services:
redis:
image: "redis:alpine"
container_name: redis
restart: always
ports:
- "6379:6379"

目录结构

src 目录新建文件夹 message-queue,并分别新建 message.producer.tsmessage.consumer.ts

1
2
3
4
5
6
7
8
src
├── app.controller.ts
├── app.module.ts
├── app.service.ts
├── main.ts
├── message-queue
│ ├── message.producer.ts
│ └── message.consumer.ts

配置相关

AppModule 中导入 BullModule,并书写如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';

@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),

BullModule.registerQueue({ name: 'message-queue' })
],
// ....
})
export class AppModule {}

BullModule.forRoot()

forRoot() 方法用于注册一个 bull 包的配置对象,这个对象会被在应用中注册的所有队列(queue)使用。

配置对象由以下属性组成:

  • limiter: RateLimiter - 控制队列任务处理的速度。查看 RateLimiter 获取更多信息。可选配置。
  • redis: RedisOpts - 配置 Redis 连接。 点 RedisOpts 获取更多信息。可选配置。
  • prefix: string - 所有队列键名的前缀。可选配置。
  • defaultJobOptions: JobOpts - 用于控制新任务的默认配置项。查看 JobOpts 获取更多信息。可选配置。
  • settings: AdvancedSettings - 高级队列配置设置。这些通常不应修改。查看 AdvancedSettings 获取更多信息。可选配置。

所有的选项都是可选的,它们提供了队列行为的精细控制。这些选项会被直接传入 Bull Queue 构造函数。

这里阅读更多。

BullModule.register()

registerQueue 用于实例化并注册队列。如果需要注册多个队列,使用逗号分隔的多个对象即可:

1
2
3
4
imports: [
//...
BullModule.registerQueue({ name: 'queue-1' },{name: 'queue-2'}),
]

可以为某个特定队列覆盖全局配置:

1
2
3
4
5
6
7
8
9
imports: [
//...
BullModule.registerQueue({
name: 'queue-x',
redis: {
port: 6389
}
}),
]

每个队列通过 name 属性保证唯一性。

name 不仅是依赖注入的 token(注入 controllers 或 providers),而且消费者(consumer)和监听器(listener)也是通过 name 与队列进(queue)行关联(name 作为装饰器参数)。

生产者 (Producer)

生产者将任务添加到队列。

回到代码,在 message.producer.ts 中书写如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';

@Injectable()
export class MessageProducer {
constructor(@InjectQueue('message-queue') private queue: Queue) {}

async sendMessage(message: string) {
await this.queue.add('message-job', {
message
});
}
}

@Injectable() 表明,生产者是Nest 中典型的 services(Nest provider)。

@InjectQueue() 通过 name 识别队列,即上文提到的,每个队列通过 name 属性保证唯一性,是依赖注入的 token。

Queue.add()

Queue.add() 用于添加任务,其参数是对象格式:

1
2
3
4
5
await this.queue.add('job-name',{
key1: value1,
key2: value2,
// ...
})

注意到上面的代码,提供了一个 job-name 参数。这种具有唯一名称的任务,称为具名任务(named jobs)

每一个具名任务,在消费者(consumer)中都要有一一对应的处理器(processor),专门用于处理具名任务。否则会报错。

Queue.add() 中,可以为任务传入一些选项,比如:

  • 使用 delay 属性来延迟任务开始的时间:
1
2
3
4
5
6
const job = await this.queue.add(
{
foo: 'bar',
},
{ delay: 3000 }, // 3 秒延时
);
  • 使用 lifo 属性将任务队列设置成 LIFO(后进先出):
1
const job = await this.audioQueue.add(  {    foo: 'bar',  },  { lifo: true },);
  • 使用 priority 属性提高某个任务优先级:
1
const job = await this.queue.add(  {    foo: 'bar',  },  { priority: 2 },);

完整的选项如下:

  • priority: number - 可选的优先级。范围从 1(优先级最高)到 MAX_INT(优先级最低)。注意,使用优先级会对性能造成轻微影响,应该谨慎使用。
  • delay: number - 直到任务可以被处理前等待的时间(毫秒)。注意,为了精确延时,服务器和客户端时钟应该同步。
  • attempts: number - 直到任务完成尝试任务的总次数。
  • repeat: RepeatOpts -通过 cron 指定进行重复任务。查看 RepeatOpts.
  • backoff: number | BackoffOpts - 如果失败自动尝试的回退设置。 查看 BackoffOpts.
  • lifo: boolean -如果为真,将任务添加到队列最右,而非最左(默认false)。
  • timeout: number - 超时时间(毫秒)。
  • jobId: number | string - 自定义 job Id。默认情况下,job ID 是一个唯一整数。如果自定义,需要保证其唯一性。尝试添加一个 job ID 已经存在的任务不会成功。
  • removeOnComplete: boolean | number - 如果为 true,成功完成的任务会被移除。数字指定要保留的任务数。默认行为是保存任务在已完成的集合中。
  • removeOnFail: boolean | number - 如果为 true, 在所有尝试后移除失败的任务。数字指定要保留的任务数。默认行为是保存任务在已失败的集合中。
  • stackTraceLimit: number -限制在栈追踪记录的栈追踪线数量。

消费者(Consumer)

消费者是一个类。这个类定义了一些方法,用于处理进入队列的任务,或是监听队列事件。

message.consumer.ts 中书写如下代码:

1
2
3
4
5
6
7
8
9
10
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('message-queue')
export class MessageConsumer {
@Process('message-job')
consume(job: Job<unknown>) {
console.log(job.data);
}
}

@Processor() 表明,这是一个消费者类,其参数是对应的队列 name。这里是我们在 AppModule 注册的 "message-queue"

@Process()

在类的内部,使用@Process() 装饰器声明任务处理器。

被装饰的方法,比如上面的 consume(), 在资源空闲且队列中仍有任务的时候被调用。

注意,类装饰器是 @Processor(),方法装饰器是@Process()

@Process('message-job') 传入了任务的 name,说明这是用来处理具名任务的对应处理器。

如果不是具名任务,那么会被统一的处理器处理,比如生产者中有如下两个任务:

1
2
3
4
5
6
7
8
9
10
11
async testMessage(message: string) {    
await this.queue.add({
message
});
}

async testMessage2(score: number) {
await this.queue.add({
score
});
}

上面两个任务,会统一被消费者中的 testConsume()处理器处理:

1
2
3
@Process()  testConsume(job: Job<unknown>) {      
// ...
}

注意,Queue.add()@Process() 都没有传入 name 参数。

没有 name 属性的 @Process() 只能定义一个。

使用队列

app.controller.ts 中书写如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { Controller, Get, Query } from '@nestjs/common';
import { AppService } from './app.service';
import { MessageProducer } from './message-queue/message.producer';

@Controller()
export class AppController {
constructor(
private readonly appService: AppService,
private readonly messageProducer: MessageProducer
) {}

// ...

@Get('messages')
async getInvokeMessage(@Query('message') message: string) {
this.messageProducer.sendMessage(message);
return message;
}
}

这里注入了 MessageProducer,需要在 AppleModule 中引入:

1
2
3
4
@Module({  
// ...
providers: [AppService, MessageProducer, MessageConsumer]
})

AppModule 完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { MessageConsumer } from './message-queue/message.consumer';
import { MessageProducer } from './message-queue/message.producer';

@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379
}
}),
BullModule.registerQueue({ name: 'message-queue' })
],
controllers: [AppController],
providers: [AppService, MessageProducer, MessageConsumer]
})
export class AppModule {}

此时访问 /messages?message=test,会在终端输出:

1
{  message: 'test'}

实践

上面的例子,实际中不会那么做。

Nest 官网对 Queue 的描述如下:

队列是一种强大的设计模式,帮助我们应对常见的应用伸缩及性能挑战。一些 Queue 可能解决的问题示例如下:

  • 削峰处理。用户可能在任意时间产生资源密集型任务,可以将这些任务放入队列,而不是同步地执行它们。之后可以使用工作进程以可控方式从队列中取出任务。当应用消耗增加,可以通过添加新的 Queue 消费者来提高后台任务处理能力。
  • 将可能阻塞 Node.js 事件循环的单个任务分散。比如一个用户请求了 CPU 密集型任务,比如音频转码,你可以将这个任务派给其它进程,让面向用户的进程保持响应。
  • 为不同服务提供可靠通信。比如,可以在一个进程或服务中将任务加入队列,在另一个进程或服务中消费它们。来自其他进程或服务的任务生命周期中一旦产生完成、错误或其他状态改变,你都可以收到通知(通过监听状态事件)。当队列生产者或者消费者失败,它们的状态会被保留,任务处理会跟随 node 重启而自动重启。

下面我们使用 mongoose 连接 MongoDB,模拟实际场景。

需要自行安装 MongoDB。

有 docker 环境可以使用以下简易 docker-compose.yaml:

1
2
3
4
5
6
7
version: "3"
services:
mongodb:
image: mongo
network_mode: host
container_name: mongodb
restart: always

主要逻辑

nest-queues-demo src 目录结构如下:

1
git switch main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
src/
├── app.controller.ts
├── app.module.ts
├── app.service.ts
├── logger
│ ├── logger.service.ts
│ ├── queues
│ │ ├── logger.consumer.ts
│ │ └── logger.producer.ts
│ └── schemas
│ └── logger.schema.ts
├── main.ts
└── user
├── schemas
│ └── user.schema.ts
├── user.controller.ts
├── user.module.ts
└── user.service.ts

新增了 userlogger

有请求到 UserController 时,将请求日志写入 MongoDB,并将用户请求结果返回。

这里用户并不需要日志的结果,可以将日志活动放入队列。

Log 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
interface Log {
level: string;

method: string;

url: string;

request_message: string;

response_code: number;

response_message: string;
}

logger.producer.ts 关键代码:

1
2
3
4
5
constructor(@InjectQueue('logger-queue') private loggerQueue: Queue) {}

async createLog(log: Log) {
await this.loggerQueue.add('logger-job', log);
}

logger.consumer.ts 关键代码:

1
2
3
4
5
6
7
8
@Process('logger-job')
async createLog(job: Job<Log>) {
const log: Log = job.data;
// loggerService.create 用于将 log 写入数据库
await this.loggerService.create(log);
// 写入完成后打印信息
console.log('success' + job.id);
}

user.controller.ts 关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Post()
async create(@Body() body: any) {
const user = await this.userService.create(body);

const log: Log = {
level: 'info',
method: 'Post',
url: '/users',
request_message: JSON.stringify(body),
response_code: 200,
response_message: JSON.stringify({ id: user._id })
};
// 此处调用了生产者中的方法
this.loggerProducer.createLog(log);

return { id: user._id };
}

要在 user.module.ts 中对队列及生产者、消费者进行注册:

1
2
3
4
5
6
7
8
9
@Module({
imports: [
// ...
BullModule.registerQueue({ name: 'logger-queue' })
],
controllers: [UserController],
providers: [UserService, LoggerService, LoggerProducer, LoggerConsumer]
})
export class UserModule {}

速率控制

上文提到过,BullModule.forRoot() 全局配置中,可以使用 limiter: RateLimiter 对队列速率做一个控制。也可以单独在 BullModule.registerQueue() 中配置。

RateLimiter 接口类型如下:

1
2
3
4
5
6
7
8
9
10
interface RateLimiter {
/** 任务数处理最大值 */
max: number;
/** 每次时间间隔(毫秒) */
duration: number;
/** 任务被限速时,它们是留在等候队列中,还是移动到延迟队列 ,默认为 false*/
bounceBack?: boolean | undefined;
/** 从数据对象传入 Queue.add() 中,带有特定键名的分组任务 ex. "network.handle" */
groupKey?: string | undefined;
}

我们在 registerQueue() 中进行配置:

1
2
3
4
@Module({
// ...
BullModule.registerQueue({ name: 'logger-queue', limiter: { max: 1, duration: 1000 } })
})

此处设置每次处理一个任务,每个任务间隔一秒(1000 毫秒)。

使用 ab 对 /users 接口进行测试,并观察 console 结果。

1
ab -n 500 -c 10 -p post.json -T 'application/json' http://localhost:3000/users

post.json 内容:

1
2
3
4
5
{
"name":"张三",
"age": 18,
"gender":"male"
}

可以观察到,ab 测试很快跑完,而应用的 console 每隔一秒输出一条信息,说明配置生效。

注意 :新的速率配置不会对队列中留存的任务生效。

比如 Ctr + c 停止刚刚的应用,将速率提高后再重启。如果队列中有留存的任务,此时再次测试,新加入队列的任务会以新配置的速率处理,而留存的任务处理速度不会变更。

应用间共享队列

队列可以在使用相同凭证连接到相同 Redis 的模块和进程间共享。

这里我们将原来的应用拆分,将消费者独立出去。

使用 nest-cli 新建应用:

1
nest new nest-queues-another

nest-queues-another 目录结构:

1
git checkout nest-queues-another
1
2
3
4
5
6
7
8
9
10
11
12
src/
├── app.controller.spec.ts
├── app.controller.ts
├── app.module.ts
├── app.service.ts
├── logger
│ ├── logger.service.ts
│ ├── queues
│ │ └── logger.consumer.ts
│ └── schemas
│ └── logger.schema.ts
└── main.ts

新的 nest-queues-demo 目录结构:

1
git checkout new-nest-queues-demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
src/
├── app.controller.ts
├── app.module.ts
├── app.service.ts
├── logger
│ └── queues
│ └── logger.producer.ts
├── main.ts
└── user
├── schemas
│ └── user.schema.ts
├── user.controller.ts
├── user.module.ts
└── user.service.ts

将生产者留在原来的应用中,而消费者放入新的应用。

只要 Redis 及队列名称相同即可。

1
2
3
4
5
6
7
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379
}
}),
BullModule.registerQueue({ name: 'logger-queue' })

注意修改相关依赖代码。

重复消费

这里来测试一下会不会出现重复消费的情况。

使用 pm2 开启 4 个消费者所在应用,开启 2 个生产者所在应用。

清空 usersloggers 文档,使用 ab 命令,发送 10000 个请求,完成后分别检查这两个文档的集合数目。

1
ab -n 10000 -c 500 -p post.json -T 'application/json' http://localhost:3000/users

多次测试,没有发现重复消费的现象。

不过,在测试过程中重启消费者应用,则出现了重复消费或丢数据现象。

其他

官方文档

事件监听器

当队列及任务状态变更时,Bull 生成了一系列有用的事件。Nest 提供了一系列允许订阅核心标准事件的装饰器。这些装饰器在 @nestjs/bull 中被导出。

事件监听器必须在一个消费者类中声明(在一个 @Processor() 装饰器装饰的类中)。

要监听一个事件,需要使用下方表格中的装饰器来声明一个事件处理器。比如,为了监听当一个在 audio 队列中的任务进入激活状态触发的事件,使用以下构造:

1
2
3
4
5
6
7
8
9
10
import {Processor, Process} from '@nestjs/bull'
import {Job} from 'bull'

@Processor('audio')
export class AudioConsumer {
@OnQueueActive()
onActive(job: Job){
console.log(`${job.id}...${job.name}...${job.data}`)
}
}

由于 Bull 运行于一个分布式(多节点)的环境中,其定义了一个本地事件的概念。这一概念认为事件可能完全由单个进程触发,也有可能由在多个进程间共享的队列触发。在本地进程中的一个队列上触发的行为或状态变动会产生一个本地事件。换句话说,当事件生产者和消费者对单个线程来说都是本地的,那么所有队列上发生的事件都是本地的。

当一个队列在多个进程间共享,可能会遇到全局事件。对需要在某个进程收到来自另一进程事件消息的监听器来说,它必须注册为一个全局事件。

当对应事件触发,事件处理器会被调用。

详细的事件监听器和处理方法见官方表格

当监听全局事件,方法签名可能和本地的有些许不同。特别是,任何在本地收到 job 对象的方法签名,在全局下改为收到一个 jobId (number) 。在此种情况下,为了获取实际的 job 对象引用,需要使用 Queue.getJob() 方法。这个方法需要 async/await。例如:

1
2
3
4
5
@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any){
const job = await this.immediateQueue.getJob(jobId);
console.log(job.id, result);
}

提示:使用 Queue 中的 getJob(),必须先注入,并在模块中注册。

除了使用专门的事件监听器装饰器,也可以使用通用的 @OnQueueEvent() ,配合 BullQueueEnvents 或者 BullGlobalQueueEvents这里查看更多信息。

队列管理

队列的 API 允许我们执行管理功能,类似暂停、继续、获取不同状态的任务数等等。可以在这里查看完整队列 API。可以直接在 Queue 对象上调用任意这些方法,示例如下:

1
2
await audioQueue.pause()
await audioQueue.resume()

详见这里

独立进程

任务处理器也可以在单独(forked)进程运行(source)。一些优点如下:

  • 进程处于沙盒中,如果崩溃不会影响到工作者线程。
  • 可以运行阻塞代码而不会影响到队列(任务不会停顿)。
  • 更好地利用多核 CPU。
  • 更少的 redis 连接。

app.module.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { join } from 'path';

@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}

注意,因为函数在 fork 进程中执行,依赖注入无法进行。这意味着处理器函数(processor)需要包含所有需要的外部依赖实例。

processor.ts

1
2
3
4
5
6
import { Job, DoneCallback } from 'bull';

export default function (job: Job, cb: DoneCallback) {
console.log(`[${process.pid}] ${JSON.stringify(job.data)}`);
cb(null, 'It works');
}

异步配置

使用 forRootAsync() 异步地传入 bull 选项。类似的,如果想异步传入队列选项,使用 registerQueueAsync()

使用工厂函数的一种方式:

1
2
3
4
5
6
7
8
BullModule.forRootAsync({
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});

这里工厂函数行为和其他 asynchronous provider 类似(比如是异步的,并且可以通过 Inject 进行依赖注入)。

1
2
3
4
5
6
7
8
9
10
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: +configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});

你也可以选择使用 useClass 语法:

1
2
3
BullModule.forRootAsync({
useClass: BullConfigService,
});

详见这里

官方样例

A working example is available here.

作者

月海

发布于

2021-07-29

更新于

2022-12-06

许可协议

评论