Bus
RabbitMQ bus module built with appolo-rabbit
Installation#
Options#
| key | Description | Type | Default |
|---|---|---|---|
id | injection id | string | busProvider |
connection | AMQP connection string | string | null |
autoListen | true to auto initialize busProvider and start listen to events | boolean | true |
handleEvents | true to register queue event handlers | boolean | true |
exchange | name of the exchange or exchange options | string | {} |
queue | queue options | object | {} |
requestQueue | request queue options | object | {} |
replayQueue | request queue options | object | {} |
appendEnv | append env name to queueName and exchangeName | boolean | true |
Exchange Options#
| key | Description | Type | Default |
|---|---|---|---|
type | request queue options or false to disable | string | topic |
autoDelete | delete when consumer count goes to 0 | boolean | false |
durable | survive broker restarts | boolean | true |
persistent | persistent delivery, messages saved to disk | boolean | true |
alternate | define an alternate exchange | string | |
publishTimeout | timeout in milliseconds for publish calls to this exchange | 2^32 | |
replyTimeout | timeout in milliseconds to wait for a reply | 2^32 | |
limit | the number of unpublished messages to cache while waiting on connection | 2^16 | |
noConfirm | prevents rabbot from creating the exchange in confirm mode | boolean | false |
Queue Options#
| key | Description | Type | Default |
|---|---|---|---|
autoDelete | delete when consumer count goes to 0 | boolean | false |
durable | survive broker restarts | boolean | true |
subscribe | auto-start the subscription | boolean | false |
limit | max number of unacked messages allowed for consumer | 2^16 | 1 |
noAck | the server will remove messages from the queue as soon as they are delivered | boolean | false |
noBatch | causes ack, nack & reject to take place immediately | boolean | false |
noCacheKeys | disable cache of matched routing keys to prevent unbounded memory growth | boolean | false |
queueLimit | max number of ready messages a queue can hold | 2^32 | |
messageTt | time in ms before a message expires on the queue | 2^32 | |
expires | time in ms before a queue with 0 consumers expires | 2^32 |
in config/modules/all.ts
Usage#
Publisher#
we inject BusProvider in order to publish messages
Handler#
if you don not call msg ack or nack
it will be called on handler return msg.ack() or msg.nack() on error
Request#
we can await a response and set expire timout if timeout reached timeout error will be thrown
Reply#
we define reply answer handler
IMessage#
each handler and reply handler called with message object
ack#
message.ack()#
Enqueues the message for acknowledgement.
reject#
message.nack()#
Enqueues the message for rejection. This will re-enqueue the message.
reject#
message.reject()#
Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.
reply#
reply( data:any )#
Acknowledges the messages and sends the message back to the requestor.
replySuccess#
replySuccess( data:T )#
reply the message with json object {success: true,data}
replyError#
message.replyError( e: RequestError<T> )#
reply the message with json object {success: false,message: e.message, data:e.data}
BusProvider#
initialize#
initialize()#
initialize busProvider and start listen to events if not in in auto mode
publish#
publish(type: string, data: any, expire?: number): Promise<void>#
publish event
- type - event name
- data - any data
- expire - timeout until the message is expired in the queue
request#
request<T>(type: string, data: any, expire?: number): Promise<T>#
request data by event return promise with event response
- type - event name
- data - any data
- expire - timeout until the request is rejected
close#
close<T>(): Promise<void>#
close the connection and clean all handlers
getQueueMessagesCount#
getQueueMessagesCount(): Promise<number>#
return number of pending events in the queue