Persisting Cloud Events to Cosmos DB in Azure

If you start down the path of implementing Event Sourcing, you’ll most likely come across https://cloudevents.io/ which has the tag line “A specification for describing event data in a common way”. This project seems to be well supported (take a look at the contributors list) and has language projections for a number of different languages. It also has some out of the box support for working with various event systems, such as Azure Event Grid. In this post we’re going to look at how you can simply save and retrieve a CloudEvent (the implementation of the Cloud Events spec) to Azure Cosmos DB.

Getting Started

We’ll start with a basic ASP.NET Core Web API project. Given the .NET 5 release is just around the corner, we’re going to pick the .NET 5.0 (Preview) option. We’re also going to enable the OpenAPI support – this is awesome as it not only gives us OpenAPI (aka next gen of Swagger) documentation for our API, it also gives us a neat test page that we’ll be using to send CloudEvents to our API.

We’re going to remove the default WeatherForecastController and the associated WeatherForecast class. Then we’re going to add a new controller, CloudEventsController, which has a single method, UploadCloudEvent, which will be invoked when a POST is made to the /api/cloudevents endpoint.

[ApiController]
[Route("[controller]")]
public class CloudEventsController : ControllerBase
{
    private readonly ILogger<CloudEventsController> _logger;

    public CloudEventsController(ILogger<CloudEventsController> logger)
    {
        _logger = logger;
    }

    [HttpPost]
    public async Task<ActionResult<string>> UploadCloudEvent(
        [FromBody] CloudEvent cloudEvent)
    {
        return Ok("TBD");
    }
}

At this point if we attempt to build the project, we’ll get a build error because the CloudEvent class isn’t recognised. Luckily Visual Studio can help us with this by recommending an appropriate NuGet Package.

We can build and run the project at this point and attempt to send a CloudEvent, as JSON, using the test UI.

Unfortunately what we get back is an error:

System.NotSupportedException: Deserialization of types without a parameterless constructor, a singular parameterized constructor, or a parameterized constructor annotated with 'JsonConstructorAttribute' is not supported. Type 'CloudNative.CloudEvents.CloudEvent'. Path: $ | LineNumber: 0 | BytePositionInLine: 1. ---> System.NotSupportedException: Deserialization of types without a parameterless constructor, a singular parameterized constructor, or a parameterized constructor annotated with 'JsonConstructorAttribute' is not supported. Type 'CloudNative.CloudEvents.CloudEvent'.

Luckily the C# implementation for CloudEvents also has an ASP.NET Core support package, CloudNative.CloudEvents.AspNetCore. This package includes a Json formatter, CloudEventJsonInputFormatter, which can be registered in the ConfigureServices in order to deserialise CloudEvent objects from Json.

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers(opts =>
    {
        opts.InputFormatters.Insert(0,new CloudEventJsonInputFormatter());
    });

    services.AddControllers();
    services.AddSwaggerGen(c =>
    {
        c.SwaggerDoc("v1", new OpenApiInfo { Title = "CloudEventsSample", Version = "v1" });
    });
}

At this point we’re pretty close to being able to send CloudEvents to our API. However, if we attempt to POST a CloudEvent, we’ll get the following error

{ "type": "https://tools.ietf.org/html/rfc7231#section-6.5.1", "title": "One or more validation errors occurred.", "status": 400, "traceId": "00-67ef18df9db33e49bb9aeb0b0c717e32-87b5edd8ba995c4d-00", "errors": { "DataContentType.Name": [ "The Name field is required." ] } }

For those who’ve worked with the ASP.NET model validation, this type of error would look fairly familiar. Essentially, it’s failing because we’re not setting the Name property on the DataContentType element. What’s confusing is that neither the CloudEvent class, nor any of it’s dependent types, have been attributed with any validation attributes (eg Required).

In .NET 5, the default validation has changed, with all properties required by default. This is a careless change and one that’s going to cause in-ordinate amount of frustration both on new projects and existing project that are upgrading. Unfortunately, the ship has sailed on this one, so the best we can do is to code around it. Luckily, there’s a simple fix – add the following line to your ConfigureServices method.

services.AddControllers(options => options.SuppressImplicitRequiredAttributeForNonNullableReferenceTypes = true);

At this point you can run the project and use the test interface to submit a CloudEvent as Json. Everything appears to work and the test interface returns the string “TBD” as the code ssuggets. However, if you set a break point in the UploadCloudEvent method and inspect the cloudEvent object, you’ll see that none of the properties have the correct value.

Turns out that if you use the default options in the test UI, most of the properties of the CloudEvent are not set. However, if you select application/cloudevents+json from the content type dropdown, the CloudEvent object is correctly deserialised.

As you can see the CloudEvents has a number of standard properties, and then a nested Data property – this is where your application specific data will reside. You’ll also notice that there’s an Extensions property, which we’ll come back to later but is useful for capturing additional metadata about the event.

Saving To Azure Cosmos DB

Every application has their own requirements for how to process and store events. In our case, we opted to save CloudEvents to an Azure Cosmos DB. In this post we’re not providing advice as to whether this is appropriate for your application, I’m simply going to walk through how to save and retrieve CloudEvent objects to/from Azure Cosmos DB.

I’m not going to walk through the process of setting up Cosmos DB, since there’s some great tutorials in the documentation for the product. What we are going to do is to define a couple of constants that we’ll need.

private string EndpointUrl = "https://yourcosmosdb-westus.documents.azure.com:443/";
private string AuthorizationKey = "Ut5NuthyYUBXlL0bxBY.........";
private string DatabaseId = "cloudeventssample";
private string ContainerId = "cloudevents";
private string PartitionKeyPath = "/type";

The EndpointUrl and AuthorizationKey come from your instance of CosmosDB. The DatabaseId, ContainerId and PartitionKeyPath are all specific to your application. Not however, that the PartitionKeyPath has to match to a property on the CloudEvent object. For the moment we’re going to use the Type property on the CloudEvent. However, we’ll see later how we can use an extension to define a partitionkey property that is more suited for this purpose.

We can then add code to the UploadCloudEvent to save the CloudEvent to a Cosmos DB container.

[HttpPost]
public async Task<ActionResult<string>> UploadCloudEvent(
    [FromBody] CloudEvent cloudEvent)
{
    var cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey);
    var databaseReq = await cosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId);
    Debug.WriteLine("Created Database: {0}", databaseReq.Database.Id);
    var containerReq = await cosmosClient.GetDatabase(DatabaseId).CreateContainerIfNotExistsAsync(ContainerId, PartitionKeyPath);
    Debug.WriteLine("Created Container: {0}", containerReq.Container.Id);

    var container = containerReq.Container;

    var response = await container.CreateItemAsync(cloudEvent);

    return Ok($"Created {response.StatusCode}");
}

As you can probably have predicted by now, this fails – nothing worth doing is easy! This time it fails stating:

Microsoft.Azure.Cosmos.CosmosException : Response status code does not indicate success: BadRequest (400); Substatus: 0; ActivityId: 70c79590-fdf2-4bf6-80a1-cd4af23774c5; Reason: (Message: {"Errors":["The input content is invalid because the required properties - 'id; ' - are missing"]} ActivityId: 70c79590-fdf2-4bf6-80a1-cd4af23774c5, Request URI: /apps/548fba61-21b5-4fac-a478-8b7b5a1b3640/services/81adfd47-798e-4614-9c70-64cacf31b2e7/partitions/c9dabeda-0b5b-4ab3-8156-d4c63069599d/replicas/132485574048358760p/, RequestStats: Please see CosmosDiagnostics, SDK: Windows/10.0.19042 cosmos-netstandard-sdk/3.14.0);

This is weird, considering the CloudEvent class does have an Id property and the instance passed into the UploadCloudEvent method does have a non-null string value for Id. Which leads us to an issue with the serialization. The default Json serialization would be to change the Id property to id. However, it would appear that the Cosmos DB client doesn’t realise this, so is looking for an “id” property on the original entity (which is different from the Id property that exists).

The fix for this is to change the serialization options for the CosmosClient, as follows:

var options = new CosmosClientOptions
{
    SerializerOptions = new CosmosSerializationOptions { PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase }
};
var cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, options);

Then, just when you thought there can’t possibly be anything more to saving a CloudEvent to CosmosDB, think again. Attempting to upload a CloudEvent now generates the following error:

Newtonsoft.Json.JsonSerializationException: Unable to find a constructor to use for type CloudNative.CloudEvents.CloudEvent. A class should either have a default constructor, one constructor with arguments or a constructor marked with the JsonConstructor attribute. Path 'dataContentType', line 1, position 19.

I’m sure there are other ways to resolve this issue but I went the route of handling the serialization and deserialization of the CloudEvent myself. The CloudEvents SDK already has helper methods that can do this but relies on reading from and writing to a stream. Luckily, the CosmosClient also has overloads that can take a stream for the contents of the item being created.

The following CreateCloudEventAsync method follows a similar structure to the CreateItemAsync method, accepting the same parameters. However, the implementation involves using a MemoryStream to serialize the CloudEvent before calling the CreateItemStreamAsync method on the Container. This uses the JsonEventFormatter, that’s part of the CloudEvents SDK.

public static class ContainerHelpers
{
    public static async Task<ResponseMessage> CreateCloudEventAsync(
        this Container container, CloudEvent item, 
        ItemRequestOptions requestOptions = null, 
        CancellationToken cancellationToken = default)
    {
        var formatter = new JsonEventFormatter();
        var bytes = formatter.EncodeStructuredEvent(item, out _);
        using (var ms = new MemoryStream(bytes))
        {
            var createResponse = await container.CreateItemStreamAsync(ms, new PartitionKey(item.Type), requestOptions, cancellationToken);
            return createResponse;
        }
    }
}

That completes the process of saving a CloudEvent to CosmosDB.

Reading CloudEvents from CosmosDB

Unfortunately, for the same reason that we weren’t able to use the CreateItemAsync method to save a CloudEvent, we’re also prevented from using the ReadItemAsync method. Instead we have to use the ReadItemStreamAsync method and then again use the JsonEventFormatter to deserialize the CloudEvent from the returned stream.

We’ll add the ReadCloudEventAsync extension method to the ContainerHelpers class shown earlier.

public static async Task<CloudEvent> ReadCloudEventAsync(
    this Container container, 
    string id, string partitionKey, 
    ItemRequestOptions requestOptions = null, 
    CancellationToken cancellationToken = default)
{
    var responseMessage = await container.ReadItemStreamAsync(id, new PartitionKey(partitionKey), requestOptions, cancellationToken);
    if (responseMessage.StatusCode == HttpStatusCode.NotFound) throw new CosmosException("Missing", responseMessage.StatusCode, 0, null, 0);
    var formatter = new JsonEventFormatter();
    var cloudEvent = await formatter.DecodeStructuredEventAsync(responseMessage.Content, null);
    return cloudEvent;
}

At the beginning of the UploadCloudEvent method, we’ll call the ReadCloudEventAsync method to see if the CloudEvent already exists, before attempting to create it only if it doesn’t exist.

try
{
    var ce = await container.ReadCloudEventAsync(cloudEvent.Id, cloudEvent.Type);

    return Ok($"Item already exists {ce.Id}");
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
    var response = await container.CreateCloudEventAsync(cloudEvent);
    return Ok($"Created {response.StatusCode}");
}

CloudEvent Extensions

Earlier I mentioned that we were using the Type property on the CloudEvent class as the partition key. For a bunch of reasons I don’t think this is a good idea. Luckily, the CloudEvents SDK supports extending the CloudEvent class, not through inheritance (which always causes issues with serialization) but through extensions which can be added to the CloudEvent. In fact, there are a number of extensions already provided by the SDK. One of which is a PartitioningExtension, which makes it possible to add a partitionKey property to the CloudEvent JSON object, for example:

{
"specversion" : "1.0",
"type" : "com.github.pull.create",
"source" : "https://github.com/cloudevents/spec/pull",
"subject" : "123",
"id" : "A234-1234-1234",
"time" : "2018-04-05T17:31:00Z",
"partitionKey" : "defect-123",
"datacontenttype" : "text/xml",
"data" : ""
}

To get this to work in our Web API project, there are a couple of things we need to do:

PartitionKeyPath

We’ll need to change the PartitionKeyPath to be /partitionKey instead of /type. Unfortunately this means you’ll need to recreate the container (or just create a new one).

private string ContainerId = "paritionedcloudevents";
private string PartitionKeyPath = "/partitionKey";

PartitionKey for Reading and Writing

Instead of passing cloudEvent.Type into the ReadCloudEventAsync method, we now need to extract the value of the ParitioningExtension.

var ce = await container.ReadCloudEventAsync(cloudEvent.Id, cloudEvent.Extension<PartitioningExtension>().PartitioningKeyValue);

Also, in the CreateCloudEventAsync method, we need to change how the PartitionKey is created

var createResponse = await container.CreateItemStreamAsync(ms, new PartitionKey(item.Extension<PartitioningExtension>().PartitioningKeyValue), requestOptions, cancellationToken);

Use PartitioningExtension in CloudEventJsonInputFormatter

The one place where it’s not easy to add in the PartitioningExtension is with the CloudEventJsonInputFormatter, which is used to deserialize the CloudEvent that’s being sent to the /api/cloudevents endpoing. Currently, if the posted JSON includes the partitionKey property, it will be ignored and no extensions will be added to the generated CloudEvent object.

Unfortunately the CloudEventJsonInputFormatter that comes as part of the CloudEvents SDK provides no mechanism to include extensions as part of the deserialization process. Luckily, this is also quite easy to fix.

We’ll start by taking a copy of the CloudEventJsonInputFormatter class and add it to our project. To avoid issues with naming conflicts, we’ll rename our class to CloudEventWithExtensionsJsonInputFormatter.

Then we need to add support for instantiating the necessary extensions as part of the ReadRequestBodyAsync method. Here we’re simply going to use a function callback as this seemed an easy way to generate the extensions each time the method is called – Do NOT simply create a single instance of the extension, otherwise all CloudEvent objects will end up with the same extension. The changed lines are marked in bold.

public class CloudEventWithExtensionsJsonInputFormatter : TextInputFormatter
{
    private Func<ICloudEventExtension[]> cloudExtensionsFactory;
    public CloudEventWithExtensionsJsonInputFormatter(Func<ICloudEventExtension[]> extensionsFactory)
    {
        cloudExtensionsFactory = extensionsFactory;

        SupportedMediaTypes.Add(MediaTypeHeaderValue.Parse("application/json"));
        SupportedMediaTypes.Add(MediaTypeHeaderValue.Parse("application/cloudevents+json"));
        SupportedEncodings.Add(Encoding.UTF8);
        SupportedEncodings.Add(Encoding.Unicode);
    }
    public override async Task<InputFormatterResult> ReadRequestBodyAsync(InputFormatterContext context, Encoding encoding)
    {
        if (context == null)
        {
            throw new ArgumentNullException(nameof(context));
        }
        if (encoding == null)
        {
            throw new ArgumentNullException(nameof(encoding));
        }
        var request = context.HttpContext.Request;
        try
        {
            var cloudEvent = await request.ReadCloudEventAsync(cloudExtensionsFactory?.Invoke());
            return await InputFormatterResult.SuccessAsync(cloudEvent);
        }
        catch (Exception ex)
        {
            return await InputFormatterResult.FailureAsync();
        }
    }
    protected override bool CanReadType(Type type)
    {
        if (type == typeof(CloudEvent))
        {
            return base.CanReadType(type);
        }
        return false;
    }
}

Now we just need to update our Startup to register the CloudEventWithExtensionsJsonInputFormatter class, with the appropriate callback that returns a PartitioningExtension instance.

opts.InputFormatters.Insert(0,new CloudEventWithExtensionsJsonInputFormatter(()=>new[] { new PartitioningExtension() }));

After all that, we’re good to go – When we POST a CloudEvent with the partitionKey property we can see that it’s included in the PartitioningExtension.

Hopefully this has given you enough information to work with CloudEvents and CosmosDB.

If there’s anything you get stuck on, or to provide feedback, please don’t hesitate to leave a comment.

Leave a comment