Close

Scheduled messaging via Azure Service Bus

There are several options for queueing in Azure. In our project we chose between Azure Eventgrid and Azure Service Bus. Azure EventGrid is used to exhange messages between Logic Apps. Azure Service Bus is used as the interface for external systems and for more complex messaging scenarios. Without going in full detail about choosing between these two messaging solutions,  I would like to share the following take aways:

  • Eventgrid is used for pushing messages, whereas in servicebus you will have to do some sort of polling. Polling in servicebus is quite expensive. That’s the reason we sometimes opt for a duplicate solution where an eventgrid subscription is created on new messages arriving via a service bus. Doing so, messages will be picked up immediately as they arrive without the need for polling. I will explain this solution in a separate post.
  • When we want to use a queuing solution for decoupling we will have to use Service Bus, not EventGrid. With a Service Bus trigger, messages will be queued up when the logic app is down. That means, no messages will get lost. In an eventgrid scenario, the eventgrid would not be able to deliver the message to a subscriber. As a result, you would immediately have deadlettering.
  • For advanced messaging scenarios we also need to use Service Bus. Let’s say you want to retain message ordering. In that case you will have to use Azure Service Bus with sessions per entity. Let’s say, you want to implement ordering for relations, you could create a session per relationId to keep all messages for a specific relation in the correct order. Another example is where you want to have a sophicticated retry mechanism. This is the focus of this article. Essentially we will need a mechanism to schedule message publication according to a configurable retry interval using C# statement ScheduleMessageAsync. There’s no action for scheduled servicebus messaging in Azure. That’s the reason we will have to use a custom function.
using Microsoft.Azure.ServiceBus;
using ...;

namespace FunctionApp.ServiceBusResend
{
    public static class ServiceBusResend
    {
        [FunctionName("ServiceBusResend")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            try
            {
                log.LogInformation("ServiceBusResend trigger function processed a request.");

                bool resend = true;
                req.Headers["Content-Type"] = "application/json";
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();

                var m = JsonConvert.DeserializeObject<Body>(requestBody);

		// Get retry settings
                var shortParkHours = Double.Parse(Environment.GetEnvironmentVariable("ShortParkHours"));
                var longParkHours = Double.Parse(Environment.GetEnvironmentVariable("LongParkHours"));
                var shortParkRetry = Int32.Parse(Environment.GetEnvironmentVariable("ShortParkRetry"));
                var longParkRetry = Int32.Parse(Environment.GetEnvironmentVariable("LongParkRetry"));

                var client = new TopicClient(Environment.GetEnvironmentVariable("ServiceBusConnection"), Environment.GetEnvironmentVariable("ServiceBusTopic"));
                // Construct output message from input message
		var message = new Message(Encoding.UTF8.GetBytes(Encoding.UTF8.GetString(System.Convert.FromBase64String(m.ContentData))));
                var scheduledEnqueueTimeUtc = DateTime.UtcNow;

                if (m.Properties.ShortParkRetry == null)
                {
                    m.Properties.ShortParkRetry = "0";
                    m.Properties.LongParkRetry = "0";
                }

		// Set ScheduledEnqueueTime
                if (Int32.Parse(m.Properties.ShortParkRetry) < shortParkRetry) // Do short park
                {
                    m.Properties.ShortParkRetry = $"{Convert.ToInt32(m.Properties.ShortParkRetry) + 1}";
                    scheduledEnqueueTimeUtc = scheduledEnqueueTimeUtc.AddHours(shortParkHours);
                }
                else if (Int32.Parse(m.Properties.LongParkRetry) < longParkRetry) // Do long park
                {
                    m.Properties.LongParkRetry = $"{Convert.ToInt32(m.Properties.LongParkRetry) + 1}";
                    scheduledEnqueueTimeUtc = scheduledEnqueueTimeUtc.AddHours(longParkHours);
                }
                else // Stop resending
                {
                    resend = false;
                    DeleteMessageInTable(m.Relatie);
                }

                if (resend)
                {
		    // Set servicebus message (user)properties
                    message.UserProperties.Add("ShortParkRetry", m.Properties.ShortParkRetry);
                    message.UserProperties.Add("LongParkRetry", m.Properties.LongParkRetry);
                    await client.ScheduleMessageAsync(message, scheduledEnqueueTimeUtc);
                    SaveMessageInTable(m.Relatie, m.ContentData, scheduledEnqueueTimeUtc);
                }

                return (ActionResult)new OkObjectResult(resend);
            }
            catch (Exception ex)
            {
                return (ActionResult)new BadRequestObjectResult(ex.Message);

            }
        }

	// Input message definition
	private class Body
        {
            public string Relatie { get; set; }
            public string ContentData { get; set; }
            public Properties Properties { get; set; }
        }

        private class Properties
        {
            public string ShortParkRetry { get; set; }
            public string LongParkRetry { get; set; }
            public string DiagnosticId { get; set; }
            public string DeliveryCount { get; set; }
            public string EnqueuedSequenceNumber { get; set; }
            public DateTime? EnqueuedTimeUtc { get; set; }
            public DateTime? ExpiresAtUtc { get; set; }
            public DateTime? LockedUntilUtc { get; set; }
            public string LockToken { get; set; }
            public string MessageId { get; set; }
            public DateTime? ScheduledEnqueueTimeUtc { get; set; }
            public string SequenceNumber { get; set; }
            public string Size { get; set; }
            public string State { get; set; }
            public string TimeToLive { get; set; }
        }

	private static void SaveMessageInTable(string relatie, string content, DateTimeOffset scheduledEnqueueTimeUtc)
        {
            var key = Environment.GetEnvironmentVariable("ParkStorageTableKey");
            var sa = Environment.GetEnvironmentVariable("ParkStorageaccount");
            var cred = new StorageCredentials(sa, key);
            var account = new CloudStorageAccount(cred, true);
            var client = account.CreateCloudTableClient();
            var table = client.GetTableReference("Parked");
            table.CreateIfNotExistsAsync();
            var entity = new DynamicTableEntity(partitionKey: "Intermediair", rowKey: relatie);
            entity.Properties.Add("scheduled", new EntityProperty(scheduledEnqueueTimeUtc));
            entity.Properties.Add("message", new EntityProperty(Encoding.UTF8.GetString(Convert.FromBase64String(content))));
            var insertOperation = TableOperation.InsertOrReplace(entity);
            table.ExecuteAsync(insertOperation);
        }

        private static void DeleteMessageInTable(string relatie)
        {
            var key = Environment.GetEnvironmentVariable("ParkStorageTableKey");
            var sa = Environment.GetEnvironmentVariable("ParkStorageaccount");
            var cred = new StorageCredentials(sa, key);
            var account = new CloudStorageAccount(cred, true);
            var client = account.CreateCloudTableClient();
            var table = client.GetTableReference("Parked");

            try
            {
                table.ExecuteAsync(TableOperation.Delete(new TableEntity(partitionKey: "Intermediair", rowKey: relatie) { ETag = "*" }));
            }
            catch (StorageException e)
            {
                if (e.RequestInformation.HttpStatusCode != (int)HttpStatusCode.NotFound)
                    throw;
            }
        }

    }
}

First of all note we pass in a message with a relation number, a body and a set of properties. The properties include two types of retry intervals: shortparkretry and longparkretry. The shortparkretry can be one hour and the longparkretry one day. The retry interval can be passed as a double so we can specify a fraction of an hour as a retry interval. That’s easy for testing where you don’t want to wait for an hour or a day. You could for instance use retry values 0.02 and 0.05.

When we want to schedule a message, we call method ScheduleMessageAsync with scheduledEnqueueTimeUtc as a parameter. That means, the message will be published to the service bus at that specific scheduledEnqueueTimeUtc.

To see what messages are in retry we write all messages including the next EnqueueTime to an Azure Storage table named Parked. This is not a requirement, it’s just handy for operators to have a form of visibility while messages are waiting to be published to the service bus. When retry is done (e.g. two times short park, two times long park), the messages are removed from the Azure Storage table again.