...

Source file src/solace.dev/go/messaging/pkg/solace/messaging_service.go

Documentation: solace.dev/go/messaging/pkg/solace

     1  // solace-messaging-go-client
     2  //
     3  // Copyright 2021-2026 Solace Corporation. All rights reserved.
     4  //
     5  // Licensed under the Apache License, Version 2.0 (the "License");
     6  // you may not use this file except in compliance with the License.
     7  // You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package solace
    18  
    19  import (
    20  	"time"
    21  
    22  	"solace.dev/go/messaging/pkg/solace/config"
    23  	"solace.dev/go/messaging/pkg/solace/metrics"
    24  )
    25  
    26  // MessagingService represents a broker that provides a messaging service.
    27  type MessagingService interface {
    28  
    29  	// Connect connects the messaging service.
    30  	// This function blocks until the connection attempt is completed.
    31  	// Returns nil if successful, otherwise an error containing failure details, which may be the following:
    32  	// - solace/errors.*PubSubPlusClientError - If a connection error occurs.
    33  	// - solace/errors.*IllegalStateError - If MessagingService has already been terminated.
    34  	//
    35  	// If Connect returns an error, the MessagingService instance is terminated and
    36  	// no longer available for network operations such as connect or reconnect.
    37  	// A new MessagingService instance must be created for any subsequent connection attempts.
    38  	Connect() error
    39  
    40  	// ConnectAsync connects the messaging service asynchronously.
    41  	// Returns a channel that receives an event when completed.
    42  	// Channel (chan) receives nil if successful, otherwise an error containing failure details.
    43  	// For more information, see MessagingService.Connect.
    44  	ConnectAsync() <-chan error
    45  
    46  	// ConnectAsyncWithCallback connects the messaging service asynchonously.
    47  	// When complete, the specified callback is called with nil if successful,
    48  	// otherwise an error if not successful. In both cases, the messaging service
    49  	// is passed as well.
    50  	ConnectAsyncWithCallback(callback func(MessagingService, error))
    51  
    52  	// CreateDirectMessagePublisherBuilder creates a DirectMessagePublisherBuilder
    53  	// that can be used to configure direct message publisher instances.
    54  	CreateDirectMessagePublisherBuilder() DirectMessagePublisherBuilder
    55  
    56  	// CreateDirectMessageReceiverBuilder creates a DirectMessageReceiverBuilder
    57  	// that can be used to configure direct message receiver instances.
    58  	CreateDirectMessageReceiverBuilder() DirectMessageReceiverBuilder
    59  
    60  	// CreatePersistentMessagePublisherBuilder creates a PersistentMessagePublisherBuilder
    61  	// that can be used to configure persistent message publisher instances.
    62  	CreatePersistentMessagePublisherBuilder() PersistentMessagePublisherBuilder
    63  
    64  	// CreatePersistentMessageReceiverBuilder creates a PersistentMessageReceiverBuilder
    65  	// that can be used to configure persistent message receiver instances.
    66  	CreatePersistentMessageReceiverBuilder() PersistentMessageReceiverBuilder
    67  
    68  	// MessageBuilder creates an OutboundMessageBuilder that can be
    69  	// used to build messages to send via a message publisher.
    70  	MessageBuilder() OutboundMessageBuilder
    71  
    72  	// EndpointProvisioner is used to provision and deprovision endpoints on the broker.
    73  	EndpointProvisioner() EndpointProvisioner
    74  
    75  	// RequestReply creates a RequestReplyMessagingService that inherits
    76  	// the configuration of this MessagingService instance.
    77  	RequestReply() RequestReplyMessagingService
    78  
    79  	// Disconnect disconnects the messaging service.
    80  	// The messaging service must be connected to disconnect.
    81  	// This function blocks until the disconnection attempt is completed.
    82  	// Returns nil if successful, otherwise an error containing failure details.
    83  	// A disconnected messaging service may not be reconnected.
    84  	// Returns solace/errors.*IllegalStateError if it is not yet connected.
    85  	Disconnect() error
    86  
    87  	// DisconnectAsync disconnects the messaging service asynchronously.
    88  	// Returns a channel (chan) that receives an event when completed.
    89  	// The channel receives nil if successful, otherwise an error containing the failure details
    90  	// For more information, see MessagingService.Disconnect.
    91  	DisconnectAsync() <-chan error
    92  
    93  	// DisconnectAsyncWithCallback disconnects the messaging service asynchronously.
    94  	// When complete, the specified callback is called with nil if successful, otherwise
    95  	// an error if not successful.
    96  	DisconnectAsyncWithCallback(callback func(error))
    97  
    98  	// IsConnected determines if the messaging service is operational and if Connect was previously
    99  	// called successfully.
   100  	// Returns true if the messaging service is connected to a remote destination, otherwise false.
   101  	IsConnected() bool
   102  
   103  	// AddReconnectionListener adds a new reconnection listener to the messaging service.
   104  	// The reconnection listener is called when reconnection events occur.
   105  	// Returns an identifier that can be used to remove the listener using RemoveReconnectionListener.
   106  	AddReconnectionListener(listener ReconnectionListener) uint64
   107  
   108  	// AddReconnectionAttemptListener adds a listener to receive reconnection-attempt notifications.
   109  	// The reconnection listener is called when reconnection-attempt events occur.
   110  	// Returns an identifier that can be used to remove the listener using RemoveReconnectionAttemptListener.
   111  	AddReconnectionAttemptListener(listener ReconnectionAttemptListener) uint64
   112  
   113  	// RemoveReconnectionListener removes a listener from the messaging service with the specified identifier.
   114  	RemoveReconnectionListener(listenerID uint64)
   115  
   116  	// RemoveReconnectionAttemptListener removes a listener from the messaging service with the specified identifier.
   117  	RemoveReconnectionAttemptListener(listenerID uint64)
   118  
   119  	// AddServiceInterruptionListener adds a listener to receive non-recoverable, service-interruption events.
   120  	// Returns an identifier othat can be used to remove the listener using RemoveServiceInterruptionListener.
   121  	AddServiceInterruptionListener(listener ServiceInterruptionListener) uint64
   122  
   123  	// RemoveServiceInterruptionListener removes a service listener to receive non-recoverable,
   124  	// service-interruption events with the specified identifier.
   125  	RemoveServiceInterruptionListener(listenerID uint64)
   126  
   127  	// GetApplicationID retrieves the application identifier.
   128  	GetApplicationID() string
   129  
   130  	// Metrics returns the metrics for this MessagingService instance.
   131  	Metrics() metrics.APIMetrics
   132  
   133  	// Info returns the API Info for this MessagingService instance.
   134  	Info() metrics.APIInfo
   135  
   136  	// Updates the value of a modifiable service property once the service has been created.
   137  	// Modifiable service properties include:
   138  	//     - solace/config.AuthenticationPropertySchemeOAuth2AccessToken,
   139  	//       whose update will be applied during the next reconnection attempt.
   140  	//     - solace/config.AuthenticationPropertySchemeOAuth2OIDCIDToken,
   141  	//       whose update will be applied during the next reconnection attempt.
   142  	//
   143  	// Modification of a service property may occur instantly, or may occur during the next
   144  	// service reconnection.
   145  	// Modification of a service property during an ongoing service reconnection may apply
   146  	// to the next reconnection attempt.
   147  	// property (ServiceProperty): The name of the property to modify.
   148  	// value (interface{}): The new value of the property.
   149  	//
   150  	// - solace/errors.*IllegalArgumentError: If the specified property cannot
   151  	// -   be modified.
   152  	// - solace/errors.*IllegalStateError: If the specified property cannot
   153  	//     be modified in the current service state.
   154  	// - solace/errors.*NativeError: If other transport or communication related errors occur.
   155  	UpdateProperty(property config.ServiceProperty, value interface{}) error
   156  }
   157  
   158  // RequestReplyMessagingService allows access to request reply behaviour.
   159  type RequestReplyMessagingService interface {
   160  	// CreateRequestReplyMessagePublisherBuilder creates a new request reply message publisher
   161  	// builder that can be used to configure request reply publisher instances.
   162  	CreateRequestReplyMessagePublisherBuilder() RequestReplyMessagePublisherBuilder
   163  
   164  	// CreateRequestReplyMessageReceiverBuilder creates a new request reply message receiver
   165  	// builder that can be used to configure request reply receiver instances.
   166  	CreateRequestReplyMessageReceiverBuilder() RequestReplyMessageReceiverBuilder
   167  }
   168  
   169  // MessagingServiceBuilder is used to configure and build MessagingService instances.
   170  type MessagingServiceBuilder interface {
   171  
   172  	// Build creates MessagingService based on the provided configuration.
   173  	// Returns the built MessagingService instance, otherwise nil if an error occurred.
   174  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
   175  	Build() (messagingService MessagingService, err error)
   176  
   177  	// BuildWithApplicationID creates MessagingService based on the provided configuration
   178  	// using the specified  application identifier as the applicationID.
   179  	// Returns the created MessagingService instance, otherwise nil if an error occurred.
   180  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
   181  	BuildWithApplicationID(applicationID string) (messagingService MessagingService, err error)
   182  
   183  	// FromConfigurationProvider sets the configuration based on the specified configuration provider.
   184  	// The following are built in configuration providers:
   185  	// - ServicePropertyMap - This can be used to set a ServiceProperty to a value programatically.
   186  	//
   187  	// The ServicePropertiesConfigurationProvider interface can also be implemented by a type
   188  	// to have it act as a configuration factory by implementing the following:
   189  	//
   190  	//   func (type MyType) GetConfiguration() ServicePropertyMap {...}
   191  	//
   192  	// Any properties provided by the configuration provider are layered over top of any
   193  	// previously set properties, including those set by specifying various strategies.
   194  	FromConfigurationProvider(provider config.ServicePropertiesConfigurationProvider) MessagingServiceBuilder
   195  
   196  	// WithAuthenticationStrategy configures the resulting messaging service
   197  	// with the specified authentication configuration
   198  	WithAuthenticationStrategy(authenticationStrategy config.AuthenticationStrategy) MessagingServiceBuilder
   199  
   200  	// WithRetryStrategy configures the resulting messaging service
   201  	// with the specified retry strategy
   202  	WithConnectionRetryStrategy(retryStrategy config.RetryStrategy) MessagingServiceBuilder
   203  
   204  	// WithMessageCompression configures the resulting messaging service
   205  	// with the specified compression factor. The builder attempts to use
   206  	// the specified compression-level with the provided host and port. It fails
   207  	// to build if an an atempt is made to use compression on a non-secured and
   208  	// non-compressed port.
   209  	WithMessageCompression(compressionFactor int) MessagingServiceBuilder
   210  
   211  	// WithReconnectionRetryStrategy configures the resulting messaging service
   212  	// with the specified  reconnection strategy.
   213  	WithReconnectionRetryStrategy(retryStrategy config.RetryStrategy) MessagingServiceBuilder
   214  
   215  	// WithTransportSecurityStrategy configures the resulting messaging service
   216  	// with the specified transport security strategy.
   217  	WithTransportSecurityStrategy(transportSecurityStrategy config.TransportSecurityStrategy) MessagingServiceBuilder
   218  
   219  	// WithProvisionTimeoutMs configures the timeout for provision and deprovision operations, in milliseconds.
   220  	WithProvisionTimeoutMs(timeout time.Duration) MessagingServiceBuilder
   221  }
   222  
   223  // ReconnectionListener is a handler that can be registered to a MessagingService.
   224  // It is called when a session was disconnected and subsequently reconnected.
   225  type ReconnectionListener func(event ServiceEvent)
   226  
   227  // ReconnectionAttemptListener is a handler that can be registered to a MessagingService.
   228  // It is called when a session is disconnected and reconnection attempts have begun.
   229  type ReconnectionAttemptListener func(event ServiceEvent)
   230  
   231  // ServiceInterruptionListener is a handler that can be registered to a MessagingService.
   232  // It is called when a session is disconncted and the connection is unrecoverable.
   233  type ServiceInterruptionListener func(event ServiceEvent)
   234  
   235  // ServiceEvent interface represents a messaging service event that applications can listen for.
   236  type ServiceEvent interface {
   237  	// GetTimestamp retrieves the timestamp of the event.
   238  	GetTimestamp() time.Time
   239  	// GetBrokerURI retrieves the URI of the broker.
   240  	GetBrokerURI() string
   241  	// GetMessage retrieves the message contents.
   242  	GetMessage() string
   243  	// GetCause retrieves the cause of the client error.
   244  	GetCause() error
   245  }
   246