Acknowledging Messages Received by Clients
The messaging APIs provide acknowledgments to the PubSub+ event broker for the guaranteed messages that clients receive through a flow. The figure below shows the process of how the guaranteed messages that an application receives through a flow are acknowledged. During this flow, client applications can also send a negative acknowledgment (NACK) for malformed messages or messages that could not be processed.
Acknowledging Received Guaranteed Messages
API Acknowledgments
A Guaranteed message window size limits the number of messages that the API can receive before it must return an acknowledgment to the event broker to indicate that it received the messages in the window. After the API sends this acknowledgment, the Guaranteed message window reopens so that further messages can be sent to the API.
An application can adjust the windowed acknowledgments by changing the default acknowledgment timer and threshold parameters set through the flow properties. For more information, see Important Flow (Message Consumer) Properties). Changing these defaults is not usually required and will change the performance characteristics of a flow.
Application Acknowledgments
One of the two following application acknowledgment modes can be used for acknowledging a message:
- Auto-acknowledgment
- Client acknowledgment
The acknowledgment mode to use is set through one of the flow properties listed below. By default, the auto-acknowledgment mode is used.
The following table shows how to perform acknowledgment for the different APIs:
PubSub+ Messaging API | Use |
---|---|
Possible values are:
|
|
Possible values in
|
|
|
|
Possible values are:
|
|
JavaScript and Node.js |
Possible values are:
|
Auto-Acknowledgment Mode
When the auto-acknowledgment mode is used, the API automatically generates application-level acknowledgments.
For JCSMP, acknowledgments are sent at different times depending on whether the message is received asynchronously or synchronously:
- when received asynchronously, the acknowledgment is sent after the message callback completes.
- when received synchronously, the acknowledgment is sent after the message is removed from the API's internal queue during the
receive()
method. It's important to realize that the acknowledgment has been sent before control is returned to the application (that is after thereceive()
method completes).
Client Acknowledgment Mode
When the client acknowledgment mode is used the client must explicitly send an acknowledgment for the message ID of each message received. Optionally, you can use negative acknowledgments as well. For more information, see Negative Acknowledgments for Specific Messages.
To explicitly send a client acknowledgment, call one of the methods or functions listed below.
Avoid allowing the number of outstanding unacknowledged messages to become excessively large (for example, 10,000 or more) because the egress message rate from the event broker can start to decline.
PubSub+ Messaging API | Use |
---|---|
|
|
See thePubSub+ Messaging API for JRTO for information |
|
|
|
|
|
JavaScript and Node.js |
Acknowledgment is called on the message object. |
When using the client acknowledgment mode with the C API, the maximum number of messages that the API can deliver to the application through the flow without receiving client acknowledgments can be set through the SOLCLIENT_FLOW_PROP_MAX_UNACKED_MESSAGES
flow property. By default, this property has a value of -1, which specifies that the maximum number of unacknowledged messages that can be delivered is not restricted by the API. (To change the maximum number of unacknowledged messages that may be received through an existing flow, call solClient_flow_setMaxUnacked()
).
Negative Acknowledgments for Specific Messages
You can use negative acknowledgments (NACKs) if you have configured your applications for client acknowledgments (see Client Acknowledgment Mode). When you use NACKs, you can send a settlement outcome to let the event broker know the result from processing a guaranteed message that was received. Based on the settlement outcome, the event broker knows how to handle the message on its queue. You can use the following settlement outcomes:
- ACCEPTED—This ACK notifies the event broker that your client application successfully processed the guaranteed message. When the event broker receives this outcome it removes the message from its queue.
- When you call a settlement function/methods with an outcome of ACCEPTED, it is the same as calling an ACK function/methods in Client Acknowledgment Mode.
- FAILED—This NACK notifies the event broker that your client application did not process the message. When the event broker receives this NACK it attempts to redeliver the message while adhering to delivery count limits.
- REJECTED—This NACK notifies the event broker that your client application could process the message but it was not accepted (for example, failed validation). When the event broker receives this NACK it removes the message from its queue and then moves the message to the dead message queue (DMQ) if it is configured.
Before you can use NACKs, you must add the FAILED, REJECTED, or both outcomes as NACK types when you create the flow to prepare the flow to work with negative acknowledgments. You do not need to add the ACCEPTED outcome because it is always available. If you try to use an outcome that has not been added, you get an error of Required Settlement Outcome Not Supported
.
- NACKs can be lost during transit (for example due to unexpected networking issues). Consider this fact as part of the logic for handling messages when you develop your application.
- NACKs are supported on event brokers 10.2.1 and later. If an event broker does not support NACKs, an
InvalidOperationException
occurs during the flow bind request when an outcome is specified.
PubSub+ Messaging API | Calls |
Enable and prepare the flow to use negative acknowledgments:
You need to add one or both of:
You can ACK or NACK using:
Possible values to use:
|
|
To enable and prepare the flow to use negative acknowledgments, set int flowProps = 0; String[] flowProperties = new String[10]; flowProperties[flowProps++] = FlowHandle.PROPERTIES.REQUIRED_OUTCOME_FAILED; flowProperties[flowProps++] = SolEnum.BooleanValue.ENABLE; flowProperties[flowProps++] = FlowHandle.PROPERTIES.REQUIRED_OUTCOME_REJECTED; flowProperties[flowProps++] = SolEnum.BooleanValue.ENABLE; To acknowledge a message, you can send an ACK or NACK using:
Possible values to use:
|
|
Enable and prepare the flow to use negative acknowledgments:
To acknowledge a message, you can send an ACK or NACK using:
For
|
|
Enable and set the following properties to
You can send an ACK or NACK using:
For the
|
|
JavaScript and Node.js |
When you create a flow, enable and set the following properties on your message consumer to use negative acknowledgments:
You can send an ACK or NACK using:
For the
|
Using NACKs with the Java RTO API
The following sample code shows how to send NACKs using the PubSub+ messaging API for Java RTO:
// Session must have SolEnum.CapabilityName.AD_APP_ACK_FAILED capability in order for negative MessageOutcomes to be supported // MessageOutcome.ACCEPTED is always supported FlowHandle flowHandle = Solclient.Allocator.newFlowHandle(); // Setting Flow Properties int flowProps = 0; String[] flowProperties = new String[10]; flowProperties[flowProps++] = FlowHandle.PROPERTIES.ACKMODE; flowProperties[flowProps++] = SolEnum.AckMode.CLIENT; flowProperties[flowProps++] = FlowHandle.PROPERTIES.REQUIRED_OUTCOME_FAILED; flowProperties[flowProps++] = SolEnum.BooleanValue.ENABLE; flowProperties[flowProps++] = FlowHandle.PROPERTIES.REQUIRED_OUTCOME_REJECTED; flowProperties[flowProps++] = SolEnum.BooleanValue.ENABLE; sessionHandle.createFlowForHandle(flowHandle, flowProperties, queue, null, new MessageCallbackSample() { @Override public void onMessage(Handle handle) { FlowHandle flowHandle = (FlowHandle) handle; MessageHandle msgHandle = flowHandle.getRxMessage(); long msgId = msgHandle.getGuaranteedMessageId(); /* An example try-catch block for processing and settling a message. */ try { processMessage(msgHandle); flowHandle.settle(msgId, MessageOutcome.ACCEPTED); } catch (Exception e){ flowHandle.settle(msgId, MessageOutcome.FAILED); } } }, flowEventCallback);
Using NACKs with the JCSMP API
The following sample code shows how to send NACKs using the PubSub+ messaging API for JCSMP:
... ... // A session has already been created with JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT final ConsumerFlowProperties cfp = new ConsumerFlowProperties().setEndpoint(q); // Add the settlement outcomes - Outcome.ACCEPTED does not need to be added because it is always included // The consumer can add multiple outcomes, for example cfp.addRequiredSettlementOutcomes(Outcome.FAILED, Outcome.REJECTED); cfp.addRequiredSettlementOutcomes(Outcome.FAILED); final FlowReceiver flowReceiver= session.createFlow(null, cfp); BytesXMLMessage msg = flowReceiver.receive(1000); try { processMessage(msg); msg.settle(Outcome.ACCEPTED);// same as msg.ackMessage(); catch (Exception e){ msg.settle(Outcome.FAILED); // Failed to process message, settle as FAILED } } ... ...
Using NACKs with the C API
For the PubSub+ messaging API for to use NACKs you must set SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILED
, SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_REJECTED
, or both properties to SOLCLIENT_PROP_ENABLE_VAL
.
The following sample code shows how to send NACKs using a fictional processMessage()
function to process the guaranteed message, which provides a status of processing so that you can send the settlement outcome using the solClient_flow_settleMsg()
function:
/* This code snippet presumes that you have a session and endpoint (queue)already */ /* Message count for messages that were accepted by the client */ static int msgCount = 0; static int msgCountSuccess = 0; static int msgCountFailed = 0; static int msgCountRejected = 0; /* Callback to handle messages */ static solClient_rxMsgCallback_returnCode_t flowMessageReceiveCallback ( solClient_opaqueFlow_pt flow_p, solClient_opaqueMsg_pt msg_p, void *user_p ) { solClient_msgId_t msgId; int status=0; /* Process the message. */ printf ( "Received message:\n" ); //Some function to process the message and return a status status = processMessage(msg_p); solClient_msg_getMsgId (msg_p, &msgId); msgCount ++ /* If the status was good) if ( status == 0 ) { printf ( "Message passed validation\n"); solClient_flow_settleMsg(flow_p, msgId, SOLCLIENT_OUTCOME_ACCEPTED); msgCountSuccess++; } else if (status == 1 ) { printf ( "Message failed validation\n"); solClient_flow_settleMsg(flow_p, msgId, SOLCLIENT_OUTCOME_REJECTED); msgCountRejected++; } else { printf ( "Message failed to process message.\n"); solClient_flow_settleMsg(flow_p, msgId, SOLCLIENT_OUTCOME_FAILED); msgCountFailed++; } } ... ... int main (in argc, char *argv[]) { /* The code snippet presumes a session has been created */ /* Configure the flow function information */ flowFuncInfo.rxMsgInfo.callback_p = flowMessageReceiveCallback; flowFuncInfo.eventInfo.callback_p = flowEventCallback; if (!solClient_session_isCapable(session_p, SOLCLIENT_SESSION_CAPABILITY_AD_APP_ACK_FAILED)) { exit(1); } /* We must add both these properties if want use FAIL and REJECT as outcomes which prepares the flow to use negative acknowledgment. */ props[propIndex++] = SOLCLIENT_FLOW_PROP_ACKMODE; props[propIndex++] = SOLCLIENT_FLOW_PROP_ACKMODE_CLIENT; props[propIndex++] = SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_FAILED; props[propIndex++] = SOLCLIENT_PROP_ENABLE_VAL; props[propIndex++] = SOLCLIENT_FLOW_PROP_REQUIRED_OUTCOME_REJECTED; props[propIndex++] = SOLCLIENT_PROP_ENABLE_VAL; props[propIndex++] = NULL; ... ... solClientRc = solClient_session_createFlow(props, session_p, &flow_p, &flowFuncInfo, sizeof(flowFuncInfo)); /** Wait for messages **/ printf ( "Waiting for messages......\n" ); fflush ( stdout ); while ( msgCount < 1 ) { SLEEP ( 1 ); } ... ... }
Using NACKs with the .NET API
For the PubSub+ .NET (C#) API, you must set RequiredOutcomeFailed
, RequiredOutcomeRejected
, or both properties to true
to use NACKs. The following sample code shows how to send NACKs using a fictional messageProcessingObject.process()
method, which determines the settement outcome that is sent with the IFlow.Settle()
method:
// This code snippet presumes that you have a session and endpoint (queue)already // Use object instantiation to set the required properties to use NACKs. FlowProperties flowProps = new FlowProperties() { AckMode = MessageAckMode.ClientAck, RequiredOutcomeFailed = true, RequiredOutcomeRejected = true }; IFlow flow = session.CreateFlow(flowProps, queue, null, new EventHandler<MessageEventArgs> (delegate(object source, MessageEventArgs args){ // Got a message and do processing and based on result fictional messageProcessingObject if (messageProcessingObject.process(args.Message) == 1) { // Accept the message flow.Settle(args.Message.ADMessageId, MessageOutcome.Accepted); } else if ((messageProcessingObject.process(args.Message) == 3) { // Reject the message flow.Settle(args.Message.ADMessageId, MessageOutcome.Rejected); } else { // Failed to process the message flow.Settle(args.Message.ADMessageId, MessageOutcome.Failed); args.Message.Dispose(); }), new EventHandler<FlowEventArgs> (delegate(object source, FlowEventArgs args) { // Got an event })); ... ... }
Using NACKs with the Javascript API
The following sample code shows how to send NACKs using the PubSub+ messaging API for Javascript:
try { // Create a message consumer with support for NACKs const messageConsumer = session.createMessageConsumer({ // solace.MessageConsumerProperties queueDescriptor: { name: 'QueueName', type: solace.QueueType.QUEUE, durable: true }, acknowledgeMode: solace.MessageConsumerAcknowledgeMode.CLIENT, // Enabling Client ack // session is created with SUPPORTED_MESSAGE_ACK_CLIENT to support negative acknowledgment outcomes // MessageOutcome.ACCEPTED is not required to be set and always supported requiredSettlementOutcomes: [MessageOutcome.FAILED, MessageOutcome.REJECTED], // set the settlement outcome for the flow. flow Type will be 0x03 }); // Define message consumer event listeners messageConsumer.on(solace.MessageConsumerEventName.UP, function () { // consumer is up }); messageConsumer.on(solace.MessageConsumerEventName.CONNECT_FAILED_ERROR, function (error) { // Error: the message consumer could not bind to queue }); messageConsumer.on(solace.MessageConsumerEventName.DOWN, function () { // The message consumer is down }); messageConsumer.on(solace.MessageConsumerEventName.DOWN_ERROR, function (details) { console.log('Received "DOWN_ERROR" event - details: ' + details); }); // Define message received event listener messageConsumer.on(solace.MessageConsumerEventName.MESSAGE, function (message) { console.log('Received message: "' + message.getBinaryAttachment() + '",' + ' details:\n' + message.dump()); // use the new settle method to ACK/NACK the message from the router message.settle(solace.MessageOutcome.ACCEPTED);// same as message.acknowledge(); // messages can also be settled with the FAILED and REJECTED outcomes on supported broker sessions // message.settle(solace.MessageOutcome.FAILED);// same as message.acknowledge(); // message.settle(solace.MessageOutcome.REJECTED);// same as message.acknowledge(); }); // Connect the message consumer messageConsumer.connect(); } catch (error) { // error occurred while creating the message consumer console.log(error.toString()); }