#include <signal.h>
#include "os.h"
#include "common.h"
#if defined(__cplusplus)
extern "C"
{
#endif
static int exitEarly_s = 0;
static int rxDone_s = 0;
static int rxTimeout_s = 0;
static int usePub_s = 1;
static int useSub_s = 1;
static int multiSend_s = 0;
static int binaryPayloadSize_s = 100;
static int sendPersistent_s = 0;
static unsigned int numRx_s = 0;
static unsigned int msgRate_s;
static unsigned int msgNum_s;
static char *publishTopic_ps;
{
common_handleError ( rc, "solClient_session_getRxStats()" );
return;
}
common_handleError ( rc, "solClient_session_getTxStats()" );
return;
}
printf ( "\n" "Tx msgs = %lld; Tx bytes = %lld\n"
"Rx msgs = %lld, Rx bytes = %lld\n"
"Avg bytes per read = %lld\n"
"Rx discard indications = %lld\n"
"Rx discards due to unrecognized parameter in header = %lld\n"
"Rx discards due to message too big = %lld\n"
"Tx would block = %lld\n" "Tx socket full = %lld\n" "\n",
common_handleError ( rc, "solClient_session_clearStats()" );
}
}
{
rxTimeout_s = 1;
}
{
numRx_s++;
if ( numRx_s >= msgNum_s ) {
rxDone_s = 1;
}
}
threadRetType pubThread ( void *session_p )
{
const unsigned int groupSize = 10;
unsigned int txCount = 0;
unsigned int groupCount = 0;
char *binary_p;
long long startTime;
long long targetTime;
long long currentTime;
long long timeDiff;
long long elapsedTime;
long long usPerGroup;
long double usPerMsg;
binary_p = ( char * ) malloc ( binaryPayloadSize_s );
if ( binary_p == NULL ) {
return DEFAULT_THREAD_RETURN_ARG;
}
memset ( binary_p, 0, binaryPayloadSize_s );
free (binary_p);
return DEFAULT_THREAD_RETURN_ARG;
}
usPerMsg = ( long double ) 1000000.0 / ( long double ) msgRate_s;
usPerGroup = ( long long ) ( usPerMsg * ( long double ) groupSize );
free (binary_p);
return DEFAULT_THREAD_RETURN_ARG;
}
free (binary_p);
return DEFAULT_THREAD_RETURN_ARG;
}
if ( sendPersistent_s ) {
free (binary_p);
return DEFAULT_THREAD_RETURN_ARG;
}
}
startTime = getTimeInUs ( );
targetTime = startTime + usPerGroup;
while ( ( txCount < msgNum_s ) && ( !exitEarly_s ) ) {
if ( errorInfo_p != NULL ) {
"solClient_session_sendMsg() failed (%s) subCode (%d:'%s'), error %s",
}
} else {
txCount++;
groupCount++;
if ( groupCount >= groupSize ) {
groupCount = 0;
currentTime = getTimeInUs ( );
timeDiff = targetTime - currentTime;
if ( timeDiff > 1000 ) {
sleepInUs ( ( int ) ( timeDiff + 500 ) );
} else if ( timeDiff < ( long long ) ( -10000 ) ) {
targetTime = currentTime;
}
targetTime += usPerGroup;
}
}
}
free (binary_p);
elapsedTime = getTimeInUs ( ) - startTime;
printf ( "\nSent %d msgs in %lld usec; rate of %lu messages/sec\n\n",
txCount, elapsedTime, ( long unsigned ) ( ( long double ) txCount /
( ( long double ) elapsedTime / ( long double ) 1000000.0 ) ) );
return DEFAULT_THREAD_RETURN_ARG;
}
#ifndef DOXYGEN_SHOULD_SKIP_THIS
#define GROUP_SIZE 10
#endif
threadRetType pubThreadSendMultiple ( void *session_p )
{
unsigned int txCount = 0;
char *binary_p;
unsigned int numToSend;
unsigned int numWritten;
long long startTime;
long long targetTime;
long long currentTime;
long long timeDiff;
long long elapsedTime;
long long usPerGroup;
long double usPerMsg;
int loop;
binary_p = ( char * ) malloc ( binaryPayloadSize_s );
if ( binary_p == NULL ) {
return DEFAULT_THREAD_RETURN_ARG;
}
memset ( binary_p, 0, binaryPayloadSize_s );
memset ( msgArray, 0, sizeof ( msgArray ) );
usPerMsg = ( long double ) 1000000.0 / ( long double ) msgRate_s;
usPerGroup = ( long long ) ( usPerMsg * ( long double ) GROUP_SIZE );
for ( loop = 0; loop < GROUP_SIZE; loop++ ) {
goto releaseMsg;
}
goto releaseMsg;
}
goto releaseMsg;
}
}
startTime = getTimeInUs ( );
targetTime = startTime + usPerGroup;
while ( ( txCount < msgNum_s ) && ( !exitEarly_s ) ) {
numToSend = msgNum_s - txCount;
if ( numToSend > GROUP_SIZE ) {
numToSend = GROUP_SIZE;
}
printf ( "Could not send multiple\n" );
break;
} else {
txCount += numToSend;
currentTime = getTimeInUs ( );
timeDiff = targetTime - currentTime;
if ( timeDiff > 1000 ) {
sleepInUs ( ( int ) ( timeDiff + 500 ) );
} else if ( timeDiff < ( long long ) ( -10000 ) ) {
targetTime = currentTime;
}
targetTime += usPerGroup;
}
}
elapsedTime = getTimeInUs ( ) - startTime;
printf ( "\nSent %d msgs in batches of %d in %lld usec; rate of %lu messages/sec\n\n",
txCount, GROUP_SIZE, elapsedTime, ( long unsigned ) ( ( long double ) txCount /
( ( long double ) elapsedTime / ( long double ) 1000000.0 ) ) );
releaseMsg:
for ( loop = 0; loop < GROUP_SIZE; loop++ ) {
if ( msgArray[loop] != NULL ) {
break;
}
}
}
return DEFAULT_THREAD_RETURN_ARG;
}
static void sigHandler ( int sigNum )
{
if ( exitEarly_s == 0 ) {
exitEarly_s = 1;
} else {
exit ( 0 );
}
}
int main ( int argc, char *argv[] )
{
char positionalParms[] =
"\tMSG_SIZE the size of the binary payload for published messages; default is 100 bytes\n"
"\tPUB_SUB_MODE (default 'b') is one of \n"
"\t\ts: for subscribers only\n"
"\t\tp[n]: for 'n' publishers only (default 1)\n"
"\t\tP[n]: for 'n' persistent publishers (default 1)\n"
"\t\tb[n]: for 'n' publishers (default 1) and 1 subscribers\n"
"\t\tB[n]: for 'n' persistent publishers (default 1) and 1 subscribers\n"
"\tTCP_NO_DELAY is one of\n"
"\t\ttrue\n"
"\t\tfalse (default)\n"
"\tMULTI_SEND_MODE is whether to use the solClient_session_sendMultipleMsg() function. \n"
"\t\tNOTE: messages sent in MULTI_SEND_MODE are always sent direct.\n"
"\t\ttrue\n" "\t\tfalse (default)\n";
const char *sessionProps[40];
char *noDelayVal_p = "0";
char *noDelay_p = "false";
char *multiSend_p = "false";
char subTopic[] = "level1/level2/level3/level4/>";
char *subTopic_p;
char pubTopic[] = "level1/level2/level3/level4/level5";
const char *pubSub_p;
long long startTime;
long long endTime;
int propIndex;
int numThread = 1;
long long userTime;
long long sysTime;
long long elapsedTime;
contextThreadInfo_t contextThreadInfo;
THREAD_HANDLE_T pubThreadHandle[100];
struct commonOptions commandOpts;
int loop;
signal ( SIGINT, sigHandler );
printf ( "\nperfTest.c (Copyright 2007-2024 Solace Corporation. All rights reserved.)\n" );
common_handleError ( rc, "solClient_initialize()" );
goto notInitialized;
}
common_printCCSMPversion ( );
common_initCommandOptions(&commandOpts,
( USER_PARAM_MASK ),
( HOST_PARAM_MASK |
DEST_PARAM_MASK |
PASS_PARAM_MASK |
NUM_MSGS_MASK |
MSG_RATE_MASK |
LOG_LEVEL_MASK |
USE_GSS_MASK |
ZIP_LEVEL_MASK));
if ( common_parseCommandOptions ( argc, argv, &commandOpts, positionalParms ) == 0 ) {
exit(1);
}
msgNum_s = commandOpts.numMsgsToSend;
msgRate_s = commandOpts.msgRate;
if ( commandOpts.destinationName[0] == ( char ) 0 ) {
publishTopic_ps = pubTopic;
subTopic_p = subTopic;
} else {
publishTopic_ps = commandOpts.destinationName;
subTopic_p = commandOpts.destinationName;
}
if ( optind < argc ) {
binaryPayloadSize_s = atoi ( argv[optind] );
}
pubSub_p = "Pub and Sub";
if ( ( optind + 1 ) < argc ) {
if ( *( argv[optind + 1] ) == 'p' ) {
useSub_s = 0;
pubSub_p = "Pub only";
} else if ( *( argv[optind + 1] ) == 'P' ) {
useSub_s = 0;
pubSub_p = "Pub only";
sendPersistent_s = 1;
} else if ( *( argv[optind + 1] ) == 's' ) {
usePub_s = 0;
pubSub_p = "Sub only";
} else if ( *( argv[optind + 1] ) == 'b' ) {
pubSub_p = "Pub and Sub";
} else if ( *( argv[optind + 1] ) == 'B' ) {
pubSub_p = "Pub and Sub";
sendPersistent_s = 1;
} else {
printf ( "Error: Unknown PUB_SUB_MODE value \"%s\"\n", argv[optind + 1] );
goto notInitialized;
}
if ( *( ( argv[optind + 1] ) + 1 ) != ( char ) 0 ) {
numThread = atoi ( ( argv[optind + 1] ) + 1 );
if ( numThread <= 0 ) {
printf ( "Error: PUB_SUB_MODE (%s) is not 'p', 's', or 'b' optionally followed by an integer\n",
argv[optind + 1] );
goto notInitialized;
}
if ( numThread > 100 ) {
printf ( "Warning: maximum 100 publisher threads supported, reducing %d to 100\n", numThread );
numThread = 100;
}
if ( usePub_s == 0 ) {
printf ( "Warning: %d publishers ignored in subscriber only mode\n", numThread );
}
}
}
if ( ( optind + 2 ) < argc ) {
if ( strcasecmp ( argv[optind + 2], "false" ) == 0 ) {
noDelayVal_p = "0";
noDelay_p = argv[optind + 2];
} else if ( strcasecmp ( argv[optind + 2], "true" ) == 0 ) {
noDelayVal_p = "1";
noDelay_p = argv[optind + 2];
} else {
printf ( "Error: Unknown TCP_NO_DELAY value \"%s\"\n", argv[optind + 2] );
goto notInitialized;
}
}
if ( ( optind + 3 ) < argc ) {
if ( strcasecmp ( argv[optind + 3], "false" ) == 0 ) {
multiSend_s = 0;
multiSend_p = argv[optind + 3];
} else if ( strcasecmp ( argv[optind + 3], "true" ) == 0 ) {
multiSend_s = 1;
multiSend_p = argv[optind + 3];
} else {
printf ( "Error: Unknown MULTI_SEND_MODE value \"%s\"\n", argv[optind + 3] );
goto notInitialized;
}
}
printf ( "APPLIANCE_IP: %s, APPLIANCE_USERNAME: %s, NUM_MSGS: %d, MSG_RATE: %d, MSG_SIZE: %d, PUB_SUB_MODE %d %s threads, TCP_NO_DELAY: %s, MULTI_SEND_MODE: %s\n",
commandOpts.targetHost, commandOpts.username, msgNum_s, msgRate_s, binaryPayloadSize_s, numThread, pubSub_p,
noDelay_p, multiSend_p );
common_handleError ( rc, "solClient_version_get()" );
goto notInitialized;
}
startTime = getTimeInUs ( );
if ( ( rc =
solClient_context_create ( NULL, &contextThreadInfo.context_p, &contextFuncInfo, sizeof ( contextFuncInfo ) ) )
common_handleError ( rc, "solClient_context_create()" );
goto initialized;
}
if ( !common_startContextThread ( &contextThreadInfo ) ) {
goto contextCreated;
}
propIndex = 0;
if ( commandOpts.targetHost[0] != (char) 0 ) {
sessionProps[propIndex++] = commandOpts.targetHost;
}
sessionProps[propIndex++] = commandOpts.username;
sessionProps[propIndex++] = commandOpts.password;
sessionProps[propIndex++] = noDelayVal_p;
sessionProps[propIndex++] = "3";
sessionProps[propIndex++] = "3";
if ( commandOpts.vpn[0] ) {
sessionProps[propIndex++] = commandOpts.vpn;
}
sessionProps[propIndex++] = ( commandOpts.enableCompression ) ? "9" : "0";
if ( commandOpts.useGSS ) {
}
sessionProps[propIndex] = NULL;
contextThreadInfo.context_p,
&session_p, &sessionFuncInfo, sizeof ( sessionFuncInfo ) ) )
common_handleError ( rc, "solClient_session_create()" );
goto contextThreadCreated;
}
if ( useSub_s ) {
0,
common_handleError ( rc, "solClient_session_topicSubscribeExt()" );
goto sessionConnected;
}
}
common_handleError ( rc, "solClient_session_connect()" );
goto sessionCreated;
}
if ( usePub_s ) {
for ( loop = 0; loop < numThread; loop++ ) {
if ( multiSend_s ) {
if ( ( pubThreadHandle[loop] = startThread ( pubThreadSendMultiple,
( void * ) session_p ) ) == _NULL_THREAD_ID ) {
goto sessionConnected;
}
} else {
if ( ( pubThreadHandle[loop] = startThread ( pubThread, ( void * ) session_p ) ) == _NULL_THREAD_ID ) {
goto sessionConnected;
}
}
}
for ( loop = 0; loop < numThread; loop++ ) {
waitOnThread ( pubThreadHandle[loop] );
}
}
if ( exitEarly_s ) {
goto sessionConnected;
}
if ( useSub_s ) {
if ( usePub_s ) {
printf ( "Waiting up to 1 second for subscriber to receive all messages...\n" );
1000, waitRxDoneCallbackFunc,
common_handleError ( rc, "solClient_context_startTimer()" );
goto sessionCreated;
}
} else {
printf ( "Waiting to receive %u message(s) or more ... \n", msgNum_s );
}
while ( !rxDone_s && !exitEarly_s && !rxTimeout_s ) {
sleepInUs ( 100000 );
}
if ( !rxTimeout_s && usePub_s ) {
common_handleError ( rc, "solClient_context_stopTimer()" );
goto sessionCreated;
}
}
}
endTime = getTimeInUs ( );
getUsageTime ( &userTime, &sysTime );
elapsedTime = endTime - startTime;
printf ( "\nElasped time: %lld us, user time: %lld us, sys time: %lld us\n"
"Percent CPU: %Lf\n", elapsedTime, userTime, sysTime,
( long double ) 100.0 *
( ( ( long double ) userTime + ( long double ) sysTime ) / ( long double ) elapsedTime ) );
printStats ( session_p );
sessionConnected:
common_handleError ( rc, "solClient_session_disconnect()" );
}
sessionCreated:
common_handleError ( rc, "solClient_session_destroy()" );
}
contextThreadCreated:
common_stopContextThread ( &contextThreadInfo );
contextCreated:
common_handleError ( rc, "solClient_context_destroy()" );
}
initialized:
common_handleError ( rc, "solClient_cleanup()" );
}
goto notInitialized;
notInitialized:
return 0;
}
#if defined(__cplusplus)
}
#endif