...

Source file src/solace.dev/go/messaging/pkg/solace/messaging_service.go

Documentation: solace.dev/go/messaging/pkg/solace

     1  // pubsubplus-go-client
     2  //
     3  // Copyright 2021-2024 Solace Corporation. All rights reserved.
     4  //
     5  // Licensed under the Apache License, Version 2.0 (the "License");
     6  // you may not use this file except in compliance with the License.
     7  // You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package solace
    18  
    19  import (
    20  	"time"
    21  
    22  	"solace.dev/go/messaging/pkg/solace/config"
    23  	"solace.dev/go/messaging/pkg/solace/metrics"
    24  )
    25  
    26  // MessagingService represents a broker that provides a messaging service.
    27  type MessagingService interface {
    28  
    29  	// Connect connects the messaging service.
    30  	// This function blocks until the connection attempt is completed.
    31  	// Returns nil if successful, otherwise an error containing failure details, which may be the following:
    32  	// - solace/errors.*PubSubPlusClientError - If a connection error occurs.
    33  	// - solace/errors.*IllegalStateError - If MessagingService has already been terminated.
    34  	Connect() error
    35  
    36  	// ConnectAsync connects the messaging service asynchronously.
    37  	// Returns a channel that receives an event when completed.
    38  	// Channel (chan) receives nil if successful, otherwise an error containing failure details.
    39  	// For more information, see MessagingService.Connect.
    40  	ConnectAsync() <-chan error
    41  
    42  	// ConnectAsyncWithCallback connects the messaging service asynchonously.
    43  	// When complete, the specified callback is called with nil if successful,
    44  	// otherwise an error if not successful. In both cases, the messaging service
    45  	// is passed as well.
    46  	ConnectAsyncWithCallback(callback func(MessagingService, error))
    47  
    48  	// CreateDirectMessagePublisherBuilder creates a DirectMessagePublisherBuilder
    49  	// that can be used to configure direct message publisher instances.
    50  	CreateDirectMessagePublisherBuilder() DirectMessagePublisherBuilder
    51  
    52  	// CreateDirectMessageReceiverBuilder creates a DirectMessageReceiverBuilder
    53  	// that can be used to configure direct message receiver instances.
    54  	CreateDirectMessageReceiverBuilder() DirectMessageReceiverBuilder
    55  
    56  	// CreatePersistentMessagePublisherBuilder creates a PersistentMessagePublisherBuilder
    57  	// that can be used to configure persistent message publisher instances.
    58  	CreatePersistentMessagePublisherBuilder() PersistentMessagePublisherBuilder
    59  
    60  	// CreatePersistentMessageReceiverBuilder creates a PersistentMessageReceiverBuilder
    61  	// that can be used to configure persistent message receiver instances.
    62  	CreatePersistentMessageReceiverBuilder() PersistentMessageReceiverBuilder
    63  
    64  	// MessageBuilder creates an OutboundMessageBuilder that can be
    65  	// used to build messages to send via a message publisher.
    66  	MessageBuilder() OutboundMessageBuilder
    67  
    68  	// RequestReply creates a RequestReplyMessagingService that inherits
    69  	// the configuration of this MessagingService instance.
    70  	RequestReply() RequestReplyMessagingService
    71  
    72  	// Disconnect disconnects the messaging service.
    73  	// The messaging service must be connected to disconnect.
    74  	// This function blocks until the disconnection attempt is completed.
    75  	// Returns nil if successful, otherwise an error containing failure details.
    76  	// A disconnected messaging service may not be reconnected.
    77  	// Returns solace/errors.*IllegalStateError if it is not yet connected.
    78  	Disconnect() error
    79  
    80  	// DisconnectAsync disconnects the messaging service asynchronously.
    81  	// Returns a channel (chan) that receives an event when completed.
    82  	// The channel receives nil if successful, otherwise an error containing the failure details
    83  	// For more information, see MessagingService.Disconnect.
    84  	DisconnectAsync() <-chan error
    85  
    86  	// DisconnectAsyncWithCallback disconnects the messaging service asynchronously.
    87  	// When complete, the specified callback is called with nil if successful, otherwise
    88  	// an error if not successful.
    89  	DisconnectAsyncWithCallback(callback func(error))
    90  
    91  	// IsConnected determines if the messaging service is operational and if Connect was previously
    92  	// called successfully.
    93  	// Returns true if the messaging service is connected to a remote destination, otherwise false.
    94  	IsConnected() bool
    95  
    96  	// AddReconnectionListener adds a new reconnection listener to the messaging service.
    97  	// The reconnection listener is called when reconnection events occur.
    98  	// Returns an identifier that can be used to remove the listener using RemoveReconnectionListener.
    99  	AddReconnectionListener(listener ReconnectionListener) uint64
   100  
   101  	// AddReconnectionAttemptListener adds a listener to receive reconnection-attempt notifications.
   102  	// The reconnection listener is called when reconnection-attempt events occur.
   103  	// Returns an identifier that can be used to remove the listener using RemoveReconnectionAttemptListener.
   104  	AddReconnectionAttemptListener(listener ReconnectionAttemptListener) uint64
   105  
   106  	// RemoveReconnectionListener removes a listener from the messaging service with the specified identifier.
   107  	RemoveReconnectionListener(listenerID uint64)
   108  
   109  	// RemoveReconnectionAttemptListener removes a listener from the messaging service with the specified identifier.
   110  	RemoveReconnectionAttemptListener(listenerID uint64)
   111  
   112  	// AddServiceInterruptionListener adds a listener to receive non-recoverable, service-interruption events.
   113  	// Returns an identifier othat can be used to remove the listener using RemoveServiceInterruptionListener.
   114  	AddServiceInterruptionListener(listener ServiceInterruptionListener) uint64
   115  
   116  	// RemoveServiceInterruptionListener removes a service listener to receive non-recoverable,
   117  	// service-interruption events with the specified identifier.
   118  	RemoveServiceInterruptionListener(listenerID uint64)
   119  
   120  	// GetApplicationID retrieves the application identifier.
   121  	GetApplicationID() string
   122  
   123  	// Metrics returns the metrics for this MessagingService instance.
   124  	Metrics() metrics.APIMetrics
   125  
   126  	// Info returns the API Info for this MessagingService instance.
   127  	Info() metrics.APIInfo
   128  
   129  	// Updates the value of a modifiable service property once the service has been created.
   130  	// Modifiable service properties include:
   131  	//     - solace/config.AuthenticationPropertySchemeOAuth2AccessToken,
   132  	//       whose update will be applied during the next reconnection attempt.
   133  	//     - solace/config.AuthenticationPropertySchemeOAuth2OIDCIDToken,
   134  	//       whose update will be applied during the next reconnection attempt.
   135  	//
   136  	// Modification of a service property may occur instantly, or may occur during the next
   137  	// service reconnection.
   138  	// Modification of a service property during an ongoing service reconnection may apply
   139  	// to the next reconnection attempt.
   140  	// property (ServiceProperty): The name of the property to modify.
   141  	// value (interface{}): The new value of the property.
   142  	//
   143  	// - solace/errors.*IllegalArgumentError: If the specified property cannot
   144  	// -   be modified.
   145  	// - solace/errors.*IllegalStateError: If the specified property cannot
   146  	//     be modified in the current service state.
   147  	// - solace/errors.*NativeError: If other transport or communication related errors occur.
   148  	UpdateProperty(property config.ServiceProperty, value interface{}) error
   149  }
   150  
   151  // RequestReplyMessagingService allows access to request reply behaviour.
   152  type RequestReplyMessagingService interface {
   153  	// CreateRequestReplyMessagePublisherBuilder creates a new request reply message publisher
   154  	// builder that can be used to configure request reply publisher instances.
   155  	CreateRequestReplyMessagePublisherBuilder() RequestReplyMessagePublisherBuilder
   156  
   157  	// CreateRequestReplyMessageReceiverBuilder creates a new request reply message receiver
   158  	// builder that can be used to configure request reply receiver instances.
   159  	CreateRequestReplyMessageReceiverBuilder() RequestReplyMessageReceiverBuilder
   160  }
   161  
   162  // MessagingServiceBuilder is used to configure and build MessagingService instances.
   163  type MessagingServiceBuilder interface {
   164  
   165  	// Build creates MessagingService based on the provided configuration.
   166  	// Returns the built MessagingService instance, otherwise nil if an error occurred.
   167  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
   168  	Build() (messagingService MessagingService, err error)
   169  
   170  	// BuildWithApplicationID creates MessagingService based on the provided configuration
   171  	// using the specified  application identifier as the applicationID.
   172  	// Returns the created MessagingService instance, otherwise nil if an error occurred.
   173  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
   174  	BuildWithApplicationID(applicationID string) (messagingService MessagingService, err error)
   175  
   176  	// FromConfigurationProvider sets the configuration based on the specified configuration provider.
   177  	// The following are built in configuration providers:
   178  	// - ServicePropertyMap - This can be used to set a ServiceProperty to a value programatically.
   179  	//
   180  	// The ServicePropertiesConfigurationProvider interface can also be implemented by a type
   181  	// to have it act as a configuration factory by implementing the following:
   182  	//
   183  	//   func (type MyType) GetConfiguration() ServicePropertyMap {...}
   184  	//
   185  	// Any properties provided by the configuration provider are layered over top of any
   186  	// previously set properties, including those set by specifying various strategies.
   187  	FromConfigurationProvider(provider config.ServicePropertiesConfigurationProvider) MessagingServiceBuilder
   188  
   189  	// WithAuthenticationStrategy configures the resulting messaging service
   190  	// with the specified authentication configuration
   191  	WithAuthenticationStrategy(authenticationStrategy config.AuthenticationStrategy) MessagingServiceBuilder
   192  
   193  	// WithRetryStrategy configures the resulting messaging service
   194  	// with the specified retry strategy
   195  	WithConnectionRetryStrategy(retryStrategy config.RetryStrategy) MessagingServiceBuilder
   196  
   197  	// WithMessageCompression configures the resulting messaging service
   198  	// with the specified compression factor. The builder attempts to use
   199  	// the specified compression-level with the provided host and port. It fails
   200  	// to build if an an atempt is made to use compression on a non-secured and
   201  	// non-compressed port.
   202  	WithMessageCompression(compressionFactor int) MessagingServiceBuilder
   203  
   204  	// WithReconnectionRetryStrategy configures the resulting messaging service
   205  	// with the specified  reconnection strategy.
   206  	WithReconnectionRetryStrategy(retryStrategy config.RetryStrategy) MessagingServiceBuilder
   207  
   208  	// WithTransportSecurityStrategy configures the resulting messaging service
   209  	// with the specified transport security strategy.
   210  	WithTransportSecurityStrategy(transportSecurityStrategy config.TransportSecurityStrategy) MessagingServiceBuilder
   211  }
   212  
   213  // ReconnectionListener is a handler that can be registered to a MessagingService.
   214  // It is called when a session was disconnected and subsequently reconnected.
   215  type ReconnectionListener func(event ServiceEvent)
   216  
   217  // ReconnectionAttemptListener is a handler that can be registered to a MessagingService.
   218  // It is called when a session is disconnected and reconnection attempts have begun.
   219  type ReconnectionAttemptListener func(event ServiceEvent)
   220  
   221  // ServiceInterruptionListener is a handler that can be registered to a MessagingService.
   222  // It is called when a session is disconncted and the connection is unrecoverable.
   223  type ServiceInterruptionListener func(event ServiceEvent)
   224  
   225  // ServiceEvent interface represents a messaging service event that applications can listen for.
   226  type ServiceEvent interface {
   227  	// GetTimestamp retrieves the timestamp of the event.
   228  	GetTimestamp() time.Time
   229  	// GetBrokerURI retrieves the URI of the broker.
   230  	GetBrokerURI() string
   231  	// GetMessage retrieves the message contents.
   232  	GetMessage() string
   233  	// GetCause retrieves the cause of the client error.
   234  	GetCause() error
   235  }
   236