#include "os.h"
#include "common.h"
{
printf ( "Received message on flow. (Message ID: %lld).\n", msgId );
} else {
printf ( "Received message on flow.\n" );
}
common_handleError ( rc, "solClient_msg_dump()" );
}
printf ( "Acknowledging message: %lld.\n", msgId );
common_handleError ( rc, "solClient_flow_sendAck()" );
}
} else {
common_handleError ( rc, "solClient_msg_getMsgId()" );
}
(*((int*)user_p))++;
}
int
main ( int argc, char *argv[] )
{
struct commonOptions commandOpts;
const char *flowProps[100];
int propIndex;
char binMsg[] = COMMON_ATTACHMENT_TEXT;
int publishCount = 0;
int receivedCount= 0;
UINT64 publishStartTime = 0;
char replayStartLocationStr[250];
printf ( "\nmessageReplay.c (Copyright 2007-2024 Solace Corporation. All rights reserved.)\n" );
initSigHandler ( );
common_initCommandOptions(&commandOpts,
( USER_PARAM_MASK |
DEST_PARAM_MASK ),
( HOST_PARAM_MASK |
PASS_PARAM_MASK |
LOG_LEVEL_MASK |
USE_GSS_MASK |
REPLAY_START_MASK |
ZIP_LEVEL_MASK ) );
if ( common_parseCommandOptions ( argc, argv, &commandOpts, NULL ) == 0 ) {
exit(1);
}
common_handleError ( rc, "solClient_initialize()" );
goto notInitialized;
}
common_printCCSMPversion ( );
&context_p, &contextFuncInfo,
sizeof ( contextFuncInfo ) ) ) !=
SOLCLIENT_OK ) {
common_handleError ( rc, "solClient_context_create()" );
goto cleanup;
}
if ( ( rc = common_createAndConnectSession ( context_p,
&session_p,
common_messageReceivePrintMsgCallback,
common_eventCallback, NULL, &commandOpts ) ) !=
SOLCLIENT_OK ) {
common_handleError ( rc, "common_createAndConnectSession()" );
goto cleanup;
}
propIndex = 0;
strncpy(queueName, commandOpts.destinationName, sizeof(commandOpts.destinationName));
flowProps[propIndex++] = queueName;
flowProps[propIndex] = NULL;
session_p, &flow_p, &flowFuncInfo,
sizeof ( flowFuncInfo ) ) ) !=
SOLCLIENT_OK ) {
"solClient_session_createFlow() did not return SOLCLIENT_OK " "after session create. rc = %d ", rc );
goto sessionConnected;
}
printf ( "Publishing 10 messages to queue %s, Ctrl-C to stop.....\n", queueName );
publishCount = 0;
destination.
dest = queueName;
publishStartTime = getTimeInUs() / 1000000;
if ( commandOpts.replayStartLocation[0] == 0 ) {
snprintf(replayStartLocationStr, sizeof(replayStartLocationStr), "Date:%lld", publishStartTime);
} else {
snprintf(replayStartLocationStr, sizeof(replayStartLocationStr), "%s", commandOpts.replayStartLocation );
}
sleepInSec ( 1 );
while ( !gotCtlC && publishCount < 10 ) {
common_handleError ( rc, "solClient_msg_alloc()" );
goto cleanupFlow;
}
common_handleError ( rc, "solClient_msg_setDeliveryMode()" );
goto cleanupFlow;
}
common_handleError ( rc, "solClient_msg_setBinaryAttachmentPtr()" );
goto cleanupFlow;
}
common_handleError ( rc, "solClient_msg_setDestination()" );
goto cleanupFlow;
}
common_handleError ( rc, "solClient_session_send" );
goto cleanupFlow;
}
common_handleError ( rc, "solClient_msg_free()" );
goto cleanupFlow;
}
publishCount++;
sleepInSec ( 1 );
}
if ( gotCtlC ) {
printf ( "Got Ctrl-C, cleaning up\n" );
goto cleanupFlow;
}
common_handleError ( rc, "solClient_flow_destroy()" );
}
flowProps[propIndex++] = replayStartLocationStr;
flowProps[propIndex] = NULL;
printf("Start Flow Message Replay with replay start location '%s'\n", replayStartLocationStr);
session_p, &flow_p, &flowFuncInfo,
sizeof ( flowFuncInfo ) ) ) !=
SOLCLIENT_OK ) {
"solClient_session_createFlow() did not return SOLCLIENT_OK " "for flow replay. rc = %d ", rc );
goto sessionConnected;
}
while ( !gotCtlC && receivedCount < 2 * publishCount ) {
sleepInSec ( 1 );
}
if ( gotCtlC ) {
printf ( "Got Ctrl-C, cleaning up\n" );
goto cleanupFlow;
}
cleanupFlow:
common_handleError ( rc, "solClient_flow_destroy()" );
}
sessionConnected:
common_handleError ( rc, "solClient_session_disconnect()" );
}
cleanup:
common_handleError ( rc, "solClient_cleanup()" );
}
notInitialized:
return 0;
}