Getting started with Graph framework

Quine.Schemas.Graph and Quine.Graph assemblies consist mostly of abstract types that need implementation. This section uses the small graph below to illustrate how to implement the abstractions. In this graph, source node generates Count integers, starting from 0 and incrementing by Increment (e.g., 0, 4, 8, 12). Transform node subtracts the provided Constant, while the drain node prints out the results.

The complete sample is in Quine.Samples\GraphSample.cs.

Getting Started Sample Graph
Getting Started Graph

  Tip

Rely on types provided by the library: if your code compiles, it is at least 80% correct. Errors occurring at run-time have two main causes: 1) invalid graph connectivity, 2) errors during instantiations of the nodes' implementing types.

This is a long sample, with a lot of boiler-plate because we are starting from scratch, with no predefined nodes or behaviors. With an existing library of behavior, the code would be much shorter. Note that only a fraction of available functionality is shown. Consult the reference documentation to learn more about the other features, such as interactivity, events, traces, and progress reporting.

Defining nodes and behaviors

  1. Define a concrete message class (complex workflows might need more than one), and concrete node state classes. Node state parametrizes the node.

    C#
    [DataContract]
    class IntMessage : GraphMessage {
        // Serialization
        static IntMessage() {
            KnownTypes.Add(typeof(IntMessage));
        }
    
        [DataMember]
        public int Data;
    }
    
    [DataContract]
    class CustomSourceState : SourceNodeState<IntMessage>
    {
        public CustomSourceState(Type implementingClass) : base(implementingClass) { }
    
        [DataMember]
        public int Increment;
    
        [DataMember]
        public int Count;
    }
    
    [DataContract]
    class CustomTransformState : TransformNodeState<IntMessage, IntMessage>
    {
        public CustomTransformState(Type implementingClass) : base(implementingClass) { }
    
        [DataMember]
        public int Constant;
    }
    
    // Drain state MUST accept GraphMessage
    class CustomDrainState : TransformNodeState<GraphMessage>
    {
        public CustomDrainState(Type implementingClass) : base(implementingClass) { }
        // Empty, but the base is abstract.
    }

    Drain node must be able to accept any message occurring in graph and must therefore be defined as accepting the base class, GraphMessage.

    The classes also illustrate how to integrate with the serialization (attributes, static constructor) framework. Nodes do not need the static constructor because the base class is generic and can do it automatically. Serialization bits can be omitted when not needed.

  2. Define classes for node behavior. The graph framework is designed with asynchronous operation in mind, but here we have no asynchronous work to do, so we just return Task.CompletedTask. Note that each node can access its state data (defined in previous step) through State member.

    C#
    class CustomSourceNode : SourceNode<CustomSourceState, IntMessage>
    {
        public CustomSourceNode(ILifetimeScope lifetimeScope, GraphShell owner, CustomSourceState state)
            : base(lifetimeScope, owner, state) { }
    
        protected override int ConcurrencyLimit => int.MaxValue;
    
        protected override async IAsyncEnumerable<IntMessage> GenerateAsync() {
            for (var i = 0; i < State.Count; ++i)
                yield return new() { Data = i * State.Increment };
        }
    }
    
    class CustomTransformNode : TransformNode<CustomTransformState, IntMessage, IntMessage>
    {
        public CustomTransformNode(ILifetimeScope lifetimeScope, GraphShell owner, CustomTransformState state)
            : base(lifetimeScope, owner, state) { }
    
        protected override int ConcurrencyLimit => int.MaxValue;
    
        // DO NOT modify the message in place.  In real-world workflows, the sam message can be enqueued at many input ports.
        protected override Task ProcessAsync(IntMessage message) {
            Console.WriteLine($"{GetType().Name}{PathId} RECEIVED: {message.Data}");
            Output0.Enqueue(new() { Data = message.Data - State.Constant });
            return Task.CompletedTask;
        }
    }
    
    class CustomDrainNode : DrainNode<CustomDrainState>
    {
        public CustomDrainNode(ILifetimeScope lifetimeScope, GraphShell owner, CustomDrainState state)
            : base(lifetimeScope, owner, state) { }
    
        protected override int ConcurrencyLimit => int.MaxValue;
    
        // Base implementation is a no-op.  Here we know that only a single message type exists in the graph.
        protected override Task ProcessAsync(GraphMessage message) {
            var typed = (IntMessage)message;
            Console.WriteLine($"{GetType().Name}{PathId} RECEIVED: {typed.Data}");
            return Task.CompletedTask;
        }
    }

    MaxConcurrency is a framework feature that limits how many nodes of the same type (as in GetType()) can be simultaneously active in a lifetime scope. Depending from which scope an instance of NodeConcurrencyLimiter is resolved, the limitation may apply either to a single graph, the whole process, or something in between.

Building and running a graph

  1. Prepare an Autofac container (or scope) with node behavior types. Make sure to register the framework types with the scopes as below.

    C#
    private GraphSample() {
        var cb = new ContainerBuilder();
    
        // Node behaviors.
        cb.RegisterType<CustomSourceNode>().AsSelf();
        cb.RegisterType<CustomTransformNode>().AsSelf();
        cb.RegisterType<CustomDrainNode>().AsSelf();
    
        // Always needed by the framework.  A scope MUST be created for each graph run.
        cb.RegisterType<GraphShell>().InstancePerLifetimeScope();
        cb.RegisterType<NodeConcurrencyLimiter>().AsSelf().SingleInstance();
    
        container = cb.Build();
    }
  2. Create node states and connections. Note how the nodes' states are customized with input from the command line.

    C#
    private GraphState BuildGraph() {
        var s = new CustomSourceState(typeof(CustomSourceNode)) {
            Increment = increment,
            Count = count
        };
        var t = new CustomTransformState(typeof(CustomTransformNode)) {
            Constant = constant
        };
        var d = new CustomDrainState(typeof(CustomDrainNode));
    
        s.Output0.Connect(t.Input0);
        t.Output0.Connect(d.Input0);
    
        var g = new GraphState();
        g.Nodes.AddRange([s, t, d]);
        return g;
    }

    The code might seem cumbersome, but users interacted with this through an UI to create own workflow templates. These were deserialized from the project database and customized through UI before each run.

  3. The graph needs an "owner" which is an instance of ITreeIdentity. In QI, this was used to route events from concurrently running graphs to the correct "place" in the UI. For simplicity, we implement the interface on the sample class itself. QI had a separate "job manager" instance that provided unique identities to each job.

    C#
    ITreeIdentity ITreeIdentity.Owner => null;
    TreePathId ITreeIdentity.PathId => default;
    int Schemas.Core.IIdentity<int>.Id => 0;

    The implementation above is suitable for implementing a "root" identity.

  4. Tie it all together by creating a DI scope for the run, instantiating GraphShell from the scope and running the graph.

    C#
    private async Task RunAsync()
    {
        // Create state and set the graph's ID within the parent.
        var graphState = BuildGraph();
        graphState.SetId(this, 1);
    
        // Create a scope for the graph run and register the state instance and "owner".
        // Remember that GraphShell MUST be registered with per lifetime scope.
        using (var scope = container.BeginLifetimeScope(cb => {
            cb.RegisterInstance(graphState);
            cb.RegisterInstance(this).As<ITreeIdentity>();
        })) {
            var graphShell = scope.Resolve<GraphShell>();
            graphShell.Build(); // IMPORTANT!
            await graphShell.RunAsync();
            Console.WriteLine($"FINISHED, EXIT STATUS WAS: {graphShell.State.CompletionState}");
        }
    }

    Neither GraphState nor node states may be reused after a completed run because each run records a trace in Trace. As mentioned elsewhere, QI was a batch execution system.

Compiling the Code

Use Visual Studio to build the solution. Then run it from the command line as below and you should observe output similar to this:

 
Quine.Samples.exe GraphSample 5 3 4

CustomTransformNode{TreePathId`.0.1.3} RECEIVED: 0
CustomDrainNode{TreePathId`.0.1.4} RECEIVED: -4
CustomTransformNode{TreePathId`.0.1.3} RECEIVED: 3
CustomDrainNode{TreePathId`.0.1.4} RECEIVED: -1
CustomTransformNode{TreePathId`.0.1.3} RECEIVED: 6
CustomDrainNode{TreePathId`.0.1.4} RECEIVED: 2
CustomTransformNode{TreePathId`.0.1.3} RECEIVED: 9
CustomDrainNode{TreePathId`.0.1.4} RECEIVED: 5
CustomTransformNode{TreePathId`.0.1.3} RECEIVED: 12
CustomDrainNode{TreePathId`.0.1.4} RECEIVED: 8
FINISHED, EXIT STATUS WAS: Completed

Note how the ID of each node starts with .0.1; 0 is the ID of the parent, provided by the implementation of ITreeIdentity, whereas 1 is provided by the call to SetId. This call automatically assigns IDs to graph elements (nodes and ports).

Robust Programming

When designing new nodes and behaviors, strive to make them reusable: they should not assume anything about their predecessors and successors.

Do NOT modify incoming messages in-place because the same message can be buffered at multiple input ports. ALWAYS create a new message instance for sending to output port.

Every received message is considered as a "unit" on which it is possible to report progress with RaiseProgressEvent. Errors that occur during processing should be signaled by an exception, which is caught by the framework and will cause CompletionState to be set to Error upon completed execution.

In some situation, a node must buffer all received messages before producing any output. To do this, you must in addition override MessageLoopAsync as follows:

C#
try {
    await base.MessageLoopAsync()
} catch (ChannelClosedException) {
    // Process buffered messages, write to outputs.
    throw;  // IMPORTANT!
}

Re-throwing the exception is critical for graph termination. Also, be sure to catch Quine.Graph.ChannelClosedException: an exception with the same name exists also in System.Threading.Channels namespace.