...

Source file src/solace.dev/go/messaging/pkg/solace/persistent_message_receiver.go

Documentation: solace.dev/go/messaging/pkg/solace

     1  // pubsubplus-go-client
     2  //
     3  // Copyright 2021-2025 Solace Corporation. All rights reserved.
     4  //
     5  // Licensed under the Apache License, Version 2.0 (the "License");
     6  // you may not use this file except in compliance with the License.
     7  // You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package solace
    18  
    19  import (
    20  	"time"
    21  
    22  	"solace.dev/go/messaging/pkg/solace/config"
    23  	"solace.dev/go/messaging/pkg/solace/message"
    24  	"solace.dev/go/messaging/pkg/solace/resource"
    25  )
    26  
    27  // PersistentMessageReceiver allows for receiving persistent message (guaranteed messages).
    28  type PersistentMessageReceiver interface {
    29  	MessageReceiver // Include all functionality of MessageReceiver.
    30  
    31  	// Ack acknowledges that a  message was received.
    32  	// This method is equivalent to calling the settle method with
    33  	// the ACCEPTED outcome like this: PersistentMessageReceiver.Settle(inboundMessage, config.PersistentReceiverAcceptedOutcome)
    34  	Ack(message message.InboundMessage) error
    35  
    36  	// Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as
    37  	// indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED,
    38  	// the receiver has to have been preconfigured via its builder to support these settlement outcomes.
    39  	// Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED
    40  	// (albeit it counts as a manual ACCEPTED in the stats), raises error for FAILED and REJECTED.
    41  	// this returns an error object with the reason for the error if it was not possible to settle the message.
    42  	Settle(message message.InboundMessage, outcome config.MessageSettlementOutcome) error
    43  
    44  	// StartAsyncCallback starts the PersistentMessageReceiver asynchronously.
    45  	// Calls the callback when started with an error if one occurred, otherwise nil
    46  	// if successful.
    47  	StartAsyncCallback(callback func(PersistentMessageReceiver, error))
    48  
    49  	// TerminateAsyncCallback terminates the PersistentMessageReceiver asynchronously.
    50  	// Calls the callback when terminated with nil if successful, otherwise an error if
    51  	// one occurred. If gracePeriod is less than 0, the function waits indefinitely.
    52  	TerminateAsyncCallback(gracePeriod time.Duration, callback func(error))
    53  
    54  	// ReceiveAsync registers a callback to be called when new messages
    55  	// are received. Returns an error if one occurred while registering the callback.
    56  	// If a callback is already registered, it is replaced by the specified
    57  	// callback.
    58  	ReceiveAsync(callback MessageHandler) error
    59  
    60  	// ReceiveMessage receives a message synchronously from the receiver.
    61  	// Returns an error if the receiver is not started or already terminated.
    62  	// This function waits until the specified timeout to receive a message or waits
    63  	// forever if timeout value is negative. If a timeout occurs, a solace.TimeoutError
    64  	// is returned.
    65  	ReceiveMessage(timeout time.Duration) (message.InboundMessage, error)
    66  
    67  	// Pause pauses the receiver's message delivery to asynchronous message handlers.
    68  	// Pausing an already paused receiver has no effect.
    69  	// Returns an IllegalStateError if the receiver has not started or has already terminated.
    70  	Pause() error
    71  
    72  	// Resume unpause the receiver's message delivery to asynchronous message handlers.
    73  	// Resume a receiver that is not paused has no effect.
    74  	// Returns an IllegalStateError if the receiver has not started or has already terminated.
    75  	Resume() error
    76  
    77  	// ReceiverInfo returns a runtime accessor for the receiver information such as the remote
    78  	// resource to which it connects.
    79  	// Returns an IllegalStateError if the receiver has not started or has already terminated.
    80  	ReceiverInfo() (info PersistentReceiverInfo, err error)
    81  }
    82  
    83  // PersistentMessageReceiverBuilder is used for configuration of PersistentMessageReceiver.
    84  type PersistentMessageReceiverBuilder interface {
    85  	// Build creates a  PersistentMessageReceiver with the specified properties.
    86  	// Returns solace/errors.*IllegalArgumentError if the queue is nil.
    87  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
    88  	Build(queue *resource.Queue) (receiver PersistentMessageReceiver, err error)
    89  
    90  	// WithActivationPassivationSupport sets the listener to receiver broker notifications
    91  	// about state changes for the resulting receiver. This change can happen if there are
    92  	// multiple instances of the same receiver for high availability and activity is exchanged.
    93  	// This change is handled by the broker.
    94  	WithActivationPassivationSupport(listener ReceiverStateChangeListener) PersistentMessageReceiverBuilder
    95  
    96  	// WithMessageAutoAcknowledgement enables automatic acknowledgement on all receiver methods.
    97  	// Auto Acknowledgement will be performed after an acknowledgement after the receive callback
    98  	// is called when using ReceiveAsync, or after the message is passed to the application when
    99  	// using ReceiveMessage. In cases where underlying network connectivity fails, automatic
   100  	// acknowledgement processing is not guaranteed.
   101  	WithMessageAutoAcknowledgement() PersistentMessageReceiverBuilder
   102  
   103  	// WithMessageClientAcknowledgement disables automatic acknowledgement on all receiver methods
   104  	// and instead enables support for client acknowledgement for both synchronous and asynchronous
   105  	// message delivery functions. New persistent receiver builders default to client acknowledgement.
   106  	WithMessageClientAcknowledgement() PersistentMessageReceiverBuilder
   107  
   108  	// WithMessageSelector sets the message selector to the specified string.
   109  	// If an empty string is provided, the filter is cleared.
   110  	WithMessageSelector(filterSelectorExpression string) PersistentMessageReceiverBuilder
   111  
   112  	// WithMissingResourcesCreationStrategy sets the missing resource creation strategy
   113  	// defining what actions the API may take when missing resources are detected.
   114  	WithMissingResourcesCreationStrategy(strategy config.MissingResourcesCreationStrategy) PersistentMessageReceiverBuilder
   115  
   116  	// WithMessageReplay enables support for message replay using a specific replay
   117  	// strategy. Once started, the receiver replays using the specified strategy.
   118  	// Valid strategies include config.ReplayStrategyAllMessages(),
   119  	// config.ReplayStrategyTimeBased and config.ReplicationGroupMessageIDReplayStrategy.
   120  	WithMessageReplay(strategy config.ReplayStrategy) PersistentMessageReceiverBuilder
   121  
   122  	// WithSubscriptions sets a list of TopicSubscriptions to subscribe
   123  	// to when starting the receiver. Accepts *resource.TopicSubscription subscriptions.
   124  	WithSubscriptions(topics ...resource.Subscription) PersistentMessageReceiverBuilder
   125  
   126  	// WithRequiredMessageOutcomeSupport configures the types of settlements the receiver can use.
   127  	// Any combination of PersistentReceiverAcceptedOutcome, PersistentReceiverFailedOutcome, and
   128  	// PersistentReceiverRejectedOutcome; the order is irrelevant.
   129  	// Attempting to Settle() a message later with an Outcome not listed here may result in an error.
   130  	WithRequiredMessageOutcomeSupport(messageSettlementOutcomes ...config.MessageSettlementOutcome) PersistentMessageReceiverBuilder
   131  
   132  	// FromConfigurationProvider configures the persistent receiver with the specified properties.
   133  	// The built-in ReceiverPropertiesConfigurationProvider implementations include:
   134  	//   ReceiverPropertyMap, a map of ReceiverProperty keys to values
   135  	FromConfigurationProvider(provider config.ReceiverPropertiesConfigurationProvider) PersistentMessageReceiverBuilder
   136  }
   137  
   138  // ReceiverState represents the various states the receiver can be in, such as Active or Passive.
   139  // This property is controlled by the remote broker.
   140  type ReceiverState byte
   141  
   142  const (
   143  	// ReceiverActive is the state in which the receiver receives messages from a broker.
   144  	ReceiverActive ReceiverState = iota
   145  	// ReceiverPassive is the state in which the receiver would not receive messages from a broker.
   146  	// Often, this is because another instance of the receiver became active, so this instance became
   147  	// passive.
   148  	ReceiverPassive
   149  )
   150  
   151  // ReceiverStateChangeListener is a listener that can be registered on a receiver to be
   152  // notified of changes in receiver state by the remote broker.
   153  type ReceiverStateChangeListener func(oldState ReceiverState, newState ReceiverState, timestamp time.Time)
   154  
   155  // PersistentReceiverInfo provides information about the receiver at runtime.
   156  type PersistentReceiverInfo interface {
   157  	// GetResourceInfo returns the remote endpoint (resource) information for the receiver.
   158  	GetResourceInfo() ResourceInfo
   159  }
   160  
   161  // ResourceInfo provides information about a resouce at runtime.
   162  type ResourceInfo interface {
   163  	// GetName returns the name of the resource.
   164  	GetName() string
   165  	// IsDurable returns true is a resource is durable, otherwise false.
   166  	IsDurable() bool
   167  }
   168