Publisher

Publisher is the component which publish message to all subscribers.

Register

Uses RegisterPublisher method of the MessageBus instance to register a publisher.

  • RegisterPublisher: Register a publisher without return value. No return value is collected while this publisher executing.

  • RegisterPublisher: Register a publisher with return value supported. Return value is transferred back to the publisher after executed.

Message name is required when registering publisher. The name is used for matching subscribers.

To modify the behaviour of the publisher, an instance of MessageBusPublisherOptions(MessageBusPublisherOptionsTParameter or MessageBusPublisherOptionsTParameter, TReturn) can be specified. Default behavior is used when no option supplied. See Option block for details.

An instance of PublisherTicket (PublisherTicketTParameter or PublisherTicketTParameter, TReturn) is returned after RegisterPublisher called, containing an id for identifying, an executor for running publisher and the instance of the option provided with registering if presents.

Publisher can be unregistered when no longer needed. Calls UnregisterPublisher method to unregister the publisher by the ticket or id specified.

Executor

There is a property named Executor in PublisherTicket which contains 4 methods to execute the sequence. For detailed executing sequence, refer to Executing.

With Return Value

For the publisher registered with return value, property Executor contains 4 methods:

When no acceptable return value is generated from any subscriber, the default value is returned if presents.

Without Return Value

For the publisher registered without return value, property Executor contains 4 methods:

Sync / Async

When executes by synchronous methods: Synchronous methods are run directly. Each asynchronous method is called with Task.Wait().

When executes by asynchronous methods: Synchronous methods are run directly. Each asynchronous method is called with await keyword.

C#
public async Task TestMethodAsync()
{
    using var bus = new MessageBus();
    var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber1000,
        new MessageBusSubscriberOptions<int, int>(sequence: 0, resultCheckingCallback: i => i > 0));
    var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100Async,
        new MessageBusSubscriberOptions<int, int>(sequence: 1, resultCheckingCallback: i => i > 0));

    var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));

    var result1000 = publisherTicket.Executor.Execute(1500);
    var result100 = publisherTicket.Executor.Execute(300);
    var result5 = publisherTicket.Executor.Execute(80);
    var result1000Async = await publisherTicket.Executor.ExecuteAsync(1500);

    using var cancellationTokenSource = new CancellationTokenSource();
    cancellationTokenSource.Cancel();
    Exception e100Async = null!;
    try
    {
        var result100Async = await publisherTicket.Executor.ExecuteAsync(300, cancellationTokenSource.Token);
    }
    catch (Exception e)
    {
        e100Async = e;
    }

    var result5BAsync = await publisherTicket.Executor.ExecuteAsync(80);

    //after being executed: result1000 == 1000, result100 == 100, result5 == 5, e100Async is OperationCanceledException, result5BAsync == 5

    bus.UnregisterPublisher(publisherTicket);
    bus.UnregisterSubscriber(subscriber1000Ticket);
    bus.UnregisterSubscriber(subscriber100Ticket);
}

public int Subscriber1000(int argument)
{
    if (argument > 1000)
        return 1000;
    else
        return 0;
}

public async Task<int> Subscriber100Async(int argument, CancellationToken cancellationToken)
{
    await Task.Delay(100, cancellationToken);
    if (argument > 100)
        return 100;
    else
        return 0;
}

Execute Once

By creating publisher ticket and run from the executor, reusable information is saved for multiple calling but saving the costs time and memory. ExecuteOnce methods provide easier and more effective way to run the message which only need to be run once by design.

ExecuteOnce methods includes:

These methods are similar to ones defined in Executor with message name specified.

Option

To modify the behaviour of the publisher, specifies an instance of MessageBusPublisherOptions(MessageBusPublisherOptionsTParameter or MessageBusPublisherOptionsTParameter, TReturn) while registering.

IsAlwaysExecuteAll

When IsAlwaysExecuteAll is set, all subscribers linked with this message name should be executed regardless of the result of the subscribers those have been executed by this instance.

C#
public void TestMethod()
{
    using var bus = new MessageBus();
    var subscriberExact1Ticket = bus.RegisterSubscriber<string>("Hello", SubscriberExact1);
    var subscriberExact2Ticket = bus.RegisterSubscriber<string>("hello", SubscriberExact2);
    var subscriberIgnoreCaseTicket = bus.RegisterSubscriber<string>(new MessageNameMatchingWithStringComparison("HELLO", StringComparison.OrdinalIgnoreCase), SubscriberIgnoreCase);
    var subscriberAllTicket = bus.RegisterSubscriber<string>(new MessageNameMatchingAll(), SubscriberAll);
    var subscriberRegEx1Ticket = bus.RegisterSubscriber<string>(new MessageNameMatchingWithRegularExpression("[Hh]ello"), SubscriberRegEx1);
    var subscriberRegEx2Ticket = bus.RegisterSubscriber<string>(new MessageNameMatchingWithRegularExpression("hello"), SubscriberRegEx2);

    var text = "Hello World";

    var publisherTicket = bus.RegisterPublisher<string>("Hello", new MessageBusPublisherOptions<string>(true));
    publisherTicket.Executor.Execute(text);

    bus.UnregisterPublisher(publisherTicket);
    bus.UnregisterSubscriber(subscriberExact1Ticket);
    bus.UnregisterSubscriber(subscriberExact2Ticket);
    bus.UnregisterSubscriber(subscriberIgnoreCaseTicket);
    bus.UnregisterSubscriber(subscriberAllTicket);
    bus.UnregisterSubscriber(subscriberRegEx1Ticket);
    bus.UnregisterSubscriber(subscriberRegEx2Ticket);

    //after being executed: _exact1, _ignoreCase, _all and _regex1 are equal to text; _exact2 and _regex2 are not.
}

private string? _exact1, _exact2, _ignoreCase, _all, _regex1 , _regex2;

public void SubscriberExact1(string? argument)
{
    _exact1 = argument;
}

public void SubscriberExact2(string? argument)
{
    _exact2 = argument;
}

public void SubscriberIgnoreCase(string? argument)
{
    _ignoreCase = argument;
}

public void SubscriberAll(string? argument)
{
    _all = argument;
}

public void SubscriberRegEx1(string? argument)
{
    _regex1 = argument;
}

public void SubscriberRegEx2(string? argument)
{
    _regex2 = argument;
}

In the demo above, the publisher is registered with IsAlwaysExecuteAll.

ArgumentConvertingCallback

If ArgumentConvertingCallback is specified, the callback is invoked to convert the argument the when publisher executing. See executing sequence for details.

C#
public void TestMethod()
{
    using var bus = new MessageBus();
    var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber1000,
        new MessageBusSubscriberOptions<int, int>(sequence: 0, resultCheckingCallback: i => i > 0));
    var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
        new MessageBusSubscriberOptions<int, int>(sequence: 1, resultCheckingCallback: i => i > 0));

    var publisherTicket = bus.RegisterPublisher<string, string>("ArgumentMatching",
        new MessageBusPublisherOptions<string, string>(
            defaultReturnValue: "5", 
            argumentConvertingCallback: s => int.Parse(s!),
            returnValueConvertingCallback: i => i?.ToString()));

    var result1000 = publisherTicket.Executor.Execute("1500");
    var result100 = publisherTicket.Executor.Execute("300");
    var result5 = publisherTicket.Executor.Execute("80");

    //after being executed: result1000 == 1000, result 100 == 100, result5 == 5


    bus.UnregisterPublisher(publisherTicket);
    bus.UnregisterSubscriber(subscriber1000Ticket);
    bus.UnregisterSubscriber(subscriber100Ticket);
}

public int Subscriber1000(int argument)
{
    if (argument > 1000)
        return 1000;
    else
        return 0;
}

public int Subscriber100(int argument)
{
    if (argument > 100)
        return 100;
    else
        return 0;
}

In the demo above, the publisher is registered with ArgumentConvertingCallback specified.

C#
var publisherTicket = bus.RegisterPublisher<string, string>("ArgumentMatching",
    new MessageBusPublisherOptions<string, string>(
        defaultReturnValue: "5", 
        argumentConvertingCallback: s => int.Parse(s!),
        returnValueConvertingCallback: i => i?.ToString()));

ReturnValueConvertingCallback

  Note

This option is available only for the publisher with return value.

If ReturnValueConvertingCallback is specified, the callback is invoked to convert the return value before publisher returning. See executing sequence for details.

C#
public void TestMethod()
{
    using var bus = new MessageBus();
    var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber1000,
        new MessageBusSubscriberOptions<int, int>(sequence: 0, resultCheckingCallback: i => i > 0));
    var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
        new MessageBusSubscriberOptions<int, int>(sequence: 1, resultCheckingCallback: i => i > 0));

    var publisherTicket = bus.RegisterPublisher<string, string>("ArgumentMatching",
        new MessageBusPublisherOptions<string, string>(
            defaultReturnValue: "5", 
            argumentConvertingCallback: s => int.Parse(s!),
            returnValueConvertingCallback: i => i?.ToString()));

    var result1000 = publisherTicket.Executor.Execute("1500");
    var result100 = publisherTicket.Executor.Execute("300");
    var result5 = publisherTicket.Executor.Execute("80");

    //after being executed: result1000 == 1000, result 100 == 100, result5 == 5


    bus.UnregisterPublisher(publisherTicket);
    bus.UnregisterSubscriber(subscriber1000Ticket);
    bus.UnregisterSubscriber(subscriber100Ticket);
}

public int Subscriber1000(int argument)
{
    if (argument > 1000)
        return 1000;
    else
        return 0;
}

public int Subscriber100(int argument)
{
    if (argument > 100)
        return 100;
    else
        return 0;
}

In the demo above, the publisher is registered with ReturnValueConvertingCallback specified.

C#
var publisherTicket = bus.RegisterPublisher<string, string>("ArgumentMatching",
    new MessageBusPublisherOptions<string, string>(
        defaultReturnValue: "5", 
        argumentConvertingCallback: s => int.Parse(s!),
        returnValueConvertingCallback: i => i?.ToString()));

DefaultReturnValue

  Note

This option is available only for the publisher with return value.

When DefaultReturnValue is set, the value is returned when no subscriber executed with accepted value returns.

C#
public void TestMethod()
{
    using var bus = new MessageBus();

    var subscriberHaveReturnTicket = bus.RegisterSubscriber<int, int>("Return", HaveReturn,
        new MessageBusSubscriberOptions<int, int>(sequence: 0));
    var subscriberNoReturnNoFinalTicket = bus.RegisterSubscriber<int>(new MessageNameMatchingAll(), NoReturn,
        new MessageBusSubscriberOptions<int>(sequence: 1, isFinal: false));
    var subscriberNoReturnTicket = bus.RegisterSubscriber<int>("NoReturnButFinal", NoReturn,
        new MessageBusSubscriberOptions<int>(sequence: 2));

    var publisher1Ticket = bus.RegisterPublisher<int, int>("Return", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 100));
    var publisher2Ticket = bus.RegisterPublisher<int, int>("SomethingElse", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 100));
    var publisher3Ticket = bus.RegisterPublisher<int, int>("NoReturnButFinal", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 100));

    _haveReturnExecuted = false;
    _noReturnExecuted = false;
    var result11 = publisher1Ticket.Executor.Execute(10);
    //after being executed: _haveReturnExecuted == true; _noReturnExecuted == false;

    _haveReturnExecuted = false;
    _noReturnExecuted = false;
    var result100 = publisher2Ticket.Executor.ExecuteAndGetMessageInstance(0, out var messageInstance1);
    //after being executed: _haveReturnExecuted == false; _noReturnExecuted == true;

    _haveReturnExecuted = false;
    _noReturnExecuted = false;
    var resultDefault = publisher3Ticket.Executor.ExecuteAndGetMessageInstance(0, out var messageInstance2);
    //after being executed: _haveReturnExecuted == false; _noReturnExecuted == true;

    bus.UnregisterSubscriber(subscriberHaveReturnTicket);
    bus.UnregisterSubscriber(subscriberNoReturnNoFinalTicket);
    bus.UnregisterSubscriber(subscriberNoReturnTicket);
    bus.UnregisterPublisher(publisher1Ticket);
    bus.UnregisterPublisher(publisher2Ticket);
    bus.UnregisterPublisher(publisher3Ticket);
}

private bool _haveReturnExecuted, _noReturnExecuted;
public int HaveReturn(int something)
{
    _haveReturnExecuted = true;
    return something + 1;
}

public void NoReturn(int something)
{
    _noReturnExecuted = true;
}

In the demo above, the executing of publisher2Ticket gets the default return value specified.

  Important

If any subscriber returned with accepted result, no matter the value is null / Nothing / default value or not, the DefaultReturnValue is not used.

In the demo above, the executing of publisher3Ticket gets 0 as result, which is the default value of the type int / Integer.

See Also