Close

Using Http Webhook

When debatching a Json file with 5600 articles, I found out that using a looping construct in Logic Apps (like foreach or until) is very slow. I had to create a set of Json files with the number of articles equal to a configurable batch size. As an alternative to Logic App looping, I decided to implement the looping in an Azure function, i.e. in custom C#.

You can also enable the Split On setting on your queue trigger. This setting automates debatching when you have multiple queue messages. In this case Split On is not usable, because we only have one queue message that needs to be debatched.

As expected looping in C# is very performant. There was one problem left however. When you call a function from a Logic App, you typically use the Function App connector. When doing so, you will be confronted with a time-out interval of two minutes. In this case not a problem, but I wanted to create a generally usable debatcher. That’s the reason I called an Azure function named TriggerDebatchEntities via a so-called Http Webhook. A Http Webhook allows you to call a function synchronously, pass a callbackurl, and then wait for an asynchronous response. The asynchronous response can’t be returned by the Http triggered function TriggerDebatchEntities. That’s why I created a second queue triggered function named DebatchEntities. DebatchEntities loops through the Json file, calls a REST service every time the batch size is reached and finally returns control to the Logic App by calling the callbackurl. DebatchEntities is not limited to two minutes, but can run for ten minutes. That’s a very long time in computing.

Http Webhook in Logic App:


Note that callbackUrl is set to variable @listCallbackUrl. This Url is a deeplink to the Webhook action. That means the Logic App will continue after the Webhook action when an asynchronous response is received.

Function TriggerDebatchEntities:

#r “Newtonsoft.Json”
#r “Microsoft.WindowsAzure.Storage”

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
using System;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using System.Text;
using Microsoft.Azure.WebJobs;
using System.Diagnostics;
using Microsoft.Azure.WebJobs.Host;

public class EventMessage
{
public int batchSize { get; set; }

public string callbackUrl { get; set; }

public string blobContainer { get; set; }

public string blob { get; set; }

}

public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
{
int batchSize = 1;
string callbackUrl;
string blobContainer;
string blob;

string storageConnectionString;

try
{

log.Info(String.Format(“Entered HTTP trigger TriggerDebatchEntities, batchsize: {0}”, batchSize.ToString()));

// parse query parameter
batchSize = (req.GetQueryNameValuePairs()
.FirstOrDefault(q => string.Compare(q.Key, “batchSize”, true) == 0)
.Value != null ? Int32.Parse(req.GetQueryNameValuePairs()
.FirstOrDefault(q => string.Compare(q.Key, “batchSize”, true) == 0)
.Value) : 1);

log.Info(String.Format(“batchsize: {0}”, batchSize.ToString()));

// Get request body
dynamic data = await req.Content.ReadAsAsync<object>();
callbackUrl = data?.callbackUrl;
blobContainer = data?.blobContainer;
blob = data?.blob;

log.Info(“Queue fiels read”);

if (callbackUrl == null)
{
return req.CreateResponse(HttpStatusCode.BadRequest, “Please pass a callbackUrl in the request body (required).”);
}

if (blobContainer == null || blob == null)
{
return req.CreateResponse(HttpStatusCode.BadRequest, “Please pass a valid blob(container) reference.”);
}

// Debatchen verplaatsen naar hoofdfunctie omdat berichtomvang anders te groot is voor de queue
storageConnectionString = Environment.GetEnvironmentVariable(“QueueStorage”);
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference(“tempdebatchentities”);
queue.CreateIfNotExists();

// Create a message and add it to the queue.
string messageContents = PrepareMessage(batchSize, callbackUrl, blobContainer, blob, log);
CloudQueueMessage message = new CloudQueueMessage(messageContents);
queue.AddMessage(message);

log.Info(“Queue message sent”);

message = null;
queue = null;
queueClient = null;

log.Info(“HTTP trigger TriggerDebatchEntities finished”);
return req.CreateResponse(HttpStatusCode.OK, “Request is registered for asynchronous processing”);

}
catch (Exception ex)
{
return req.CreateResponse(HttpStatusCode.InternalServerError, String.Format(“Asynchronous processing not succeeded. Error: {0]”, ex.Message));
}

}

private static string PrepareMessage(int batchSize, string callbackUrl, string blobContainer, string blob, TraceWriter log)
{

EventMessage msg = new EventMessage();
msg.batchSize = batchSize;
msg.callbackUrl = callbackUrl;
msg.blobContainer = blobContainer;
msg.blob = blob;

string messageContents = JsonConvert.SerializeObject(msg, Newtonsoft.Json.Formatting.Indented);
msg = null;

return messageContents;

}

Function DebatchEntities:

#r “Newtonsoft.Json”
#r “Microsoft.WindowsAzure.Storage”

using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using System.Text;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System.IO;

private static HttpClient tlxClient = new HttpClient();

public static void Run(string myQueueItem, TraceWriter log)
{

int batchSize=1;
string callbackUrl=””;
string blobContainer=””;
string blob=””;

log.Info($”Entered Queue trigger DebatchEntities processed”);

string doWorkUrl = Environment.GetEnvironmentVariable(“ServiceUrl”);

// Read input values from queue
JObject input = JObject.Parse(myQueueItem);
string propertyName;

foreach (JProperty parsedProperty in input.Properties())
{
propertyName = parsedProperty.Name;
if (propertyName.Equals(“batchSize”))
{
batchSize = (int)parsedProperty.Value;
}
if (propertyName.Equals(“callbackUrl”))
{
callbackUrl = (string)parsedProperty.Value;
}
if (propertyName.Equals(“blobContainer”))
{
blobContainer = (string)parsedProperty.Value;
}
if (propertyName.Equals(“blob”))
{
blob = (string)parsedProperty.Value;
}
}

log.Info(String.Format(“callback {0}”, callbackUrl));
log.Info(String.Format(“blobContainer {0}”, blobContainer));
log.Info(String.Format(“blob {0}”, blob));

// Read blob
string storageConnectionString = Environment.GetEnvironmentVariable(“BlobStorage”);
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);

CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
CloudBlobContainer container = blobClient.GetContainerReference(blobContainer);

// Get blob data
CloudBlockBlob cloudBlob = container.GetBlockBlobReference(blob);
MemoryStream stream= new MemoryStream();
cloudBlob.DownloadToStream(stream);
stream.Position = 0;

StreamReader streamReader = new StreamReader(stream);
string streamContent = streamReader.ReadToEnd();

int counter = 0;
JArray jsonContent = JArray.Parse(streamContent);
JArray output = new JArray();
HttpResponseMessage response;

foreach (JObject item in jsonContent)
{
if (counter < batchSize)
{
counter += 1;
output.Add(item);
}
else
{

response = tlxClient.PostAsync(doWorkUrl, new StringContent(output.ToString(), Encoding.UTF8, “application/json”)).Result;
log.Info(“DoWork ServiceCall performed”);

output.Clear();
output.Add(item);
counter = 1;
}
}

if (counter > 0)
{
response = tlxClient.PostAsync(doWorkUrl, new StringContent(output.ToString(), Encoding.UTF8, “application/json”)).Result;
log.Info(“DoWork ServiceCall performed”);
}

using (var client = new HttpClient())
{
response = client.PostAsync(callbackUrl, new StringContent(myQueueItem,
Encoding.UTF8, “application/json”)).Result;
}

log.Info(String.Format(“Response from callback {0}.”, response));

blobClient = null;
cloudBlob = null;
stream = null;
streamReader = null;

log.Info(String.Format(“Asynchronous processing performed. Callback to {0}.”, callbackUrl));
}

Note that the DoWorkUrl is defined in AppSettings. In this case we use a dummy service, being the RequestBin service -> http://requestbin.fullcontact.com/1iwzk791.

Please visit my blog post on durable functionsas well.

Leave a Reply

Your email address will not be published. Required fields are marked *