Acknowledging Messages Received by Clients

The messaging APIs provide acknowledgments to the PubSub+ event broker for the guaranteed messages that clients receive through a Flow. The figure below shows the process of how the guaranteed messages that an application receives through a Flow are acknowledged. During this Flow, client applications can also send a negative acknowledgment (Nack) for malformed messages or messages that could not be processed.

Acknowledging Received Guaranteed Messages

API Acknowledgments

A Guaranteed Message window size limits the number of messages that the API can receive before it must return an acknowledgment to the event broker to indicate that it received the messages in the window. After the API sends this acknowledgment, the Guaranteed Message window reopens so that further messages can be sent to the API.

An application can adjust the windowed acknowledgments by changing the default acknowledgment timer and threshold parameters set through the Flow properties (refer to Important Flow (Message Consumer) Properties). Changing these defaults is not usually required and will change the performance characteristics of a Flow.

Application Acknowledgments

One of the two following application acknowledgment modes can be used for acknowledging a message:

  • Auto-acknowledgment
  • Client acknowledgment

The acknowledgment mode to use is set through one of the flow properties listed below. By default, the auto-acknowledgment mode is used. 

The following table shows how to perform acknowledgment for the different APIs:

PubSub+ Messaging API Use

JCSMP

ConsumerFlowProperties.setAckMode(String ackMode)

Possible values are:

  • JCSMPProperties.SUPPORTED_MESSAGE_ACK_AUTO

  • JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT

Java RTO

FlowHandle.PROPERTIES.ACKMODE

Possible values in SolEnum.AckMode are:

  • AckMode.AUTO

  • AckMode.CLIENT

C

SOLCLIENT_FLOW_PROP_ACKMODE. The possible values are: 

  • SOLCLIENT_FLOW_PROP_ACKMODE_CLIENT
  • SOLCLIENT_FLOW_PROP_ACKMODE_AUTO (Default)

.NET

FlowProperties.AckMode

Possible values are:

  • AutoAck

  • ClientAck

JavaScript and Node.js

solace.MessageConsumerProperties.acknowledgeMode

Possible values are:

  • solace.MessageConsumerAcknowledgeMode.AUTO

  • solace.MessageConsumerAcknowledgeMode.CLIENT

Auto-Acknowledgment Mode

When the auto-acknowledgment mode is used, the API automatically generates application-level acknowledgments.

For JCSMP, acknowledgments are sent at different times depending on whether the message is received asynchronously or synchronously:

  • when received asynchronously, the acknowledgment is sent after the message callback completes.
  • when received synchronously, the acknowledgment is sent after the message is removed from the API's internal queue during the receive() method. It's important to realize that the acknowledgment has been sent before control is returned to the application (that is after the receive() method completes).

Client Acknowledgment Mode

When the client acknowledgment mode is used the client must explicitly send an acknowledgment for the message ID of each message received. Optionally, you can use negative acknowledgments as well. For more information, see Negative Acknowledgments for Specific Messages.

To explicitly send a client acknowledgment, call one of the methods or functions listed below.

Avoid allowing the number of outstanding unacknowledged messages to become excessively large (for example, 10,000 or more) because the egress message rate from the event broker can start to decline.

To send an Explicit Client Acknowledgment

PubSub+ Messaging API Use

JCSMP

XMLMessage.ackMessage()

Java RTO

FlowHandle.ack(...)

See the PubSub+ Messaging API for JRTO for information

C

solClient_flow_sendAck(...)

.NET

IFlow.Ack(...)

JavaScript and Node.js

solace.Message.acknowledge()

Acknowledgment is called on the message object.

When using the client acknowledgment mode with the C API, the maximum number of messages that the API can deliver to the application through the Flow without receiving client acknowledgments can be set through the SOLCLIENT_FLOW_PROP_MAX_UNACKED_MESSAGES Flow property. By default, this property has a value of -1, which specifies that the maximum number of unacknowledged messages that can be delivered is not restricted by the API. (To change the maximum number of unacknowledged messages that may be received through an existing Flow, call solClient_flow_setMaxUnacked()).

Negative Acknowledgments for Specific Messages

You can use negative acknowledgments (Nacks) if you have configured your applications for client acknowledgments (see Client Acknowledgment Mode). When you use Nacks, you can send a settlement outcome to let the event broker know the result from processing a guaranteed message that was received. Based on the settlement outcome, the event broker knows how to handle the message on its queue. You can use the following settlement outcomes:

  • ACCEPTED—This Ack notifies the event broker that your client application successfully processed the guaranteed message. When the event broker receives this outcome it removes the message from its queue.
    • When you call a settlement function/methods with an outcome of ACCEPTED, it is the same as calling an Ack function/methods in Client Acknowledgment Mode.
  • FAILED—This Nack notifies the event broker that your client application did not process the message. When the event broker receives this Nack it attempts to redeliver the message while adhering to delivery count limits.
  • REJECTED—This Nack notifies the event broker that your client application could process the message but it was not accepted (for example, failed validation). When the event broker receives this Nack it removes the message from its queue and then moves the message to the Dead Message Queue (DMQ) if it is configured.

Before you can use Nacks, you must add the FAILED, REJECTED, or both outcomes as Nack types when you create the Flow to prepare the Flow to work with negative acknowledgments. You do not need to add the ACCEPTED outcome because it is always available. If you try to use an outcome that has not been added, you get an error of Required Settlement Outcome Not Supported.

  • Nacks can be lost during transit (for example due to unexpected networking issues). Consider this fact as part of the logic for handling messages when you develop your application.
  • Nacks are supported on event brokers 10.2.1 and later. If an event broker does not support Nacks, an InvalidOperationException occurs during the Flow bind request when an outcome is specified.
PubSub+ Messaging API Calls

JCSMP

Enable and prepare the Flow to use negative acknowledgments:

ConsumerFlowProperties.addRequiredSettlementOutcomes(Outcome outcometoadd)

You need to add one or both of:

  • Outcome.FAILED
  • Outcome.REJECTED

Outcome.ACCEPTED is not required to be set and always supported.

You can Ack or Nack using:

XMLMessage.settle(settlement_outcome)

Possible values to use:

  • Outcome.ACCEPTED
  • Outcome.FAILED
  • Outcome.REJECTED

Java RTO

To enable and prepare the Flow to use negative acknowledgments, set REQUIRED_OUTCOME_FAILED , REQUIRED_OUTCOME_REJECTED , or both to ENABLE:

int flowProps = 0;
String[] flowProperties = new String[10];
flowProperties[flowProps++] = FlowHandle.PROPERTIES.REQUIRED_OUTCOME_FAILED;
flowProperties[flowProps++] = SolEnum.BooleanValue.ENABLE;
flowProperties[flowProps++] = FlowHandle.PROPERTIES.REQUIRED_OUTCOME_REJECTED;
flowProperties[flowProps++] = SolEnum.BooleanValue.ENABLE;

To acknowledge a message, you can send an Ack or Nack using:

flowHandle.settle(msgId, messageOutcome);

Possible values to use:

  • MessageOutcome.ACCEPTED
  • MessageOutcome.FAILED
  • MessageOutcome.REJECTED

C

Enable and prepare the Flow to use negative acknowledgments:

  • SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILED
  • SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_REJECTED
  • To enable a Nack, set the property to SOLCLIENT_PROP_ENABLE_VAL

To acknowledge a message, you can send an Ack or Nack using:

solClient_flow_settleMsg (solClient_opaqueFlow_pt,solClient_msgId_t,solClient_msgOutcome_t)

For solClient_msgOutcome_t, you can use the following enumerated values:

  • SOLCLIENT_OUTCOME_ACCEPTED
  • SOLCLIENT_OUTCOME_FAILED
  • SOLCLIENT_OUTCOME_REJECTED

.NET

Enable and set the following properties to true in the IFlow interface to use negative acknowledgments:

  • IFlow.RequiredOutcomeFailed = true
  • IFlow.RequiredOutcomeRejected = true

You can send a Ack or Nack using:

  • IFlow.Settle(long ADMessageId, MessageOutcome outcome)

For the MessageOutcome, you use one of the following values:

  • MessageOutcome.Accepted
  • MessageOutcome.Rejected
  • MessageOutcome.Failed

JavaScript and Node.js

When you create a flow, enable and set the following properties on your message consumer to use negative acknowledgments:
requiredSettlementOutcomes: [MessageOutcome.FAILED, MessageOutcome.REJECTED]

MessageOutcome.ACCEPTED is not required to be set with requiredSettlementOutcomes and is always supported.

You can Ack or Nack using:

Message.settle(messageOutcome)

For the messageOutcome, you use one of the following values:

  • MessageOutcome.ACCEPTED
  • MessageOutcome.REJECTED
  • MessageOutcome.FAILED

Using Nacks with the Java RTO API

The following sample code shows how to send Nacks using the PubSub+ Messaging API for Java RTO:

// session must have SolEnum.CapabilityName.AD_APP_ACK_FAILED capability in order for negative MessageOutcomes to be supported
// MessageOutcome.ACCEPTED is always supported
 
FlowHandle flowHandle = Solclient.Allocator.newFlowHandle();
 
// Setting Flow Properties
int flowProps = 0;
String[] flowProperties = new String[10];
flowProperties[flowProps++] = FlowHandle.PROPERTIES.ACKMODE;
flowProperties[flowProps++] = SolEnum.AckMode.CLIENT;
flowProperties[flowProps++] = FlowHandle.PROPERTIES.REQUIRED_OUTCOME_FAILED;
flowProperties[flowProps++] = SolEnum.BooleanValue.ENABLE;
flowProperties[flowProps++] = FlowHandle.PROPERTIES.REQUIRED_OUTCOME_REJECTED;
flowProperties[flowProps++] = SolEnum.BooleanValue.ENABLE;
 
sessionHandle.createFlowForHandle(flowHandle, flowProperties, queue, null,
		     new MessageCallbackSample() {
			 @Override
			 public void onMessage(Handle handle) {
			     FlowHandle flowHandle = (FlowHandle) handle;
			     MessageHandle msgHandle = flowHandle.getRxMessage();
			     long msgId = msgHandle.getGuaranteedMessageId();
							
			     /* An example try-catch block for processing and settling a message. */	
			     try {
			         processMessage(msgHandle);
			         flowHandle.settle(msgId, MessageOutcome.ACCEPTED);
			     } catch (Exception e){
			         flowHandle.settle(msgId, MessageOutcome.FAILED);
			     }
			 }	
		      }, flowEventCallback);

Using Nacks with the JCSMP API

The following sample code shows how to send Nacks using the PubSub+ Messaging API for JCSMP:

...
...
// A session has already been created with JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT
final ConsumerFlowProperties cfp = new ConsumerFlowProperties().setEndpoint(q);

// Add the settlement outcomes - Outcome.ACCEPTED does not need to be added because it is always included
// The consumer can add multiple outcomes, for example cfp.addRequiredSettlementOutcomes(Outcome.FAILED, Outcome.REJECTED);
cfp.addRequiredSettlementOutcomes(Outcome.FAILED);

final FlowReceiver flowReceiver= session.createFlow(null, cfp);
BytesXMLMessage msg = flowReceiver.receive(1000);
try {
    processMessage(msg);
    msg.settle(Outcome.ACCEPTED);// same as msg.ackMessage();  
    catch (Exception e){
        msg.settle(Outcome.FAILED); // Failed to process message, settle as FAILED
    }
}
...
...

Using Nacks with the C API

For the PubSub+ Messaging API for to use Nacks you must set SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILED, SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_REJECTED, or both properties to SOLCLIENT_PROP_ENABLE_VAL.

The following sample code shows how to send Nacks using a fictional processMessage() function to process the guaranteed message, which provides a status of processing so that you can send the settlement outcome using the solClient_flow_settleMsg() function: 

/* This code snippet presumes that you have a
   session and endpoint (queue)already */
/* Message Count for messages that were accepted by the client */
static int msgCount = 0;
static int msgCountSuccess = 0;
static int msgCountFailed = 0;
static int msgCountRejected = 0;

/* Callback to handle messages */
	
static solClient_rxMsgCallback_returnCode_t flowMessageReceiveCallback ( solClient_opaqueFlow_pt flow_p, 
                                                                         solClient_opaqueMsg_pt msg_p,
                                                                         void *user_p )
{
  solClient_msgId_t msgId;
  int status=0;
  /* Process the message. */
  printf ( "Received message:\n" );
  //Some function to process the message and return a status
  status = processMessage(msg_p);
  solClient_msg_getMsgId (msg_p, &msgId); 
  msgCount ++
    
 /* If the status was good)
 if ( status  == 0 ) {
	printf ( "Message passed validation\n");
	solClient_flow_settleMsg(flow_p, msgId, SOLCLIENT_OUTCOME_ACCEPTED);
       msgCountSuccess++;
 }
 else if (status ==  1 ) {
	printf ( "Message failed validation\n");
	solClient_flow_settleMsg(flow_p, msgId, SOLCLIENT_OUTCOME_REJECTED);
       msgCountRejected++;
 }
 else {
    printf ( "Message failed to process message.\n");
    solClient_flow_settleMsg(flow_p, msgId, SOLCLIENT_OUTCOME_FAILED);
    msgCountFailed++;
 }
}			
...
...

int main (in argc, char *argv[])
{
   /* The code snippet presumes a session has been created */
   /* Configure the Flow function information */
   flowFuncInfo.rxMsgInfo.callback_p = flowMessageReceiveCallback;
   flowFuncInfo.eventInfo.callback_p = flowEventCallback;
   
   if (!solClient_session_isCapable(session_p, SOLCLIENT_SESSION_CAPABILITY_AD_APP_ACK_FAILED)) {
      exit(1);
    }
    /* We must add both these properties if want use FAIL and REJECT as outcomes
       which prepares the flow to use negative acknowledgment. */
    props[propIndex++] = SOLCLIENT_FLOW_PROP_ACKMODE;
    props[propIndex++] = SOLCLIENT_FLOW_PROP_ACKMODE_CLIENT;
    props[propIndex++] = SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILED;    
    props[propIndex++] = SOLCLIENT_PROP_ENABLE_VAL;
    props[propIndex++] = SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_REJECTED;    
    props[propIndex++] = SOLCLIENT_PROP_ENABLE_VAL;
    props[propIndex++] = NULL;

    ...
    ...
     solClientRc = solClient_session_createFlow(props,
                                                session_p,
			                         &flow_p,
			                         &flowFuncInfo,
			                         sizeof(flowFuncInfo));


  /** Wait for messages **/
  printf ( "Waiting for messages......\n" );
  fflush ( stdout );
  while ( msgCount < 1 ) {
     SLEEP ( 1 );
  }
...
...
}				

Using Nacks with the .NET API

For the PubSub+ .NET (C#) API, you must set RequiredOutcomeFailed, RequiredOutcomeRejected, or both properties to true to use Nacks. The following sample code shows how to send Nacks using a fictional messageProcessingObject.process() method, which determines the settement outcome that is sent with the IFlow.Settle() method:

// This code snippet presumes that you have a session and endpoint (queue)already
// Use object object instantiation to set the required properties to use Nacks.
FlowProperties flowProps = new FlowProperties()
{
   AckMode = MessageAckMode.ClientAck,
   RequiredOutcomeFailed = true,
   RequiredOutcomeRejected = true
};
 
IFlow flow = session.CreateFlow(flowProps, queue, null,
                        new EventHandler<MessageEventArgs> (delegate(object source, MessageEventArgs args){
                           // Got a message and do processing and based on result fictional messageProcessingObject
		            if (messageProcessingObject.process(args.Message) == 1)	{			
                               // Accept the message
                               flow.Settle(args.Message.ADMessageId, MessageOutcome.Accepted);
			    }
			    else if ((messageProcessingObject.process(args.Message) == 3) {
			        // Reject the message
                               flow.Settle(args.Message.ADMessageId, MessageOutcome.Rejected);
			    }
			    else {
			        // Failed to process the message
                               flow.Settle(args.Message.ADMessageId, MessageOutcome.Failed); 
				args.Message.Dispose();
                        }),
                        new EventHandler<FlowEventArgs> (delegate(object source, FlowEventArgs args) {
			    // Got an event
                         }));
...
...
}				

Using Nacks with the Javascript API

The following sample code shows how to send Nacks using the PubSub+ Messaging API for Javascript:

try {
  // Create a message consumer with support for NACKs
  const messageConsumer = session.createMessageConsumer({
     // solace.MessageConsumerProperties
     queueDescriptor: { name: 'QueueName', type: solace.QueueType.QUEUE, durable: true },
     acknowledgeMode: solace.MessageConsumerAcknowledgeMode.CLIENT, // Enabling Client ack
     // session is created with SUPPORTED_MESSAGE_ACK_CLIENT to support negative acknowledgment outcomes
     // MessageOutcome.ACCEPTED is not required to be set and always supported
     requiredSettlementOutcomes: [MessageOutcome.FAILED, MessageOutcome.REJECTED], // set the settlement outcome for the flow. Flow Type will be 0x03
  });
 
  // Define message consumer event listeners
  messageConsumer.on(solace.MessageConsumerEventName.UP, function () {
      // consumer is up
  });
 
  messageConsumer.on(solace.MessageConsumerEventName.CONNECT_FAILED_ERROR, function (error) {
      // Error: the message consumer could not bind to queue
  });
 
  messageConsumer.on(solace.MessageConsumerEventName.DOWN, function () {
      // The message consumer is down
  });
 
  messageConsumer.on(solace.MessageConsumerEventName.DOWN_ERROR, function (details) {
      console.log('Received "DOWN_ERROR" event - details: ' + details);
  });
 
  // Define message received event listener
  messageConsumer.on(solace.MessageConsumerEventName.MESSAGE, function (message) {
      console.log('Received message: "' + message.getBinaryAttachment() + '",' +
          ' details:\n' + message.dump());
       // use the new settle method to ACK/NACK the message from the router
       message.settle(solace.MessageOutcome.ACCEPTED);// same as message.acknowledge();
 
      // messages can also be settled with the FAILED and REJECTED outcomes on supported broker sessions
      // message.settle(solace.MessageOutcome.FAILED);// same as message.acknowledge();
      // message.settle(solace.MessageOutcome.REJECTED);// same as message.acknowledge();
  });
 
  // Connect the message consumer
  messageConsumer.connect();
} catch (error) {
    // error occurred while creating the message consumer
    console.log(error.toString());
}