Skip to content

ProSnippets StreamLayers

uma2526 edited this page Nov 10, 2021 · 2 revisions
Language:              C#  
Subject:               StreamLayers  
Contributor:           ArcGIS Pro SDK Team <[email protected]>  
Organization:          esri, http://www.esri.com  
Date:                  1/10/2020  
ArcGIS Pro:            2.4  
Visual Studio:         2017, 2019  
.NET Target Framework: 4.6.1  

Create Stream Layer

Create Stream Layer with URI

//Must be on the QueuedTask
var url = "https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
var createParam = new FeatureLayerCreationParams(new Uri(url))
{
  IsVisible = false //turned off by default
};
var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(createParam, map);

//or use "original" create layer (will be visible by default)
Uri uri = new Uri(url);
streamLayer = LayerFactory.Instance.CreateLayer(uri, map) as StreamLayer;
streamLayer.SetVisibility(false);//turn off

Create a stream layer with a definition query

//Must be on the QueuedTask
var url = "https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
var lyrCreateParam = new FeatureLayerCreationParams(new Uri(url))
{
  IsVisible = true,
  DefinitionFilter = new CIMDefinitionFilter()
  {
    DefinitionExpression = "RWY = '29L'",
    Name = "Runway"
  }
};

var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(lyrCreateParam, map);

Create a stream layer with a simple renderer

var url = @"https://geoeventsample1.esri.com:6443/arcgis/rest/services/LABus/StreamServer";
var uri = new Uri(url, UriKind.Absolute);
//Must be on QueuedTask!
var createParams = new FeatureLayerCreationParams(uri)
{
  RendererDefinition = new SimpleRendererDefinition()
  {
    SymbolTemplate = SymbolFactory.Instance.ConstructPointSymbol(
                        ColorFactory.Instance.BlueRGB,
                        12,
                 SimpleMarkerStyle.Pushpin).MakeSymbolReference()
  }
};
var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
                    createParams, map);

Setting a unique value renderer for latest observations

var url = @"https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
var uri = new Uri(url, UriKind.Absolute);
//Must be on QueuedTask!
var createParams = new FeatureLayerCreationParams(uri)
{
  IsVisible = false
};
var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
                    createParams, map);
//Define the unique values by hand
var uvr = new CIMUniqueValueRenderer()
{
  Fields = new string[] { "ACTYPE" },
  UseDefaultSymbol = true,
  DefaultLabel = "Others",
  DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
              CIMColor.CreateRGBColor(185, 185, 185), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
};

var classes = new List<CIMUniqueValueClass>();
//add in classes - one for ACTYPE of 727, one for DC 9
classes.Add(
  new CIMUniqueValueClass() {
        Values = new CIMUniqueValue[] {
              new CIMUniqueValue() { FieldValues = new string[] { "B727" } } },
        Visible = true,
        Label = "Boeing 727",
        Symbol = SymbolFactory.Instance.ConstructPointSymbol(
              ColorFactory.Instance.RedRGB, 10, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
});
classes.Add(
  new CIMUniqueValueClass()
  {
    Values = new CIMUniqueValue[] {
              new CIMUniqueValue() { FieldValues = new string[] { "DC9" } } },
    Visible = true,
    Label = "DC 9",
    Symbol = SymbolFactory.Instance.ConstructPointSymbol(
              ColorFactory.Instance.GreenRGB, 10, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
  });
//add the classes to a group
var groups = new List<CIMUniqueValueGroup>()
{
  new CIMUniqueValueGroup() {
     Classes = classes.ToArray()
  }
};
//add the groups to the renderer
uvr.Groups = groups.ToArray();
//Apply the renderer (for current observations)
streamLayer.SetRenderer(uvr);
streamLayer.SetVisibility(true);//turn on the layer

Stream Layer settings and properties

Find all Stream Layers that are Track Aware

var trackAwareLayers = MapView.Active.Map.GetLayersAsFlattenedList()
                           .OfType<StreamLayer>().Where(sl => sl.IsTrackAware)?.ToList();

Determine the Stream Layer type

//spatial or non-spatial?
if (streamLayer.TrackType == TrackType.AttributeOnly)
{
  //this is a non-spatial stream layer
}
else
{
  //this must be a spatial stream layer
}

Check the Stream Layer connection state

if (!streamLayer.IsStreamingConnectionOpen)
  //Must be on QueuedTask!
  streamLayer.StartStreaming();

Start and stop streaming

//Must be on QueuedTask!
//Start...
streamLayer.StartStreaming();
//Stop...
streamLayer.StopStreaming();

Delete all current and previous observations

//Must be on QueuedTask!
//Must be called on the feature class
using(var rfc = streamLayer.GetFeatureClass())
  rfc.Truncate();

Get the Track Id Field

if (streamLayer.IsTrackAware)
{
  var trackField = streamLayer.TrackIdFieldName;
  //TODO use the field name
}

Get The Track Type

var trackType = streamLayer.TrackType;
switch(trackType)
{
  //TODO deal with tracktype
  case TrackType.None:
  case TrackType.AttributeOnly:
  case TrackType.Spatial:
    break;
}

Set the Maximum Count of Previous Observations to be Stored in Memory

//Must be on QueuedTask
//Set Expiration Method and Max Expiration Count
if (streamLayer.GetExpirationMethod() != FeatureExpirationMethod.MaximumFeatureCount)
  streamLayer.SetExpirationMethod(FeatureExpirationMethod.MaximumFeatureCount);
streamLayer.SetExpirationMaxCount(15);
//FYI
if (streamLayer.IsTrackAware)
{
  //MaxCount is per track! otherwise for the entire layer
}

Set the Maximum Age of Previous Observations to be Stored in Memory

//Must be on QueuedTask
//Set Expiration Method and Max Expiration Age
if (streamLayer.GetExpirationMethod() != FeatureExpirationMethod.MaximumFeatureAge)
  streamLayer.SetExpirationMethod(FeatureExpirationMethod.MaximumFeatureAge);
//set to 12 hours (max is 24 hours)
streamLayer.SetExpirationMaxAge(new TimeSpan(12,0,0));

//FYI
if (streamLayer.IsTrackAware)
{
  //MaxAge is per track! otherwise for the entire layer
}

Set Various Stream Layer properties via the CIM

//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask
//get the CIM Definition
var def = streamLayer.GetDefinition() as CIMFeatureLayer;
//set the number of previous observations, 
def.PreviousObservationsCount = (int)streamLayer.GetExpirationMaxCount() - 1;
//set show previous observations and track lines to true
def.ShowPreviousObservations = true;
def.ShowTracks = true;
//commit the changes
streamLayer.SetDefinition(def);

Rendering

Defining a unique value renderer definition

var uvrDef = new UniqueValueRendererDefinition()
{
  ValueFields = new string[] { "ACTYPE" },
  SymbolTemplate = SymbolFactory.Instance.ConstructPointSymbol(
    ColorFactory.Instance.RedRGB, 10, SimpleMarkerStyle.Hexagon)
      .MakeSymbolReference(),
  ValuesLimit = 5
};
//Note: CreateRenderer can only create value classes based on
//the current events it has received
streamLayer.SetRenderer(streamLayer.CreateRenderer(uvrDef));

Setting a unique value renderer for latest observations

//Define the classes by hand to avoid using CreateRenderer(...)
CIMUniqueValueClass uvcB727 = new CIMUniqueValueClass()
{
  Values = new CIMUniqueValue[] { new CIMUniqueValue() { FieldValues = new string[] { "B727" } } },
  Visible = true,
  Label = "Boeing 727",
  Symbol = SymbolFactory.Instance.ConstructPointSymbol(CIMColor.CreateRGBColor(255, 0, 0), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
};

CIMUniqueValueClass uvcD9 = new CIMUniqueValueClass()
{
  Values = new CIMUniqueValue[] { new CIMUniqueValue() { FieldValues = new string[] { "DC9" } } },
  Visible = true,
  Label = "DC 9",
  Symbol = SymbolFactory.Instance.ConstructPointSymbol(CIMColor.CreateRGBColor(0, 255, 0), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
};
//Assign the classes to a group
CIMUniqueValueGroup uvGrp = new CIMUniqueValueGroup()
{
  Classes = new CIMUniqueValueClass[] { uvcB727, uvcD9 }
};
//assign the group to the renderer
var UVrndr = new CIMUniqueValueRenderer()
{
  Fields = new string[] { "ACTYPE" },
  Groups = new CIMUniqueValueGroup[] { uvGrp },
  UseDefaultSymbol = true,
  DefaultLabel = "Others",
  DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
    CIMColor.CreateRGBColor(185, 185, 185), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
};
//set the renderer. Depending on the current events recieved, the
//layer may or may not have events for each of the specified
//unique value classes
streamLayer.SetRenderer(UVrndr);

Setting a unique value renderer for previous observations

//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask!
//Define unique value classes same as we do for current observations
//or use "CreateRenderer(...)" to assign them automatically
CIMUniqueValueClass uvcB727Prev = new CIMUniqueValueClass()
{
  Values = new CIMUniqueValue[] { new CIMUniqueValue() {
    FieldValues = new string[] { "B727" } } },
  Visible = true,
  Label = "Boeing 727",
  Symbol = SymbolFactory.Instance.ConstructPointSymbol(
    CIMColor.CreateRGBColor(255, 0, 0), 4, SimpleMarkerStyle.Hexagon)
    .MakeSymbolReference()
};

CIMUniqueValueClass uvcD9Prev = new CIMUniqueValueClass()
{
  Values = new CIMUniqueValue[] { new CIMUniqueValue() {
    FieldValues = new string[] { "DC9" } } },
  Visible = true,
  Label = "DC 9",
  Symbol = SymbolFactory.Instance.ConstructPointSymbol(
    CIMColor.CreateRGBColor(0, 255, 0), 4, SimpleMarkerStyle.Hexagon)
    .MakeSymbolReference()
};

CIMUniqueValueGroup uvGrpPrev = new CIMUniqueValueGroup()
{
  Classes = new CIMUniqueValueClass[] { uvcB727Prev, uvcD9Prev }
};

var UVrndrPrev = new CIMUniqueValueRenderer()
{
  Fields = new string[] { "ACTYPE" },
  Groups = new CIMUniqueValueGroup[] { uvGrpPrev },
  UseDefaultSymbol = true,
  DefaultLabel = "Others",
  DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
    CIMColor.CreateRGBColor(185, 185, 185), 4, SimpleMarkerStyle.Hexagon)
    .MakeSymbolReference()
};

Setting a simple renderer to draw track lines

//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask!
//Note: only a simple renderer with solid line symbol is supported for track 
//line renderer
var trackRenderer = new SimpleRendererDefinition()
{
  SymbolTemplate = SymbolFactory.Instance.ConstructLineSymbol(
      ColorFactory.Instance.BlueRGB, 2, SimpleLineStyle.Solid)
        .MakeSymbolReference()
};
streamLayer.SetRenderer(
     streamLayer.CreateRenderer(trackRenderer), 
       FeatureRendererTarget.TrackLines);

Check Previous Observation and Track Line Visibility

//The layer must be track aware and spatial for these settings
//to have an effect
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask
if (!streamLayer.AreTrackLinesVisible)
  streamLayer.SetTrackLinesVisibility(true);
if (!streamLayer.ArePreviousObservationsVisible)
  streamLayer.SetPreviousObservationsVisibility(true);

Make Track Lines and Previous Observations Visible

//The layer must be track aware and spatial for these settings
//to have an effect
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask
//Note: Setting PreviousObservationsCount larger than the 
//"SetExpirationMaxCount()" has no effect
streamLayer.SetPreviousObservationsCount(6);
if (!streamLayer.AreTrackLinesVisible)
  streamLayer.SetTrackLinesVisibility(true);
if (!streamLayer.ArePreviousObservationsVisible)
  streamLayer.SetPreviousObservationsVisibility(true);

Retrieve the current observation renderer

//Must be on QueuedTask!
var renderer = streamLayer.GetRenderer();

Retrieve the previous observation renderer

//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask!
var prev_renderer = streamLayer.GetRenderer(
    FeatureRendererTarget.PreviousObservations);

Retrieve the track lines renderer

//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask!
var track_renderer = streamLayer.GetRenderer(
    FeatureRendererTarget.TrackLines);

Subscribe and SearchAndSubscribe

Search And Subscribe for Streaming Data

await QueuedTask.Run(async () =>
{
  //query filter can be null to search and retrieve all rows
  //true means recycling cursor
  using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
  {
    //waiting for new features to be streamed
    //default is no cancellation
    while (await rc.WaitForRowsAsync())
    {
      while (rc.MoveNext())
      {
        //determine the origin of the row event
        switch (rc.Current.GetRowSource())
        {
          case RealtimeRowSource.PreExisting:
            //pre-existing row at the time of subscribe
            continue;
          case RealtimeRowSource.EventInsert:
            //row was inserted after subscribe
            continue;
          case RealtimeRowSource.EventDelete:
            //row was deleted after subscribe
            continue;
        }
      }
    }
  }//row cursor is disposed. row cursor is unsubscribed

  //....or....
  //Use the feature class instead of the layer
  using(var rfc = streamLayer.GetFeatureClass())
  {
    //non-recycling cursor - 2nd param "false"
    using(var rc = rfc.SearchAndSubscribe(qfilter, false))
    {
      //waiting for new features to be streamed
      //default is no cancellation
      while (await rc.WaitForRowsAsync())
      {
        //etc
      }
    }
  }
});

Search And Subscribe With Cancellation

await QueuedTask.Run(async () =>
{
  //Recycling cursor - 2nd param "true"
  //or streamLayer.Subscribe(qfilter, true) to just subscribe
  using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
  {
    //auto-cancel after 20 seconds
    var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
    //catch TaskCanceledException
    try
    {
      while (await rc.WaitForRowsAsync(cancel.Token))
      {
        //check for row events
        while (rc.MoveNext())
        {
          //etc
        }
      }
    }
    catch (TaskCanceledException tce)
    {
      //Handle cancellation as needed
    }
    cancel.Dispose();
  }
});

Explicitly Cancel WaitForRowsAsync

//somewhere in our code we create a CancellationTokenSource
var cancel = new CancellationTokenSource();
//...

//call cancel on the CancellationTokenSource anywhere in
//the add-in, assuming the CancellationTokenSource is in scope
if (SomeConditionForCancel)
  cancel.Cancel();//<-- will cancel the token

//Within QueuedTask we are subscribed! streamLayer.Subscribe() or SearchAndSubscribe()
try
{
  //TaskCanceledException will be thrown when the token is cancelled
  while (await rc.WaitForRowsAsync(cancel.Token))
  {
    //check for row events
    while (rc.MoveNext())
    {
      //etc
    }
  }
}
catch (TaskCanceledException tce)
{
  //Handle cancellation as needed
}
cancel.Dispose();

Realtime FeatureClass

Connect to a real-time feature class from a real-time datastore

var url = "https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
     
await QueuedTask.Run(() =>
{
  var realtimeServiceConProp = new RealtimeServiceConnectionProperties(
                                   new Uri(url),
                                   RealtimeDatastoreType.StreamService
                                );
  using (var realtimeDatastore = new RealtimeDatastore(realtimeServiceConProp))
  {
    //A Realtime data store only contains **one** Realtime feature class (or table)
    var name = realtimeDatastore.GetTableNames().First();
    using (var realtimeFeatureClass = realtimeDatastore.OpenTable(name) as RealtimeFeatureClass)
    {
      //feature class, by default, is not streaming (opposite of the stream layer)
      realtimeFeatureClass.StartStreaming();
      //TODO use the feature class
      //...
    }
  }

});

Check the Realtime Feature Class is Track Aware

using (var rfc = streamLayer.GetFeatureClass())
using (var rfc_def = rfc.GetDefinition())
{
  if (rfc_def.HasTrackIDField())
  {
    //Track aware
  }
}

Get the Track Id Field from the Realtime Feature class

//Must be on QueuedTask
using (var rfc = streamLayer.GetFeatureClass())
using (var rfc_def = rfc.GetDefinition())
{
  if (rfc_def.HasTrackIDField())
  {
    var fld_name = rfc_def.GetTrackIDField();

  }
}

Subscribe to Streaming Data

//Note: with feature class we can also use a System Task to subscribe and
//process rows
await QueuedTask.Run(async () =>
{
  // or var rfc = realtimeDatastore.OpenTable(name) as RealtimeFeatureClass
  using (var rfc = streamLayer.GetFeatureClass())
  {
    //non-recycling cursor - 2nd param "false"
    //subscribe, pre-existing rows are not searched
    using (var rc = rfc.Subscribe(qfilter, false))
    {
      SpatialQueryFilter spatialFilter = new SpatialQueryFilter();
      //waiting for new features to be streamed
      //default is no cancellation
      while (await rc.WaitForRowsAsync())
      {
        while(rc.MoveNext())
        {
          switch (rc.Current.GetRowSource())
          {
            case RealtimeRowSource.EventInsert:
              //getting geometry from new events as they arrive
              Polygon poly = ((RealtimeFeature)rc.Current).GetShape() as Polygon;

              //using the geometry to select features from another feature layer
              spatialFilter.FilterGeometry = poly;//project poly if needed...
              countyFeatureLayer.Select(spatialFilter);
              continue;
            default:
              continue;
          }
        }
      }
    }//row cursor is disposed. row cursor is unsubscribed
  }
});

Search Existing Data and Subscribe for Streaming Data

//Note we can use System Task with the Realtime feature class
//for subscribe
await System.Threading.Tasks.Task.Run(async () =>
// or use ... QueuedTask.Run()
{
  using (var rfc = streamLayer.GetFeatureClass())
  {
    //non-recycling cursor - 2nd param "false"
    using (var rc = rfc.SearchAndSubscribe(qfilter, false))
    {
      //waiting for new features to be streamed
      //default is no cancellation
      while (await rc.WaitForRowsAsync())
      {
        //pre-existing rows will be retrieved that were searched
        while (rc.MoveNext())
        {
          var row = rc.Current;
          var row_source = row.GetRowSource();
          switch (row_source)
          {
            case RealtimeRowSource.EventDelete:
              //TODO - handle deletes
              break;
            case RealtimeRowSource.EventInsert:
              //TODO handle inserts
              break;
            case RealtimeRowSource.PreExisting:
              //TODO handle pre-existing rows
              break;
          }
        }
      }
    }//row cursor is disposed. row cursor is unsubscribed
  }
});

Search And Subscribe With Cancellation

await System.Threading.Tasks.Task.Run(async () =>
// or use ... QueuedTask.Run()
{
  using (var rfc = streamLayer.GetFeatureClass())
  {
    //Recycling cursor - 2nd param "true"
    using (var rc = rfc.SearchAndSubscribe(qfilter, true))
    {
      //auto-cancel after 20 seconds
      var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
      //catch TaskCanceledException
      try
      {
        while (await rc.WaitForRowsAsync(cancel.Token))
        {
          //check for row events
          while (rc.MoveNext())
          {
            //etc
          }
        }
      }
      catch(TaskCanceledException tce)
      {
        //Handle cancellation as needed
      }
      cancel.Dispose();
    }
  }
});

Developing with ArcGIS Pro

    Migration


Framework

    Add-ins

    Configurations

    Customization

    Styling


Arcade


Content


CoreHost


DataReviewer


Editing


Geodatabase

    3D Analyst Data

    Plugin Datasources

    Topology

    Linear Referencing

    Object Model Diagram


Geometry

    Relational Operations


Geoprocessing


Knowledge Graph


Layout

    Reports

    Presentations


Map Authoring

    3D Analyst

    CIM

    Graphics

    Scene

    Stream

    Voxel


Map Exploration

    Map Tools


Networks

    Network Diagrams


Parcel Fabric


Raster


Sharing


Tasks


Workflow Manager


Reference

Clone this wiki locally