Post

AWS - boto3 - boto3.resource('sqs') - SQS


AWS SQS boto3.resource('sqs')

SQS allows you to queue and then process messages. This tutorial covers how to create a new queue, get and use an existing queue, push new messages onto the queue, and process messages from the queue by using Resources and Collections.

Queues are created with a name. You may also optionally set queue attributes, such as the number of seconds to wait before an item may be processed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
sqs = boto3.resource('sqs')

queue = sqs.create_queue(QueueName='test', Attributes={'DelaySeconds': '5'})    # queue


for queue in sqs.queues.all()
    # Get the queue. This returns an SQS.Queue instance
    queue = sqs.get_queue_by_name(QueueName='test')

    queue.url
    queue.attributes['QueueArn'].split(':')[-1]
    queue.attributes.get('DelaySeconds')
    queue.send_message(MessageBody='world')

    response = queue.send_message(MessageBody='world')
    response = queue.send_message(MessageBody='boto3', MessageAttributes={ 'ABC': {'StringValue': 'abc','DataType': 'String'}} )
    response = queue.send_messages(Entries=[{'Id': '1','MessageBody': 'hello'}, {'Id': '2','MessageBody': 'world'}] )
    response.get('MessageId'))
    response.get('MD5OfMessageBody')
    response.get('Failed')

    queue.receive_messages(MessageAttributeNames=['ABC']):
        for message in queue.receive_messages(MessageAttributeNames=['ABC'])
            message.message_attributes
            message.message_attributes.get('ABC').get('StringValue')
            message.delete()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# ------------------------- Creating a queue -------------------------
# get the SQS service resource
sqs = boto3.resource('sqs')

# Create the queue. This returns an SQS.Queue instance
queue = sqs.create_queue(QueueName='test', Attributes={'DelaySeconds': '5'})

#  now access identifiers and attributes
print(queue.url)
print(queue.attributes.get('DelaySeconds'))
# The code above may throw an exception if you already have a queue named test.




# ------------------------- Using an existing queue -------------------------
# Get the queue. This returns an SQS.Queue instance
queue = sqs.get_queue_by_name(QueueName='test')


# list all of existing queues:
# Print out each queue name, which is part of its ARN
for queue in sqs.queues.all():
    print(queue.url)


# To get the name from a queue, you must use its ARN, which is available in the queue's attributes attribute.
queueName = queue.attributes['QueueArn'].split(':')[-1]
# will return its name.




# ------------------------- Sending messages -------------------------

# a new message, adds it to the end of the queue:
response = queue.send_message(MessageBody='world')


# The response is NOT a resource, but gives you a message ID and MD5
print(response.get('MessageId'))
print(response.get('MD5OfMessageBody'))



# You can also create messages with custom attributes:
queue.send_message(MessageBody='boto3', MessageAttributes={
    'Author': {
        'StringValue': 'Daniel',
        'DataType': 'String'
    }
})


# Messages can also be sent in batches. For example, sending the two messages described above in a single request would look like the following:
response = queue.send_messages(Entries=[
    {
        'Id': '1',
        'MessageBody': 'world'
    },
    {
        'Id': '2',
        'MessageBody': 'boto3',
        'MessageAttributes': {
            'Author': {
                'StringValue': 'Daniel',
                'DataType': 'String'
            }
        }
    }
])

# Print out any failures
print(response.get('Failed'))
# In this case, the response contains lists of Successful and Failed messages, so you can retry failures if needed.






# ------------------------- Processing messages -------------------------
    # {
    #     'Id': '1',
    #     'MessageBody': 'world'
    # },
    # {
    #     'Id': '2',
    #     'MessageBody': 'boto3',
    #     'MessageAttributes': {
    #         'Author': {
    #             'StringValue': 'Daniel',
    #             'DataType': 'String'
    #         }
    #     }
    # }


# Process messages by printing out body and optional author name
for message in queue.receive_messages(MessageAttributeNames=['Author']):
    # Get the custom author message attribute if it was set
    author_text = ''
    if message.message_attributes is not None:
        author_name = message.message_attributes.get('Author').get('StringValue')
        if author_name:
            author_text = ' ({0})'.format(author_name)

    # Print out the body and author (if set)
    print('Hello, {0}!{1}'.format(message.body, author_text))

    # Let the queue know that the message is processed
    message.delete()


# Given only the messages that were sent in a batch with SQS.Queue.send_messages() in the previous section, the above code will print out:
Hello, world!
Hello, boto3! (Daniel)
This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.