...

Source file src/solace.dev/go/messaging/pkg/solace/request_reply_message_publisher.go

Documentation: solace.dev/go/messaging/pkg/solace

     1  package solace
     2  
     3  import (
     4  	"time"
     5  
     6  	"solace.dev/go/messaging/pkg/solace/config"
     7  	"solace.dev/go/messaging/pkg/solace/message"
     8  	"solace.dev/go/messaging/pkg/solace/resource"
     9  )
    10  
    11  // RequestReplyMessagePublisher allows for publishing of request-reply messages
    12  // with handling for reply messages.
    13  type RequestReplyMessagePublisher interface {
    14  	MessagePublisher
    15  	MessagePublisherHealthCheck
    16  
    17  	// StartAsyncCallback will start the RequestReplyMessagePublisher asynchronously.
    18  	// Before this function is called, the service is considered
    19  	// off-duty. To operate normally, this function must be called on
    20  	// the RequestReplyMessageReceiver instance. This function is idempotent.
    21  	// Returns immediately and will call the callback function when ready
    22  	// passing the started RequestReplyMessageReceiver instance, or nil and
    23  	// an error if one occurred. Subsequent calls will register additional
    24  	// callbacks that will be called immediately if already started.
    25  	StartAsyncCallback(callback func(RequestReplyMessagePublisher, error))
    26  
    27  	// TerminateAsyncCallback will terminate the message publisher asynchronously.
    28  	// This function is idempotent. The only way to resume operation
    29  	// after this function is called is to create a new instance.
    30  	// Any attempt to call this function renders the instance
    31  	// permanently terminated, even if this function completes.
    32  	// A graceful shutdown will be attempted within the grace period.
    33  	// A grace period of 0 implies a non-graceful shutdown that ignores
    34  	// unfinished tasks or in-flight messages.
    35  	// Returns immediately and registers a callback that will receive an
    36  	// error if one occurred or nil if successfully and gracefully terminated.
    37  	// If gracePeriod is less than 0, the function will wait indefinitely.
    38  	TerminateAsyncCallback(gracePeriod time.Duration, callback func(error))
    39  
    40  	// PublishBytes sends a request for a reply of type byte array to the specified destination.
    41  	// The API will handle correlation of messages so no additional work is requried.
    42  	// Takes a requestMessage to send, a replyMessageHandler function to handle the
    43  	// response, a requestsDestination to deliver the requestMessage to, a replyTimeout
    44  	// indicating the maximum wait time for a response message and an optional
    45  	// userContext object given to the replyMessageHandler (may be nil).
    46  	// Returns an error if one occurred. If replyTimeout is less than 0, the function
    47  	// will wait indefinitely. Possible errors include:
    48  	// - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed.
    49  	// - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O
    50  	//   capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners
    51  	//   are called.
    52  	PublishBytes(message []byte, replyMessageHandler ReplyMessageHandler, destination *resource.Topic, replyTimeout time.Duration, userContext interface{}) error
    53  
    54  	// PublishString sends a request for a reply of type string to the specified destination.
    55  	// The API will handle correlation of messages so no additional work is requried.
    56  	// Takes a requestMessage to send, a replyMessageHandler function to handle the
    57  	// response, a requestsDestination to deliver the requestMessage to, a replyTimeout
    58  	// indicating the maximum wait time for a response message and an optional
    59  	// userContext object given to the replyMessageHandler (may be nil).
    60  	// Returns an error if one occurred. If replyTimeout is less than 0, the function
    61  	// will wait indefinitely. Possible errors include:
    62  	// - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed.
    63  	// - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O
    64  	//   capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners
    65  	//   are called.
    66  	PublishString(message string, replyMessageHandler ReplyMessageHandler, destination *resource.Topic, replyTimeout time.Duration, userContext interface{}) error
    67  
    68  	// Publish sends a request for a reply non-blocking with optional user context.
    69  	// The API will handle correlation of messages so no additional work is requried.
    70  	// Takes a requestMessage to send, a replyMessageHandler function to handle the
    71  	// response, a requestsDestination to deliver the requestMessage to, a replyTimeout
    72  	// indicating the maximum wait time for a response message and an optional
    73  	// userContext object given to the replyMessageHandler (may be nil).
    74  	// Returns an error if one occurred. If replyTimeout is less than 0, the function
    75  	// will wait indefinitely. Possible errors include:
    76  	// - solace/errors.*PubSubPlusClientError if the message could not be sent and all retry attempts failed.
    77  	// - solace/errors.*PublisherOverflowError if publishing messages faster than publisher's I/O
    78  	// capabilities allow. When publishing can be resumed, registered PublisherReadinessListeners
    79  	// will be called.
    80  	Publish(requestMessage message.OutboundMessage, replyMessageHandler ReplyMessageHandler,
    81  		requestsDestination *resource.Topic, replyTimeout time.Duration,
    82  		properties config.MessagePropertiesConfigurationProvider, userContext interface{}) error
    83  
    84  	// PublishAwaitResponse will send a request for a reply blocking until a response is
    85  	// received. The API will handle correlation of messages so no additional work is required.
    86  	// Takes a requestMessage to send, a requestDestination to deliver the requestMessage to,
    87  	// and a replyTimeout indicating the maximum wait time for a response message.
    88  	// Will return the response and an error if one occurred. If replyTimeout is less than 0,
    89  	// the function will wait indefinitely. Possible errors include:
    90  	// - solace/errors.*PubSubPlusClientError if the message could not be sent and all retry attempts failed.
    91  	// - solace/errors.*PublisherOverflowError if publishing messages faster than publisher's I/O
    92  	// capabilities allow. When publishing can be resumed, registered PublisherReadinessListeners
    93  	// will be called.
    94  	PublishAwaitResponse(requestMessage message.OutboundMessage, requestDestination *resource.Topic,
    95  		replyTimeout time.Duration, properties config.MessagePropertiesConfigurationProvider) (message.InboundMessage, error)
    96  }
    97  
    98  // ReplyMessageHandler is a callback to handle a reply message. The function will be
    99  // called with a message received or nil, the user context if it was set when calling
   100  // RequestReplyMessagePublisher.Publish, and an error if one was thrown.
   101  type ReplyMessageHandler func(message message.InboundMessage, userContext interface{}, err error)
   102  
   103  // RequestReplyMessagePublisherBuilder allows for configuration of request reply message publisher instances
   104  type RequestReplyMessagePublisherBuilder interface {
   105  	// Build will build a new RequestReplyMessagePublisher instance based on the configured properties.
   106  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
   107  	Build() (messagePublisher RequestReplyMessagePublisher, err error)
   108  	// OnBackPressureReject will set the publisher backpressure strategy to reject
   109  	// where publish attempts will be rejected once the bufferSize, in number of messages, is reached.
   110  	// If bufferSize is 0, an error will be thrown when the transport is full when publishing.
   111  	// Valid bufferSize is >= 0.
   112  	OnBackPressureReject(bufferSize uint) RequestReplyMessagePublisherBuilder
   113  	// OnBackPressureWait will set the publisher backpressure strategy to wait where publish
   114  	// attempts will block until there is space in the buffer of size bufferSize in number of messages.
   115  	// Valid bufferSize is >= 1.
   116  	OnBackPressureWait(bufferSize uint) RequestReplyMessagePublisherBuilder
   117  	// FromConfigurationProvider will configure the persistent publisher with the given properties.
   118  	// Built in PublisherPropertiesConfigurationProvider implementations include:
   119  	//   PublisherPropertyMap, a map of PublisherProperty keys to values
   120  	FromConfigurationProvider(provider config.PublisherPropertiesConfigurationProvider) RequestReplyMessagePublisherBuilder
   121  }
   122