Java源码示例:com.amazonaws.services.sqs.AmazonSQSAsync
示例1
@Test
@SuppressWarnings("unchecked")
void sendMessage_withExecutionExceptionWhileSendingAsyncMessage_throwMessageDeliveryException()
throws Exception {
// Arrange
Future<SendMessageResult> future = mock(Future.class);
when(future.get(1000, TimeUnit.MILLISECONDS))
.thenThrow(new ExecutionException(new Exception()));
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class)))
.thenReturn(future);
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Assert
assertThatThrownBy(() -> queueMessageChannel
.send(MessageBuilder.withPayload("Hello").build(), 1000))
.isInstanceOf(MessageDeliveryException.class);
}
示例2
@Test
void send_withCustomDestinationResolveAndDestination_usesDestination() {
AmazonSQSAsync amazonSqs = createAmazonSqs();
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(
amazonSqs,
(DestinationResolver<String>) name -> name.toUpperCase(Locale.ENGLISH),
null);
Message<String> stringMessage = MessageBuilder.withPayload("message content")
.build();
queueMessagingTemplate.send("myqueue", stringMessage);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor
.forClass(SendMessageRequest.class);
verify(amazonSqs).sendMessage(sendMessageRequestArgumentCaptor.capture());
assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl())
.isEqualTo("MYQUEUE");
}
示例3
@Test
@SuppressWarnings("unchecked")
void sendMessage_withTimeout_sendsMessageAsyncAndReturnsTrueOnceFutureCompleted()
throws Exception {
// Arrange
Future<SendMessageResult> future = mock(Future.class);
when(future.get(1000, TimeUnit.MILLISECONDS)).thenReturn(new SendMessageResult());
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class)))
.thenReturn(future);
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
boolean result = queueMessageChannel
.send(MessageBuilder.withPayload("Hello").build(), 1000);
// Assert
assertThat(result).isTrue();
verify(amazonSqs, only()).sendMessageAsync(any(SendMessageRequest.class));
}
示例4
@Test
void receiveMessage_withoutTimeout_returnsTextMessage() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All")))
.thenReturn(new ReceiveMessageResult().withMessages(Collections
.singleton(new com.amazonaws.services.sqs.model.Message()
.withBody("content"))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo("content");
}
示例5
private QueueMessageHandler getMessageHandler(AmazonSQSAsync amazonSqs) {
if (this.queueMessageHandlerFactory.getAmazonSqs() == null) {
this.queueMessageHandlerFactory.setAmazonSqs(amazonSqs);
}
if (CollectionUtils
.isEmpty(this.queueMessageHandlerFactory.getMessageConverters())
&& this.mappingJackson2MessageConverter != null) {
this.queueMessageHandlerFactory.setMessageConverters(
Arrays.asList(this.mappingJackson2MessageConverter));
}
this.queueMessageHandlerFactory.setBeanFactory(this.beanFactory);
this.queueMessageHandlerFactory.setObjectMapper(this.objectMapper);
return this.queueMessageHandlerFactory.createQueueMessageHandler();
}
示例6
@Test
void sendMessage_withoutDelayHeader_shouldNotSetDelayOnSendMessageRequestAndNotSetHeaderAsMessageAttribute()
throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor
.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture()))
.thenReturn(new SendMessageResult());
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
Message<String> message = MessageBuilder.withPayload("Hello").build();
// Act
queueMessageChannel.send(message);
// Assert
SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor
.getValue();
assertThat(sendMessageRequest.getDelaySeconds()).isNull();
assertThat(sendMessageRequest.getMessageAttributes()
.containsKey(SqsMessageHeaders.SQS_DELAY_HEADER)).isFalse();
}
示例7
@Test
void doDestroy_whenContainerCallsDestroy_DestroysDefaultTaskExecutor()
throws Exception {
// Arrange
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly());
simpleMessageListenerContainer.setAmazonSqs(sqs);
QueueMessageHandler messageHandler = mock(QueueMessageHandler.class);
simpleMessageListenerContainer.setMessageHandler(messageHandler);
simpleMessageListenerContainer.afterPropertiesSet();
simpleMessageListenerContainer.start();
// Act
simpleMessageListenerContainer.destroy();
// Assert
assertThat(((ThreadPoolTaskExecutor) simpleMessageListenerContainer
.getTaskExecutor()).getThreadPoolExecutor().isTerminated()).isTrue();
}
示例8
@Test
void sendMessage_withMimeTypeHeader_shouldPassItAsMessageAttribute()
throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
MimeType mimeType = new MimeType("test", "plain", Charset.forName("UTF-8"));
Message<String> message = MessageBuilder.withPayload("Hello")
.setHeader(MessageHeaders.CONTENT_TYPE, mimeType).build();
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor
.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture()))
.thenReturn(new SendMessageResult());
// Act
boolean sent = messageChannel.send(message);
// Assert
assertThat(sent).isTrue();
assertThat(sendMessageRequestArgumentCaptor.getValue().getMessageAttributes()
.get(MessageHeaders.CONTENT_TYPE).getStringValue())
.isEqualTo(mimeType.toString());
}
示例9
@Test
void receiveMessage_withSpecifiedTimeout_returnsNull() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(2).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All"))).thenReturn(
new ReceiveMessageResult().withMessages(Collections.emptyList()));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive(2);
// Assert
assertThat(receivedMessage).isNull();
}
示例10
@Test
void receiveMessage_withoutDefaultTimeout_returnsNull() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All"))).thenReturn(
new ReceiveMessageResult().withMessages(Collections.emptyList()));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive(0);
// Assert
assertThat(receivedMessage).isNull();
}
示例11
@Test
void testIsActive() throws Exception {
AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer();
AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly());
container.setAmazonSqs(mock);
container.setMessageHandler(mock(QueueMessageHandler.class));
container.afterPropertiesSet();
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue")))
.thenReturn(new GetQueueUrlResult()
.withQueueUrl("http://testQueue.amazonaws.com"));
container.start();
assertThat(container.isRunning()).isTrue();
container.stop();
assertThat(container.isRunning()).isFalse();
// Container can still be active an restarted later (e.g. paused for a while)
assertThat(container.isActive()).isTrue();
}
示例12
private AmazonSQSAsync createAmazonSqs() {
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
GetQueueUrlResult queueUrl = new GetQueueUrlResult();
queueUrl.setQueueUrl("https://queue-url.com");
when(amazonSqs.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(queueUrl);
ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult();
com.amazonaws.services.sqs.model.Message message = new com.amazonaws.services.sqs.model.Message();
message.setBody("My message");
receiveMessageResult.withMessages(message);
when(amazonSqs.receiveMessage(any(ReceiveMessageRequest.class)))
.thenReturn(receiveMessageResult);
return amazonSqs;
}
示例13
@Test
void sendMessage_withGroupIdHeader_shouldSetGroupIdOnSendMessageRequestAndNotSetItAsHeaderAsMessageAttribute()
throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor
.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture()))
.thenReturn(new SendMessageResult());
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
Message<String> message = MessageBuilder.withPayload("Hello")
.setHeader(SqsMessageHeaders.SQS_GROUP_ID_HEADER, "id-5").build();
// Act
queueMessageChannel.send(message);
// Assert
SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor
.getValue();
assertThat(sendMessageRequest.getMessageGroupId()).isEqualTo("id-5");
assertThat(sendMessageRequest.getMessageAttributes()
.containsKey(SqsMessageHeaders.SQS_GROUP_ID_HEADER)).isFalse();
}
示例14
@Test
void sendMessage_validTextMessage_returnsTrue() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor
.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture()))
.thenReturn(new SendMessageResult());
Message<String> stringMessage = MessageBuilder.withPayload("message content")
.build();
MessageChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
boolean sent = messageChannel.send(stringMessage);
// Assert
verify(amazonSqs, only()).sendMessage(any(SendMessageRequest.class));
assertThat(sendMessageRequestArgumentCaptor.getValue().getMessageBody())
.isEqualTo("message content");
assertThat(sent).isTrue();
}
示例15
@Test
void sendMessage_serviceThrowsError_throwsMessagingException() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
Message<String> stringMessage = MessageBuilder.withPayload("message content")
.build();
MessageChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
when(amazonSqs
.sendMessage(new SendMessageRequest("http://testQueue", "message content")
.withDelaySeconds(0).withMessageAttributes(isNotNull())))
.thenThrow(new AmazonServiceException("wanted error"));
// Assert
assertThatThrownBy(() -> messageChannel.send(stringMessage))
.isInstanceOf(MessagingException.class)
.hasMessageContaining("wanted error");
}
示例16
@Test
@SuppressWarnings("unchecked")
void sendMessage_withSendMessageAsyncTakingMoreTimeThanSpecifiedTimeout_returnsFalse()
throws Exception {
// Arrange
Future<SendMessageResult> future = mock(Future.class);
when(future.get(1000, TimeUnit.MILLISECONDS)).thenThrow(new TimeoutException());
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class)))
.thenReturn(future);
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
boolean result = queueMessageChannel
.send(MessageBuilder.withPayload("Hello").build(), 1000);
// Assert
assertThat(result).isFalse();
}
示例17
@Bean
public AmazonSQSAsync amazonSQS() {
return AmazonSQSAsyncClientBuilder.standard()
.withCredentials(localStack.getDefaultCredentialsProvider())
.withEndpointConfiguration(localStack.getEndpointConfiguration(SQS))
.build();
}
示例18
@Test
void receive_withoutDefaultDestination_throwsAnException() {
AmazonSQSAsync amazonSqs = createAmazonSqs();
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(
amazonSqs);
assertThatThrownBy(queueMessagingTemplate::receive)
.isInstanceOf(IllegalStateException.class);
}
示例19
AmazonSQSAsync get() {
if (asyncClient == null) {
synchronized (this) {
if (asyncClient == null) {
asyncClient = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withEndpointConfiguration(endpointConfiguration).build();
}
}
}
return asyncClient;
}
示例20
private AmazonSQSAsync buildAsyncClient() {
final AmazonSQSAsyncClientBuilder builder = AmazonSQSAsyncClientBuilder.standard();
if(conf.region == AwsRegion.OTHER) {
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(conf.endpoint, null));
} else {
builder.withRegion(conf.region.getId());
}
builder.setCredentials(credentials);
builder.setClientConfiguration(clientConfiguration);
return builder.build();
}
示例21
@Test
void receiveAndConvert_withDestination_usesDestinationAndConvertsMessage() {
AmazonSQSAsync amazonSqs = createAmazonSqs();
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(
amazonSqs);
String message = queueMessagingTemplate.receiveAndConvert("my-queue",
String.class);
assertThat(message).isEqualTo("My message");
}
示例22
@Test
void sendMessage_withDeduplicationIdHeader_shouldSetDeduplicationIdOnSendMessageRequestAndNotSetItAsHeaderAsMessageAttribute()
throws Exception {
// @checkstyle:on
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor
.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture()))
.thenReturn(new SendMessageResult());
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
Message<String> message = MessageBuilder.withPayload("Hello")
.setHeader(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, "id-5").build();
// Act
queueMessageChannel.send(message);
// Assert
SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor
.getValue();
assertThat(sendMessageRequest.getMessageDeduplicationId()).isEqualTo("id-5");
assertThat(sendMessageRequest.getMessageAttributes()
.containsKey(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER)).isFalse();
}
示例23
@Bean
public QueueMessagingTemplate defaultQueueMessagingTemplate(
AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver) {
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(
amazonSqs, resourceIdResolver);
queueMessagingTemplate.setDefaultDestinationName("JsonQueue");
return queueMessagingTemplate;
}
示例24
@Bean
public QueueMessagingTemplate queueMessagingTemplateWithCustomConverter(
AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver) {
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(
amazonSqs, resourceIdResolver);
queueMessagingTemplate.setDefaultDestinationName("StreamQueue");
queueMessagingTemplate.setMessageConverter(new ObjectMessageConverter());
return queueMessagingTemplate;
}
示例25
@Bean
public QueueMessagingTemplate defaultQueueMessagingTemplate(
AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver) {
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(
amazonSqs, resourceIdResolver);
queueMessagingTemplate.setDefaultDestinationName("JsonQueue");
return queueMessagingTemplate;
}
示例26
@Bean
public QueueMessagingTemplate queueMessagingTemplateWithCustomConverter(
AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver) {
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(
amazonSqs, resourceIdResolver);
queueMessagingTemplate.setDefaultDestinationName("StreamQueue");
queueMessagingTemplate.setMessageConverter(new ObjectMessageConverter());
return queueMessagingTemplate;
}
示例27
@Bean
public QueueMessageHandler queueMessageHandler(AmazonSQSAsync amazonSqs) {
if (this.simpleMessageListenerContainerFactory.getQueueMessageHandler() != null) {
return this.simpleMessageListenerContainerFactory.getQueueMessageHandler();
}
else {
return getMessageHandler(amazonSqs);
}
}
示例28
@Test
void receiveMessage_withIdOfTypeString_IdShouldBeConvertedToUuid() throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
UUID uuid = UUID.randomUUID();
when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue")
.withWaitTimeSeconds(0).withMaxNumberOfMessages(1)
.withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES)
.withMessageAttributeNames("All"))).thenReturn(new ReceiveMessageResult()
.withMessages(new com.amazonaws.services.sqs.model.Message()
.withBody("Hello")
.withMessageAttributes(Collections.singletonMap(
MessageHeaders.ID,
new MessageAttributeValue()
.withDataType(
MessageAttributeDataTypes.STRING)
.withStringValue(uuid.toString())))));
PollableChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Act
Message<?> receivedMessage = messageChannel.receive();
// Assert
Object idMessageHeader = receivedMessage.getHeaders().get(MessageHeaders.ID);
assertThat(UUID.class.isInstance(idMessageHeader)).isTrue();
assertThat(idMessageHeader).isEqualTo(uuid);
}
示例29
private static void mockReceiveMessage(AmazonSQSAsync sqs, String queueUrl,
String messageContent, String receiptHandle) {
when(sqs.receiveMessage(new ReceiveMessageRequest(queueUrl)
.withAttributeNames("All").withMessageAttributeNames("All")
.withMaxNumberOfMessages(10).withWaitTimeSeconds(20)))
.thenReturn(
new ReceiveMessageResult().withMessages(
new Message().withBody(messageContent)
.withReceiptHandle(receiptHandle)),
new ReceiveMessageResult());
}
示例30
@Test
void sendMessage_withBinaryMessageHeader_shouldBeSentAsBinaryMessageAttribute()
throws Exception {
// Arrange
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes());
String headerName = "MyHeader";
Message<String> message = MessageBuilder.withPayload("Hello")
.setHeader(headerName, headerValue).build();
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor
.forClass(SendMessageRequest.class);
when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture()))
.thenReturn(new SendMessageResult());
// Act
boolean sent = messageChannel.send(message);
// Assert
assertThat(sent).isTrue();
assertThat(sendMessageRequestArgumentCaptor.getValue().getMessageAttributes()
.get(headerName).getBinaryValue()).isEqualTo(headerValue);
assertThat(sendMessageRequestArgumentCaptor.getValue().getMessageAttributes()
.get(headerName).getDataType())
.isEqualTo(MessageAttributeDataTypes.BINARY);
}