1 // solace-messaging-go-client 2 // 3 // Copyright 2024-2026 Solace Corporation. All rights reserved. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 package solace 18 19 import ( 20 "time" 21 22 "solace.dev/go/messaging/pkg/solace/config" 23 "solace.dev/go/messaging/pkg/solace/message" 24 "solace.dev/go/messaging/pkg/solace/resource" 25 ) 26 27 // RequestReplyMessagePublisher allows for publishing of request-reply messages 28 // with handling for reply messages. 29 type RequestReplyMessagePublisher interface { 30 MessagePublisher 31 MessagePublisherHealthCheck 32 33 // StartAsyncCallback will start the RequestReplyMessagePublisher asynchronously. 34 // Before this function is called, the service is considered 35 // off-duty. To operate normally, this function must be called on 36 // the RequestReplyMessageReceiver instance. This function is idempotent. 37 // Returns immediately and will call the callback function when ready 38 // passing the started RequestReplyMessageReceiver instance, or nil and 39 // an error if one occurred. Subsequent calls will register additional 40 // callbacks that will be called immediately if already started. 41 StartAsyncCallback(callback func(RequestReplyMessagePublisher, error)) 42 43 // TerminateAsyncCallback will terminate the message publisher asynchronously. 44 // This function is idempotent. The only way to resume operation 45 // after this function is called is to create a new instance. 46 // Any attempt to call this function renders the instance 47 // permanently terminated, even if this function completes. 48 // A graceful shutdown will be attempted within the grace period. 49 // A grace period of 0 implies a non-graceful shutdown that ignores 50 // unfinished tasks or in-flight messages. 51 // Returns immediately and registers a callback that will receive an 52 // error if one occurred or nil if successfully and gracefully terminated. 53 // If gracePeriod is less than 0, the function will wait indefinitely. 54 TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) 55 56 // PublishBytes sends a request for a reply of type byte array to the specified destination. 57 // The API will handle correlation of messages so no additional work is required. 58 // Takes a requestMessage to send, a replyMessageHandler function to handle the 59 // response, a requestsDestination to deliver the requestMessage to, a replyTimeout 60 // indicating the maximum wait time for a response message and an optional 61 // userContext object given to the replyMessageHandler (may be nil). 62 // Returns an error if one occurred. If replyTimeout is less than 0, the function 63 // will wait indefinitely. Possible errors include: 64 // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. 65 // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O 66 // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners 67 // are called. 68 PublishBytes(message []byte, replyMessageHandler ReplyMessageHandler, destination *resource.Topic, replyTimeout time.Duration, userContext interface{}) error 69 70 // PublishString sends a request for a reply of type string to the specified destination. 71 // The API will handle correlation of messages so no additional work is required. 72 // Takes a requestMessage to send, a replyMessageHandler function to handle the 73 // response, a requestsDestination to deliver the requestMessage to, a replyTimeout 74 // indicating the maximum wait time for a response message and an optional 75 // userContext object given to the replyMessageHandler (may be nil). 76 // Returns an error if one occurred. If replyTimeout is less than 0, the function 77 // will wait indefinitely. Possible errors include: 78 // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. 79 // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O 80 // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners 81 // are called. 82 PublishString(message string, replyMessageHandler ReplyMessageHandler, destination *resource.Topic, replyTimeout time.Duration, userContext interface{}) error 83 84 // Publish sends a request for a reply non-blocking with optional user context. 85 // The API will handle correlation of messages so no additional work is required. 86 // Takes a requestMessage to send, a replyMessageHandler function to handle the 87 // response, a requestsDestination to deliver the requestMessage to, a replyTimeout 88 // indicating the maximum wait time for a response message and an optional 89 // userContext object given to the replyMessageHandler (may be nil). 90 // Returns an error if one occurred. If replyTimeout is less than 0, the function 91 // will wait indefinitely. Possible errors include: 92 // - solace/errors.*PubSubPlusClientError if the message could not be sent and all retry attempts failed. 93 // - solace/errors.*PublisherOverflowError if publishing messages faster than publisher's I/O 94 // capabilities allow. When publishing can be resumed, registered PublisherReadinessListeners 95 // will be called. 96 Publish(requestMessage message.OutboundMessage, replyMessageHandler ReplyMessageHandler, 97 requestsDestination *resource.Topic, replyTimeout time.Duration, 98 properties config.MessagePropertiesConfigurationProvider, userContext interface{}) error 99 100 // PublishAwaitResponse will send a request for a reply blocking until a response is 101 // received. The API will handle correlation of messages so no additional work is required. 102 // Takes a requestMessage to send, a requestDestination to deliver the requestMessage to, 103 // and a replyTimeout indicating the maximum wait time for a response message. 104 // Will return the response and an error if one occurred. If replyTimeout is less than 0, 105 // the function will wait indefinitely. Possible errors include: 106 // - solace/errors.*PubSubPlusClientError if the message could not be sent and all retry attempts failed. 107 // - solace/errors.*PublisherOverflowError if publishing messages faster than publisher's I/O 108 // capabilities allow. When publishing can be resumed, registered PublisherReadinessListeners 109 // will be called. 110 PublishAwaitResponse(requestMessage message.OutboundMessage, requestDestination *resource.Topic, 111 replyTimeout time.Duration, properties config.MessagePropertiesConfigurationProvider) (message.InboundMessage, error) 112 } 113 114 // ReplyMessageHandler is a callback to handle a reply message. The function will be 115 // called with a message received or nil, the user context if it was set when calling 116 // RequestReplyMessagePublisher.Publish, and an error if one was thrown. 117 type ReplyMessageHandler func(message message.InboundMessage, userContext interface{}, err error) 118 119 // RequestReplyMessagePublisherBuilder allows for configuration of request reply message publisher instances 120 type RequestReplyMessagePublisherBuilder interface { 121 // Build will build a new RequestReplyMessagePublisher instance based on the configured properties. 122 // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. 123 Build() (messagePublisher RequestReplyMessagePublisher, err error) 124 // OnBackPressureReject will set the publisher backpressure strategy to reject 125 // where publish attempts will be rejected once the bufferSize, in number of messages, is reached. 126 // If bufferSize is 0, an error will be thrown when the transport is full when publishing. 127 // Valid bufferSize is >= 0. 128 OnBackPressureReject(bufferSize uint) RequestReplyMessagePublisherBuilder 129 // OnBackPressureWait will set the publisher backpressure strategy to wait where publish 130 // attempts will block until there is space in the buffer of size bufferSize in number of messages. 131 // Valid bufferSize is >= 1. 132 OnBackPressureWait(bufferSize uint) RequestReplyMessagePublisherBuilder 133 // FromConfigurationProvider will configure the persistent publisher with the given properties. 134 // Built in PublisherPropertiesConfigurationProvider implementations include: 135 // PublisherPropertyMap, a map of PublisherProperty keys to values 136 FromConfigurationProvider(provider config.PublisherPropertiesConfigurationProvider) RequestReplyMessagePublisherBuilder 137 } 138