1 // pubsubplus-go-client 2 // 3 // Copyright 2021-2025 Solace Corporation. All rights reserved. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 // Package resource contains types and factory functions for various 18 // broker resources such as topics and queues. 19 package resource 20 21 import "fmt" 22 23 // Destination represents a message destination on a broker. 24 // Some examples of destinations include queues and topics. 25 // Destination implementations can be retrieved by the various helper functions, 26 // such as TopicOf, QueueDurableExclusive, and TopicSubscriptionOf. 27 type Destination interface { 28 // GetName retrieves the name of the destination 29 GetName() string 30 } 31 32 // Subscription represents the valid subscriptions that can be specified to receivers. 33 // Valid subscriptions include *resource.TopicSubscription. 34 type Subscription interface { 35 Destination 36 // GetSubscriptionType will return the type of the subscription as a string 37 GetSubscriptionType() string 38 } 39 40 // Topic is an implementation of destination representing a topic 41 // that can be published to. 42 type Topic struct { 43 name string 44 } 45 46 // TopicOf creates a new topic with the specified name. 47 // Topic name must not be empty. 48 func TopicOf(expression string) *Topic { 49 return &Topic{expression} 50 } 51 52 // GetName returns the name of the topic. Implements the Destination interface. 53 func (t *Topic) GetName() string { 54 return t.name 55 } 56 57 // String implements fmt.Stringer 58 func (t *Topic) String() string { 59 return fmt.Sprintf("Topic: %s", t.GetName()) 60 } 61 62 // ShareName is an interface for identifiers that are associated 63 // with a shared subscription. 64 // See https://docs.solace.com/PubSub-Basics/Direct-Messages.htm#Shared in the Solace documentation. 65 type ShareName struct { 66 name string 67 } 68 69 // ShareNameOf returns a new share name with the provided name. 70 // Valid share names are not empty and do not contain special 71 // characters '>' or '*'. Returns a new ShareName with the given string. 72 func ShareNameOf(name string) *ShareName { 73 return &ShareName{name} 74 } 75 76 // GetName returns the share name. Implements the Destination interface. 77 func (sn *ShareName) GetName() string { 78 return sn.name 79 } 80 81 // String implements fmt.Stringer 82 func (sn *ShareName) String() string { 83 return fmt.Sprintf("ShareName: %s", sn.GetName()) 84 } 85 86 // TopicSubscription is a subscription to a topic often used for receivers. 87 type TopicSubscription struct { 88 topic string 89 } 90 91 // GetName returns the topic subscription expression. Implements the Destination interface. 92 func (t *TopicSubscription) GetName() string { 93 return t.topic 94 } 95 96 // GetSubscriptionType returns the type of the topic subscription as a string 97 func (t *TopicSubscription) GetSubscriptionType() string { 98 return fmt.Sprintf("%T", t) 99 } 100 101 // String implements fmt.Stringer 102 func (t *TopicSubscription) String() string { 103 return fmt.Sprintf("TopicSubscription: %s", t.GetName()) 104 } 105 106 // TopicSubscriptionOf creates a TopicSubscription of the specified topic string. 107 func TopicSubscriptionOf(topic string) *TopicSubscription { 108 return &TopicSubscription{topic} 109 } 110 111 // Queue represents a queue used for guaranteed messaging receivers. 112 type Queue struct { 113 name string 114 exclusivelyAccessible, durable bool 115 } 116 117 // GetName returns the name of the queue. Implements the Destination interface. 118 func (q *Queue) GetName() string { 119 return q.name 120 } 121 122 // IsExclusivelyAccessible determines if Queue supports exclusive or shared-access mode. 123 // Returns true if the Queue can serve only one consumer at any one time, false if the 124 // Queue can serve multiple consumers with each consumer serviced in a round-robin fashion. 125 func (q *Queue) IsExclusivelyAccessible() bool { 126 return q.exclusivelyAccessible 127 } 128 129 // IsDurable determines if the Queue is durable. Durable queues are privisioned objects on 130 // the broker that have a lifespan that is independent of any one client session. 131 func (q *Queue) IsDurable() bool { 132 return q.durable 133 } 134 135 func (q *Queue) String() string { 136 return fmt.Sprintf("Queue: %s, exclusive: %t, durable: %t", q.GetName(), q.IsExclusivelyAccessible(), q.IsDurable()) 137 } 138 139 // QueueDurableExclusive creates a new durable, exclusive queue with the specified name. 140 func QueueDurableExclusive(queueName string) *Queue { 141 return &Queue{ 142 name: queueName, 143 exclusivelyAccessible: true, 144 durable: true, 145 } 146 } 147 148 // QueueDurableNonExclusive creates a durable, non-exclusive queue with the specified name. 149 func QueueDurableNonExclusive(queueName string) *Queue { 150 return &Queue{ 151 name: queueName, 152 exclusivelyAccessible: false, 153 durable: true, 154 } 155 } 156 157 // QueueNonDurableExclusive creates an exclusive, non-durable queue with the specified name. 158 func QueueNonDurableExclusive(queueName string) *Queue { 159 return &Queue{ 160 name: queueName, 161 exclusivelyAccessible: true, 162 durable: false, 163 } 164 } 165 166 // QueueNonDurableExclusiveAnonymous creates an anonymous, exclusive, and non-durable queue. 167 func QueueNonDurableExclusiveAnonymous() *Queue { 168 return &Queue{ 169 name: "", 170 exclusivelyAccessible: true, 171 durable: false, 172 } 173 } 174 175 // CachedMessageSubscriptionStrategy indicates how the API should pass received cached and live messages to the application. Refer to each 176 // variant for details on what behaviour they configure. 177 type CachedMessageSubscriptionStrategy int 178 179 const ( 180 // CacheRequestStrategyAsAvailable provides a configuration for receiving a concurrent mix of both live and cached messages on the given TopicSubscription. 181 CacheRequestStrategyAsAvailable CachedMessageSubscriptionStrategy = iota 182 183 // CacheRequestStrategyLiveCancelsCached provides a configuration for initially passing received cached messages to the application and as soon as live 184 // messages are received, passing those instead and passing no more cached messages. 185 CacheRequestStrategyLiveCancelsCached 186 187 // CacheRequestStrategyCachedFirst provides a configuration for passing only cached messages to the application, before passing the received live messages. 188 // The live messages passed to the application thereof this configuration can be received as early as when the cache request is sent 189 // by the API, and are enqueued until the cache response is received and its associated cached messages, if available, are passed to 190 // the application. 191 CacheRequestStrategyCachedFirst 192 193 // CachedOnly provides a configuration for passing only cached messages and no live messages to the application. 194 // 195 // Note: Cache requests configured using CacheRequestStrategyCachedOnly are limited to be used with subscribers 196 // without live data subscriptions. When used with matching live data subscriptions, cached messages will be 197 // delivered for both the cache outcome and live subscription leading to duplicate message delivery. When needing 198 // cache data when live data subscriptions are already present, it is recommended to use other 199 // CachedMessageSubscriptionStrategy types such as CacheRequestStrategyLiveCancelsCached or 200 // CacheRequestStrategyAsAvailable. 201 CacheRequestStrategyCachedOnly 202 ) 203 204 // CachedMessageSubscriptionRequest provides an interface through which cache request configurations can be constructed. These 205 // configurations can then be passed to a call to a [solace.dev/go/messaging/pkg/solace.ReceiverCacheRequests] interface method to request cached data. Refer to each of the below 206 // factory methods for details on what configuration they provide. 207 type CachedMessageSubscriptionRequest interface { 208 209 // GetName retrieves the name of the topic subscription. 210 GetName() string 211 212 // GetCacheName retrieves the name of the cache. 213 GetCacheName() string 214 215 // GetCacheAccessTimeout retrieves the timeout for the cache request. 216 GetCacheAccessTimeout() int32 217 218 // GetMaxCachedMessages retrieves the max number of cached messages to be retrieved in a request. 219 GetMaxCachedMessages() int32 220 221 // GetCachedMessageAge retrieves the max age of cached messages to be retrieved in a request. 222 GetCachedMessageAge() int32 223 224 // GetCachedMessageSubscriptionRequestStrategy retrieves the configured type of subscription strategy. 225 GetCachedMessageSubscriptionRequestStrategy() *CachedMessageSubscriptionStrategy 226 } 227 228 type cachedMessageSubscriptionRequest struct { 229 cacheName string 230 subscription *TopicSubscription 231 cacheAccessTimeout int32 232 maxCachedMessages int32 233 cachedMessageAge int32 234 cachedMessageSubscriptionStrategy *CachedMessageSubscriptionStrategy 235 } 236 237 // GetName retrieves the name of the topic subscription. 238 func (request *cachedMessageSubscriptionRequest) GetName() string { 239 if request.subscription == nil { 240 return "" // if topic subscription is nil, return an empty string 241 } 242 return request.subscription.GetName() 243 } 244 245 // GetCacheName retrieves the name of the cache. 246 func (request *cachedMessageSubscriptionRequest) GetCacheName() string { 247 return request.cacheName 248 } 249 250 // GetCacheAccessTimeout retrieves the timeout for the cache request. 251 func (request *cachedMessageSubscriptionRequest) GetCacheAccessTimeout() int32 { 252 return request.cacheAccessTimeout 253 } 254 255 // GetMaxCachedMessages retrieves the max number of cached messages to be retrieved in a request. 256 func (request *cachedMessageSubscriptionRequest) GetMaxCachedMessages() int32 { 257 return request.maxCachedMessages 258 } 259 260 // GetCachedMessageAge retrieves the max age of cached messages to be retrieved in a request. 261 func (request *cachedMessageSubscriptionRequest) GetCachedMessageAge() int32 { 262 return request.cachedMessageAge 263 } 264 265 // GetCachedMessageSubscriptionRequestStrategy retrieves the configured type of subscription strategy. 266 func (request *cachedMessageSubscriptionRequest) GetCachedMessageSubscriptionRequestStrategy() *CachedMessageSubscriptionStrategy { 267 return request.cachedMessageSubscriptionStrategy 268 } 269 270 // NewCachedMessageSubscriptionRequest returns a CachedMessageSubscriptionRequest that can be used to configure a 271 // cache request. The cachedMessageSubscriptionStrategy indicates how the API should pass received cached/live 272 // messages to the application after a cache request has been sent. Refer to 273 // [solace.dev/go/messaging/pkg/solace/resource.CachedMessageSubscriptionStrategy] for details on what behaviour 274 // each strategy configures. 275 // - cacheName: The name of the cache to retrieve messages from. 276 // - subscription: What topic the cache request should match against. 277 // - cacheAccessTimeout: How long in milliseconds a cache request is permitted to take before it is internally 278 // cancelled. The valid range for this timeout is between 3000 and signed int 32 max. This value specifies a 279 // timer for the internal requests that occur between this API and a PubSub+ cache. A single call to a 280 // [solace.dev/go/messaging/pkg/solace.ReceiverCacheRequests] interface method can lead to one or more of these internal requests. As long 281 // as each of these internal requests complete before the specified time-out, the timeout value is satisfied. 282 // - maxCachedMessages: The max number of messages expected to be returned as a part of a 283 // cache response. The range of this paramater is between 0 and signed int32 max, with 0 indicating that there 284 // should be no restrictions on the number of messages received as a part of a cache request. 285 // - cachedMessageAge: the max age in seconds of the messages to be retrieved from a cache. 286 // The range of this parameter is between 0 and signed int 32 max, with 0 indicating that there should be no 287 // restrictions on the age of messages to be retrieved. 288 // 289 // The construction of NewCachedMessageSubscriptionRequest does not validate these parameter values. Instead, they are validated 290 // when the cache request is sent after a call to a [solace.dev/go/messaging/pkg/solace.ReceiverCacheRequests] interface method. 291 func NewCachedMessageSubscriptionRequest(cachedMessageSubscriptionStrategy CachedMessageSubscriptionStrategy, 292 cacheName string, 293 subscription *TopicSubscription, 294 cacheAccessTimeout int32, 295 maxCachedMessages int32, 296 cachedMessageAge int32) CachedMessageSubscriptionRequest { 297 // map the cachedMessageSubscriptionStrategy 298 var cachedMsgSubStrategy *CachedMessageSubscriptionStrategy = nil 299 switch cachedMessageSubscriptionStrategy { 300 case CacheRequestStrategyAsAvailable: 301 fallthrough 302 case CacheRequestStrategyCachedFirst: 303 fallthrough 304 case CacheRequestStrategyCachedOnly: 305 fallthrough 306 case CacheRequestStrategyLiveCancelsCached: 307 // these are valid 308 cachedMsgSubStrategy = &cachedMessageSubscriptionStrategy 309 default: 310 cachedMsgSubStrategy = nil 311 } 312 // return back a valid cache message subscription request if everything checks out 313 return &cachedMessageSubscriptionRequest{ 314 cacheName: cacheName, 315 subscription: subscription, 316 cacheAccessTimeout: cacheAccessTimeout, 317 maxCachedMessages: maxCachedMessages, 318 cachedMessageAge: cachedMessageAge, 319 cachedMessageSubscriptionStrategy: cachedMsgSubStrategy, 320 } 321 } 322