1 // pubsubplus-go-client 2 // 3 // Copyright 2021-2025 Solace Corporation. All rights reserved. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 package solace 18 19 import ( 20 "time" 21 22 "solace.dev/go/messaging/pkg/solace/config" 23 "solace.dev/go/messaging/pkg/solace/message" 24 "solace.dev/go/messaging/pkg/solace/resource" 25 ) 26 27 // PersistentMessageReceiver allows for receiving persistent message (guaranteed messages). 28 type PersistentMessageReceiver interface { 29 MessageReceiver // Include all functionality of MessageReceiver. 30 31 // Ack acknowledges that a message was received. 32 // This method is equivalent to calling the settle method with 33 // the ACCEPTED outcome like this: PersistentMessageReceiver.Settle(inboundMessage, config.PersistentReceiverAcceptedOutcome) 34 Ack(message message.InboundMessage) error 35 36 // Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as 37 // indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED, 38 // the receiver has to have been preconfigured via its builder to support these settlement outcomes. 39 // Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED 40 // (albeit it counts as a manual ACCEPTED in the stats), raises error for FAILED and REJECTED. 41 // this returns an error object with the reason for the error if it was not possible to settle the message. 42 Settle(message message.InboundMessage, outcome config.MessageSettlementOutcome) error 43 44 // StartAsyncCallback starts the PersistentMessageReceiver asynchronously. 45 // Calls the callback when started with an error if one occurred, otherwise nil 46 // if successful. 47 StartAsyncCallback(callback func(PersistentMessageReceiver, error)) 48 49 // TerminateAsyncCallback terminates the PersistentMessageReceiver asynchronously. 50 // Calls the callback when terminated with nil if successful, otherwise an error if 51 // one occurred. If gracePeriod is less than 0, the function waits indefinitely. 52 TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) 53 54 // ReceiveAsync registers a callback to be called when new messages 55 // are received. Returns an error if one occurred while registering the callback. 56 // If a callback is already registered, it is replaced by the specified 57 // callback. 58 ReceiveAsync(callback MessageHandler) error 59 60 // ReceiveMessage receives a message synchronously from the receiver. 61 // Returns an error if the receiver is not started or already terminated. 62 // This function waits until the specified timeout to receive a message or waits 63 // forever if timeout value is negative. If a timeout occurs, a solace.TimeoutError 64 // is returned. 65 ReceiveMessage(timeout time.Duration) (message.InboundMessage, error) 66 67 // Pause pauses the receiver's message delivery to asynchronous message handlers. 68 // Pausing an already paused receiver has no effect. 69 // Returns an IllegalStateError if the receiver has not started or has already terminated. 70 Pause() error 71 72 // Resume unpause the receiver's message delivery to asynchronous message handlers. 73 // Resume a receiver that is not paused has no effect. 74 // Returns an IllegalStateError if the receiver has not started or has already terminated. 75 Resume() error 76 77 // ReceiverInfo returns a runtime accessor for the receiver information such as the remote 78 // resource to which it connects. 79 // Returns an IllegalStateError if the receiver has not started or has already terminated. 80 ReceiverInfo() (info PersistentReceiverInfo, err error) 81 } 82 83 // PersistentMessageReceiverBuilder is used for configuration of PersistentMessageReceiver. 84 type PersistentMessageReceiverBuilder interface { 85 // Build creates a PersistentMessageReceiver with the specified properties. 86 // Returns solace/errors.*IllegalArgumentError if the queue is nil. 87 // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. 88 Build(queue *resource.Queue) (receiver PersistentMessageReceiver, err error) 89 90 // WithActivationPassivationSupport sets the listener to receiver broker notifications 91 // about state changes for the resulting receiver. This change can happen if there are 92 // multiple instances of the same receiver for high availability and activity is exchanged. 93 // This change is handled by the broker. 94 WithActivationPassivationSupport(listener ReceiverStateChangeListener) PersistentMessageReceiverBuilder 95 96 // WithMessageAutoAcknowledgement enables automatic acknowledgement on all receiver methods. 97 // Auto Acknowledgement will be performed after an acknowledgement after the receive callback 98 // is called when using ReceiveAsync, or after the message is passed to the application when 99 // using ReceiveMessage. In cases where underlying network connectivity fails, automatic 100 // acknowledgement processing is not guaranteed. 101 WithMessageAutoAcknowledgement() PersistentMessageReceiverBuilder 102 103 // WithMessageClientAcknowledgement disables automatic acknowledgement on all receiver methods 104 // and instead enables support for client acknowledgement for both synchronous and asynchronous 105 // message delivery functions. New persistent receiver builders default to client acknowledgement. 106 WithMessageClientAcknowledgement() PersistentMessageReceiverBuilder 107 108 // WithMessageSelector sets the message selector to the specified string. 109 // If an empty string is provided, the filter is cleared. 110 WithMessageSelector(filterSelectorExpression string) PersistentMessageReceiverBuilder 111 112 // WithMissingResourcesCreationStrategy sets the missing resource creation strategy 113 // defining what actions the API may take when missing resources are detected. 114 WithMissingResourcesCreationStrategy(strategy config.MissingResourcesCreationStrategy) PersistentMessageReceiverBuilder 115 116 // WithMessageReplay enables support for message replay using a specific replay 117 // strategy. Once started, the receiver replays using the specified strategy. 118 // Valid strategies include config.ReplayStrategyAllMessages(), 119 // config.ReplayStrategyTimeBased and config.ReplicationGroupMessageIDReplayStrategy. 120 WithMessageReplay(strategy config.ReplayStrategy) PersistentMessageReceiverBuilder 121 122 // WithSubscriptions sets a list of TopicSubscriptions to subscribe 123 // to when starting the receiver. Accepts *resource.TopicSubscription subscriptions. 124 WithSubscriptions(topics ...resource.Subscription) PersistentMessageReceiverBuilder 125 126 // WithRequiredMessageOutcomeSupport configures the types of settlements the receiver can use. 127 // Any combination of PersistentReceiverAcceptedOutcome, PersistentReceiverFailedOutcome, and 128 // PersistentReceiverRejectedOutcome; the order is irrelevant. 129 // Attempting to Settle() a message later with an Outcome not listed here may result in an error. 130 WithRequiredMessageOutcomeSupport(messageSettlementOutcomes ...config.MessageSettlementOutcome) PersistentMessageReceiverBuilder 131 132 // FromConfigurationProvider configures the persistent receiver with the specified properties. 133 // The built-in ReceiverPropertiesConfigurationProvider implementations include: 134 // ReceiverPropertyMap, a map of ReceiverProperty keys to values 135 FromConfigurationProvider(provider config.ReceiverPropertiesConfigurationProvider) PersistentMessageReceiverBuilder 136 } 137 138 // ReceiverState represents the various states the receiver can be in, such as Active or Passive. 139 // This property is controlled by the remote broker. 140 type ReceiverState byte 141 142 const ( 143 // ReceiverActive is the state in which the receiver receives messages from a broker. 144 ReceiverActive ReceiverState = iota 145 // ReceiverPassive is the state in which the receiver would not receive messages from a broker. 146 // Often, this is because another instance of the receiver became active, so this instance became 147 // passive. 148 ReceiverPassive 149 ) 150 151 // ReceiverStateChangeListener is a listener that can be registered on a receiver to be 152 // notified of changes in receiver state by the remote broker. 153 type ReceiverStateChangeListener func(oldState ReceiverState, newState ReceiverState, timestamp time.Time) 154 155 // PersistentReceiverInfo provides information about the receiver at runtime. 156 type PersistentReceiverInfo interface { 157 // GetResourceInfo returns the remote endpoint (resource) information for the receiver. 158 GetResourceInfo() ResourceInfo 159 } 160 161 // ResourceInfo provides information about a resouce at runtime. 162 type ResourceInfo interface { 163 // GetName returns the name of the resource. 164 GetName() string 165 // IsDurable returns true is a resource is durable, otherwise false. 166 IsDurable() bool 167 } 168