289 lines
10 KiB
TypeScript
289 lines
10 KiB
TypeScript
import { expect } from 'chai';
|
|
import { anyString, anything, instance, mock, verify, when } from 'ts-mockito';
|
|
|
|
import { SqsClient } from '../../src/utils/sqs_client';
|
|
import { SqsConsumer, SqsRetryableError } from '../../src/utils/sqs_consumer';
|
|
|
|
describe('SqsConsumer', () => {
|
|
describe('consumeOnceAsync', () => {
|
|
describe('beforeHandle', () => {
|
|
it('should not call handleMessage if beforeHandle returns false', async () => {
|
|
// Given
|
|
const sqsClientMock = mock(SqsClient);
|
|
let isHandleCalled = false;
|
|
const beforeHandle = async () => false;
|
|
const handleMessage = async () => {
|
|
isHandleCalled = true;
|
|
};
|
|
|
|
const consumer = new SqsConsumer({
|
|
workerIndex: 0,
|
|
workerAddress: 'id',
|
|
sqsClient: sqsClientMock,
|
|
handleMessage,
|
|
beforeHandle,
|
|
});
|
|
|
|
// When
|
|
await consumer.consumeOnceAsync();
|
|
|
|
// Then
|
|
expect(isHandleCalled).to.equal(false);
|
|
});
|
|
|
|
it('should not call handleMessage if beforeHandle throws an error', async () => {
|
|
// Given
|
|
const sqsClientMock = mock(SqsClient);
|
|
let isHandleCalled = false;
|
|
const beforeHandle = async () => Promise.reject('error!');
|
|
const handleMessage = async () => {
|
|
isHandleCalled = true;
|
|
};
|
|
|
|
const consumer = new SqsConsumer({
|
|
workerIndex: 0,
|
|
workerAddress: 'id',
|
|
sqsClient: sqsClientMock,
|
|
handleMessage,
|
|
beforeHandle,
|
|
});
|
|
|
|
// When
|
|
await consumer.consumeOnceAsync();
|
|
|
|
// Then
|
|
expect(isHandleCalled).to.equal(false);
|
|
});
|
|
|
|
it('should call handleMessage if no beforeHandle', async () => {
|
|
// Given
|
|
const sqsClientMock = mock(SqsClient);
|
|
when(sqsClientMock.receiveMessageAsync()).thenResolve({
|
|
Body: '0xdeadbeef',
|
|
});
|
|
const sqsClientInstance = instance(sqsClientMock);
|
|
let isHandleCalled = false;
|
|
const handleMessage = async () => {
|
|
isHandleCalled = true;
|
|
};
|
|
|
|
const consumer = new SqsConsumer({
|
|
workerIndex: 0,
|
|
workerAddress: 'id',
|
|
sqsClient: sqsClientInstance,
|
|
handleMessage,
|
|
});
|
|
|
|
// When
|
|
await consumer.consumeOnceAsync();
|
|
|
|
// Then
|
|
expect(isHandleCalled).to.equal(true);
|
|
});
|
|
|
|
it('should call handleMessage if beforeHandle returns true', async () => {
|
|
// Given
|
|
const sqsClientMock = mock(SqsClient);
|
|
when(sqsClientMock.receiveMessageAsync()).thenResolve({
|
|
Body: '0xdeadbeef',
|
|
});
|
|
const sqsClientInstance = instance(sqsClientMock);
|
|
const beforeHandle = async () => true;
|
|
let isHandleCalled = false;
|
|
const handleMessage = async () => {
|
|
isHandleCalled = true;
|
|
};
|
|
|
|
const consumer = new SqsConsumer({
|
|
workerIndex: 0,
|
|
workerAddress: 'id',
|
|
sqsClient: sqsClientInstance,
|
|
beforeHandle,
|
|
handleMessage,
|
|
});
|
|
|
|
// When
|
|
await consumer.consumeOnceAsync();
|
|
|
|
// Then
|
|
expect(isHandleCalled).to.equal(true);
|
|
});
|
|
});
|
|
|
|
describe('handleMessage', () => {
|
|
it('should not be called if no message is recieved', async () => {
|
|
// Given
|
|
const sqsClientMock = mock(SqsClient);
|
|
when(sqsClientMock.receiveMessageAsync()).thenResolve(null);
|
|
const sqsClientInstance = instance(sqsClientMock);
|
|
|
|
let isHandleCalled = false;
|
|
const handleMessage = async () => {
|
|
isHandleCalled = true;
|
|
};
|
|
|
|
const consumer = new SqsConsumer({
|
|
workerIndex: 0,
|
|
workerAddress: 'id',
|
|
sqsClient: sqsClientInstance,
|
|
handleMessage,
|
|
});
|
|
|
|
// When
|
|
await consumer.consumeOnceAsync();
|
|
|
|
// Then
|
|
expect(isHandleCalled).to.equal(false);
|
|
});
|
|
|
|
it('should call changeMessageVisibility if a SqsRetryableError is encountered (triggers a retry)', async () => {
|
|
// Given
|
|
const sqsClientMock = mock(SqsClient);
|
|
when(sqsClientMock.receiveMessageAsync()).thenResolve({
|
|
Body: '0xdeadbeef',
|
|
ReceiptHandle: '1',
|
|
});
|
|
const sqsClientInstance = instance(sqsClientMock);
|
|
|
|
const handleMessage = async () => {
|
|
throw new SqsRetryableError('error');
|
|
};
|
|
|
|
const consumer = new SqsConsumer({
|
|
workerIndex: 0,
|
|
workerAddress: 'id',
|
|
sqsClient: sqsClientInstance,
|
|
handleMessage,
|
|
});
|
|
|
|
// When
|
|
await consumer.consumeOnceAsync();
|
|
|
|
// Then
|
|
verify(sqsClientMock.changeMessageVisibilityAsync(anyString(), 0)).once();
|
|
});
|
|
|
|
it('should not call changeMessageVisibility if a non SqsRetryableError is encountered', async () => {
|
|
// Given
|
|
const sqsClientMock = mock(SqsClient);
|
|
when(sqsClientMock.receiveMessageAsync()).thenResolve({
|
|
Body: '0xdeadbeef',
|
|
ReceiptHandle: '1',
|
|
});
|
|
const sqsClientInstance = instance(sqsClientMock);
|
|
|
|
const handleMessage = async () => {
|
|
throw new Error('error');
|
|
};
|
|
|
|
const consumer = new SqsConsumer({
|
|
workerIndex: 0,
|
|
workerAddress: 'id',
|
|
sqsClient: sqsClientInstance,
|
|
handleMessage,
|
|
});
|
|
|
|
// When
|
|
await consumer.consumeOnceAsync();
|
|
|
|
// Then
|
|
verify(sqsClientMock.changeMessageVisibilityAsync(anything(), anything())).never();
|
|
});
|
|
|
|
it('should call deleteMessageAsync if message is successfully handled', async () => {
|
|
// Given
|
|
const sqsClientMock = mock(SqsClient);
|
|
when(sqsClientMock.receiveMessageAsync()).thenResolve({
|
|
Body: '0xdeadbeef',
|
|
ReceiptHandle: '1',
|
|
});
|
|
const sqsClientInstance = instance(sqsClientMock);
|
|
|
|
let isHandleCalled = false;
|
|
const handleMessage = async () => {
|
|
isHandleCalled = true;
|
|
};
|
|
|
|
const consumer = new SqsConsumer({
|
|
workerIndex: 0,
|
|
workerAddress: 'id',
|
|
sqsClient: sqsClientInstance,
|
|
handleMessage,
|
|
});
|
|
|
|
// When
|
|
await consumer.consumeOnceAsync();
|
|
|
|
// Then
|
|
expect(isHandleCalled).to.equal(true);
|
|
verify(sqsClientMock.deleteMessageAsync(anyString())).once();
|
|
});
|
|
});
|
|
|
|
describe('afterHandle', () => {
|
|
it('should be called once everything is successful', async () => {
|
|
// Given
|
|
const sqsClientMock = mock(SqsClient);
|
|
when(sqsClientMock.receiveMessageAsync()).thenResolve({
|
|
Body: '0xdeadbeef',
|
|
ReceiptHandle: '1',
|
|
});
|
|
|
|
const sqsClientInstance = instance(sqsClientMock);
|
|
let isAfterCalled = false;
|
|
const afterHandle = async () => {
|
|
isAfterCalled = true;
|
|
};
|
|
|
|
const consumer = new SqsConsumer({
|
|
workerIndex: 0,
|
|
workerAddress: 'id',
|
|
sqsClient: sqsClientInstance,
|
|
// $eslint-fix-me https://github.com/rhinodavid/eslint-fix-me
|
|
// eslint-disable-next-line @typescript-eslint/no-empty-function
|
|
handleMessage: async () => {},
|
|
afterHandle,
|
|
});
|
|
|
|
// When
|
|
await consumer.consumeOnceAsync();
|
|
|
|
// Then
|
|
expect(isAfterCalled).to.equal(true);
|
|
});
|
|
|
|
it('should be passed an error if a non-retryable error was encountered', async () => {
|
|
// Given
|
|
const sqsClientMock = mock(SqsClient);
|
|
when(sqsClientMock.receiveMessageAsync()).thenResolve({
|
|
Body: '0xdeadbeef',
|
|
ReceiptHandle: '1',
|
|
});
|
|
|
|
const sqsClientInstance = instance(sqsClientMock);
|
|
let isAfterCalledWithError = false;
|
|
|
|
const consumer = new SqsConsumer({
|
|
workerIndex: 0,
|
|
workerAddress: 'id',
|
|
sqsClient: sqsClientInstance,
|
|
handleMessage: async () => {
|
|
throw new Error();
|
|
},
|
|
afterHandle: async (_, error) => {
|
|
if (error) {
|
|
isAfterCalledWithError = true;
|
|
}
|
|
},
|
|
});
|
|
|
|
// When
|
|
await consumer.consumeOnceAsync();
|
|
|
|
// Then
|
|
expect(isAfterCalledWithError).to.equal(true);
|
|
});
|
|
});
|
|
});
|
|
});
|