Important
MessageBus has different behaviour in this than many other publishing subscription model frameworks by default.
Publisher is the component (delegate) which to be run when publisher requests.
Uses RegisterSubscriber method of the MessageBus instance to register a subscriber.
RegisterSubscriber method requires 3 arguments.
Message name matcher: Choose from a string for matching exactly, or an instance of derived class of MessageNameMatcherBase. There are 3 built-in matcher classes:
MessageNameMatchingWithStringComparison: Matching using the comparer specified by StringComparison.
MessageNameMatchingAll: Matching all publishers.
MessageNameMatchingWithRegularExpression: Matching based on regular expression specified.
See Matching Method for details.
A method callback: The method is run when publisher requests.
The method can be in synchronous or asynchronous model.
Synchronous model:
The 1st parameter: The method must have one parameter for receiving argument passed from publisher.
The 2nd parameter: The method can have an optional parameter in MessageInstance type to receive the information of the current executing. See Message Instance for details.
Return: The method can have return type or not.
Asynchronous model:
The 1st parameter: The method must have one parameter for receiving argument passed from publisher.
The 2nd parameter: The method can have an optional parameter in MessageInstance type to receive the information of the current executing. See Message Instance for details.
The last parameter: The method must have one parameter in System.Threading.CancellationToken type.
Return: The method can have return type as Task or a generic of Task (Task<T> / Task(Of T)).
To modify the behaviour of the subscriber, an instance of MessageBusSubscriberOptions(MessageBusSubscriberOptionsTParameter or MessageBusSubscriberOptionsTParameter, TReturn) can be specified. Default behavior is used when no option supplied. See Option block for details.
An instance of SubscriberTicket (SubscriberTicketTParameter, TDelegate or SubscriberTicketTParameter, TReturn, TDelegate) is returned after RegisterSubscriber called, containing an id for identity, parameter and return types, name matcher, whether the callback is in asynchronous mode, the callback and the instance of the option provided with registering if presents.
Subscriber can be unregistered when no longer needed. Calls UnregisterSubscriber method to unregister the subscriber by the ticket or id specified.
By default, the executing is stopped after one subscriber is called every time when publisher requests unless the IsAlwaysExecuteAll is set as the option while the publisher registering. This behaviour can be modified by setting MessageBusSubscriberOptions. Setting it makes MessageBus behave like most publishing subscription model frameworks.
For the subscriber without return value, if the IsFinal is set to false / False, the result (void) from this subscriber is not treated as accepted. For the subscriber with return value, the ResultCheckingCallback could mark the return value not accepted as well. Therefore the next subscriber is called next.
For all subscribers marked by IsAlwaysExecution are always executed no matter the result is generated by previous ones or not.
When the accepted result is generated by previous subscribers, all return value from the followings which need to be executed is ignored. Therefore, no ResultCheckingCallback or ReturnValueConvertingCallback is invoked.
To modify the behaviour of the publisher, specifies an instance of MessageBusSubscriberOptions(MessageBusSubscriberOptionsTParameter or MessageBusSubscriberOptionsTParameter, TReturn) while registering.
Subscribers are ordered by sequence set by Sequence. The default value is 0. Sequences with lower sequence number run earlier. The executing order of sequences with the same sequence number is indeterminate.
public void TestMethod()
{
using var bus = new MessageBus();
var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber1000,
new MessageBusSubscriberOptions<int, int>(sequence: 0, resultCheckingCallback: i => i > 0));
var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
new MessageBusSubscriberOptions<int, int>(sequence: 1, resultCheckingCallback: i => i > 0));
var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));
var result1000 = publisherTicket.Executor.Execute(1500);
var result100 = publisherTicket.Executor.Execute(300);
var result5 = publisherTicket.Executor.Execute(80);
//after being executed: result1000 == 1000, result 100 == 100, result5 == 5
bus.UnregisterPublisher(publisherTicket);
bus.UnregisterSubscriber(subscriber1000Ticket);
bus.UnregisterSubscriber(subscriber100Ticket);
}
public int Subscriber1000(int argument)
{
if (argument > 1000)
return 1000;
else
return 0;
}
public int Subscriber100(int argument)
{
if (argument > 100)
return 100;
else
return 0;
}
Public Sub TestMethod()
Using bus As New MessageBus
Dim subscriber1000Ticket = bus.RegisterSubscriber(Of Integer, Integer)("ArgumentMatching", AddressOf Subscriber1000, New MessageBusSubscriberOptions(Of Integer, Integer)(sequence:=0, resultCheckingCallback:=Function(i) i > 0))
Dim subscriber100Ticket = bus.RegisterSubscriber(Of Integer, Integer)("ArgumentMatching", AddressOf Subscriber100, New MessageBusSubscriberOptions(Of Integer, Integer)(sequence:=1, resultCheckingCallback:=Function(i) i > 0))
Dim publisherTicket = bus.RegisterPublisher(Of Integer, Integer)("ArgumentMatching", New MessageBusPublisherOptions(Of Integer, Integer)(defaultReturnValue:=5))
Dim result1000 = publisherTicket.Executor.Execute(1500)
Dim result100 = publisherTicket.Executor.Execute(300)
Dim result5 = publisherTicket.Executor.Execute(80)
'after being executed: result1000 = 1000, result100 = 100, result5 = 5
bus.UnregisterPublisher(publisherTicket)
bus.UnregisterSubscriber(subscriber1000Ticket)
bus.UnregisterSubscriber(subscriber100Ticket)
End Using
End Sub
Public Function Subscriber1000(argument As Integer) As Integer
If argument > 1000 Then
Return 1000
Else
Return 0
End If
End Function
Public Function Subscriber100(argument As Integer) As Integer
If argument > 100 Then
Return 100
Else
Return 0
End If
End Function
Subscribers set with IsAlwaysExecution are always executed no matter the result is generated by previous ones or not. When the accepted result is generated by previous subscribers, all return value from the followings which need to be executed is ignored. Therefore, no ResultCheckingCallback or ReturnValueConvertingCallback is invoked.
public void TestMethod()
{
using var bus = new MessageBus();
var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber1000,
new MessageBusSubscriberOptions<int, int>(sequence: 0, resultCheckingCallback: i => i > 0));
var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
new MessageBusSubscriberOptions<int, int>(sequence: 2, resultCheckingCallback: i => i > 0));
var subscriberForceTicket = bus.RegisterSubscriber<int>("ArgumentMatching", SubscriberForce,
new MessageBusSubscriberOptions<int>(sequence: 1, isAlwaysExecution: true, isFinal: false));
var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));
_forceExecuted = false;
var result1000 = publisherTicket.Executor.Execute(1500);
//after being executed: _forceExecuted == true, result1000 == 1000
_forceExecuted = false;
var result100 = publisherTicket.Executor.Execute(300);
//after being executed: _forceExecuted == true, result100 == 100
_forceExecuted = false;
var result5 = publisherTicket.Executor.Execute(80);
//after being executed: _forceExecuted == true, result5 == 5
bus.UnregisterPublisher(publisherTicket);
bus.UnregisterSubscriber(subscriber1000Ticket);
bus.UnregisterSubscriber(subscriber100Ticket);
bus.UnregisterSubscriber(subscriberForceTicket);
}
public int Subscriber1000(int argument)
{
if (argument > 1000)
return 1000;
else
return 0;
}
public int Subscriber100(int argument)
{
if (argument > 100)
return 100;
else
return 0;
}
private bool _forceExecuted;
public void SubscriberForce(int argument)
{
_forceExecuted = true;
}
Sub TestMethod()
Using bus As New MessageBus
Dim subscriber1000Ticket = bus.RegisterSubscriber(Of Integer, Integer)("ArgumentMatching", AddressOf Subscriber1000, New MessageBusSubscriberOptions(Of Integer, Integer)(sequence:=0, resultCheckingCallback:=Function(i) i > 0))
Dim subscriber100Ticket = bus.RegisterSubscriber(Of Integer, Integer)("ArgumentMatching", AddressOf Subscriber100, New MessageBusSubscriberOptions(Of Integer, Integer)(sequence:=2, resultCheckingCallback:=Function(i) i > 0))
Dim subscriberForceTicket = bus.RegisterSubscriber(Of Integer)("ArgumentMatching", AddressOf SubscriberForce, New MessageBusSubscriberOptions(Of Integer)(sequence:=1, isAlwaysExecution:=True, isFinal:=False))
Dim publisherTicket = bus.RegisterPublisher(Of Integer, Integer)("ArgumentMatching", New MessageBusPublisherOptions(Of Integer, Integer)(defaultReturnValue:=5))
_forceExecuted = False
Dim result1000 = publisherTicket.Executor.Execute(1500)
'after being executed: _forceExecuted = True, result1000 = 1000
_forceExecuted = False
Dim result100 = publisherTicket.Executor.Execute(300)
'after being executed: _forceExecuted = True, result100 = 100
_forceExecuted = False
Dim result5 = publisherTicket.Executor.Execute(80)
'after being executed: _forceExecuted = True, result5 = 5
bus.UnregisterPublisher(publisherTicket)
bus.UnregisterSubscriber(subscriber1000Ticket)
bus.UnregisterSubscriber(subscriber100Ticket)
bus.UnregisterSubscriber(subscriberForceTicket)
End Using
End Sub
Public Function Subscriber1000(argument As Integer) As Integer
If argument > 1000 Then
Return 1000
Else
Return 0
End If
End Function
Public Function Subscriber100(argument As Integer) As Integer
If argument > 100 Then
Return 100
Else
Return 0
End If
End Function
Private _forceExecuted As Boolean
Public Sub SubscriberForce(argument As Integer)
_forceExecuted = True
End Sub
In the demo above, the subscriber registered as subscriberForceTicket is marked to be run always.
var subscriberForceTicket = bus.RegisterSubscriber<int>("ArgumentMatching", SubscriberForce,
new MessageBusSubscriberOptions<int>(sequence: 1, isAlwaysExecution: true, isFinal: false));
Dim subscriberForceTicket = bus.RegisterSubscriber(Of Integer)("ArgumentMatching", AddressOf SubscriberForce, New MessageBusSubscriberOptions(Of Integer)(sequence:=1, isAlwaysExecution:=True, isFinal:=False))
When the IsFinal is set to false / False, the return value (void) from this saubscriber is not treated as acceptable.
If ConditionCheckingCallback is specified, the callback is invoked to check whether this subscriber should be executed in this instance with the argument specified.
public void TestMethod()
{
using var bus = new MessageBus();
var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ConditionMatching", Subscriber1000,
new MessageBusSubscriberOptions<int, int>(sequence: 0, conditionCheckingCallback: i => i is int and > 1000));
var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ConditionMatching", Subscriber100,
new MessageBusSubscriberOptions<int, int>(sequence: 1, conditionCheckingCallback: i => i is int and > 100));
var publisherTicket = bus.RegisterPublisher<int, int>("ConditionMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));
var result1000 = publisherTicket.Executor.Execute(1500);
var result100 = publisherTicket.Executor.Execute(300);
var result5 = publisherTicket.Executor.Execute(80);
//after being executed: result1000 == 1000, result 100 == 100, result5 == 5
bus.UnregisterPublisher(publisherTicket);
bus.UnregisterSubscriber(subscriber1000Ticket);
bus.UnregisterSubscriber(subscriber100Ticket);
}
public int Subscriber1000(int argument)
{
return 1000;
}
public int Subscriber100(int argument)
{
return 100;
}
Public Sub TestMethod()
Using bus As New MessageBus
Dim subscriber1000Ticket = bus.RegisterSubscriber(Of Integer, Integer)("ConditionMatching", AddressOf Subscriber1000, New MessageBusSubscriberOptions(Of Integer, Integer)(sequence:=0, conditionCheckingCallback:=Function(i) i > 1000))
Dim subscriber100Ticket = bus.RegisterSubscriber(Of Integer, Integer)("ConditionMatching", AddressOf Subscriber100, New MessageBusSubscriberOptions(Of Integer, Integer)(sequence:=1, conditionCheckingCallback:=Function(i) i > 100))
Dim publisherTicket = bus.RegisterPublisher(Of Integer, Integer)("ConditionMatching", New MessageBusPublisherOptions(Of Integer, Integer)(defaultReturnValue:=5))
Dim result1000 = publisherTicket.Executor.Execute(1500)
Dim result100 = publisherTicket.Executor.Execute(300)
Dim result5 = publisherTicket.Executor.Execute(80)
'after being executed: result1000 = 1000, result100 = 100, result5 = 5
bus.UnregisterPublisher(publisherTicket)
bus.UnregisterSubscriber(subscriber1000Ticket)
bus.UnregisterSubscriber(subscriber100Ticket)
End Using
End Sub
Public Function Subscriber1000(argument As Integer) As Integer
Return 1000
End Function
Public Function Subscriber100(argument As Integer) As Integer
Return 100
End Function
If ArgumentConvertingCallback is specified, the callback is invoked to convert the argument the before subscriber executing.
public void TestMethod()
{
using var bus = new MessageBus();
var subscriber1000Ticket = bus.RegisterSubscriber<string, string>("ArgumentMatching", Subscriber1000,
new MessageBusSubscriberOptions<string, string>(
sequence: 0,
resultCheckingCallback: i => i != null,
argumentConvertingCallback: i => i?.ToString(),
returnValueConvertingCallback: s => int.Parse(s!)));
var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
new MessageBusSubscriberOptions<int, int>(
sequence: 1,
resultCheckingCallback: i => i > 0));
var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));
var result1000 = publisherTicket.Executor.Execute(1000);
var result100 = publisherTicket.Executor.Execute(300);
var result5 = publisherTicket.Executor.Execute(80);
//after being executed: result1000 == 1000, result 100 == 100, result5 == 5
bus.UnregisterPublisher(publisherTicket);
bus.UnregisterSubscriber(subscriber1000Ticket);
bus.UnregisterSubscriber(subscriber100Ticket);
}
public string? Subscriber1000(string? argument)
{
if (argument == "1000")
return "1000";
else
return null;
}
public int Subscriber100(int argument)
{
if (argument > 100)
return 100;
else
return 0;
}
Public Sub TestMethod()
Using bus As New MessageBus
Dim subscriber1000Ticket = bus.RegisterSubscriber(Of String, String)("ArgumentMatching", AddressOf Subscriber1000, New MessageBusSubscriberOptions(Of String, String)(sequence:=0, resultCheckingCallback:=Function(i) i IsNot Nothing, argumentConvertingCallback:=Function(i) i?.ToString(), returnValueConvertingCallback:=Function(s) Integer.Parse(s)))
Dim subscriber100Ticket = bus.RegisterSubscriber(Of Integer, Integer)("ArgumentMatching", AddressOf Subscriber100, New MessageBusSubscriberOptions(Of Integer, Integer)(sequence:=1, resultCheckingCallback:=Function(i) i > 0))
Dim publisherTicket = bus.RegisterPublisher(Of Integer, Integer)("ArgumentMatching", New MessageBusPublisherOptions(Of Integer, Integer)(defaultReturnValue:=5))
Dim result1000 = publisherTicket.Executor.Execute(1000)
Dim result100 = publisherTicket.Executor.Execute(300)
Dim result5 = publisherTicket.Executor.Execute(80)
'after being executed: result1000 = 1000, result100 = 100, result5 = 5
bus.UnregisterPublisher(publisherTicket)
bus.UnregisterSubscriber(subscriber1000Ticket)
bus.UnregisterSubscriber(subscriber100Ticket)
End Using
End Sub
Public Shared Function Subscriber1000(argument As String) As String
If argument = "1000" Then
Return "1000"
Else
Return Nothing
End If
End Function
Public Shared Function Subscriber100(argument As Integer) As Integer
If argument > 100 Then
Return 100
Else
Return 0
End If
End Function
In the demo above, the subscriber registered as subscriber1000Ticket is set with this conversion function.
var subscriber1000Ticket = bus.RegisterSubscriber<string, string>("ArgumentMatching", Subscriber1000,
new MessageBusSubscriberOptions<string, string>(
sequence: 0,
resultCheckingCallback: i => i != null,
argumentConvertingCallback: i => i?.ToString(),
returnValueConvertingCallback: s => int.Parse(s!)));
Dim subscriber1000Ticket = bus.RegisterSubscriber(Of String, String)("ArgumentMatching", AddressOf Subscriber1000, New MessageBusSubscriberOptions(Of String, String)(sequence:=0, resultCheckingCallback:=Function(i) i IsNot Nothing, argumentConvertingCallback:=Function(i) i?.ToString(), returnValueConvertingCallback:=Function(s) Integer.Parse(s)))
If ResultCheckingCallback is specified, the callback is invoked to check whether message should be returned instead of executing subsequent subscribers. See Accepted Result for details.
public void TestMethod()
{
using var bus = new MessageBus();
var subscriber1000Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber1000,
new MessageBusSubscriberOptions<int, int>(sequence: 0, resultCheckingCallback: i => i > 0));
var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
new MessageBusSubscriberOptions<int, int>(sequence: 1, resultCheckingCallback: i => i > 0));
var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));
var result1000 = publisherTicket.Executor.Execute(1500);
var result100 = publisherTicket.Executor.Execute(300);
var result5 = publisherTicket.Executor.Execute(80);
//after being executed: result1000 == 1000, result 100 == 100, result5 == 5
bus.UnregisterPublisher(publisherTicket);
bus.UnregisterSubscriber(subscriber1000Ticket);
bus.UnregisterSubscriber(subscriber100Ticket);
}
public int Subscriber1000(int argument)
{
if (argument > 1000)
return 1000;
else
return 0;
}
public int Subscriber100(int argument)
{
if (argument > 100)
return 100;
else
return 0;
}
Public Sub TestMethod()
Using bus As New MessageBus
Dim subscriber1000Ticket = bus.RegisterSubscriber(Of Integer, Integer)("ArgumentMatching", AddressOf Subscriber1000, New MessageBusSubscriberOptions(Of Integer, Integer)(sequence:=0, resultCheckingCallback:=Function(i) i > 0))
Dim subscriber100Ticket = bus.RegisterSubscriber(Of Integer, Integer)("ArgumentMatching", AddressOf Subscriber100, New MessageBusSubscriberOptions(Of Integer, Integer)(sequence:=1, resultCheckingCallback:=Function(i) i > 0))
Dim publisherTicket = bus.RegisterPublisher(Of Integer, Integer)("ArgumentMatching", New MessageBusPublisherOptions(Of Integer, Integer)(defaultReturnValue:=5))
Dim result1000 = publisherTicket.Executor.Execute(1500)
Dim result100 = publisherTicket.Executor.Execute(300)
Dim result5 = publisherTicket.Executor.Execute(80)
'after being executed: result1000 = 1000, result100 = 100, result5 = 5
bus.UnregisterPublisher(publisherTicket)
bus.UnregisterSubscriber(subscriber1000Ticket)
bus.UnregisterSubscriber(subscriber100Ticket)
End Using
End Sub
Public Function Subscriber1000(argument As Integer) As Integer
If argument > 1000 Then
Return 1000
Else
Return 0
End If
End Function
Public Function Subscriber100(argument As Integer) As Integer
If argument > 100 Then
Return 100
Else
Return 0
End If
End Function
In the demo above, the subscribers registered as subscriber1000Ticket and subscriber100Ticket is set with a checker callback to check whether the return value is larger than 0. Therefore, when these methods return 0, the return value is not treated as accepatable and the following subscribers continue to be executed.
If ReturnValueConvertingCallback is specified, the callback is invoked to convert the return value before returning to publisher.
public void TestMethod()
{
using var bus = new MessageBus();
var subscriber1000Ticket = bus.RegisterSubscriber<string, string>("ArgumentMatching", Subscriber1000,
new MessageBusSubscriberOptions<string, string>(
sequence: 0,
resultCheckingCallback: i => i != null,
argumentConvertingCallback: i => i?.ToString(),
returnValueConvertingCallback: s => int.Parse(s!)));
var subscriber100Ticket = bus.RegisterSubscriber<int, int>("ArgumentMatching", Subscriber100,
new MessageBusSubscriberOptions<int, int>(
sequence: 1,
resultCheckingCallback: i => i > 0));
var publisherTicket = bus.RegisterPublisher<int, int>("ArgumentMatching", new MessageBusPublisherOptions<int, int>(defaultReturnValue: 5));
var result1000 = publisherTicket.Executor.Execute(1000);
var result100 = publisherTicket.Executor.Execute(300);
var result5 = publisherTicket.Executor.Execute(80);
//after being executed: result1000 == 1000, result 100 == 100, result5 == 5
bus.UnregisterPublisher(publisherTicket);
bus.UnregisterSubscriber(subscriber1000Ticket);
bus.UnregisterSubscriber(subscriber100Ticket);
}
public string? Subscriber1000(string? argument)
{
if (argument == "1000")
return "1000";
else
return null;
}
public int Subscriber100(int argument)
{
if (argument > 100)
return 100;
else
return 0;
}
Public Sub TestMethod()
Using bus As New MessageBus
Dim subscriber1000Ticket = bus.RegisterSubscriber(Of String, String)("ArgumentMatching", AddressOf Subscriber1000, New MessageBusSubscriberOptions(Of String, String)(sequence:=0, resultCheckingCallback:=Function(i) i IsNot Nothing, argumentConvertingCallback:=Function(i) i?.ToString(), returnValueConvertingCallback:=Function(s) Integer.Parse(s)))
Dim subscriber100Ticket = bus.RegisterSubscriber(Of Integer, Integer)("ArgumentMatching", AddressOf Subscriber100, New MessageBusSubscriberOptions(Of Integer, Integer)(sequence:=1, resultCheckingCallback:=Function(i) i > 0))
Dim publisherTicket = bus.RegisterPublisher(Of Integer, Integer)("ArgumentMatching", New MessageBusPublisherOptions(Of Integer, Integer)(defaultReturnValue:=5))
Dim result1000 = publisherTicket.Executor.Execute(1000)
Dim result100 = publisherTicket.Executor.Execute(300)
Dim result5 = publisherTicket.Executor.Execute(80)
'after being executed: result1000 = 1000, result100 = 100, result5 = 5
bus.UnregisterPublisher(publisherTicket)
bus.UnregisterSubscriber(subscriber1000Ticket)
bus.UnregisterSubscriber(subscriber100Ticket)
End Using
End Sub
Public Shared Function Subscriber1000(argument As String) As String
If argument = "1000" Then
Return "1000"
Else
Return Nothing
End If
End Function
Public Shared Function Subscriber100(argument As Integer) As Integer
If argument > 100 Then
Return 100
Else
Return 0
End If
End Function
In the demo above, the subscriber registered as subscriber1000Ticket is set with this conversion function.
var subscriber1000Ticket = bus.RegisterSubscriber<string, string>("ArgumentMatching", Subscriber1000,
new MessageBusSubscriberOptions<string, string>(
sequence: 0,
resultCheckingCallback: i => i != null,
argumentConvertingCallback: i => i?.ToString(),
returnValueConvertingCallback: s => int.Parse(s!)));
Dim subscriber1000Ticket = bus.RegisterSubscriber(Of String, String)("ArgumentMatching", AddressOf Subscriber1000, New MessageBusSubscriberOptions(Of String, String)(sequence:=0, resultCheckingCallback:=Function(i) i IsNot Nothing, argumentConvertingCallback:=Function(i) i?.ToString(), returnValueConvertingCallback:=Function(s) Integer.Parse(s)))