...

Source file src/solace.dev/go/messaging/pkg/solace/messaging_service.go

Documentation: solace.dev/go/messaging/pkg/solace

     1  // pubsubplus-go-client
     2  //
     3  // Copyright 2021-2025 Solace Corporation. All rights reserved.
     4  //
     5  // Licensed under the Apache License, Version 2.0 (the "License");
     6  // you may not use this file except in compliance with the License.
     7  // You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package solace
    18  
    19  import (
    20  	"time"
    21  
    22  	"solace.dev/go/messaging/pkg/solace/config"
    23  	"solace.dev/go/messaging/pkg/solace/metrics"
    24  )
    25  
    26  // MessagingService represents a broker that provides a messaging service.
    27  type MessagingService interface {
    28  
    29  	// Connect connects the messaging service.
    30  	// This function blocks until the connection attempt is completed.
    31  	// Returns nil if successful, otherwise an error containing failure details, which may be the following:
    32  	// - solace/errors.*PubSubPlusClientError - If a connection error occurs.
    33  	// - solace/errors.*IllegalStateError - If MessagingService has already been terminated.
    34  	Connect() error
    35  
    36  	// ConnectAsync connects the messaging service asynchronously.
    37  	// Returns a channel that receives an event when completed.
    38  	// Channel (chan) receives nil if successful, otherwise an error containing failure details.
    39  	// For more information, see MessagingService.Connect.
    40  	ConnectAsync() <-chan error
    41  
    42  	// ConnectAsyncWithCallback connects the messaging service asynchonously.
    43  	// When complete, the specified callback is called with nil if successful,
    44  	// otherwise an error if not successful. In both cases, the messaging service
    45  	// is passed as well.
    46  	ConnectAsyncWithCallback(callback func(MessagingService, error))
    47  
    48  	// CreateDirectMessagePublisherBuilder creates a DirectMessagePublisherBuilder
    49  	// that can be used to configure direct message publisher instances.
    50  	CreateDirectMessagePublisherBuilder() DirectMessagePublisherBuilder
    51  
    52  	// CreateDirectMessageReceiverBuilder creates a DirectMessageReceiverBuilder
    53  	// that can be used to configure direct message receiver instances.
    54  	CreateDirectMessageReceiverBuilder() DirectMessageReceiverBuilder
    55  
    56  	// CreatePersistentMessagePublisherBuilder creates a PersistentMessagePublisherBuilder
    57  	// that can be used to configure persistent message publisher instances.
    58  	CreatePersistentMessagePublisherBuilder() PersistentMessagePublisherBuilder
    59  
    60  	// CreatePersistentMessageReceiverBuilder creates a PersistentMessageReceiverBuilder
    61  	// that can be used to configure persistent message receiver instances.
    62  	CreatePersistentMessageReceiverBuilder() PersistentMessageReceiverBuilder
    63  
    64  	// MessageBuilder creates an OutboundMessageBuilder that can be
    65  	// used to build messages to send via a message publisher.
    66  	MessageBuilder() OutboundMessageBuilder
    67  
    68  	// EndpointProvisioner is used to provision and deprovision endpoints on the broker.
    69  	EndpointProvisioner() EndpointProvisioner
    70  
    71  	// RequestReply creates a RequestReplyMessagingService that inherits
    72  	// the configuration of this MessagingService instance.
    73  	RequestReply() RequestReplyMessagingService
    74  
    75  	// Disconnect disconnects the messaging service.
    76  	// The messaging service must be connected to disconnect.
    77  	// This function blocks until the disconnection attempt is completed.
    78  	// Returns nil if successful, otherwise an error containing failure details.
    79  	// A disconnected messaging service may not be reconnected.
    80  	// Returns solace/errors.*IllegalStateError if it is not yet connected.
    81  	Disconnect() error
    82  
    83  	// DisconnectAsync disconnects the messaging service asynchronously.
    84  	// Returns a channel (chan) that receives an event when completed.
    85  	// The channel receives nil if successful, otherwise an error containing the failure details
    86  	// For more information, see MessagingService.Disconnect.
    87  	DisconnectAsync() <-chan error
    88  
    89  	// DisconnectAsyncWithCallback disconnects the messaging service asynchronously.
    90  	// When complete, the specified callback is called with nil if successful, otherwise
    91  	// an error if not successful.
    92  	DisconnectAsyncWithCallback(callback func(error))
    93  
    94  	// IsConnected determines if the messaging service is operational and if Connect was previously
    95  	// called successfully.
    96  	// Returns true if the messaging service is connected to a remote destination, otherwise false.
    97  	IsConnected() bool
    98  
    99  	// AddReconnectionListener adds a new reconnection listener to the messaging service.
   100  	// The reconnection listener is called when reconnection events occur.
   101  	// Returns an identifier that can be used to remove the listener using RemoveReconnectionListener.
   102  	AddReconnectionListener(listener ReconnectionListener) uint64
   103  
   104  	// AddReconnectionAttemptListener adds a listener to receive reconnection-attempt notifications.
   105  	// The reconnection listener is called when reconnection-attempt events occur.
   106  	// Returns an identifier that can be used to remove the listener using RemoveReconnectionAttemptListener.
   107  	AddReconnectionAttemptListener(listener ReconnectionAttemptListener) uint64
   108  
   109  	// RemoveReconnectionListener removes a listener from the messaging service with the specified identifier.
   110  	RemoveReconnectionListener(listenerID uint64)
   111  
   112  	// RemoveReconnectionAttemptListener removes a listener from the messaging service with the specified identifier.
   113  	RemoveReconnectionAttemptListener(listenerID uint64)
   114  
   115  	// AddServiceInterruptionListener adds a listener to receive non-recoverable, service-interruption events.
   116  	// Returns an identifier othat can be used to remove the listener using RemoveServiceInterruptionListener.
   117  	AddServiceInterruptionListener(listener ServiceInterruptionListener) uint64
   118  
   119  	// RemoveServiceInterruptionListener removes a service listener to receive non-recoverable,
   120  	// service-interruption events with the specified identifier.
   121  	RemoveServiceInterruptionListener(listenerID uint64)
   122  
   123  	// GetApplicationID retrieves the application identifier.
   124  	GetApplicationID() string
   125  
   126  	// Metrics returns the metrics for this MessagingService instance.
   127  	Metrics() metrics.APIMetrics
   128  
   129  	// Info returns the API Info for this MessagingService instance.
   130  	Info() metrics.APIInfo
   131  
   132  	// Updates the value of a modifiable service property once the service has been created.
   133  	// Modifiable service properties include:
   134  	//     - solace/config.AuthenticationPropertySchemeOAuth2AccessToken,
   135  	//       whose update will be applied during the next reconnection attempt.
   136  	//     - solace/config.AuthenticationPropertySchemeOAuth2OIDCIDToken,
   137  	//       whose update will be applied during the next reconnection attempt.
   138  	//
   139  	// Modification of a service property may occur instantly, or may occur during the next
   140  	// service reconnection.
   141  	// Modification of a service property during an ongoing service reconnection may apply
   142  	// to the next reconnection attempt.
   143  	// property (ServiceProperty): The name of the property to modify.
   144  	// value (interface{}): The new value of the property.
   145  	//
   146  	// - solace/errors.*IllegalArgumentError: If the specified property cannot
   147  	// -   be modified.
   148  	// - solace/errors.*IllegalStateError: If the specified property cannot
   149  	//     be modified in the current service state.
   150  	// - solace/errors.*NativeError: If other transport or communication related errors occur.
   151  	UpdateProperty(property config.ServiceProperty, value interface{}) error
   152  }
   153  
   154  // RequestReplyMessagingService allows access to request reply behaviour.
   155  type RequestReplyMessagingService interface {
   156  	// CreateRequestReplyMessagePublisherBuilder creates a new request reply message publisher
   157  	// builder that can be used to configure request reply publisher instances.
   158  	CreateRequestReplyMessagePublisherBuilder() RequestReplyMessagePublisherBuilder
   159  
   160  	// CreateRequestReplyMessageReceiverBuilder creates a new request reply message receiver
   161  	// builder that can be used to configure request reply receiver instances.
   162  	CreateRequestReplyMessageReceiverBuilder() RequestReplyMessageReceiverBuilder
   163  }
   164  
   165  // MessagingServiceBuilder is used to configure and build MessagingService instances.
   166  type MessagingServiceBuilder interface {
   167  
   168  	// Build creates MessagingService based on the provided configuration.
   169  	// Returns the built MessagingService instance, otherwise nil if an error occurred.
   170  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
   171  	Build() (messagingService MessagingService, err error)
   172  
   173  	// BuildWithApplicationID creates MessagingService based on the provided configuration
   174  	// using the specified  application identifier as the applicationID.
   175  	// Returns the created MessagingService instance, otherwise nil if an error occurred.
   176  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
   177  	BuildWithApplicationID(applicationID string) (messagingService MessagingService, err error)
   178  
   179  	// FromConfigurationProvider sets the configuration based on the specified configuration provider.
   180  	// The following are built in configuration providers:
   181  	// - ServicePropertyMap - This can be used to set a ServiceProperty to a value programatically.
   182  	//
   183  	// The ServicePropertiesConfigurationProvider interface can also be implemented by a type
   184  	// to have it act as a configuration factory by implementing the following:
   185  	//
   186  	//   func (type MyType) GetConfiguration() ServicePropertyMap {...}
   187  	//
   188  	// Any properties provided by the configuration provider are layered over top of any
   189  	// previously set properties, including those set by specifying various strategies.
   190  	FromConfigurationProvider(provider config.ServicePropertiesConfigurationProvider) MessagingServiceBuilder
   191  
   192  	// WithAuthenticationStrategy configures the resulting messaging service
   193  	// with the specified authentication configuration
   194  	WithAuthenticationStrategy(authenticationStrategy config.AuthenticationStrategy) MessagingServiceBuilder
   195  
   196  	// WithRetryStrategy configures the resulting messaging service
   197  	// with the specified retry strategy
   198  	WithConnectionRetryStrategy(retryStrategy config.RetryStrategy) MessagingServiceBuilder
   199  
   200  	// WithMessageCompression configures the resulting messaging service
   201  	// with the specified compression factor. The builder attempts to use
   202  	// the specified compression-level with the provided host and port. It fails
   203  	// to build if an an atempt is made to use compression on a non-secured and
   204  	// non-compressed port.
   205  	WithMessageCompression(compressionFactor int) MessagingServiceBuilder
   206  
   207  	// WithReconnectionRetryStrategy configures the resulting messaging service
   208  	// with the specified  reconnection strategy.
   209  	WithReconnectionRetryStrategy(retryStrategy config.RetryStrategy) MessagingServiceBuilder
   210  
   211  	// WithTransportSecurityStrategy configures the resulting messaging service
   212  	// with the specified transport security strategy.
   213  	WithTransportSecurityStrategy(transportSecurityStrategy config.TransportSecurityStrategy) MessagingServiceBuilder
   214  
   215  	// WithProvisionTimeoutMs configures the timeout for provision and deprovision operations, in milliseconds.
   216  	WithProvisionTimeoutMs(timeout time.Duration) MessagingServiceBuilder
   217  }
   218  
   219  // ReconnectionListener is a handler that can be registered to a MessagingService.
   220  // It is called when a session was disconnected and subsequently reconnected.
   221  type ReconnectionListener func(event ServiceEvent)
   222  
   223  // ReconnectionAttemptListener is a handler that can be registered to a MessagingService.
   224  // It is called when a session is disconnected and reconnection attempts have begun.
   225  type ReconnectionAttemptListener func(event ServiceEvent)
   226  
   227  // ServiceInterruptionListener is a handler that can be registered to a MessagingService.
   228  // It is called when a session is disconncted and the connection is unrecoverable.
   229  type ServiceInterruptionListener func(event ServiceEvent)
   230  
   231  // ServiceEvent interface represents a messaging service event that applications can listen for.
   232  type ServiceEvent interface {
   233  	// GetTimestamp retrieves the timestamp of the event.
   234  	GetTimestamp() time.Time
   235  	// GetBrokerURI retrieves the URI of the broker.
   236  	GetBrokerURI() string
   237  	// GetMessage retrieves the message contents.
   238  	GetMessage() string
   239  	// GetCause retrieves the cause of the client error.
   240  	GetCause() error
   241  }
   242