...

Source file src/solace.dev/go/messaging/pkg/solace/request_reply_message_publisher.go

Documentation: solace.dev/go/messaging/pkg/solace

     1  // solace-messaging-go-client
     2  //
     3  // Copyright 2024-2026 Solace Corporation. All rights reserved.
     4  //
     5  // Licensed under the Apache License, Version 2.0 (the "License");
     6  // you may not use this file except in compliance with the License.
     7  // You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package solace
    18  
    19  import (
    20  	"time"
    21  
    22  	"solace.dev/go/messaging/pkg/solace/config"
    23  	"solace.dev/go/messaging/pkg/solace/message"
    24  	"solace.dev/go/messaging/pkg/solace/resource"
    25  )
    26  
    27  // RequestReplyMessagePublisher allows for publishing of request-reply messages
    28  // with handling for reply messages.
    29  type RequestReplyMessagePublisher interface {
    30  	MessagePublisher
    31  	MessagePublisherHealthCheck
    32  
    33  	// StartAsyncCallback will start the RequestReplyMessagePublisher asynchronously.
    34  	// Before this function is called, the service is considered
    35  	// off-duty. To operate normally, this function must be called on
    36  	// the RequestReplyMessageReceiver instance. This function is idempotent.
    37  	// Returns immediately and will call the callback function when ready
    38  	// passing the started RequestReplyMessageReceiver instance, or nil and
    39  	// an error if one occurred. Subsequent calls will register additional
    40  	// callbacks that will be called immediately if already started.
    41  	StartAsyncCallback(callback func(RequestReplyMessagePublisher, error))
    42  
    43  	// TerminateAsyncCallback will terminate the message publisher asynchronously.
    44  	// This function is idempotent. The only way to resume operation
    45  	// after this function is called is to create a new instance.
    46  	// Any attempt to call this function renders the instance
    47  	// permanently terminated, even if this function completes.
    48  	// A graceful shutdown will be attempted within the grace period.
    49  	// A grace period of 0 implies a non-graceful shutdown that ignores
    50  	// unfinished tasks or in-flight messages.
    51  	// Returns immediately and registers a callback that will receive an
    52  	// error if one occurred or nil if successfully and gracefully terminated.
    53  	// If gracePeriod is less than 0, the function will wait indefinitely.
    54  	TerminateAsyncCallback(gracePeriod time.Duration, callback func(error))
    55  
    56  	// PublishBytes sends a request for a reply of type byte array to the specified destination.
    57  	// The API will handle correlation of messages so no additional work is required.
    58  	// Takes a requestMessage to send, a replyMessageHandler function to handle the
    59  	// response, a requestsDestination to deliver the requestMessage to, a replyTimeout
    60  	// indicating the maximum wait time for a response message and an optional
    61  	// userContext object given to the replyMessageHandler (may be nil).
    62  	// Returns an error if one occurred. If replyTimeout is less than 0, the function
    63  	// will wait indefinitely. Possible errors include:
    64  	// - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed.
    65  	// - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O
    66  	//   capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners
    67  	//   are called.
    68  	PublishBytes(message []byte, replyMessageHandler ReplyMessageHandler, destination *resource.Topic, replyTimeout time.Duration, userContext interface{}) error
    69  
    70  	// PublishString sends a request for a reply of type string to the specified destination.
    71  	// The API will handle correlation of messages so no additional work is required.
    72  	// Takes a requestMessage to send, a replyMessageHandler function to handle the
    73  	// response, a requestsDestination to deliver the requestMessage to, a replyTimeout
    74  	// indicating the maximum wait time for a response message and an optional
    75  	// userContext object given to the replyMessageHandler (may be nil).
    76  	// Returns an error if one occurred. If replyTimeout is less than 0, the function
    77  	// will wait indefinitely. Possible errors include:
    78  	// - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed.
    79  	// - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O
    80  	//   capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners
    81  	//   are called.
    82  	PublishString(message string, replyMessageHandler ReplyMessageHandler, destination *resource.Topic, replyTimeout time.Duration, userContext interface{}) error
    83  
    84  	// Publish sends a request for a reply non-blocking with optional user context.
    85  	// The API will handle correlation of messages so no additional work is required.
    86  	// Takes a requestMessage to send, a replyMessageHandler function to handle the
    87  	// response, a requestsDestination to deliver the requestMessage to, a replyTimeout
    88  	// indicating the maximum wait time for a response message and an optional
    89  	// userContext object given to the replyMessageHandler (may be nil).
    90  	// Returns an error if one occurred. If replyTimeout is less than 0, the function
    91  	// will wait indefinitely. Possible errors include:
    92  	// - solace/errors.*PubSubPlusClientError if the message could not be sent and all retry attempts failed.
    93  	// - solace/errors.*PublisherOverflowError if publishing messages faster than publisher's I/O
    94  	// capabilities allow. When publishing can be resumed, registered PublisherReadinessListeners
    95  	// will be called.
    96  	Publish(requestMessage message.OutboundMessage, replyMessageHandler ReplyMessageHandler,
    97  		requestsDestination *resource.Topic, replyTimeout time.Duration,
    98  		properties config.MessagePropertiesConfigurationProvider, userContext interface{}) error
    99  
   100  	// PublishAwaitResponse will send a request for a reply blocking until a response is
   101  	// received. The API will handle correlation of messages so no additional work is required.
   102  	// Takes a requestMessage to send, a requestDestination to deliver the requestMessage to,
   103  	// and a replyTimeout indicating the maximum wait time for a response message.
   104  	// Will return the response and an error if one occurred. If replyTimeout is less than 0,
   105  	// the function will wait indefinitely. Possible errors include:
   106  	// - solace/errors.*PubSubPlusClientError if the message could not be sent and all retry attempts failed.
   107  	// - solace/errors.*PublisherOverflowError if publishing messages faster than publisher's I/O
   108  	// capabilities allow. When publishing can be resumed, registered PublisherReadinessListeners
   109  	// will be called.
   110  	PublishAwaitResponse(requestMessage message.OutboundMessage, requestDestination *resource.Topic,
   111  		replyTimeout time.Duration, properties config.MessagePropertiesConfigurationProvider) (message.InboundMessage, error)
   112  }
   113  
   114  // ReplyMessageHandler is a callback to handle a reply message. The function will be
   115  // called with a message received or nil, the user context if it was set when calling
   116  // RequestReplyMessagePublisher.Publish, and an error if one was thrown.
   117  type ReplyMessageHandler func(message message.InboundMessage, userContext interface{}, err error)
   118  
   119  // RequestReplyMessagePublisherBuilder allows for configuration of request reply message publisher instances
   120  type RequestReplyMessagePublisherBuilder interface {
   121  	// Build will build a new RequestReplyMessagePublisher instance based on the configured properties.
   122  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
   123  	Build() (messagePublisher RequestReplyMessagePublisher, err error)
   124  	// OnBackPressureReject will set the publisher backpressure strategy to reject
   125  	// where publish attempts will be rejected once the bufferSize, in number of messages, is reached.
   126  	// If bufferSize is 0, an error will be thrown when the transport is full when publishing.
   127  	// Valid bufferSize is >= 0.
   128  	OnBackPressureReject(bufferSize uint) RequestReplyMessagePublisherBuilder
   129  	// OnBackPressureWait will set the publisher backpressure strategy to wait where publish
   130  	// attempts will block until there is space in the buffer of size bufferSize in number of messages.
   131  	// Valid bufferSize is >= 1.
   132  	OnBackPressureWait(bufferSize uint) RequestReplyMessagePublisherBuilder
   133  	// FromConfigurationProvider will configure the persistent publisher with the given properties.
   134  	// Built in PublisherPropertiesConfigurationProvider implementations include:
   135  	//   PublisherPropertyMap, a map of PublisherProperty keys to values
   136  	FromConfigurationProvider(provider config.PublisherPropertiesConfigurationProvider) RequestReplyMessagePublisherBuilder
   137  }
   138