1 // pubsubplus-go-client 2 // 3 // Copyright 2021-2024 Solace Corporation. All rights reserved. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 package solace 18 19 import ( 20 "time" 21 22 "solace.dev/go/messaging/pkg/solace/config" 23 "solace.dev/go/messaging/pkg/solace/message" 24 "solace.dev/go/messaging/pkg/solace/resource" 25 ) 26 27 // PersistentMessageReceiver allows for receiving persistent message (guaranteed messages). 28 type PersistentMessageReceiver interface { 29 MessageReceiver // Include all functionality of MessageReceiver. 30 31 // Ack acknowledges that a message was received. 32 Ack(message message.InboundMessage) error 33 34 // StartAsyncCallback starts the PersistentMessageReceiver asynchronously. 35 // Calls the callback when started with an error if one occurred, otherwise nil 36 // if successful. 37 StartAsyncCallback(callback func(PersistentMessageReceiver, error)) 38 39 // TerminateAsyncCallback terminates the PersistentMessageReceiver asynchronously. 40 // Calls the callback when terminated with nil if successful, otherwise an error if 41 // one occurred. If gracePeriod is less than 0, the function waits indefinitely. 42 TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) 43 44 // ReceiveAsync registers a callback to be called when new messages 45 // are received. Returns an error if one occurred while registering the callback. 46 // If a callback is already registered, it is replaced by the specified 47 // callback. 48 ReceiveAsync(callback MessageHandler) error 49 50 // ReceiveMessage receives a message synchronously from the receiver. 51 // Returns an error if the receiver is not started or already terminated. 52 // This function waits until the specified timeout to receive a message or waits 53 // forever if timeout value is negative. If a timeout occurs, a solace.TimeoutError 54 // is returned. 55 ReceiveMessage(timeout time.Duration) (message.InboundMessage, error) 56 57 // Pause pauses the receiver's message delivery to asynchronous message handlers. 58 // Pausing an already paused receiver has no effect. 59 // Returns an IllegalStateError if the receiver has not started or has already terminated. 60 Pause() error 61 62 // Resume unpause the receiver's message delivery to asynchronous message handlers. 63 // Resume a receiver that is not paused has no effect. 64 // Returns an IllegalStateError if the receiver has not started or has already terminated. 65 Resume() error 66 67 // ReceiverInfo returns a runtime accessor for the receiver information such as the remote 68 // resource to which it connects. 69 // Returns an IllegalStateError if the receiver has not started or has already terminated. 70 ReceiverInfo() (info PersistentReceiverInfo, err error) 71 } 72 73 // PersistentMessageReceiverBuilder is used for configuration of PersistentMessageReceiver. 74 type PersistentMessageReceiverBuilder interface { 75 // Build creates a PersistentMessageReceiver with the specified properties. 76 // Returns solace/errors.*IllegalArgumentError if the queue is nil. 77 // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. 78 Build(queue *resource.Queue) (receiver PersistentMessageReceiver, err error) 79 80 // WithActivationPassivationSupport sets the listener to receiver broker notifications 81 // about state changes for the resulting receiver. This change can happen if there are 82 // multiple instances of the same receiver for high availability and activity is exchanged. 83 // This change is handled by the broker. 84 WithActivationPassivationSupport(listener ReceiverStateChangeListener) PersistentMessageReceiverBuilder 85 86 // WithMessageAutoAcknowledgement enables automatic acknowledgement on all receiver methods. 87 // Auto Acknowledgement will be performed after an acknowledgement after the receive callback 88 // is called when using ReceiveAsync, or after the message is passed to the application when 89 // using ReceiveMessage. In cases where underlying network connectivity fails, automatic 90 // acknowledgement processing is not guaranteed. 91 WithMessageAutoAcknowledgement() PersistentMessageReceiverBuilder 92 93 // WithMessageClientAcknowledgement disables automatic acknowledgement on all receiver methods 94 // and instead enables support for client acknowledgement for both synchronous and asynchronous 95 // message delivery functions. New persistent receiver builders default to client acknowledgement. 96 WithMessageClientAcknowledgement() PersistentMessageReceiverBuilder 97 98 // WithMessageSelector sets the message selector to the specified string. 99 // If an empty string is provided, the filter is cleared. 100 WithMessageSelector(filterSelectorExpression string) PersistentMessageReceiverBuilder 101 102 // WithMissingResourcesCreationStrategy sets the missing resource creation strategy 103 // defining what actions the API may take when missing resources are detected. 104 WithMissingResourcesCreationStrategy(strategy config.MissingResourcesCreationStrategy) PersistentMessageReceiverBuilder 105 106 // WithMessageReplay enables support for message replay using a specific replay 107 // strategy. Once started, the receiver replays using the specified strategy. 108 // Valid strategies include config.ReplayStrategyAllMessages(), 109 // config.ReplayStrategyTimeBased and config.ReplicationGroupMessageIDReplayStrategy. 110 WithMessageReplay(strategy config.ReplayStrategy) PersistentMessageReceiverBuilder 111 112 // WithSubscriptions sets a list of TopicSubscriptions to subscribe 113 // to when starting the receiver. Accepts *resource.TopicSubscription subscriptions. 114 WithSubscriptions(topics ...resource.Subscription) PersistentMessageReceiverBuilder 115 116 // FromConfigurationProvider configures the persistent receiver with the specified properties. 117 // The built-in ReceiverPropertiesConfigurationProvider implementations include: 118 // ReceiverPropertyMap, a map of ReceiverProperty keys to values 119 FromConfigurationProvider(provider config.ReceiverPropertiesConfigurationProvider) PersistentMessageReceiverBuilder 120 } 121 122 // ReceiverState represents the various states the receiver can be in, such as Active or Passive. 123 // This property is controlled by the remote broker. 124 type ReceiverState byte 125 126 const ( 127 // ReceiverActive is the state in which the receiver receives messages from a broker. 128 ReceiverActive ReceiverState = iota 129 // ReceiverPassive is the state in which the receiver would not receive messages from a broker. 130 // Often, this is because another instance of the receiver became active, so this instance became 131 // passive. 132 ReceiverPassive 133 ) 134 135 // ReceiverStateChangeListener is a listener that can be registered on a receiver to be 136 // notified of changes in receiver state by the remote broker. 137 type ReceiverStateChangeListener func(oldState ReceiverState, newState ReceiverState, timestamp time.Time) 138 139 // PersistentReceiverInfo provides information about the receiver at runtime. 140 type PersistentReceiverInfo interface { 141 // GetResourceInfo returns the remote endpoint (resource) information for the receiver. 142 GetResourceInfo() ResourceInfo 143 } 144 145 // ResourceInfo provides information about a resouce at runtime. 146 type ResourceInfo interface { 147 // GetName returns the name of the resource. 148 GetName() string 149 // IsDurable returns true is a resource is durable, otherwise false. 150 IsDurable() bool 151 } 152