Thinking about the data I was using, I tried to think of a typical way in which you’d want to query the information relating to the outbreak of an infectious disease. Here’s what I came up with – let’s suppose that the disease is airborne and spread by droplets. In which case, you can define the geographic extent of the area that has been affected by creating the convex hull around all the locations that have had reported incidents (if you’re in a location and places to the west, east, south, and north of you have all had confirmed cases, I’d say you lie in an infected area even if you haven’t directly had any cases at your location). What’s more, let’s say that, after an outbreak has been confirmed at a location, it takes 30 days before that area can be declared “all clear” and free of the infection.
Bear in mind that I’m not an epidemiologist and I have no idea if these assumptions are anything like accurate, but they’ll work for me in this example. So, in order to define (and plot on a map) the areas of the world affected by the disease at any point in time, I’d want to determine all those points that lay within the convex hull formed from the locations of any incidents that had occurred in the preceding 30 days. This seemed like a reasonable case for a spatio-temporal query, so let’s just pass over the medical accuracy or not of the assumptions…
This time, rather than just select each individual event, I wanted to create a hopping window – a window that selects only those events that occur within a particular, sliding timeframe. Then I’d want to perform an aggregate of the payload fields of the events contained within that window.
I’d already decided that the extent of my window would cover a period of 30 days (to capture all those occurrences that were still contagious at any given point in time), and I wanted to examine the cases that fell within that window on a day-by-day basis – i.e. my window would “hop” forward by one day each time. There was no point defining a hop size of less than a day, since the granularity of my input data was only recorded at date level.
Both the window size and the hop size are defined as timespan parameters to a HoppingWindow acting on an input stream. So, I changed the first part of my query to look like this:
Next, I needed to create the aggregate function to act upon those events in the window.
Notice that, as with the location data in the payload of each event, I’m using the WKT text format for all inputs and outputs of my spatial functions. This incurs a little extra cost in parsing the input into each function, but it just makes the workflow that much easier to debug. I’m also using the ConvexHull() method of the SqlGeography datatype, which is newly introduced in the SqlServer.Types.dll library that ships with SQL Server Denali.
Before you can actually call this function from the Stream Insight query, you need to register a LINQ extension method that wraps the aggregate in such a way that it can be called from LINQ. Here’s my LINQ Extension wrapper:
Calling the CHull aggregate method (remember to use the name of the LINQ wrapper, not the UDA itself) to calculate the convex hull around all those points whose location is defined by the WKT field in the event payload contained within the window at any time meant that my modified query now looked like this:
Now, re-running my project gave the output on the console window as shown below – for each day covered by the extent of the data (still shown in mm/dd/yyyy format because of the OutputTracer output adaptor I’m using – grrrr), the results show the Polygon formed from the convex hull of any points that occurred in the preceding 30 days.
Bear in mind that I’m not an epidemiologist and I have no idea if these assumptions are anything like accurate, but they’ll work for me in this example. So, in order to define (and plot on a map) the areas of the world affected by the disease at any point in time, I’d want to determine all those points that lay within the convex hull formed from the locations of any incidents that had occurred in the preceding 30 days. This seemed like a reasonable case for a spatio-temporal query, so let’s just pass over the medical accuracy or not of the assumptions…
Defining the Hopping Window
The basic query I’d set up last time just selected all the events from the input stream, as follows:var query = from e in inputStream select e;
I’d already decided that the extent of my window would cover a period of 30 days (to capture all those occurrences that were still contagious at any given point in time), and I wanted to examine the cases that fell within that window on a day-by-day basis – i.e. my window would “hop” forward by one day each time. There was no point defining a hop size of less than a day, since the granularity of my input data was only recorded at date level.
Both the window size and the hop size are defined as timespan parameters to a HoppingWindow acting on an input stream. So, I changed the first part of my query to look like this:
var query = from e in inputStream.HoppingWindow( TimeSpan.FromDays(30), // Window size TimeSpan.FromDays(1)) // Hop size ... // Do something with the events that fall in this window
Creating a Custom StreamInsight Aggregate function
To create the convex hull of the set of points contained within a particular window, I needed to create an aggregate function that derives from the CepAggregate base class. There’s an MSDN article at http://msdn.microsoft.com/en-us/library/ee842720.aspx that gives an example, although it fails to mention where you’d actually find that class (it’s in Microsoft.ComplexEventProcessing.Extensibility, incidentally). In my case, I wanted a function that would act upon an input set of strings (remember that my payload contained the location of each outbreak as a WKT Point), and return a string (the WKT of a Polygon created from the convex hull of those points). Here’s the function I created:public class ConvexHull : CepAggregate{ public override string GenerateOutput(IEnumerable eventData) { // First, create a MultiPoint of all Points in the current window var gb = new SqlGeographyBuilder(); gb.SetSrid(4326); gb.BeginGeography(OpenGisGeographyType.MultiPoint); foreach (var d in eventData) { // Create a geography instance from the WKT of each event SqlGeography point = SqlGeography.Parse(d); gb.BeginGeography(OpenGisGeometryType.Point); gb.BeginFigure((double)point.STX, (double)point.STY); gb.EndFigure(); gb.EndGeography(); } gb.EndGeography(); // Now, create the Convex Hull of that MultiPoint SqlGeography convexhull; convexhull = gb.ConstructedGeography.STConvexHull(); // Return the WKT of the Convex Hull return convexhull.ToString(); } }
Before you can actually call this function from the Stream Insight query, you need to register a LINQ extension method that wraps the aggregate in such a way that it can be called from LINQ. Here’s my LINQ Extension wrapper:
public static class UDAExtensionMethods { [CepUserDefinedAggregate(typeof(ConvexHull))] public static string CHull(this CepWindow window, Expression > map) { throw CepUtility.DoNotCall(); } }
var query = from x in inputStream.HoppingWindow( TimeSpan.FromDays(30), TimeSpan.FromDays(1)) select x.CHull(p => p.WKT);
No comments:
Post a Comment