1 // pubsubplus-go-client 2 // 3 // Copyright 2021-2024 Solace Corporation. All rights reserved. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 package solace 18 19 import ( 20 "time" 21 22 "solace.dev/go/messaging/pkg/solace/config" 23 "solace.dev/go/messaging/pkg/solace/message" 24 "solace.dev/go/messaging/pkg/solace/resource" 25 ) 26 27 // PersistentMessagePublisher allows for the publishing of persistent messages (guaranteed messages). 28 type PersistentMessagePublisher interface { 29 MessagePublisher 30 MessagePublisherHealthCheck 31 32 // StartAsyncCallback starts the PersistentMessagePublisher asynchronously. 33 // Calls the callback when started with an error if one occurred, otherwise nil 34 // when successful. 35 StartAsyncCallback(callback func(PersistentMessagePublisher, error)) 36 37 // SetPublishReceiptListener sets the listener to receive delivery receipts. 38 // PublishReceipt events are triggered once the API receives an acknowledgement 39 // when a message is received from the event broker. 40 // This should be set before the publisher is started to avoid dropping acknowledgements. 41 // The listener does not receive events from PublishAwaitAcknowledgement calls. 42 SetMessagePublishReceiptListener(listener MessagePublishReceiptListener) 43 44 // TerminateAsyncCallback terminates the PersistentMessagePublisher asynchronously. 45 // Calls the callback when terminated with nil if successful, otherwise an error if 46 // one occurred. When gracePeriod is a value less than 0, the function waits indefinitely. 47 TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) 48 49 // PublishBytes sends a message of type byte array to the specified destination. 50 // Returns an error if one occurred while attempting to publish or if the publisher 51 // is not started/terminated. Possible errors include: 52 // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. 53 // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O 54 // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners 55 // are called. 56 PublishBytes(message []byte, destination *resource.Topic) error 57 58 // PublishString sends a message of type string to the specified destination. 59 // Possible errors include: 60 // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. 61 // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O 62 // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners 63 // are called. 64 PublishString(message string, destination *resource.Topic) error 65 66 // Publish sends the specified message of type OutboundMessage built by a 67 // OutboundMessageBuilder to the specified destination. 68 // Optionally, you can provide properties in the form of OutboundMessageProperties to override 69 // any properties set on OutboundMessage. The properties argument can be nil to 70 // not set any properties. 71 // Optionally, provide a context that is available in the PublishReceiptListener 72 // registered with SetMessagePublishReceiptListener as GetUserContext. 73 // The context argument can be nil to not set a context. Possible errors include: 74 // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. 75 // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O 76 // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners 77 // are called. 78 Publish(message message.OutboundMessage, destination *resource.Topic, properties config.MessagePropertiesConfigurationProvider, context interface{}) error 79 80 // PublishAwaitAcknowledgement sends the specified message of type OutboundMessage 81 // and awaits a publish acknowledgement. 82 // Optionally, you can provide properties in the form of OutboundMessageProperties to override 83 // any properties set on OutboundMessage. The properties argument can be nil to 84 // not set any properties. 85 // If the specified timeout argument is less than 0, the function waits indefinitely. 86 // Possible errors include: 87 // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. 88 // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O 89 // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners 90 // are called. 91 PublishAwaitAcknowledgement(message message.OutboundMessage, destination *resource.Topic, timeout time.Duration, properties config.MessagePropertiesConfigurationProvider) error 92 } 93 94 // MessagePublishReceiptListener is a listener that can be registered for the delivery receipt events. 95 type MessagePublishReceiptListener func(PublishReceipt) 96 97 // PublishReceipt is the receipt for delivery of a persistent message. 98 type PublishReceipt interface { 99 // GetUserContext retrieves the context associated with the publish, if provided. 100 // Returns nil if no user context is set. 101 GetUserContext() interface{} 102 103 // GetTimeStamp retrieves the time. The time indicates when the event occurred, specifically the time when the 104 // acknowledgement was received by the API from the event broker, or if present, when the GetError error 105 // occurred. 106 GetTimeStamp() time.Time 107 108 // GetMessage returns an OutboundMessage that was successfully published. 109 GetMessage() message.OutboundMessage 110 111 // GetError retrieves an error if one occurred, which usually indicates a publish attempt failed. 112 // GetError returns nil on a successful publish, otherwise an error if a failure occurred 113 // while delivering the message. 114 GetError() error 115 116 // IsPersisted returns true if the event broker confirmed that the message was successfully received and persisted, 117 // otherwise false. 118 IsPersisted() bool 119 } 120 121 // PersistentMessagePublisherBuilder allows for configuration of persistent message publisher instances. 122 type PersistentMessagePublisherBuilder interface { 123 // Build returns a new PersistentMessagePublisher based on the configured properties. 124 // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. 125 Build() (messagePublisher PersistentMessagePublisher, err error) 126 // OnBackPressureReject sets the publisher back pressure strategy to reject 127 // where the publish attempts are rejected once the bufferSize (the number of messages), is reached. 128 // If bufferSize is 0, an error is thrown when the transport is full when attempting to publish. 129 // A buffer of the given size will be statically allocated when the publisher is built. 130 // Valid bufferSize is greater than or equal to 0. 131 OnBackPressureReject(bufferSize uint) PersistentMessagePublisherBuilder 132 // OnBackPressureWait sets the publisher back pressure strategy to wait where publish 133 // attempts block until there is space in the buffer of size bufferSize (the number of messages). 134 // A buffer of the given size will be statically allocated when the publisher is built. 135 // Valid bufferSize is greater than or equal to 1. 136 OnBackPressureWait(bufferSize uint) PersistentMessagePublisherBuilder 137 // FromConfigurationProvider configures the persistent publisher with the given properties. 138 // Built in PublisherPropertiesConfigurationProvider implementations include: 139 // - PublisherPropertyMap - A map of PublisherProperty keys to values. 140 FromConfigurationProvider(provider config.PublisherPropertiesConfigurationProvider) PersistentMessagePublisherBuilder 141 } 142