Subscriber

Publisher is the component (delegate) which to be run when publisher requests.

Register

Uses RegisterSubscriber method of the MessageBus instance to register a subscriber.

RegisterSubscriber method requires 3 arguments.

  1. Message name matcher: Choose from a string for matching exactly, or an instance of derived class of MessageNameMatcherBase. There are 3 built-in matcher classes:

    See Matching Method for details.

  2. A method callback: The method is run when publisher requests.

    The method can be in synchronous or asynchronous model.

    • Synchronous model:

      • The 1st parameter: The method must have one parameter for receiving argument passed from publisher.

        The 2nd parameter: The method can have an optional parameter in MessageInstance type to receive the information of the current executing. See Message Instance for details.

        Return: The method can have return type or not.

    • Asynchronous model:

      • The 1st parameter: The method must have one parameter for receiving argument passed from publisher.

        The 2nd parameter: The method can have an optional parameter in MessageInstance type to receive the information of the current executing. See Message Instance for details.

        The last parameter: The method must have one parameter in System.Threading.CancellationToken type.

        Return: The method can have return type as Task or a generic of Task (Task<T> / Task(Of T)).

To modify the behaviour of the subscriber, an instance of MessageBusSubscriberOptions(MessageBusSubscriberOptionsTParameter or MessageBusSubscriberOptionsTParameter, TReturn) can be specified. Default behavior is used when no option supplied. See Option block for details.

An instance of SubscriberTicket (SubscriberTicketTParameter, TDelegate or SubscriberTicketTParameter, TReturn, TDelegate) is returned after RegisterSubscriber called, containing an id for identity, parameter and return types, name matcher, whether the callback is in asynchronous mode, the callback and the instance of the option provided with registering if presents.

Subscriber can be unregistered when no longer needed. Calls UnregisterSubscriber method to unregister the subscriber by the ticket or id specified.

Accepted Result

  Important

MessageBus has different behaviour in this than many other publishing subscription model frameworks by default.

By default, the executing is stopped after one subscriber is called every time when publisher requests unless the IsAlwaysExecuteAll is set as the option while the publisher registering. This behaviour can be modified by setting MessageBusSubscriberOptions. Setting it makes MessageBus behave like most publishing subscription model frameworks.

For the subscriber without return value, if the IsFinal is set to false / False, the result (void) from this subscriber is not treated as accepted. For the subscriber with return value, the ResultCheckingCallback could mark the return value not accepted as well. Therefore the next subscriber is called next.

For all subscribers marked by IsAlwaysExecution are always executed no matter the result is generated by previous ones or not.

When the accepted result is generated by previous subscribers, all return value from the followings which need to be executed is ignored. Therefore, no ResultCheckingCallback or ReturnValueConvertingCallback is invoked.

Option

To modify the behaviour of the publisher, specifies an instance of MessageBusSubscriberOptions(MessageBusSubscriberOptionsTParameter or MessageBusSubscriberOptionsTParameter, TReturn) while registering.

Sequence

Subscribers are ordered by sequence set by Sequence. The default value is 0. Sequences with lower sequence number run earlier. The executing order of sequences with the same sequence number is indeterminate.

C#
public void TestMethod()
{
    using var bus = new MessageBus();
    var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber1000,
        new MessageBusSubscriberOptions<int, int>(sequence: 0, resultCheckingCallback: i => i > 0));
    var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
        new MessageBusSubscriberOptions<int, int>(sequence: 1, resultCheckingCallback: i => i > 0));

    var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));

    var result1000 = publisherTicket.Executor.Execute(1500);
    var result100 = publisherTicket.Executor.Execute(300);
    var result5 = publisherTicket.Executor.Execute(80);

    //after being executed: result1000 == 1000, result 100 == 100, result5 == 5

    bus.UnregisterPublisher(publisherTicket);
    bus.UnregisterSubscriber(subscriber1000Ticket);
    bus.UnregisterSubscriber(subscriber100Ticket);
}

public int Subscriber1000(int argument)
{
    if (argument > 1000)
        return 1000;
    else
        return 0;
}

public int Subscriber100(int argument)
{
    if (argument > 100)
        return 100;
    else
        return 0;
}

IsAlwaysExecution

Subscribers set with IsAlwaysExecution are always executed no matter the result is generated by previous ones or not. When the accepted result is generated by previous subscribers, all return value from the followings which need to be executed is ignored. Therefore, no ResultCheckingCallback or ReturnValueConvertingCallback is invoked.

C#
  public void TestMethod()
{
    using var bus = new MessageBus();
    var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber1000,
        new MessageBusSubscriberOptions<int, int>(sequence: 0, resultCheckingCallback: i => i > 0));
    var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
        new MessageBusSubscriberOptions<int, int>(sequence: 2, resultCheckingCallback: i => i > 0));
    var subscriberForceTicket = bus.RegisterSubscriber<int>("ArgumentMatching", SubscriberForce,
        new MessageBusSubscriberOptions<int>(sequence: 1, isAlwaysExecution: true, isFinal: false));

    var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));

    _forceExecuted = false;
    var result1000 = publisherTicket.Executor.Execute(1500);
    //after being executed: _forceExecuted == true, result1000 == 1000

    _forceExecuted = false;
    var result100 = publisherTicket.Executor.Execute(300);
    //after being executed: _forceExecuted == true, result100 == 100

    _forceExecuted = false;
    var result5 = publisherTicket.Executor.Execute(80);
    //after being executed: _forceExecuted == true, result5 == 5

    bus.UnregisterPublisher(publisherTicket);
    bus.UnregisterSubscriber(subscriber1000Ticket);
    bus.UnregisterSubscriber(subscriber100Ticket);
    bus.UnregisterSubscriber(subscriberForceTicket);
}

public int Subscriber1000(int argument)
{
    if (argument > 1000)
        return 1000;
    else
        return 0;
}

public int Subscriber100(int argument)
{
    if (argument > 100)
        return 100;
    else
        return 0;
}

private bool _forceExecuted;
public void SubscriberForce(int argument)
{
    _forceExecuted = true;
}

In the demo above, the subscriber registered as subscriberForceTicket is marked to be run always.

C#
var subscriberForceTicket = bus.RegisterSubscriber<int>("ArgumentMatching", SubscriberForce,
    new MessageBusSubscriberOptions<int>(sequence: 1, isAlwaysExecution: true, isFinal: false));

IsFinal

  Note

This option is available only for the subscriber without return value.

When the IsFinal is set to false / False, the return value (void) from this saubscriber is not treated as acceptable.

ConditionCheckingCallback

If ConditionCheckingCallback is specified, the callback is invoked to check whether this subscriber should be executed in this instance with the argument specified.

C#
public void TestMethod()
{
    using var bus = new MessageBus();
    var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ConditionMatching", Subscriber1000,
        new MessageBusSubscriberOptions<int, int>(sequence: 0, conditionCheckingCallback: i => i is int and > 1000));
    var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ConditionMatching", Subscriber100,
        new MessageBusSubscriberOptions<int, int>(sequence: 1, conditionCheckingCallback: i => i is int and > 100));

    var publisherTicket = bus.RegisterPublisher<int, int>("ConditionMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));

    var result1000 = publisherTicket.Executor.Execute(1500);
    var result100 = publisherTicket.Executor.Execute(300);
    var result5 = publisherTicket.Executor.Execute(80);

    //after being executed: result1000 == 1000, result 100 == 100, result5 == 5

    bus.UnregisterPublisher(publisherTicket);
    bus.UnregisterSubscriber(subscriber1000Ticket);
    bus.UnregisterSubscriber(subscriber100Ticket);
}

public int Subscriber1000(int argument)
{
    return 1000;
}

public int Subscriber100(int argument)
{
    return 100;
}

ArgumentConvertingCallback

If ArgumentConvertingCallback is specified, the callback is invoked to convert the argument the before subscriber executing.

C#
public void TestMethod()
{
    using var bus = new MessageBus();
    var subscriber1000Ticket = bus.RegisterSubscriber<string, string>("ArgumentMatching", Subscriber1000,
        new MessageBusSubscriberOptions<string, string>(
            sequence: 0,
            resultCheckingCallback: i => i != null,
            argumentConvertingCallback: i => i?.ToString(),
            returnValueConvertingCallback: s => int.Parse(s!)));
    var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
        new MessageBusSubscriberOptions<int, int>(
            sequence: 1,
            resultCheckingCallback: i => i > 0));

    var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));

    var result1000 = publisherTicket.Executor.Execute(1000);
    var result100 = publisherTicket.Executor.Execute(300);
    var result5 = publisherTicket.Executor.Execute(80);

    //after being executed: result1000 == 1000, result 100 == 100, result5 == 5

    bus.UnregisterPublisher(publisherTicket);
    bus.UnregisterSubscriber(subscriber1000Ticket);
    bus.UnregisterSubscriber(subscriber100Ticket);
}

public string? Subscriber1000(string? argument)
{
    if (argument == "1000")
        return "1000";
    else
        return null;
}

public int Subscriber100(int argument)
{
    if (argument > 100)
        return 100;
    else
        return 0;
}

In the demo above, the subscriber registered as subscriber1000Ticket is set with this conversion function.

C#
var subscriber1000Ticket = bus.RegisterSubscriber<string, string>("ArgumentMatching", Subscriber1000,
    new MessageBusSubscriberOptions<string, string>(
        sequence: 0,
        resultCheckingCallback: i => i != null,
        argumentConvertingCallback: i => i?.ToString(),
        returnValueConvertingCallback: s => int.Parse(s!)));

ResultCheckingCallback

  Note

This option is available only for the subscriber with return value.

If ResultCheckingCallback is specified, the callback is invoked to check whether message should be returned instead of executing subsequent subscribers. See Accepted Result for details.

C#
public void TestMethod()
{
    using var bus = new MessageBus();
    var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber1000,
        new MessageBusSubscriberOptions<int, int>(sequence: 0, resultCheckingCallback: i => i > 0));
    var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
        new MessageBusSubscriberOptions<int, int>(sequence: 1, resultCheckingCallback: i => i > 0));

    var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));

    var result1000 = publisherTicket.Executor.Execute(1500);
    var result100 = publisherTicket.Executor.Execute(300);
    var result5 = publisherTicket.Executor.Execute(80);

    //after being executed: result1000 == 1000, result 100 == 100, result5 == 5

    bus.UnregisterPublisher(publisherTicket);
    bus.UnregisterSubscriber(subscriber1000Ticket);
    bus.UnregisterSubscriber(subscriber100Ticket);
}

public int Subscriber1000(int argument)
{
    if (argument > 1000)
        return 1000;
    else
        return 0;
}

public int Subscriber100(int argument)
{
    if (argument > 100)
        return 100;
    else
        return 0;
}

In the demo above, the subscribers registered as subscriber1000Ticket and subscriber100Ticket is set with a checker callback to check whether the return value is larger than 0. Therefore, when these methods return 0, the return value is not treated as accepatable and the following subscribers continue to be executed.

ReturnValueConvertingCallback

  Note

This option is available only for the subscriber with return value.

If ReturnValueConvertingCallback is specified, the callback is invoked to convert the return value before returning to publisher.

C#
public void TestMethod()
{
    using var bus = new MessageBus();
    var subscriber1000Ticket = bus.RegisterSubscriber<string, string>("ArgumentMatching", Subscriber1000,
        new MessageBusSubscriberOptions<string, string>(
            sequence: 0,
            resultCheckingCallback: i => i != null,
            argumentConvertingCallback: i => i?.ToString(),
            returnValueConvertingCallback: s => int.Parse(s!)));
    var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
        new MessageBusSubscriberOptions<int, int>(
            sequence: 1,
            resultCheckingCallback: i => i > 0));

    var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));

    var result1000 = publisherTicket.Executor.Execute(1000);
    var result100 = publisherTicket.Executor.Execute(300);
    var result5 = publisherTicket.Executor.Execute(80);

    //after being executed: result1000 == 1000, result 100 == 100, result5 == 5

    bus.UnregisterPublisher(publisherTicket);
    bus.UnregisterSubscriber(subscriber1000Ticket);
    bus.UnregisterSubscriber(subscriber100Ticket);
}

public string? Subscriber1000(string? argument)
{
    if (argument == "1000")
        return "1000";
    else
        return null;
}

public int Subscriber100(int argument)
{
    if (argument > 100)
        return 100;
    else
        return 0;
}

In the demo above, the subscriber registered as subscriber1000Ticket is set with this conversion function.

C#
var subscriber1000Ticket = bus.RegisterSubscriber<string, string>("ArgumentMatching", Subscriber1000,
    new MessageBusSubscriberOptions<string, string>(
        sequence: 0,
        resultCheckingCallback: i => i != null,
        argumentConvertingCallback: i => i?.ToString(),
        returnValueConvertingCallback: s => int.Parse(s!)));

See Also