AWS - boto3 - boto3.resource('sqs') - SQS
AWS SQS boto3.resource('sqs')
SQS allows you to queue and then process messages. This tutorial covers how to create a new queue, get and use an existing queue, push new messages onto the queue, and process messages from the queue by using Resources and Collections.
Queues are created with a name. You may also optionally set queue attributes, such as the number of seconds to wait before an item may be processed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName='test', Attributes={'DelaySeconds': '5'}) # queue
for queue in sqs.queues.all()
# Get the queue. This returns an SQS.Queue instance
queue = sqs.get_queue_by_name(QueueName='test')
queue.url
queue.attributes['QueueArn'].split(':')[-1]
queue.attributes.get('DelaySeconds')
queue.send_message(MessageBody='world')
response = queue.send_message(MessageBody='world')
response = queue.send_message(MessageBody='boto3', MessageAttributes={ 'ABC': {'StringValue': 'abc','DataType': 'String'}} )
response = queue.send_messages(Entries=[{'Id': '1','MessageBody': 'hello'}, {'Id': '2','MessageBody': 'world'}] )
response.get('MessageId'))
response.get('MD5OfMessageBody')
response.get('Failed')
queue.receive_messages(MessageAttributeNames=['ABC']):
for message in queue.receive_messages(MessageAttributeNames=['ABC'])
message.message_attributes
message.message_attributes.get('ABC').get('StringValue')
message.delete()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# ------------------------- Creating a queue -------------------------
# get the SQS service resource
sqs = boto3.resource('sqs')
# Create the queue. This returns an SQS.Queue instance
queue = sqs.create_queue(QueueName='test', Attributes={'DelaySeconds': '5'})
# now access identifiers and attributes
print(queue.url)
print(queue.attributes.get('DelaySeconds'))
# The code above may throw an exception if you already have a queue named test.
# ------------------------- Using an existing queue -------------------------
# Get the queue. This returns an SQS.Queue instance
queue = sqs.get_queue_by_name(QueueName='test')
# list all of existing queues:
# Print out each queue name, which is part of its ARN
for queue in sqs.queues.all():
print(queue.url)
# To get the name from a queue, you must use its ARN, which is available in the queue's attributes attribute.
queueName = queue.attributes['QueueArn'].split(':')[-1]
# will return its name.
# ------------------------- Sending messages -------------------------
# a new message, adds it to the end of the queue:
response = queue.send_message(MessageBody='world')
# The response is NOT a resource, but gives you a message ID and MD5
print(response.get('MessageId'))
print(response.get('MD5OfMessageBody'))
# You can also create messages with custom attributes:
queue.send_message(MessageBody='boto3', MessageAttributes={
'Author': {
'StringValue': 'Daniel',
'DataType': 'String'
}
})
# Messages can also be sent in batches. For example, sending the two messages described above in a single request would look like the following:
response = queue.send_messages(Entries=[
{
'Id': '1',
'MessageBody': 'world'
},
{
'Id': '2',
'MessageBody': 'boto3',
'MessageAttributes': {
'Author': {
'StringValue': 'Daniel',
'DataType': 'String'
}
}
}
])
# Print out any failures
print(response.get('Failed'))
# In this case, the response contains lists of Successful and Failed messages, so you can retry failures if needed.
# ------------------------- Processing messages -------------------------
# {
# 'Id': '1',
# 'MessageBody': 'world'
# },
# {
# 'Id': '2',
# 'MessageBody': 'boto3',
# 'MessageAttributes': {
# 'Author': {
# 'StringValue': 'Daniel',
# 'DataType': 'String'
# }
# }
# }
# Process messages by printing out body and optional author name
for message in queue.receive_messages(MessageAttributeNames=['Author']):
# Get the custom author message attribute if it was set
author_text = ''
if message.message_attributes is not None:
author_name = message.message_attributes.get('Author').get('StringValue')
if author_name:
author_text = ' ({0})'.format(author_name)
# Print out the body and author (if set)
print('Hello, {0}!{1}'.format(message.body, author_text))
# Let the queue know that the message is processed
message.delete()
# Given only the messages that were sent in a batch with SQS.Queue.send_messages() in the previous section, the above code will print out:
Hello, world!
Hello, boto3! (Daniel)
This post is licensed under CC BY 4.0 by the author.
Comments powered by Disqus.