Files
protocol/apps-node/rfq-api/test/utils/sqs_consumer_test.ts

289 lines
10 KiB
TypeScript

import { expect } from 'chai';
import { anyString, anything, instance, mock, verify, when } from 'ts-mockito';
import { SqsClient } from '../../src/utils/sqs_client';
import { SqsConsumer, SqsRetryableError } from '../../src/utils/sqs_consumer';
describe('SqsConsumer', () => {
describe('consumeOnceAsync', () => {
describe('beforeHandle', () => {
it('should not call handleMessage if beforeHandle returns false', async () => {
// Given
const sqsClientMock = mock(SqsClient);
let isHandleCalled = false;
const beforeHandle = async () => false;
const handleMessage = async () => {
isHandleCalled = true;
};
const consumer = new SqsConsumer({
workerIndex: 0,
workerAddress: 'id',
sqsClient: sqsClientMock,
handleMessage,
beforeHandle,
});
// When
await consumer.consumeOnceAsync();
// Then
expect(isHandleCalled).to.equal(false);
});
it('should not call handleMessage if beforeHandle throws an error', async () => {
// Given
const sqsClientMock = mock(SqsClient);
let isHandleCalled = false;
const beforeHandle = async () => Promise.reject('error!');
const handleMessage = async () => {
isHandleCalled = true;
};
const consumer = new SqsConsumer({
workerIndex: 0,
workerAddress: 'id',
sqsClient: sqsClientMock,
handleMessage,
beforeHandle,
});
// When
await consumer.consumeOnceAsync();
// Then
expect(isHandleCalled).to.equal(false);
});
it('should call handleMessage if no beforeHandle', async () => {
// Given
const sqsClientMock = mock(SqsClient);
when(sqsClientMock.receiveMessageAsync()).thenResolve({
Body: '0xdeadbeef',
});
const sqsClientInstance = instance(sqsClientMock);
let isHandleCalled = false;
const handleMessage = async () => {
isHandleCalled = true;
};
const consumer = new SqsConsumer({
workerIndex: 0,
workerAddress: 'id',
sqsClient: sqsClientInstance,
handleMessage,
});
// When
await consumer.consumeOnceAsync();
// Then
expect(isHandleCalled).to.equal(true);
});
it('should call handleMessage if beforeHandle returns true', async () => {
// Given
const sqsClientMock = mock(SqsClient);
when(sqsClientMock.receiveMessageAsync()).thenResolve({
Body: '0xdeadbeef',
});
const sqsClientInstance = instance(sqsClientMock);
const beforeHandle = async () => true;
let isHandleCalled = false;
const handleMessage = async () => {
isHandleCalled = true;
};
const consumer = new SqsConsumer({
workerIndex: 0,
workerAddress: 'id',
sqsClient: sqsClientInstance,
beforeHandle,
handleMessage,
});
// When
await consumer.consumeOnceAsync();
// Then
expect(isHandleCalled).to.equal(true);
});
});
describe('handleMessage', () => {
it('should not be called if no message is recieved', async () => {
// Given
const sqsClientMock = mock(SqsClient);
when(sqsClientMock.receiveMessageAsync()).thenResolve(null);
const sqsClientInstance = instance(sqsClientMock);
let isHandleCalled = false;
const handleMessage = async () => {
isHandleCalled = true;
};
const consumer = new SqsConsumer({
workerIndex: 0,
workerAddress: 'id',
sqsClient: sqsClientInstance,
handleMessage,
});
// When
await consumer.consumeOnceAsync();
// Then
expect(isHandleCalled).to.equal(false);
});
it('should call changeMessageVisibility if a SqsRetryableError is encountered (triggers a retry)', async () => {
// Given
const sqsClientMock = mock(SqsClient);
when(sqsClientMock.receiveMessageAsync()).thenResolve({
Body: '0xdeadbeef',
ReceiptHandle: '1',
});
const sqsClientInstance = instance(sqsClientMock);
const handleMessage = async () => {
throw new SqsRetryableError('error');
};
const consumer = new SqsConsumer({
workerIndex: 0,
workerAddress: 'id',
sqsClient: sqsClientInstance,
handleMessage,
});
// When
await consumer.consumeOnceAsync();
// Then
verify(sqsClientMock.changeMessageVisibilityAsync(anyString(), 0)).once();
});
it('should not call changeMessageVisibility if a non SqsRetryableError is encountered', async () => {
// Given
const sqsClientMock = mock(SqsClient);
when(sqsClientMock.receiveMessageAsync()).thenResolve({
Body: '0xdeadbeef',
ReceiptHandle: '1',
});
const sqsClientInstance = instance(sqsClientMock);
const handleMessage = async () => {
throw new Error('error');
};
const consumer = new SqsConsumer({
workerIndex: 0,
workerAddress: 'id',
sqsClient: sqsClientInstance,
handleMessage,
});
// When
await consumer.consumeOnceAsync();
// Then
verify(sqsClientMock.changeMessageVisibilityAsync(anything(), anything())).never();
});
it('should call deleteMessageAsync if message is successfully handled', async () => {
// Given
const sqsClientMock = mock(SqsClient);
when(sqsClientMock.receiveMessageAsync()).thenResolve({
Body: '0xdeadbeef',
ReceiptHandle: '1',
});
const sqsClientInstance = instance(sqsClientMock);
let isHandleCalled = false;
const handleMessage = async () => {
isHandleCalled = true;
};
const consumer = new SqsConsumer({
workerIndex: 0,
workerAddress: 'id',
sqsClient: sqsClientInstance,
handleMessage,
});
// When
await consumer.consumeOnceAsync();
// Then
expect(isHandleCalled).to.equal(true);
verify(sqsClientMock.deleteMessageAsync(anyString())).once();
});
});
describe('afterHandle', () => {
it('should be called once everything is successful', async () => {
// Given
const sqsClientMock = mock(SqsClient);
when(sqsClientMock.receiveMessageAsync()).thenResolve({
Body: '0xdeadbeef',
ReceiptHandle: '1',
});
const sqsClientInstance = instance(sqsClientMock);
let isAfterCalled = false;
const afterHandle = async () => {
isAfterCalled = true;
};
const consumer = new SqsConsumer({
workerIndex: 0,
workerAddress: 'id',
sqsClient: sqsClientInstance,
// $eslint-fix-me https://github.com/rhinodavid/eslint-fix-me
// eslint-disable-next-line @typescript-eslint/no-empty-function
handleMessage: async () => {},
afterHandle,
});
// When
await consumer.consumeOnceAsync();
// Then
expect(isAfterCalled).to.equal(true);
});
it('should be passed an error if a non-retryable error was encountered', async () => {
// Given
const sqsClientMock = mock(SqsClient);
when(sqsClientMock.receiveMessageAsync()).thenResolve({
Body: '0xdeadbeef',
ReceiptHandle: '1',
});
const sqsClientInstance = instance(sqsClientMock);
let isAfterCalledWithError = false;
const consumer = new SqsConsumer({
workerIndex: 0,
workerAddress: 'id',
sqsClient: sqsClientInstance,
handleMessage: async () => {
throw new Error();
},
afterHandle: async (_, error) => {
if (error) {
isAfterCalledWithError = true;
}
},
});
// When
await consumer.consumeOnceAsync();
// Then
expect(isAfterCalledWithError).to.equal(true);
});
});
});
});