1 package solace 2 3 import ( 4 "time" 5 6 "solace.dev/go/messaging/pkg/solace/config" 7 "solace.dev/go/messaging/pkg/solace/message" 8 "solace.dev/go/messaging/pkg/solace/resource" 9 ) 10 11 // RequestReplyMessagePublisher allows for publishing of request-reply messages 12 // with handling for reply messages. 13 type RequestReplyMessagePublisher interface { 14 MessagePublisher 15 MessagePublisherHealthCheck 16 17 // StartAsyncCallback will start the RequestReplyMessagePublisher asynchronously. 18 // Before this function is called, the service is considered 19 // off-duty. To operate normally, this function must be called on 20 // the RequestReplyMessageReceiver instance. This function is idempotent. 21 // Returns immediately and will call the callback function when ready 22 // passing the started RequestReplyMessageReceiver instance, or nil and 23 // an error if one occurred. Subsequent calls will register additional 24 // callbacks that will be called immediately if already started. 25 StartAsyncCallback(callback func(RequestReplyMessagePublisher, error)) 26 27 // TerminateAsyncCallback will terminate the message publisher asynchronously. 28 // This function is idempotent. The only way to resume operation 29 // after this function is called is to create a new instance. 30 // Any attempt to call this function renders the instance 31 // permanently terminated, even if this function completes. 32 // A graceful shutdown will be attempted within the grace period. 33 // A grace period of 0 implies a non-graceful shutdown that ignores 34 // unfinished tasks or in-flight messages. 35 // Returns immediately and registers a callback that will receive an 36 // error if one occurred or nil if successfully and gracefully terminated. 37 // If gracePeriod is less than 0, the function will wait indefinitely. 38 TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) 39 40 // PublishBytes sends a request for a reply of type byte array to the specified destination. 41 // The API will handle correlation of messages so no additional work is requried. 42 // Takes a requestMessage to send, a replyMessageHandler function to handle the 43 // response, a requestsDestination to deliver the requestMessage to, a replyTimeout 44 // indicating the maximum wait time for a response message and an optional 45 // userContext object given to the replyMessageHandler (may be nil). 46 // Returns an error if one occurred. If replyTimeout is less than 0, the function 47 // will wait indefinitely. Possible errors include: 48 // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. 49 // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O 50 // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners 51 // are called. 52 PublishBytes(message []byte, replyMessageHandler ReplyMessageHandler, destination *resource.Topic, replyTimeout time.Duration, userContext interface{}) error 53 54 // PublishString sends a request for a reply of type string to the specified destination. 55 // The API will handle correlation of messages so no additional work is requried. 56 // Takes a requestMessage to send, a replyMessageHandler function to handle the 57 // response, a requestsDestination to deliver the requestMessage to, a replyTimeout 58 // indicating the maximum wait time for a response message and an optional 59 // userContext object given to the replyMessageHandler (may be nil). 60 // Returns an error if one occurred. If replyTimeout is less than 0, the function 61 // will wait indefinitely. Possible errors include: 62 // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. 63 // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O 64 // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners 65 // are called. 66 PublishString(message string, replyMessageHandler ReplyMessageHandler, destination *resource.Topic, replyTimeout time.Duration, userContext interface{}) error 67 68 // Publish sends a request for a reply non-blocking with optional user context. 69 // The API will handle correlation of messages so no additional work is requried. 70 // Takes a requestMessage to send, a replyMessageHandler function to handle the 71 // response, a requestsDestination to deliver the requestMessage to, a replyTimeout 72 // indicating the maximum wait time for a response message and an optional 73 // userContext object given to the replyMessageHandler (may be nil). 74 // Returns an error if one occurred. If replyTimeout is less than 0, the function 75 // will wait indefinitely. Possible errors include: 76 // - solace/errors.*PubSubPlusClientError if the message could not be sent and all retry attempts failed. 77 // - solace/errors.*PublisherOverflowError if publishing messages faster than publisher's I/O 78 // capabilities allow. When publishing can be resumed, registered PublisherReadinessListeners 79 // will be called. 80 Publish(requestMessage message.OutboundMessage, replyMessageHandler ReplyMessageHandler, 81 requestsDestination *resource.Topic, replyTimeout time.Duration, 82 properties config.MessagePropertiesConfigurationProvider, userContext interface{}) error 83 84 // PublishAwaitResponse will send a request for a reply blocking until a response is 85 // received. The API will handle correlation of messages so no additional work is required. 86 // Takes a requestMessage to send, a requestDestination to deliver the requestMessage to, 87 // and a replyTimeout indicating the maximum wait time for a response message. 88 // Will return the response and an error if one occurred. If replyTimeout is less than 0, 89 // the function will wait indefinitely. Possible errors include: 90 // - solace/errors.*PubSubPlusClientError if the message could not be sent and all retry attempts failed. 91 // - solace/errors.*PublisherOverflowError if publishing messages faster than publisher's I/O 92 // capabilities allow. When publishing can be resumed, registered PublisherReadinessListeners 93 // will be called. 94 PublishAwaitResponse(requestMessage message.OutboundMessage, requestDestination *resource.Topic, 95 replyTimeout time.Duration, properties config.MessagePropertiesConfigurationProvider) (message.InboundMessage, error) 96 } 97 98 // ReplyMessageHandler is a callback to handle a reply message. The function will be 99 // called with a message received or nil, the user context if it was set when calling 100 // RequestReplyMessagePublisher.Publish, and an error if one was thrown. 101 type ReplyMessageHandler func(message message.InboundMessage, userContext interface{}, err error) 102 103 // RequestReplyMessagePublisherBuilder allows for configuration of request reply message publisher instances 104 type RequestReplyMessagePublisherBuilder interface { 105 // Build will build a new RequestReplyMessagePublisher instance based on the configured properties. 106 // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. 107 Build() (messagePublisher RequestReplyMessagePublisher, err error) 108 // OnBackPressureReject will set the publisher backpressure strategy to reject 109 // where publish attempts will be rejected once the bufferSize, in number of messages, is reached. 110 // If bufferSize is 0, an error will be thrown when the transport is full when publishing. 111 // Valid bufferSize is >= 0. 112 OnBackPressureReject(bufferSize uint) RequestReplyMessagePublisherBuilder 113 // OnBackPressureWait will set the publisher backpressure strategy to wait where publish 114 // attempts will block until there is space in the buffer of size bufferSize in number of messages. 115 // Valid bufferSize is >= 1. 116 OnBackPressureWait(bufferSize uint) RequestReplyMessagePublisherBuilder 117 // FromConfigurationProvider will configure the persistent publisher with the given properties. 118 // Built in PublisherPropertiesConfigurationProvider implementations include: 119 // PublisherPropertyMap, a map of PublisherProperty keys to values 120 FromConfigurationProvider(provider config.PublisherPropertiesConfigurationProvider) RequestReplyMessagePublisherBuilder 121 } 122