Service bus is a refined version of the queue service. We can create topics for PUB/SUB.
Firstly we need to create a namespace, under this namespace we can create queues or topics.
The Service bus explorer can be used to send/receive/peek message on the service bus. If we read the message then the message is removed from the queue, whereas with peek we can see the message without removing it. To get to the service bus explorer , create a queue in the service bus namespace
An enterprise service bus implements a communication system between mutually interacting software applications in a service-oriented architecture.
QUEUE PROPERTIES
TIME to Live
We can set a common time to live or each message in the queue, this can be done by configuring the TTL at the queue level. Each message can have its own TTL which will override the Queue TTL.
LOCK Duration:
When we set the lock duration on a message what happens is that once a receiver receives the messages no other receivers or subscribers will have access to that message till the lock duration time expires.
DEAD Letter queue:
We set a time to live for every message, once this time is over and no one has received that message, it is moved from the primary queue to the dead letter queue.
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview
https://aws.amazon.com/message-queue/benefits/
Better Performance
Message queues enable asynchronous communication, which means that the endpoints that are producing and consuming messages interact with the queue, not each other. Producers can add requests to the queue without waiting for them to be processed. Consumers process messages only when they are available. No component in the system is ever stalled waiting for another, optimizing data flow.Increased Reliability
Queues make your data persistent, and reduce the errors that happen when different parts of your system go offline. By separating different components with message queues, you create more fault tolerance. If one part of the system is ever unreachable, the other can still continue to interact with the queue. The queue itself can also be mirrored for even more availability.Load Leveling
If one servers processing speed is less than the amount of requests it receives then we can add those request to a queue. The queue acts as a buffer.High Availability
Even if the receiving system fails we still have those messages in the queue.
https://app.pluralsight.com/player?course=azure-service-bus-in-depth&author=alan-smith&name=5c6ee412-4854-4f4c-aeca-0ff1cd386b47&clip=5&mode=live
Go to shared access policies to get the connection string. Create a policy. We use these keys to connect to the service bus queue.
Remove the Entity path from the connection string when using QueueClient
Sending And Receiving messages
In receive and delete , we cant abandon or defer the message execution. There is a chance we might loose our data. If the receiving application crashes while processing the message the message is lost as it is delete once its read form the queue.
Peak Lock: The receiver is responsible for message completion.
The first operation the receiving application can perform is to complete the message. This signals that the receiving application has successfully processed the message, and the message can then be removed from the Q.
The receiving application decides it cannot process the message. For some reason, it has the option to abandon message processing. In this case, the message will become visible on the cue again and could be received another time by receiving application.
If the receiving application would like to process the message at a later point in time, the message can be deferred. The message will remain on the Q and could be received again by the receiving application by specifying its unique sequence number now that the sequence number is required to receive that message and it's not possible to receive the message without that sequence number.
If the receiving application decides that there's an error processing the message such as malformed or corrupt data in the message hey can explicitly dead letter the message.
This means that the message will be moved on to a dead letter sub Q within the messaging entity and could be received and analyzed by a separate application when the message is received in the peak lot received mode, there will be a lock time out if the receiving application doesn't perform. All of these four options within the lock timeout duration, the lock time at will expire and the message will become visible in the queue again. We sometimes refer to the code that generates the mechanism for receiving and processing messages as a message.
MESSAGE PROPERTIES
MessageID: Used to identify a message. When we defer a message it stays in the Q, we can use the messageID to read the message from the queue. Uniquely identifies a message.
Receive and Delete
queueClient = new QueueClient(connectionString, queueName,ReceiveMode.ReceiveAndDelete);
Once this is set remove the
This way we can implement recieve and delete in our code, else we can set the peek mode.
POINT TO POINT
Sender
IQueueClient queueClient = new QueueClient(connectionString, queueName);
for (int i = 0; i < 10; i++)
{
//Serialize to json String
Person person = new Person { Name=$"Anish {i}", StudentId=i };
string json = JsonConvert.SerializeObject(person);
//Convert data to byte array and Send message to queue
await queueClient.SendAsync(new Message(Encoding.UTF8.GetBytes(json)));
}
//Close the connection
await queueClient.CloseAsync();
Receiver
var queueClient = new QueueClient(connectionString, queueName);
// Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
// Maximum number of Concurrent calls to the callback `ProcessMessagesAsync`, set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
// Number of threads that will act on the queue, so 1 thread will
// serve all the requests
//for more than 1 thread the order of the request will not be preserved. So if order is imp then use 1 thread
MaxConcurrentCalls = 1,
// we have to manually complete the messages that were processed successfully
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
// False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
AutoComplete = false
};
// Register the function that will process messages
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
_____________________________________________
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
// Process the message
string msg = Encoding.UTF8.GetString(message.Body);
Person person = JsonConvert.DeserializeObject<Person>(msg);
Console.WriteLine($"Name: {person.Name}, Address: {person.Address}");
// Complete the message so that it is not received again.
// This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default).
// SInce we have set AutoComplete = false, we need to manually complete the message
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
// Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed.
// If queueClient has already been Closed, you may chose to not call CompleteAsync() or AbandonAsync() etc. calls
// to avoid unnecessary exceptions.
}
__________________________________
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
TOPICS
Under the service bus namespace create a topic and inside the topic we can add subscribers. So whenever we send a message it will be broadcasted and sent to both subscribers
To Get the connection string go to the topic-> Shared access policies.
//use ManagementClient
var manager = new Manager(ServiceBusConnectionString);
await manager.CreateTopic(OrdersTopicPath);
//Subscription without a filter, rule 1=1 gets created , can receive all messages.
await manager.CreateSubscription(OrdersTopicPath, "AllOrders");
PUBLISH SUBSCRIBE
We create a topic and create subscriptions under that topic. So any message sent from the topic will be sent to all subscribers.
For queues we create a queueclient and for topics we create a topicclient. NUGET:
Microsoft.Azure.ServiceBus
To get a connectionstring we need to go to shared access policies and generate a policy, this will give us 2 connection strings.
If we use point to point messaging for publish subscribe then the sender has the responsibility to route each message to each queue, also managing these queues can be hectic.
'So we use pub Sub and create topics and subscriptions. This was we delegate the routing and other responsibilities to the messaging system and not the sender. Also we can add filtering logic, this will help by allow only filtered users to receive the messages. We create 1 topic and we can have multiple subscriptions in it.
Types of filters in PUB SUB
When we create a subscription, we can specify the path of the Topic and the subscription name. If we dont specify a rule when we create a subscription then a default rule is created 1=1 which is always true, hence the subscribers will receive every message that is broadcasted.
The last filter says that the subscribers will only receive messages that have Loyalty =true and Region="USA". These fields need to be set in the header. We cant use the message body properties. But we can promote them to the header. So they can be used inside these filter expressions.
//SubscriptionWithSqlFilter, create SubscriptionDescription and ruledescription
await manager.CreateSubscriptionWithSqlFilter(OrdersTopicPath, subscriptionName: "UsaOrders", sqlExpression: "region = 'USA'");
await manager.CreateSubscriptionWithSqlFilter(OrdersTopicPath, "EuOrders", "region = 'EU'");
await manager.CreateSubscriptionWithSqlFilter(OrdersTopicPath, "LargeOrders", "items > 30");
await manager.CreateSubscriptionWithSqlFilter(OrdersTopicPath, "HighValueOrders", "value > 500");
await manager.CreateSubscriptionWithSqlFilter(OrdersTopicPath, "LoyaltyCardOrders", "loyalty = true AND region = 'USA'");
Check if Queue topic or subscription Exists- Use ManagementClient
using Microsoft.Azure.ServiceBus.Management; --Allows management function
By adding the values from the body as UserProperties we can use them inside filter statements. These filters can be applied when using SQL filters, when using correlation filters only the == can be used. Correlation is faster than SQL, but the needs are different for using them.
DEAD LETTERING
Messages are moved to the dead letter queue when the message exceeds its TTL . Maxdelivery count etc.
var queue = QueueClient.CreateFromConnectionString(connectionString, QueueClient.FormatDeadLetterPath(queueName));
var messageList = new List<BrokeredMessage>();
BrokeredMessage message;
do
{
message = await queue.PeekAsync();
if (message != null)
{
messageList.Add(message);
}
} while (message != null);
return messageList;
DUPLICATE DETECTION
We need to enable duplicate detection on the queue. Once this is done servicebus will expect a messageid on each message and any messages with the same id will be rejected.
We can set the time limit for which a message can be checked for a duplicate .
So a message with messageid =1 will be checked for 1 min, after a minute if we get a message with the sameid it will be accepted.
USING WIRE TAP (PENDING)
CORRELATION AND SESSION ID
Case 1: if App1 want to send a message to app2 and needs a reponse message from the app from a different queue, then app1 can set a correlationid for the first message and send it to the queue. Now App2 can read this message and push to Q2 another message with the same correlationid . Another approach is where App2 can set the correlationid of the second message to the messageID of the first message.
we can use message correlation for his grouping sequences of related messages. Together we'll use a correlation identify, which will have the same value for all of the messages in a particular sequence of messages. Then we can use correlation in a receiving application to group those messages together.
The sender creates a queue with sessionrequired =true, then creates a message with sessionid =123, this is received by the receiver which initializes the session with the value 123 and starts listening for messages with session id 123. After a timeout or certain interval the receiver can close the message session.
Unlike using correlation id, we use sessions to create a session for a particular amount of time. This session is identified using a sessionid set in the message header.
https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions
When Sessions are enabled on a queue or a subscription, the client applications can no longer send/receive regular messages. All messages must be sent as part of a session (by setting the session id) and received by receiving the session.
When the MessageSession object is accepted and while it's held by a client, that client holds an exclusive lock on all messages with that session's SessionId that exist in the queue or subscription, and also on all messages with that SessionId that still arrive while the session is held.
The lock is released when Close or CloseAsync are called, or when the lock expires in cases in which the application is unable to do the close operation. The session lock should be treated like an exclusive lock on a file, meaning that the application should close the session as soon as it no longer needs it and/or doesn't expect any further messages.
When multiple concurrent receivers pull from the queue, the messages belonging to a particular session are dispatched to the specific receiver that currently holds the lock for that session. With that operation, an interleaved message stream in one queue or subscription is cleanly de-multiplexed to different receivers and those receivers can also live on different client machines, since the lock management happens service-side, inside Service Bus.
REQUEST RESPONSE WITH SESSION ID
Normally we have 2 queues, one sends a message and the other replies with a response. We can have multiple senders and one queue that replies. This can be done using sessionIDs
The sender sends the message with a reply to sessionId set to a number . Ex: 1234. We dont need a session now so we set require session= false. then the sender waits for a message with the sessionid =1234. Once this is received we setup a session. this session will thus receive only messages with that particular session id. The server will send back a response by setting the RequireSession true. thus the sender will create a session.
(Step no: 3)The sender creates a sessionclient and not the queueclient when it waits for the first response from the server. So it only receives the messages with the same sessionid it had sent to the server in the replytosessionid field in the request message.
The server has 2 queues , 1 for request it receives from the sender and the other one for response that it sends out. The reponse queue has requiresession=true in its queuedescription set (while creating the queue). We receive the message in the request queue on the server and then set a sessionid that was in the received message and push it to the response queue on the server. since this has requiresession set and we have the receiving queue configured with a session client , this message is directly received
Comments
Post a Comment