/**
* Sqs adapter. This is an adapter for AWS SQS Client.
*/
export class SqsAdapter {
constructor(
private readonly logger: Logger,
private readonly sqs: SQS,
private readonly autoCompressionService: AutoCompressionService,
private readonly sqsUrl: string
) {}
public async receiveMessage(options: { limit?: number } = {}): Promise<SQS.Message[]> {
try {
const data = await this.sqs.receiveMessage({
QueueUrl: this.sqsUrl,
MaxNumberOfMessages: options.limit
}).promise();
const messages = await Promise.all(
data.Messages.map(async message => Object.assign({}, message, {
Body: await this.resolveMessageBody(message.Body)
}))
);
this.logger.log(SqsAdapter.name, this.receiveMessage.name, '', data);
return messages;
} catch (err) {
this.logger.error(SqsAdapter.name, this.receiveMessage.name, err);
throw err;
}
}
public async sendMessage(body: Object|string, compressionType: CompressionType = CompressionType.None): Promise<SQS.SendMessageResult> {
try {
const data = await this.sqs.sendMessage({
QueueUrl: this.sqsUrl,
MessageBody: await this.autoCompressionService.compress(body, compressionType)
}).promise();
this.logger.log(SqsAdapter.name, this.sendMessage.name, '', data);
return data;
} catch (err) {
this.logger.error(SqsAdapter.name, this.sendMessage.name, err);
throw err;
}
}
public async sendMessageBatch(items: Object[]|string[], compressionType: CompressionType = CompressionType.None): Promise<SQS.SendMessageBatchResult> {
const entries = await Promise.all(
items.map(async (entry, i) => ({
Id: i.toString(),
MessageBody: await this.autoCompressionService.compress(entry, compressionType)
}))
);
try {
const data = await this.sqs.sendMessageBatch({
QueueUrl: this.sqsUrl,
Entries: entries
}).promise();
this.logger.log(SqsAdapter.name, this.sendMessageBatch.name, '', data);
return data;
} catch (err) {
this.logger.error(SqsAdapter.name, this.sendMessageBatch.name, err);
throw err;
}
}
public async deleteMessage(receiptHandle: string): Promise<object> {
try {
const data = await this.sqs.deleteMessage({
QueueUrl: this.sqsUrl,
ReceiptHandle: receiptHandle
}).promise();
this.logger.log(SqsAdapter.name, this.deleteMessage.name, '', data);
return data;
} catch (err) {
this.logger.error(SqsAdapter.name, this.deleteMessage.name, err);
throw err;
}
}
public async deleteMessageBatch(receiptHandles: string[]): Promise<SQS.DeleteMessageBatchResult> {
try {
const data = await this.sqs.deleteMessageBatch({
QueueUrl: this.sqsUrl,
Entries: receiptHandles.map((receiptHandle, i) => ({
Id: i.toString(),
ReceiptHandle: receiptHandle
}))
}).promise();
this.logger.log(SqsAdapter.name, this.deleteMessageBatch.name, '', data);
return data;
} catch (err) {
this.logger.error(SqsAdapter.name, this.deleteMessageBatch.name, err);
throw err;
}
}
public async resolveMessageBody(body: string): Promise<string> {
return await this.autoCompressionService.decompress(body);
}
}