1 package solace 2 3 import ( 4 "time" 5 6 "solace.dev/go/messaging/pkg/solace/config" 7 "solace.dev/go/messaging/pkg/solace/message" 8 "solace.dev/go/messaging/pkg/solace/resource" 9 ) 10 11 // Replier allows for received request-reply messages to be replied to. The 12 // destination of these messages is automatically determined by the 13 // InboundMessage passed to a RequestMessageHandler. 14 type Replier interface { 15 // Reply publishes a reply or response message. 16 Reply(message message.OutboundMessage) error 17 } 18 19 // RequestMessageHandler is a callback called when a message is received. 20 // It is passed the request message as well as a replier allowing for the 21 // publishing of a reply message. The replier argument may be nil indicating 22 // that a NON-Request-Reply message has been received on the topic subscription 23 // given when building the RequestReplyMessageReceiver instance. 24 type RequestMessageHandler func(message message.InboundMessage, replier Replier) 25 26 // RequestReplyMessageReceiver allows receiving of request-reply messages 27 // with handling for sending reply messages. 28 type RequestReplyMessageReceiver interface { 29 MessageReceiver 30 31 // StartAsyncCallback will start the message receiver asynchronously. 32 // Before this function is called, the service is considered 33 // off-duty. To operate normally, this function must be called on 34 // the RequestReplyMessageReceiver instance. This function is idempotent. 35 // Returns immediately and will call the callback function when ready 36 // passing the started RequestReplyMessageReceiver instance, or nil and 37 // an error if one occurred. Subsequent calls will register additional 38 // callbacks that will be called immediately if already started. 39 StartAsyncCallback(callback func(RequestReplyMessageReceiver, error)) 40 41 // TerminateAsyncCallback will terminate the message receiver asynchronously. 42 // This function is idempotent. The only way to resume operation 43 // after this function is called is to create a new instance. 44 // Any attempt to call this function renders the instance 45 // permanently terminated, even if this function completes. 46 // A graceful shutdown will be attempted within the grace period. 47 // A grace period of 0 implies a non-graceful shutdown that ignores 48 // unfinished tasks or in-flight messages. 49 // Returns immediately and registers a callback that will receive an 50 // error if one occurred or nil if successfully and gracefully terminated. 51 // If gracePeriod is less than 0, the function will wait indefinitely. 52 TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) 53 54 // ReceiveAsync registers an asynchronous message handler. The given 55 // messageHandler will handle an ordered sequence of inbound request messages. 56 // This function is mutually exclusive to ReceiveMessage. 57 // Returns an error one occurred while registering the callback. 58 // If a callback is already registered, it will be replaced by the given 59 // callback. 60 ReceiveAsync(messageHandler RequestMessageHandler) error 61 62 // ReceiveMessage receives a message and replier synchronously from the receiver. 63 // Returns a nil replier if the message can not be replied to. 64 // Returns an error if the receiver is not started or already terminated. 65 // This function waits until the specified timeout to receive a message or waits 66 // forever if timeout value is negative. If a timeout occurs, a solace.TimeoutError 67 // is returned. 68 ReceiveMessage(timeout time.Duration) (message.InboundMessage, Replier, error) 69 } 70 71 // RequestReplyMessageReceiverBuilder allows for configuration of RequestReplyMessageReceiver instances 72 type RequestReplyMessageReceiverBuilder interface { 73 // Build will build a new RequestReplyMessageReceiver with the given properties. 74 // The message receiver will subscribe to the specified topic subscription. 75 // Accepts TopicSubscription instances as Subscriptions. See solace.TopicSubscriptionOf. 76 // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. 77 Build(requestTopicSubscription resource.Subscription) (messageReceiver RequestReplyMessageReceiver, err error) 78 // BuildWithSharedSubscription will build a new RequestReplyMessageReceiver with 79 // the given properties using a shared topic subscription and the shared name. 80 BuildWithSharedSubscription(requestTopicSubscription resource.Subscription, shareName *resource.ShareName) (messageReceiver RequestReplyMessageReceiver, err error) 81 // OnBackPressureDropLatest configures the receiver with the specified buffer size. If the buffer 82 // is full and a message arrives, the incoming message is discarded. 83 // A buffer of the given size will be statically allocated when the receiver is built. 84 // The bufferCapacity must be greater than or equal to 1. 85 OnBackPressureDropLatest(bufferCapacity uint) RequestReplyMessageReceiverBuilder 86 // OnBackPressureDropOldest configures the receiver with the specified buffer size, bufferCapacity. If the buffer 87 // is full and a message arrives, the oldest message in the buffer is discarded. 88 // A buffer of the given size will be statically allocated when the receiver is built. 89 // The value of bufferCapacity must be greater than or equal to 1. 90 OnBackPressureDropOldest(bufferCapacity uint) RequestReplyMessageReceiverBuilder 91 // FromConfigurationProvider will configure the request reply receiver with the given properties. 92 // Built in ReceiverPropertiesConfigurationProvider implementations include: 93 // ReceiverPropertyMap, a map of ReceiverProperty keys to values 94 FromConfigurationProvider(provider config.ReceiverPropertiesConfigurationProvider) RequestReplyMessageReceiverBuilder 95 } 96