#include "os.h"
#include "common.h"
static void
{
common_handleError ( rc, "solClient_msg_alloc()" );
return;
}
common_handleError ( rc, "solClient_msg_setDeliveryMode()" );
goto freeMessage;
}
destination.
dest = COMMON_MY_SAMPLE_TOPIC;
common_handleError ( rc, "solClient_msg_setDestination()" );
goto freeMessage;
}
common_handleError ( rc, "solClient_msg_setTimeToLive()" );
goto freeMessage;
}
common_handleError ( rc, "solClient_msg_setDMQEligible()" );
goto freeMessage;
}
common_handleError ( rc, "solClient_session_sendMsg()" );
goto freeMessage;
}
freeMessage:
common_handleError ( rc, "solClient_msg_free()" );
}
}
int
main ( int argc, char *argv[] )
{
struct commonOptions commandOpts;
char *user_p = "Session Callback";
int flowRxMsgCounter;
int dmqRxMsgCounter;
const char *props[40];
int propIndex;
int i;
printf ( "\nmessageTTLAndDeadMessageQueue.c (Copyright 2010-2024 Solace Corporation. All rights reserved.)\n" );
initSigHandler ( );
common_initCommandOptions(&commandOpts,
( USER_PARAM_MASK ),
( HOST_PARAM_MASK |
PASS_PARAM_MASK |
LOG_LEVEL_MASK |
USE_GSS_MASK |
ZIP_LEVEL_MASK));
if ( common_parseCommandOptions ( argc, argv, &commandOpts, NULL ) == 0 ) {
exit(1);
}
common_handleError ( rc, "solClient_initialize()" );
goto notInitialized;
}
common_printCCSMPversion ( );
&context_p, &contextFuncInfo,
sizeof ( contextFuncInfo ) ) ) !=
SOLCLIENT_OK ) {
common_handleError ( rc, "solClient_context_create()" );
goto cleanup;
}
if ( ( rc = common_createAndConnectSession ( context_p,
&session_p,
common_messageReceiveCallback,
common_eventCallback, user_p, &commandOpts ) ) !=
SOLCLIENT_OK ) {
common_handleError ( rc, "common_createAndConnectSession()" );
goto cleanup;
}
goto sessionConnected;
}
if ( ( rc = common_createQueue ( session_p, COMMON_TESTQ ) ) !=
SOLCLIENT_OK ) {
common_handleError ( rc, "common_createQueue()" );
goto sessionConnected;
}
if ( ( rc = common_createQueue ( session_p, COMMON_DMQ_NAME ) ) !=
SOLCLIENT_OK ) {
common_handleError ( rc, "common_createQueue()" );
goto sessionConnected;
}
"Adding subscription %s to queue %s through Session.", COMMON_MY_SAMPLE_TOPIC, COMMON_TESTQ );
propIndex = 0;
props[propIndex++] = COMMON_TESTQ;
props[propIndex++] = NULL;
session_p,
common_handleError ( rc, "solClient_session_endpointTopicSubscribe()" );
goto sessionConnected;
}
printf ( "Publishing three messages without TTL and DMQ\n" );
for ( i = 0; i < 3; i++ ) {
publishMessageWithTTL ( session_p, 0, FALSE );
}
printf ( "Publshing message with TTL=3000 ms and DMQ Eligible=FALSE\n" );
publishMessageWithTTL ( session_p, 3000, FALSE );
printf ( "Publshing message with TTL=3000 ms and DMQ Eligible=TRUE\n" );
publishMessageWithTTL ( session_p, 3000, TRUE );
flowRxMsgCounter = 0;
propIndex = 0;
props[propIndex++] = COMMON_TESTQ;
props[propIndex] = NULL;
session_p, &flow_p, &flowFuncInfo,
sizeof ( flowFuncInfo ) ) ) !=
SOLCLIENT_OK ) {
common_handleError ( rc, "solClient_session_createFlow() did not return SOLCLIENT_OK" );
goto sessionConnected;
}
dmqRxMsgCounter = 0;
propIndex = 0;
props[propIndex++] = COMMON_DMQ_NAME;
props[propIndex] = NULL;
session_p,
&dmqFlow_p, &flowFuncInfo,
sizeof ( flowFuncInfo ) ) ) !=
SOLCLIENT_OK ) {
common_handleError ( rc, "solClient_session_createFlow() did not return SOLCLIENT_OK" );
goto sessionConnected;
}
i = 0;
while ( ( flowRxMsgCounter != 5 ) && ( i < 8 ) ) {
sleepInUs ( 250 );
i++;
}
if ( flowRxMsgCounter != 5 ) {
goto sessionConnected;
}
if ( dmqRxMsgCounter != 0 ) {
goto sessionConnected;
}
printf ( "All sent messages received\n" );
common_handleError ( rc, "solClient_flow_stop()" );
goto sessionConnected;
}
flowRxMsgCounter = 0;
printf ( "Resend 5 messages\n" );
for ( i = 0; i < 3; i++ ) {
publishMessageWithTTL ( session_p, 0, FALSE );
}
publishMessageWithTTL ( session_p, 3000, FALSE );
publishMessageWithTTL ( session_p, 3000, TRUE );
printf ( "Wait five seconds to allow messages to expire\n" );
sleepInSec ( 5 );
if ( dmqRxMsgCounter != 1 ) {
goto sessionConnected;
}
common_handleError ( rc, "solClient_flow_start()" );
goto sessionConnected;
}
i = 0;
while ( ( flowRxMsgCounter != 3 ) && ( i < 8 ) ) {
sleepInUs ( 250 );
i++;
}
if ( flowRxMsgCounter != 3 ) {
goto sessionConnected;
}
printf ( "Three messages with no TTL received and one message received on Dead Message Queue as expected\n" );
sessionConnected:
if ( flow_p != NULL ) {
common_handleError ( rc, "solClient_flow_destroy()" );
}
}
if ( dmqFlow_p != NULL ) {
common_handleError ( rc, "solClient_flow_destroy()" );
}
}
if ( ( rc = common_deleteQueue ( session_p, COMMON_TESTQ ) ) !=
SOLCLIENT_OK ) {
common_handleError ( rc, "common_deleteQueue()" );
}
if ( ( rc = common_deleteQueue ( session_p, COMMON_DMQ_NAME ) ) !=
SOLCLIENT_OK ) {
common_handleError ( rc, "common_deleteQueue()" );
}
common_handleError ( rc, "solClient_session_disconnect()" );
}
cleanup:
common_handleError ( rc, "solClient_cleanup()" );
}
notInitialized:
return 0;
}