...

Source file src/solace.dev/go/messaging/pkg/solace/resource/destination.go

Documentation: solace.dev/go/messaging/pkg/solace/resource

     1  // pubsubplus-go-client
     2  //
     3  // Copyright 2021-2025 Solace Corporation. All rights reserved.
     4  //
     5  // Licensed under the Apache License, Version 2.0 (the "License");
     6  // you may not use this file except in compliance with the License.
     7  // You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  // Package resource contains types and factory functions for various
    18  // broker resources such as topics and queues.
    19  package resource
    20  
    21  import "fmt"
    22  
    23  // Destination represents a message destination on a broker.
    24  // Some examples of destinations include queues and topics.
    25  // Destination implementations can be retrieved by the various helper functions,
    26  // such as TopicOf, QueueDurableExclusive, and TopicSubscriptionOf.
    27  type Destination interface {
    28  	// GetName retrieves the name of the destination
    29  	GetName() string
    30  }
    31  
    32  // Subscription represents the valid subscriptions that can be specified to receivers.
    33  // Valid subscriptions include *resource.TopicSubscription.
    34  type Subscription interface {
    35  	Destination
    36  	// GetSubscriptionType will return the type of the subscription as a string
    37  	GetSubscriptionType() string
    38  }
    39  
    40  // Topic is an implementation of destination representing a topic
    41  // that can be published to.
    42  type Topic struct {
    43  	name string
    44  }
    45  
    46  // TopicOf creates a new topic with the specified name.
    47  // Topic name must not be empty.
    48  func TopicOf(expression string) *Topic {
    49  	return &Topic{expression}
    50  }
    51  
    52  // GetName returns the name of the topic. Implements the Destination interface.
    53  func (t *Topic) GetName() string {
    54  	return t.name
    55  }
    56  
    57  // String implements fmt.Stringer
    58  func (t *Topic) String() string {
    59  	return fmt.Sprintf("Topic: %s", t.GetName())
    60  }
    61  
    62  // ShareName is an interface for identifiers that are associated
    63  // with a shared subscription.
    64  // See https://docs.solace.com/PubSub-Basics/Direct-Messages.htm#Shared in the Solace documentation.
    65  type ShareName struct {
    66  	name string
    67  }
    68  
    69  // ShareNameOf returns a new share name with the provided name.
    70  // Valid share names are not empty and do not contain special
    71  // characters '>' or '*'. Returns a new ShareName with the given string.
    72  func ShareNameOf(name string) *ShareName {
    73  	return &ShareName{name}
    74  }
    75  
    76  // GetName returns the share name. Implements the Destination interface.
    77  func (sn *ShareName) GetName() string {
    78  	return sn.name
    79  }
    80  
    81  // String implements fmt.Stringer
    82  func (sn *ShareName) String() string {
    83  	return fmt.Sprintf("ShareName: %s", sn.GetName())
    84  }
    85  
    86  // TopicSubscription is a subscription to a topic often used for receivers.
    87  type TopicSubscription struct {
    88  	topic string
    89  }
    90  
    91  // GetName returns the topic subscription expression. Implements the Destination interface.
    92  func (t *TopicSubscription) GetName() string {
    93  	return t.topic
    94  }
    95  
    96  // GetSubscriptionType returns the type of the topic subscription as a string
    97  func (t *TopicSubscription) GetSubscriptionType() string {
    98  	return fmt.Sprintf("%T", t)
    99  }
   100  
   101  // String implements fmt.Stringer
   102  func (t *TopicSubscription) String() string {
   103  	return fmt.Sprintf("TopicSubscription: %s", t.GetName())
   104  }
   105  
   106  // TopicSubscriptionOf creates a TopicSubscription of the specified topic string.
   107  func TopicSubscriptionOf(topic string) *TopicSubscription {
   108  	return &TopicSubscription{topic}
   109  }
   110  
   111  // Queue represents a queue used for guaranteed messaging receivers.
   112  type Queue struct {
   113  	name                           string
   114  	exclusivelyAccessible, durable bool
   115  }
   116  
   117  // GetName returns the name of the queue. Implements the Destination interface.
   118  func (q *Queue) GetName() string {
   119  	return q.name
   120  }
   121  
   122  // IsExclusivelyAccessible determines if Queue supports exclusive or shared-access mode.
   123  // Returns true if the Queue can serve only one consumer at any one time, false if the
   124  // Queue can serve multiple consumers with each consumer serviced in a round-robin fashion.
   125  func (q *Queue) IsExclusivelyAccessible() bool {
   126  	return q.exclusivelyAccessible
   127  }
   128  
   129  // IsDurable determines if the Queue is durable. Durable queues are privisioned objects on
   130  // the broker that have a lifespan that is independent of any one client session.
   131  func (q *Queue) IsDurable() bool {
   132  	return q.durable
   133  }
   134  
   135  func (q *Queue) String() string {
   136  	return fmt.Sprintf("Queue: %s, exclusive: %t, durable: %t", q.GetName(), q.IsExclusivelyAccessible(), q.IsDurable())
   137  }
   138  
   139  // QueueDurableExclusive creates a new durable, exclusive queue with the specified name.
   140  func QueueDurableExclusive(queueName string) *Queue {
   141  	return &Queue{
   142  		name:                  queueName,
   143  		exclusivelyAccessible: true,
   144  		durable:               true,
   145  	}
   146  }
   147  
   148  // QueueDurableNonExclusive creates a durable, non-exclusive queue with the specified name.
   149  func QueueDurableNonExclusive(queueName string) *Queue {
   150  	return &Queue{
   151  		name:                  queueName,
   152  		exclusivelyAccessible: false,
   153  		durable:               true,
   154  	}
   155  }
   156  
   157  // QueueNonDurableExclusive creates an exclusive, non-durable queue with the specified name.
   158  func QueueNonDurableExclusive(queueName string) *Queue {
   159  	return &Queue{
   160  		name:                  queueName,
   161  		exclusivelyAccessible: true,
   162  		durable:               false,
   163  	}
   164  }
   165  
   166  // QueueNonDurableExclusiveAnonymous creates an anonymous, exclusive, and non-durable queue.
   167  func QueueNonDurableExclusiveAnonymous() *Queue {
   168  	return &Queue{
   169  		name:                  "",
   170  		exclusivelyAccessible: true,
   171  		durable:               false,
   172  	}
   173  }
   174  
   175  // CachedMessageSubscriptionStrategy indicates how the API should pass received cached and live messages to the application. Refer to each
   176  // variant for details on what behaviour they configure.
   177  type CachedMessageSubscriptionStrategy int
   178  
   179  const (
   180  	// CacheRequestStrategyAsAvailable provides a configuration for receiving a concurrent mix of both live and cached messages on the given TopicSubscription.
   181  	CacheRequestStrategyAsAvailable CachedMessageSubscriptionStrategy = iota
   182  
   183  	// CacheRequestStrategyLiveCancelsCached provides a configuration for initially passing received cached messages to the application and as soon as live
   184  	// messages are received, passing those instead and passing no more cached messages.
   185  	CacheRequestStrategyLiveCancelsCached
   186  
   187  	// CacheRequestStrategyCachedFirst provides a configuration for passing only cached messages to the application, before passing the received live messages.
   188  	// The live messages passed to the application thereof this configuration can be received as early as when the cache request is sent
   189  	// by the API, and are enqueued until the cache response is received and its associated cached messages, if available, are passed to
   190  	// the application.
   191  	CacheRequestStrategyCachedFirst
   192  
   193  	// CachedOnly provides a configuration for passing only cached messages and no live messages to the application.
   194  	//
   195  	// Note: Cache requests configured using CacheRequestStrategyCachedOnly are limited to be used with subscribers
   196  	// without live data subscriptions. When used with matching live data subscriptions, cached messages will be
   197  	// delivered for both the cache outcome and live subscription leading to duplicate message delivery. When needing
   198  	// cache data when live data subscriptions are already present, it is recommended to use other
   199  	// CachedMessageSubscriptionStrategy types such as CacheRequestStrategyLiveCancelsCached or
   200  	// CacheRequestStrategyAsAvailable.
   201  	CacheRequestStrategyCachedOnly
   202  )
   203  
   204  // CachedMessageSubscriptionRequest provides an interface through which cache request configurations can be constructed. These
   205  // configurations can then be passed to a call to a [solace.dev/go/messaging/pkg/solace.ReceiverCacheRequests] interface method to request cached data. Refer to each of the below
   206  // factory methods for details on what configuration they provide.
   207  type CachedMessageSubscriptionRequest interface {
   208  
   209  	// GetName retrieves the name of the topic subscription.
   210  	GetName() string
   211  
   212  	// GetCacheName retrieves the name of the cache.
   213  	GetCacheName() string
   214  
   215  	// GetCacheAccessTimeout retrieves the timeout for the cache request.
   216  	GetCacheAccessTimeout() int32
   217  
   218  	// GetMaxCachedMessages retrieves the max number of cached messages to be retrieved in a request.
   219  	GetMaxCachedMessages() int32
   220  
   221  	// GetCachedMessageAge retrieves the max age of cached messages to be retrieved in a request.
   222  	GetCachedMessageAge() int32
   223  
   224  	// GetCachedMessageSubscriptionRequestStrategy retrieves the configured type of subscription strategy.
   225  	GetCachedMessageSubscriptionRequestStrategy() *CachedMessageSubscriptionStrategy
   226  }
   227  
   228  type cachedMessageSubscriptionRequest struct {
   229  	cacheName                         string
   230  	subscription                      *TopicSubscription
   231  	cacheAccessTimeout                int32
   232  	maxCachedMessages                 int32
   233  	cachedMessageAge                  int32
   234  	cachedMessageSubscriptionStrategy *CachedMessageSubscriptionStrategy
   235  }
   236  
   237  // GetName retrieves the name of the topic subscription.
   238  func (request *cachedMessageSubscriptionRequest) GetName() string {
   239  	if request.subscription == nil {
   240  		return "" // if topic subscription is nil, return an empty string
   241  	}
   242  	return request.subscription.GetName()
   243  }
   244  
   245  // GetCacheName retrieves the name of the cache.
   246  func (request *cachedMessageSubscriptionRequest) GetCacheName() string {
   247  	return request.cacheName
   248  }
   249  
   250  // GetCacheAccessTimeout retrieves the timeout for the cache request.
   251  func (request *cachedMessageSubscriptionRequest) GetCacheAccessTimeout() int32 {
   252  	return request.cacheAccessTimeout
   253  }
   254  
   255  // GetMaxCachedMessages retrieves the max number of cached messages to be retrieved in a request.
   256  func (request *cachedMessageSubscriptionRequest) GetMaxCachedMessages() int32 {
   257  	return request.maxCachedMessages
   258  }
   259  
   260  // GetCachedMessageAge retrieves the max age of cached messages to be retrieved in a request.
   261  func (request *cachedMessageSubscriptionRequest) GetCachedMessageAge() int32 {
   262  	return request.cachedMessageAge
   263  }
   264  
   265  // GetCachedMessageSubscriptionRequestStrategy retrieves the configured type of subscription strategy.
   266  func (request *cachedMessageSubscriptionRequest) GetCachedMessageSubscriptionRequestStrategy() *CachedMessageSubscriptionStrategy {
   267  	return request.cachedMessageSubscriptionStrategy
   268  }
   269  
   270  // NewCachedMessageSubscriptionRequest returns a CachedMessageSubscriptionRequest that can be used to configure a
   271  // cache request. The cachedMessageSubscriptionStrategy indicates how the API should pass received cached/live
   272  // messages to the application after a cache request has been sent. Refer to
   273  // [solace.dev/go/messaging/pkg/solace/resource.CachedMessageSubscriptionStrategy] for details on what behaviour
   274  // each strategy configures.
   275  //   - cacheName: The name of the cache to retrieve messages from.
   276  //   - subscription: What topic the cache request should match against.
   277  //   - cacheAccessTimeout: How long in milliseconds a cache request is permitted to take before it is internally
   278  //     cancelled. The valid range for this timeout is between 3000 and signed int 32 max. This value specifies a
   279  //     timer for the internal requests that occur between this API and a PubSub+ cache. A single call to a
   280  //     [solace.dev/go/messaging/pkg/solace.ReceiverCacheRequests] interface method can lead to one or more of these internal requests. As long
   281  //     as each of these internal requests complete before the specified time-out, the timeout value is satisfied.
   282  //   - maxCachedMessages: The max number of messages expected to be returned as a part of a
   283  //     cache response. The range of this paramater is between 0 and signed int32 max, with 0 indicating that there
   284  //     should be no restrictions on the number of messages received as a part of a cache request.
   285  //   - cachedMessageAge: the max age in seconds of the messages to be retrieved from a cache.
   286  //     The range of this parameter is between 0 and signed int 32 max, with 0 indicating that there should be no
   287  //     restrictions on the age of messages to be retrieved.
   288  //
   289  // The construction of NewCachedMessageSubscriptionRequest does not validate these parameter values. Instead, they are validated
   290  // when the cache request is sent after a call to a [solace.dev/go/messaging/pkg/solace.ReceiverCacheRequests] interface method.
   291  func NewCachedMessageSubscriptionRequest(cachedMessageSubscriptionStrategy CachedMessageSubscriptionStrategy,
   292  	cacheName string,
   293  	subscription *TopicSubscription,
   294  	cacheAccessTimeout int32,
   295  	maxCachedMessages int32,
   296  	cachedMessageAge int32) CachedMessageSubscriptionRequest {
   297  	// map the cachedMessageSubscriptionStrategy
   298  	var cachedMsgSubStrategy *CachedMessageSubscriptionStrategy = nil
   299  	switch cachedMessageSubscriptionStrategy {
   300  	case CacheRequestStrategyAsAvailable:
   301  		fallthrough
   302  	case CacheRequestStrategyCachedFirst:
   303  		fallthrough
   304  	case CacheRequestStrategyCachedOnly:
   305  		fallthrough
   306  	case CacheRequestStrategyLiveCancelsCached:
   307  		// these are valid
   308  		cachedMsgSubStrategy = &cachedMessageSubscriptionStrategy
   309  	default:
   310  		cachedMsgSubStrategy = nil
   311  	}
   312  	// return back a valid cache message subscription request if everything checks out
   313  	return &cachedMessageSubscriptionRequest{
   314  		cacheName:                         cacheName,
   315  		subscription:                      subscription,
   316  		cacheAccessTimeout:                cacheAccessTimeout,
   317  		maxCachedMessages:                 maxCachedMessages,
   318  		cachedMessageAge:                  cachedMessageAge,
   319  		cachedMessageSubscriptionStrategy: cachedMsgSubStrategy,
   320  	}
   321  }
   322