Skip to content

Service Broker 101 Lesson 5: Stored procedures to handle messages

June 16, 2020

At this point we’ve set up our message types, our contracts, our services, and our queues. We’ve opened conversations and sent messages across them (but only initiator to target), and we’ve seen how to get those messages off the target queue. This lesson will go through the last step in the process, doing something with these messages.

Message handling from outside of the queue

This is the easier option, but it will probably cause you some issues down the line. This is where you let messages accumulate on the queue, and at set times run a stored procedure (or other statement) to do something with them. It might look something like this:

DECLARE @Messages AS TABLE
    (
          MessageType NVARCHAR(256) NOT NULL
        , MessageBody XML NULL
    );

RECEIVE
      message_type_name
    , CAST(message_body AS XML) AS XMLMessageBody
FROM dbo.TargetQueue
INTO @Messages;

INSERT INTO dbo.GoodMessages
    (
          MessageType
        , MessageBody
    )
SELECT
      Msg.MessageType
    , Msg.MessageBody
FROM @Messages AS Msg
WHERE 1 = 1
    AND Msg.MessageType IN
        (     'ServiceBrokerExample/Example1/MessageType/Outgoing'
            , 'ServiceBrokerExample/Example2/MessageType/Outgoing');

INSERT INTO dbo.ServiceBrokerErrors
    (
          MessageType
        , MessageBody
    )
SELECT
      Msg.MessageType
    , Msg.MessageBody
FROM @Messages AS Msg
WHERE 1 = 1
    AND Msg.MessageType LIKE 'ServiceBrokerExample/Example[0-9]/MessageType/Error';

This code will retrieve every message from the queue, and add it to one of the two tables depending on the message type. This will process the messages, and empty the queue, but what about all of the conversations that have been left open. We could execute an END CONVERSATION statement for each conversation_handle in the queue, but that will only close the conversation from the initiator side. We could END CONVERSATION WITH CLEANUP, but that’s a brute force approach like KILLing a connection,a nd should really be a last resort. In any case, we don’t know for sure which of these conversations we do want to end. If we are doing something more complex, maybe we want to keep a conversation going for some time.

Message handling using stored procedures as queue handlers

As discussed in the last lesson, you can attach a stored procedure to a queue when creating or altering it using the following syntax:

WITH
    , ACTIVATION
        (
              STATUS = ON
            , PROCEDURE_NAME = dbo.TargetQueueProcedure
            , MAX_QUEUE_READERS = 4
            , EXECUTE AS SELF
        )

This stored procedure is used by the queue to handle messages that arrive. When a message arrives the queue will execute the procedure over and over until there are no more messages on the queue. Your stored procedure therefore needs to be removing messages as it goes. You have the option to have multiple versions of the query executing at once, to clear down a queue faster or to keep up with a high volume of messages, using the MAX_QUEUE_READERS setting. You can turn the stored procedure on or off using the STATUS, while this is set to OFF nothing will happen but as soon as it is set to ON the query will start processing messages again. Finally you need to specify what user the query will execute under. The options here are SELF, as the current user (the person who runs the CREATE or ALTER script), OWNER, as the person who owns the queue, or a username that the current user has impersonate permissions for.

Below is an example of how you might set out a stored procedure you want to run as a queue handler:

DECLARE
      @ConversationHandle UNIQUEIDENTIFIER
    , @MessageType NVARCHAR(256)
    , @MessageBody XML
    , @ReceivedMessage BIT = 1

WHILE @ReceivedMessage = 1
BEGIN
    SET @ReceivedMessage = 0
    WAITFOR
        (
            RECEIVE TOP (1)
                  @ConversationHandle = conversation_handle
                , @MessageBody = message_body
                , @MessageType = message_type_name
                , @ReceivedMessage = 1
            FROM dbo.TargetQueue
        ), TIMEOUT 1000;

    IF @ReceivedMessage = 1
    BEGIN
        IF @MessageType IN
                ('ServiceBrokerExample/Example1/MessageType/Outgoing'
                , 'ServiceBrokerExample/Example2/MessageType/Outgoing')
            INSERT INTO dbo.GoodMessages
                (
                      MessageType
                    , MessageBody
                )
            VALUES (@MessageType, @MessageBody);
            
        IF @MessageType LIKE 'ServiceBrokerExample/Example[0-9]/MessageType/Error'
            INSERT INTO dbo.ServiceBrokerErrors
                (
                      MessageType
                    , MessageBody
                )
            VALUES (@MessageType, @MessageBody);
        
        END CONVERSATION @ConversationHandle;
    END
END

The WAITFOR, TIMEOUT 1000 that we wrap around the RECEIVE TOP(1) does exactly what you might expect, it waits 1000 seconds to see if we can retrieve a message from the queue, and breaks at either the 1000 second mark or as soon as a message is received. The WHILE @ReceviedMessage = 1 ensures that the query will keep executing as long as it finds messages, and will end as soon as it does not.

Once the message has been received we do something with it, in this case adding it to different tables depending on the message type, and then close the conversation. We could also send messages back if we wished, using the same conversation, and keep the conversation open if we expect to receive more messages from our initiator queue in response. We could even open more conversations and send messages on to further queues.

The initiator queue will need a similar query attached to it. At the simplest, this should just look for the Microsoft “http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog” message type, and end the conversation from the initiator endpoint as well. It may also do other things, depending on how complex you want the conversation to be, but the most important thing is that any possible conversation path ends with both sides executing an END CONVERSATION eventually. Otherwise conversation endpoints will persist and cause performance issues down the line.

POISON_MESSAGE_HANDLING

POISON_MESSAGE_HANDLING is another queue-level property you can set with a CREATE or ALTER QUEUE statement. A poison message in the service broker context is a message that causes the activation stored procedure to roll back the transaction when it executes. This will roll back the RECEIVE statement and put the message back on the queue, then the activation stored procedure will activate, attempt to process it again, and roll back again. This can cause an infinite loop, and potentially block up the queue or at the very least consume resources.

One way round this is to not include transactions and rollbacks in your query. This might be ok depending on how you handle the error, but what if the error handling also fails? Also, what if the failure is a temporary connection problem or something similar, ideally then you do want to roll back and try again.

The POISON_MESSAGE_HANDLING setting decides what the queue does if you are in this situation. The default is to set it to ON, in which case the queue will disable (STATUS = OFF) after 5 consecutive ROLLBACKs. If you set it to OFF, the queue will keep on trying to execute the stored procedure forever so if you are going to disable this you need to be confident your activation stored procedure can handle this scenario.

That’s all for now, next time the lesson will be a brief look at how you can extract data from the XML of a message.

From → Uncategorized

One Comment

Trackbacks & Pingbacks

  1. Handling Messages with Service Broker – Curated SQL

Leave a comment