...

Source file src/solace.dev/go/messaging/pkg/solace/persistent_message_publisher.go

Documentation: solace.dev/go/messaging/pkg/solace

     1  // pubsubplus-go-client
     2  //
     3  // Copyright 2021-2024 Solace Corporation. All rights reserved.
     4  //
     5  // Licensed under the Apache License, Version 2.0 (the "License");
     6  // you may not use this file except in compliance with the License.
     7  // You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package solace
    18  
    19  import (
    20  	"time"
    21  
    22  	"solace.dev/go/messaging/pkg/solace/config"
    23  	"solace.dev/go/messaging/pkg/solace/message"
    24  	"solace.dev/go/messaging/pkg/solace/resource"
    25  )
    26  
    27  // PersistentMessagePublisher allows for the publishing of persistent messages (guaranteed messages).
    28  type PersistentMessagePublisher interface {
    29  	MessagePublisher
    30  	MessagePublisherHealthCheck
    31  
    32  	// StartAsyncCallback starts the PersistentMessagePublisher asynchronously.
    33  	// Calls the callback when started with an error if one occurred, otherwise nil
    34  	// when successful.
    35  	StartAsyncCallback(callback func(PersistentMessagePublisher, error))
    36  
    37  	// SetPublishReceiptListener sets the listener to receive delivery receipts.
    38  	// PublishReceipt events are triggered once the API receives an acknowledgement
    39  	// when a message is received from the event broker.
    40  	// This should be set before the publisher is started to avoid dropping acknowledgements.
    41  	// The listener does not receive events from PublishAwaitAcknowledgement calls.
    42  	SetMessagePublishReceiptListener(listener MessagePublishReceiptListener)
    43  
    44  	// TerminateAsyncCallback terminates the PersistentMessagePublisher asynchronously.
    45  	// Calls the callback when terminated with nil if successful, otherwise an error if
    46  	// one occurred. When gracePeriod is a value less than 0, the function waits indefinitely.
    47  	TerminateAsyncCallback(gracePeriod time.Duration, callback func(error))
    48  
    49  	// PublishBytes sends a message of type byte array to the specified destination.
    50  	// Returns an error if one occurred while attempting to publish or if the publisher
    51  	// is not started/terminated. Possible errors include:
    52  	// - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed.
    53  	// - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O
    54  	//   capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners
    55  	//   are called.
    56  	PublishBytes(message []byte, destination *resource.Topic) error
    57  
    58  	// PublishString sends a message of type string to the specified destination.
    59  	// Possible errors include:
    60  	// - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed.
    61  	// - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O
    62  	//   capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners
    63  	//   are called.
    64  	PublishString(message string, destination *resource.Topic) error
    65  
    66  	// Publish sends the specified  message of type OutboundMessage built by a
    67  	// OutboundMessageBuilder to the specified destination.
    68  	// Optionally, you can provide properties in the form of OutboundMessageProperties to override
    69  	// any properties set on OutboundMessage. The properties argument can be nil to
    70  	// not set any properties.
    71  	// Optionally, provide a context that is available in the PublishReceiptListener
    72  	// registered with SetMessagePublishReceiptListener as GetUserContext.
    73  	// The context argument can be nil to not set a context. Possible errors include:
    74  	// - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed.
    75  	// - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O
    76  	//   capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners
    77  	//   are called.
    78  	Publish(message message.OutboundMessage, destination *resource.Topic, properties config.MessagePropertiesConfigurationProvider, context interface{}) error
    79  
    80  	// PublishAwaitAcknowledgement sends the specified message of type OutboundMessage
    81  	// and awaits a publish acknowledgement.
    82  	// Optionally, you can provide properties in the form of OutboundMessageProperties to override
    83  	// any properties set on OutboundMessage. The properties argument can be nil to
    84  	// not set any properties.
    85  	// If the specified timeout argument is less than 0, the function waits indefinitely.
    86  	// Possible errors include:
    87  	// - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed.
    88  	// - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O
    89  	//   capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners
    90  	//   are called.
    91  	PublishAwaitAcknowledgement(message message.OutboundMessage, destination *resource.Topic, timeout time.Duration, properties config.MessagePropertiesConfigurationProvider) error
    92  }
    93  
    94  // MessagePublishReceiptListener is a listener that can be registered for the delivery receipt events.
    95  type MessagePublishReceiptListener func(PublishReceipt)
    96  
    97  // PublishReceipt is the receipt for delivery of a persistent message.
    98  type PublishReceipt interface {
    99  	// GetUserContext retrieves the context associated with the publish, if provided.
   100  	// Returns nil if no user context is set.
   101  	GetUserContext() interface{}
   102  
   103  	// GetTimeStamp retrieves the time. The time indicates when the event occurred, specifically the time when the
   104  	// acknowledgement was received by the API from the event  broker, or if present,  when the GetError error
   105  	// occurred.
   106  	GetTimeStamp() time.Time
   107  
   108  	// GetMessage returns an OutboundMessage that was successfully published.
   109  	GetMessage() message.OutboundMessage
   110  
   111  	// GetError retrieves an error if one occurred, which usually indicates a publish attempt failed.
   112  	// GetError returns nil on a successful publish, otherwise an error if a failure occurred
   113  	// while delivering the message.
   114  	GetError() error
   115  
   116  	// IsPersisted returns true if the event broker confirmed that the message was successfully received and persisted,
   117  	// otherwise false.
   118  	IsPersisted() bool
   119  }
   120  
   121  // PersistentMessagePublisherBuilder allows for configuration of persistent message publisher instances.
   122  type PersistentMessagePublisherBuilder interface {
   123  	// Build returns a new PersistentMessagePublisher based on the configured properties.
   124  	// Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided.
   125  	Build() (messagePublisher PersistentMessagePublisher, err error)
   126  	// OnBackPressureReject sets the publisher back pressure strategy to reject
   127  	// where the publish attempts are rejected once the bufferSize (the number of messages), is reached.
   128  	// If bufferSize is 0, an error is thrown when the transport is full when attempting to publish.
   129  	// A buffer of the given size will be statically allocated when the publisher is built.
   130  	// Valid bufferSize is greater than or equal to  0.
   131  	OnBackPressureReject(bufferSize uint) PersistentMessagePublisherBuilder
   132  	// OnBackPressureWait sets the publisher back pressure strategy to wait where publish
   133  	// attempts block until there is space in the buffer of size bufferSize (the number of messages).
   134  	// A buffer of the given size will be statically allocated when the publisher is built.
   135  	// Valid bufferSize is greater than or equal to 1.
   136  	OnBackPressureWait(bufferSize uint) PersistentMessagePublisherBuilder
   137  	// FromConfigurationProvider configures the persistent publisher with the given properties.
   138  	// Built in PublisherPropertiesConfigurationProvider implementations include:
   139  	// - PublisherPropertyMap - A  map of PublisherProperty keys to values.
   140  	FromConfigurationProvider(provider config.PublisherPropertiesConfigurationProvider) PersistentMessagePublisherBuilder
   141  }
   142