...

Source file src/solace.dev/go/messaging/pkg/solace/persistent_message_receiver.go

Documentation: solace.dev/go/messaging/pkg/solace

     1  // pubsubplus-go-client
     2  //
     3  // Copyright 2021-2024 Solace Corporation. All rights reserved.
     4  //
     5  // Licensed under the Apache License, Version 2.0 (the "License");
     6  // you may not use this file except in compliance with the License.
     7  // You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package solace
    18  
    19  import (
    20  	"time"
    21  
    22  	"solace.dev/go/messaging/pkg/solace/config"
    23  	"solace.dev/go/messaging/pkg/solace/message"
    24  	"solace.dev/go/messaging/pkg/solace/resource"
    25  )
    26  
    27  // PersistentMessageReceiver allows for receiving persistent message (guaranteed messages).
    28  type PersistentMessageReceiver interface {
    29  	MessageReceiver // Include all functionality of MessageReceiver.
    30  
    31  	// Ack acknowledges that a  message was received.
    32  	Ack(message message.InboundMessage) error
    33  
    34  	// StartAsyncCallback starts the PersistentMessageReceiver asynchronously.
    35  	// Calls the callback when started with an error if one occurred, otherwise nil
    36  	// if successful.
    37  	StartAsyncCallback(callback func(PersistentMessageReceiver, error))
    38  
    39  	// TerminateAsyncCallback terminates the PersistentMessageReceiver asynchronously.
    40  	// Calls the callback when terminated with nil if successful, otherwise an error if
    41  	// one occurred. If gracePeriod is less than 0, the function waits indefinitely.
    42  	TerminateAsyncCallback(gracePeriod time.Duration, callback func(error))
    43  
    44  	// ReceiveAsync registers a callback to be called when new messages
    45  	// are received. Returns an error if one occurred while registering the callback.
    46  	// If a callback is already registered, it is replaced by the specified
    47  	// callback.
    48  	ReceiveAsync(callback MessageHandler) error
    49  
    50  	// ReceiveMessage receives a message synchronously from the receiver.
    51  	// Returns an error if the receiver is not started or already terminated.
    52  	// This function waits until the specified timeout to receive a message or waits
    53  	// forever if timeout value is negative. If a timeout occurs, a solace.TimeoutError
    54  	// is returned.
    55  	ReceiveMessage(timeout time.Duration) (message.InboundMessage, error)
    56  
    57  	// Pause pauses the receiver's message delivery to asynchronous message handlers.
    58  	// Pausing an already paused receiver has no effect.
    59  	// Returns an IllegalStateError if the receiver has not started or has already terminated.
    60  	Pause() error
    61  
    62  	// Resume unpause the receiver's message delivery to asynchronous message handlers.
    63  	// Resume a receiver that is not paused has no effect.
    64  	// Returns an IllegalStateError if the receiver has not started or has already terminated.
    65  	Resume() error
    66  
    67  	// ReceiverInfo returns a runtime accessor for the receiver information such as the remote
    68  	// resource to which it connects.
    69  	// Returns an IllegalStateError if the receiver has not started or has already terminated.
    70  	ReceiverInfo() (info PersistentReceiverInfo, err error)
    71  }
    72  
    73  // PersistentMessageReceiverBuilder is used for configuration of PersistentMessageReceiver.
    74  type PersistentMessageReceiverBuilder interface {
    75  	// Build creates a  PersistentMessageReceiver with the specified properties.
    76  	// Returns solace/errors.*IllegalArgumentError if the queue is nil.
    77  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
    78  	Build(queue *resource.Queue) (receiver PersistentMessageReceiver, err error)
    79  
    80  	// WithActivationPassivationSupport sets the listener to receiver broker notifications
    81  	// about state changes for the resulting receiver. This change can happen if there are
    82  	// multiple instances of the same receiver for high availability and activity is exchanged.
    83  	// This change is handled by the broker.
    84  	WithActivationPassivationSupport(listener ReceiverStateChangeListener) PersistentMessageReceiverBuilder
    85  
    86  	// WithMessageAutoAcknowledgement enables automatic acknowledgement on all receiver methods.
    87  	// Auto Acknowledgement will be performed after an acknowledgement after the receive callback
    88  	// is called when using ReceiveAsync, or after the message is passed to the application when
    89  	// using ReceiveMessage. In cases where underlying network connectivity fails, automatic
    90  	// acknowledgement processing is not guaranteed.
    91  	WithMessageAutoAcknowledgement() PersistentMessageReceiverBuilder
    92  
    93  	// WithMessageClientAcknowledgement disables automatic acknowledgement on all receiver methods
    94  	// and instead enables support for client acknowledgement for both synchronous and asynchronous
    95  	// message delivery functions. New persistent receiver builders default to client acknowledgement.
    96  	WithMessageClientAcknowledgement() PersistentMessageReceiverBuilder
    97  
    98  	// WithMessageSelector sets the message selector to the specified string.
    99  	// If an empty string is provided, the filter is cleared.
   100  	WithMessageSelector(filterSelectorExpression string) PersistentMessageReceiverBuilder
   101  
   102  	// WithMissingResourcesCreationStrategy sets the missing resource creation strategy
   103  	// defining what actions the API may take when missing resources are detected.
   104  	WithMissingResourcesCreationStrategy(strategy config.MissingResourcesCreationStrategy) PersistentMessageReceiverBuilder
   105  
   106  	// WithMessageReplay enables support for message replay using a specific replay
   107  	// strategy. Once started, the receiver replays using the specified strategy.
   108  	// Valid strategies include config.ReplayStrategyAllMessages(),
   109  	// config.ReplayStrategyTimeBased and config.ReplicationGroupMessageIDReplayStrategy.
   110  	WithMessageReplay(strategy config.ReplayStrategy) PersistentMessageReceiverBuilder
   111  
   112  	// WithSubscriptions sets a list of TopicSubscriptions to subscribe
   113  	// to when starting the receiver. Accepts *resource.TopicSubscription subscriptions.
   114  	WithSubscriptions(topics ...resource.Subscription) PersistentMessageReceiverBuilder
   115  
   116  	// FromConfigurationProvider configures the persistent receiver with the specified properties.
   117  	// The built-in ReceiverPropertiesConfigurationProvider implementations include:
   118  	//   ReceiverPropertyMap, a map of ReceiverProperty keys to values
   119  	FromConfigurationProvider(provider config.ReceiverPropertiesConfigurationProvider) PersistentMessageReceiverBuilder
   120  }
   121  
   122  // ReceiverState represents the various states the receiver can be in, such as Active or Passive.
   123  // This property is controlled by the remote broker.
   124  type ReceiverState byte
   125  
   126  const (
   127  	// ReceiverActive is the state in which the receiver receives messages from a broker.
   128  	ReceiverActive ReceiverState = iota
   129  	// ReceiverPassive is the state in which the receiver would not receive messages from a broker.
   130  	// Often, this is because another instance of the receiver became active, so this instance became
   131  	// passive.
   132  	ReceiverPassive
   133  )
   134  
   135  // ReceiverStateChangeListener is a listener that can be registered on a receiver to be
   136  // notified of changes in receiver state by the remote broker.
   137  type ReceiverStateChangeListener func(oldState ReceiverState, newState ReceiverState, timestamp time.Time)
   138  
   139  // PersistentReceiverInfo provides information about the receiver at runtime.
   140  type PersistentReceiverInfo interface {
   141  	// GetResourceInfo returns the remote endpoint (resource) information for the receiver.
   142  	GetResourceInfo() ResourceInfo
   143  }
   144  
   145  // ResourceInfo provides information about a resouce at runtime.
   146  type ResourceInfo interface {
   147  	// GetName returns the name of the resource.
   148  	GetName() string
   149  	// IsDurable returns true is a resource is durable, otherwise false.
   150  	IsDurable() bool
   151  }
   152