Request-Reply Messaging in the Go API

Request-reply messaging is a method of data transmission where applications simulate separate point-to-point channels: one for requests, and another for replies. In request-reply messaging, each request sent from a message requestor requires a reply from a message replier. When a message replier consumes a request message, it sends a reply back to the requestor. This messaging pattern is useful when each message sent between components in your applications requires a reply, for example when performing authentication or financial transactions.

The PubSub+ Messaging APIs publish request messages with a unique, automatically generated ReplyTo destination topic in the message header field. This ReplyTo topic serves as the return address that the reply should be sent to. Because the ReplyTo topic destination is handled by the PubSub+ Messaging APIs, it allows users to perform request-reply operations without worrying about registering appropriate topic subscriptions to receive replies.

Request-reply messaging can only be used with direct messages in the PubSub+ Go API.

To use the request-reply messaging pattern with the Go API, follow these steps:

  1. Create a RequestReplyMessagePublisher
  2. Sending a Request
  3. Create a RequestReplyMessageReceiver
  4. Receiving Requests and Sending Replies

Create a RequestReplyMessagePublisher

To send message requests, create a MessagingService instance (see Messaging Service for instructions). After you create a MessagingService instance and connect it to the event broker, use the RequestReply() function to build a RequestReplyMessagePublisher instance:

requestReplyPublisher, builderErr := messagingService.RequestReply().CreateRequestReplyMessagePublisherBuilder().Build()
if builderErr != nil {
    panic(builderErr)
}

startErr := requestReplyPublisher.Start()
if startErr != nil {
    panic(startErr)
}

Next create an OutboundMessage instance. This is the request that your publisher sends to the receiver instance. For information on creating an OutboundMessage instance see Configuring and Creating Outbound Messages.

Sending a Request

When you send a request, it can be either synchronous or asynchronous. A synchronous request blocks your application until a reply is received. An asynchronous request allows your application to send multiple requests before any replies are received.

Sending a Synchronous Request

The PubSub+ Go API provides synchronous request-reply messaging, which blocks each request until a reply is received. This is useful for synchronous, point to point communication where the order of events is important, for example when processing financial transactions. To send a synchronous request, use your RequestReplyMessagePublisher to call the PublishAwaitResponse() function. The PublishAwaitResponse() function takes the following parameters:

  • requestMessage—the OutboundMessage request to send
  • requestDestination—The Topic destination for request messages
  • replyTimeout—a time.Duration value representing the maximum time to wait for a response message
  • properties—(Optional) A MessagePropertyMap that contains additional message properties (see Configuring and Creating Outbound Messages). Set this parameter to nil if you do not have additional message properties to set.

For more information, see the PubSub+ Messaging API for Go reference.

The following example shows how to send a message request synchronously and assign the reply to an InboundMessage instance:

replyMsg, publishErr := requestReplyPublisher.PublishAwaitResponse(message, topic, 5*time.Second, nil)

if publishErr == nil { 
    fmt.Printf("The reply inbound payload: %s\n", replyMsg.GetPayloadAsString())
} else if terr, ok := publishErr.(*solace.TimeoutError); ok { 
    // No reply received, can implement resiliency or retry mechanisms here
    fmt.Printf("The reply timed out with %s with msg number : %d\n", terr, msgSeqNum)
} else {
    panic(publishErr)
}

Sending an Asynchronous Request

The PubSub+ Go API provides asynchronous request-reply messaging, which allows your application to send multiple requests before a reply is received. This is useful for asynchronous communication where the order of events is not important. To send an asynchronous request, use your RequestReplyMessagePublisher to call the Publish() function. The Publish() function takes the following parameters:

  • requestMessage—the OutboundMessage request to send
  • replyMessageHandler—an instance of RequestReplyMessagePublisher.ReplyMessageHandler, a callback handler to process the reply message when it arrives or when a timeout occurs.
  • requestDestination—the Topic destination for request messages
  • replyTimeout—a time.Duration value representing the maximum time to wait for a response message
  • properties—(Optional) A MessagePropertyMap that contains additional message properties (see Configuring and Creating Outbound Messages). Set this parameter to nil if you do not have additional message properties to set.
  • userContext—(Optional) A userContext interface to be made available during response message processing (see User Contexts). Set this parameter to nil if you do not have context to set.

For more information, see the PubSub+ Messaging API for Go reference.

The following shows an example implementation of a ReplyMessageHandler, which sends a message request asynchronously and assigns the reply to an InboundMessage instance:

func ReplyHandler(message message.InboundMessage, userContext interface{}, err error) {
    if err == nil {
        fmt.Printf("The reply inbound payload: %s\n", message.GetPayloadAsString())
    } else if terr, ok := err.(*solace.TimeoutError); ok {
        // No reply received, can implement resiliency or retry mechanisms here
        fmt.Printf("The reply timed out with %s with user context : %s\n", terr, userContext)
    } else {
        panic(err)
    }
}

// ...

publishErr := requestReplyPublisher.Publish(message, ReplyHandler, topic, 5*time.Second, nil, nil)
if publishErr != nil {
    panic(publishErr)
}

Create a RequestReplyMessageReceiver

To send message replies, create a MessagingService instance (see Messaging Service for instructions). After you create a MessagingService instance and connect it to the event broker, use the RequestReply() function to build a RequestReplyMessageReceiver instance:

requestReplyReceiver, builderErr := messagingService.RequestReply().CreateRequestReplyMessageReceiverBuilder().Build(topicSubscription)
if builderErr != nil {
    panic(builderErr)
}

startErr := requestReplyReceiver.Start()
if startErr != nil {
    panic(startErr)
}

Next create an OutboundMessage instance. This is the reply that your receiver sends to the requestor instance. For information on creating an OutboundMessage instance see Configuring and Creating Outbound Messages.

Receiving Requests and Sending Replies

Your RequestReplyMessageReceiver can receive a request synchronously or asynchronously as an InboundMessage instance.

Receiving a Request Synchronously and Sending a Reply

The PubSub+ Go API provides synchronous request-reply messaging, which blocks your application until the ReceiveMessage() function returns. This is useful for synchronous, point to point communication where the order of events is important, for example when processing financial transactions. To receive a synchronous request, use your RequestReplyMessageReceiver instance to call the ReceiveMessage() function. The ReceiveMessage() function receives the request message and the replier instance from the RequestReplyMessageReceiver. The function takes the following parameter:

  • Timeout—a time.Duration value representing the time the function waits for a request message. The function waits forever if this value is negative.

For more information, see the PubSub+ Messaging API for Go reference.

The following shows an example implementation of a RequestMessageHandler, which receives a request message synchronously, assigns the reply to an OutboundMessage instance and sends it with the Reply() function:

message, replier, receiveErr := requestReplyReceiver.ReceiveMessage(5 * time.Second)
if receiveErr != nil { receive pull was not successful
    if terr, ok := receiveErr.(*solace.TimeoutError); ok { // A timeout occurred and no request message was received
        // good location for implementing resiliency or retry mechanisms.
        fmt.Printf("request message pull from the receiver timed out with %s with last msg number : %d\n", terr, msgSeqNum)
        continue
    } else { 
        fmt.Println("Receiver error while trying to pull request message. Error: ", receiveErr)
    }
}

if replier != nil { // the replier is only set when received message is request message that can be replied to
    // build reply message
    replyMsg, replyMsgBuildErr := messageBuilder.BuildWithStringPayload(messageBody + "\nReply from: " + message.GetPayloadAsString())
    if replyMsgBuildErr != nil {
        panic(replyMsgBuildErr)
    }
    // send reply msg
    replyErr := replier.Reply(replyMsg)
    if replyErr != nil {
        fmt.Println("error on send reply. Error: ", replyErr)
    }
} else {
    // messages received on the topic subscription without a repliable destination will return a nil replier
    fmt.Printf("Received message: %d on topic %s that was not a request message\n", msgSeqNum, topicSubscription.GetName())
}

Receiving a Request Asynchronously and Sending a Reply

The PubSub+ Go API provides asynchronous request-reply messaging, which allows your application to receive multiple message requests asynchronously with the ReceiveAsync() function. This is useful for point to point communication where the order of events is not important. To receive asynchronous requests, use a RequestReplyMessageReceiver instance to call the ReceiveAsync() function. The ReceiveAsync() function takes the following parameter:

  • requestMessageHandler—an instance of RequestReplyMessageReceiver.RequestMessageHandler, a callback handler to process incoming request messages and the replier instances. This callback allows the ReceiveAsync() function to receive both an inboundMessage (the request) and an instance of RequestReplyMessageReceiver.Replier. The replier instance allows your RequestReplyMessageReceiver to send a reply back to the requestor.

For more information, see the PubSub+ Messaging API for Go reference.

The following shows an example implementation of a RequestMessageHandler, which receives a request message asynchronously, assigns the reply to an OutboundMessage instance and sends it with the Reply() function:

requestMessageHandler := func(message message.InboundMessage, replier solace.Replier) {

    if replier == nil { // the replier is only set when received message is a request message that can be replied to
        // messages received on the topic subscription without a reply destination will return a nil replier
        fmt.Printf("Received message: %d on topic %s that was not a request message\n", msgSeqNum, topicSubscription.GetName())
        return
    }
    replyMsg, replyMsgBuildErr := messageBuilder.BuildWithStringPayload(messageBody + "\nReply from: " + message.GetPayloadAsString())
    if replyMsgBuildErr != nil {
        panic(replyMsgBuildErr)
    }
    replyErr := replier.Reply(replyMsg)
    if replyErr != nil {
        fmt.Println("Error on send reply: ", replyErr)
    }
}

requestReplyReceiver.ReceiveAsync(requestMessageHandler)