PubSub+ Messaging API For C  7.31.0.7
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
ex/ios/examples/PerfTestExample.m
/*
* Copyright 2007-2024 Solace Corporation. All rights reserved.
*/
#import "PerfTestExample.h"
@implementation PerfTestExample
@synthesize rxDone_s;
@synthesize rxTimeout_s;
@synthesize usePub_s;
@synthesize useSub_s;
@synthesize multiSend_s;
@synthesize binaryPayloadSize_s;
@synthesize numRx_s;
@synthesize msgRate_s;
@synthesize msgNum_s;
@synthesize publishTopic_ps;
- (id)initWithExampleInterface:(ExampleInterface *)exampleInterface {
self = [super initWithExampleInterface:exampleInterface];
// Set example name and description
self.name = @"PerfTest";
self.description = @"Performs various performance tests";
// Setup example parameters
[self.parameters addParameter:PARAMETER_DESTINATION_TOPIC];
[self.parameters addParameter:PARAMETER_NUM_MESSAGES];
[self.parameters addParameter:PARAMETER_MESSAGES_RATE];
[self.parameters addParameter:PARAMETER_MESSAGE_SIZE];
[self.parameters addParameter:PARAMETER_PUB_SUB_MODE];
[self.parameters addParameter:PARAMETER_PUB_SUB_QUANTITY];
[self.parameters addParameter:PARAMETER_TCP_NO_DELAY];
[self.parameters addParameter:PARAMETER_MULTI_SEND_MODE];
// Set default instance field values
self.rxDone_s = 0;
self.rxTimeout_s = 0;
self.usePub_s = 1;
self.useSub_s = 1;
self.multiSend_s = 0;
self.binaryPayloadSize_s = 100;
self.numRx_s = 0;
return self;
}
- (void)printStatsForSession:(solClient_opaqueSession_pt)session_p {
solClient_stats_t rxStats[SOLCLIENT_STATS_RX_NUM_STATS];
solClient_stats_t txStats[SOLCLIENT_STATS_TX_NUM_STATS];
if ((rc = solClient_session_getRxStats(session_p, rxStats,
[self handleErrorWithReturnCode:rc
errorString:"solClient_session_getRxStats()"];
return;
}
if ((rc = solClient_session_getTxStats(session_p, txStats,
[self handleErrorWithReturnCode:rc
errorString:"solClient_session_getTxStats()"];
return;
}
""
"Tx msgs = %lld; Tx bytes = %lld\n"
"Rx msgs = %lld, Rx bytes = %lld\n"
"Avg bytes per read = %lld\n"
"Rx discard indications = %lld\n"
"Rx discards due to unrecognized parameter in header = %lld\n"
"Rx discards due to message too big = %lld\n"
"Tx would block = %lld\n"
"Tx socket full = %lld\n",
? rxStats[SOLCLIENT_STATS_RX_DIRECT_BYTES] /
: 0LL,
// Clear statistics
if ((rc = solClient_session_clearStats(session_p)) != SOLCLIENT_OK) {
[self handleErrorWithReturnCode:rc
errorString:"solClient_session_clearStats()"];
}
}
- (void)waitRxDoneCallbackWithContext:
(solClient_opaqueContext_pt)opaqueContext_p
userData:(void *)user_p {
"Timed out waiting for message receive to finish");
// Flag to main loop to stop waiting
self.rxTimeout_s = 1;
}
static void waitRxDoneCallback(solClient_opaqueContext_pt opaqueContext_p,
void *user_p) {
// Use callback bridge to pass pointer to example to Objective C method
callbackBridge_t *bridge = (callbackBridge_t *)user_p;
PerfTestExample *example = (__bridge PerfTestExample *)bridge->example_p;
[example waitRxDoneCallbackWithContext:opaqueContext_p
userData:bridge->user_p];
}
messageReceiveCallbackWithSession:(solClient_opaqueSession_pt)session_p
message:(solClient_opaqueMsg_pt)msg_p
userData:(void *)user_p {
self.numRx_s++;
if (self.numRx_s >= self.msgNum_s) {
// Flag to main loop to stop waiting
self.rxDone_s = 1;
}
}
- (threadRetType)pubThread:(void *)session_p {
const unsigned int groupSize = 10;
unsigned int txCount = 0;
unsigned int groupCount = 0;
char *binary_p;
long long startTime;
long long targetTime;
long long currentTime;
long long timeDiff;
long long elapsedTime;
long long usPerGroup;
long double usPerMsg;
binary_p = (char *)malloc(self.binaryPayloadSize_s);
if (binary_p == NULL) {
solClient_log(SOLCLIENT_LOG_ERROR, "Could not malloc %d bytes",
self.binaryPayloadSize_s);
return DEFAULT_THREAD_RETURN_ARG;
}
memset(binary_p, 0, self.binaryPayloadSize_s);
solClient_log(SOLCLIENT_LOG_ERROR, "Could not allocate msg");
return DEFAULT_THREAD_RETURN_ARG;
}
usPerMsg = (long double)1000000.0 / (long double)self.msgRate_s;
usPerGroup = (long long)(usPerMsg * (long double)groupSize);
msg_p, binary_p, self.binaryPayloadSize_s) != SOLCLIENT_OK) {
"Could not set binary attachment in msg");
return DEFAULT_THREAD_RETURN_ARG;
}
if (solClient_msg_setTopicPtr(msg_p, self.publishTopic_ps) !=
solClient_log(SOLCLIENT_LOG_ERROR, "Could not set Topic in msg");
return DEFAULT_THREAD_RETURN_ARG;
}
startTime = [self getTimeInUs];
targetTime = startTime + usPerGroup;
while (txCount < self.msgNum_s && !self.requestCancel) {
sendRc = solClient_session_sendMsg(session_p, msg_p);
if (sendRc != SOLCLIENT_OK) {
errorInfo_p = solClient_getLastErrorInfo();
if (errorInfo_p != NULL) {
"solClient_session_sendMsg() failed (%s) "
"subCode (%d:'%s'), error %s",
errorInfo_p->subCode,
errorInfo_p->errorStr);
}
} else {
txCount++;
groupCount++;
if (groupCount >= groupSize) {
groupCount = 0;
currentTime = [self getTimeInUs];
timeDiff = targetTime - currentTime;
if (timeDiff > 1000) {
[NSThread sleepForTimeInterval:(timeDiff + 500) / 1000000];
} else if (timeDiff < (long long)(-10000)) {
// Fell too far behind; reset time base so we do not burst
// for too long
targetTime = currentTime;
}
targetTime += usPerGroup;
}
}
}
elapsedTime = [self getTimeInUs] - startTime;
"Sent %d msgs in %lld usec; rate of %lu messages/sec\n", txCount,
elapsedTime,
(long unsigned)((long double)txCount /
((long double)elapsedTime / (long double)1000000.0)));
return DEFAULT_THREAD_RETURN_ARG;
}
threadRetType pubThread(void *user_p) {
// Use callback bridge to pass pointer to example to Objective C method
callbackBridge_t *bridge = (callbackBridge_t *)user_p;
PerfTestExample *example = (__bridge PerfTestExample *)bridge->example_p;
void *session_p = bridge->user_p;
free(bridge);
return [example pubThread:session_p];
}
- (threadRetType)pubThreadSendMultiple:(void *)session_p {
unsigned int txCount = 0;
char *binary_p;
unsigned int numToSend;
unsigned int numWritten;
long long startTime;
long long targetTime;
long long currentTime;
long long timeDiff;
long long elapsedTime;
long long usPerGroup;
long double usPerMsg;
int loop;
solClient_opaqueMsg_pt msgArray[GROUP_SIZE];
binary_p = (char *)malloc(self.binaryPayloadSize_s);
if (binary_p == NULL) {
solClient_log(SOLCLIENT_LOG_ERROR, "Could not malloc %d bytes",
self.binaryPayloadSize_s);
return DEFAULT_THREAD_RETURN_ARG;
}
memset(binary_p, 0, self.binaryPayloadSize_s);
memset(msgArray, 0, sizeof(msgArray));
usPerMsg = (long double)1000000.0 / (long double)self.msgRate_s;
usPerGroup = (long long)(usPerMsg * (long double)GROUP_SIZE);
for (loop = 0; loop < GROUP_SIZE; loop++) {
if (solClient_msg_alloc(&msgArray[loop]) != SOLCLIENT_OK) {
solClient_log(SOLCLIENT_LOG_ERROR, "Could not allocate msg # %d",
loop);
goto releaseMsg;
}
if (solClient_msg_setBinaryAttachmentPtr(msgArray[loop], binary_p,
self.binaryPayloadSize_s) !=
"Could not set binary attachment in msg");
goto releaseMsg;
}
if (solClient_msg_setTopicPtr(msgArray[loop], self.publishTopic_ps) !=
solClient_log(SOLCLIENT_LOG_ERROR, "Could not set topic in msg");
goto releaseMsg;
}
}
startTime = [self getTimeInUs];
targetTime = startTime + usPerGroup;
while (txCount < self.msgNum_s && !self.requestCancel) {
numToSend = self.msgNum_s - txCount;
if (numToSend > GROUP_SIZE) {
numToSend = GROUP_SIZE;
}
sendRc = solClient_session_sendMultipleMsg(session_p, msgArray,
numToSend, &numWritten);
if (sendRc != SOLCLIENT_OK) {
solClient_log(SOLCLIENT_LOG_ERROR, "Could not send multiple\n");
break;
} else {
txCount += numToSend;
currentTime = [self getTimeInUs];
timeDiff = targetTime - currentTime;
if (timeDiff > 1000) {
[NSThread sleepForTimeInterval:(timeDiff + 500) / 1000000];
} else if (timeDiff < (long long)(-10000)) {
// Fell too far behind; reset time base so we do not burst for
// too long
targetTime = currentTime;
}
targetTime += usPerGroup;
}
}
elapsedTime = [self getTimeInUs] - startTime;
"Sent %d msgs in batches of %d in %lld usec; rate of %lu "
"messages/sec\n",
txCount, GROUP_SIZE, elapsedTime,
(long unsigned)((long double)txCount /
((long double)elapsedTime / (long double)1000000.0)));
releaseMsg:
for (loop = 0; loop < GROUP_SIZE; loop++) {
if (msgArray[loop] != NULL) {
if (solClient_msg_free(&msgArray[loop]) != SOLCLIENT_OK) {
solClient_log(SOLCLIENT_LOG_ERROR, "Could not release msg # %d",
loop);
break;
}
}
}
return DEFAULT_THREAD_RETURN_ARG;
}
threadRetType pubThreadSendMultiple(void *user_p) {
// Use callback bridge to pass pointer to example to Objective C method
callbackBridge_t *bridge = (callbackBridge_t *)user_p;
PerfTestExample *example = (__bridge PerfTestExample *)bridge->example_p;
void *session_p = bridge->user_p;
free(bridge);
return [example pubThreadSendMultiple:session_p];
}
- (void)run {
[super run];
const char *sessionProps[40];
char *noDelayVal_p = "0";
char *noDelay_p = "false";
char *multiSend_p = "false";
char subTopic[] = "level1/level2/level3/level4/>";
char *subTopic_p;
char pubTopic[] = "level1/level2/level3/level4/level5";
const char *pubSub_p;
long long startTime;
long long endTime;
int propIndex;
int numThread = 1;
long long userTime;
long long sysTime;
long long elapsedTime;
contextThreadInfo_t contextThreadInfo;
pthread_t pubThreadHandle[100];
int loop;
// Initialize the API; this must be done prior to first usage
[self handleErrorWithReturnCode:rc
errorString:"solClient_initialize()"];
goto notInitialized;
}
// Set up logging level and log example and API information
[self setLoggingLevel];
"PerfTestExample.m (Copyright 2007-2024 Solace Corporation. All rights reserved.)\n");
[self logCCSMPVersion];
self.msgNum_s =
(unsigned int)[self.parameters parameterWithId:PARAMETER_NUM_MESSAGES]
.value.integerValue;
self.msgRate_s =
(unsigned int)[self.parameters parameterWithId:PARAMETER_MESSAGES_RATE]
.value.integerValue;
if ([[self.parameters parameterWithId:PARAMETER_DESTINATION_TOPIC].value
cStringUsingEncoding:NSASCIIStringEncoding][0] == (char)0) {
self.publishTopic_ps = pubTopic;
subTopic_p = subTopic;
} else {
self.publishTopic_ps = (char *)
[[self.parameters parameterWithId:PARAMETER_DESTINATION_TOPIC].value
cStringUsingEncoding:NSASCIIStringEncoding];
subTopic_p = (char *)
[[self.parameters parameterWithId:PARAMETER_DESTINATION_TOPIC].value
cStringUsingEncoding:NSASCIIStringEncoding];
}
// Binary payload size option.
self.binaryPayloadSize_s =
(int)[self.parameters parameterWithId:PARAMETER_MESSAGE_SIZE]
.value.integerValue;
// Publisher only, subscriber only, or both publisher and subscriber option.
if ([[self.parameters parameterWithId:PARAMETER_PUB_SUB_MODE].value
isEqualToString:@"Subscriber only"]) {
self.usePub_s = 0; // sub only
self.useSub_s = 1;
pubSub_p = "Sub only";
} else if ([[self.parameters parameterWithId:PARAMETER_PUB_SUB_MODE].value
isEqualToString:@"Publishers only"]) {
self.useSub_s = 0; // pub only
self.usePub_s = 1;
pubSub_p = "Pub only";
} else if ([[self.parameters parameterWithId:PARAMETER_PUB_SUB_MODE].value
isEqualToString:@"Publishers and subscriber"]) {
self.usePub_s = 1;
self.useSub_s = 1;
pubSub_p = "Pub and Sub";
}
numThread =
(int)[self.parameters parameterWithId:PARAMETER_PUB_SUB_QUANTITY]
.value.integerValue;
if (numThread <= 0) {
"Error: PUB_SUB_MODE improperly initialized\n");
goto notInitialized;
return;
}
if (numThread > 100) {
"Warning: maximum 100 publisher threads supported, reducing %d to "
"100\n",
numThread);
numThread = 100;
}
if (self.usePub_s == 0) {
"Warning: %d publishers ignored in subscriber only mode\n",
numThread);
}
// TCP no delay option.
noDelay_p =
[self.parameters parameterWithId:PARAMETER_TCP_NO_DELAY].value.boolValue
? "true"
: "false";
noDelayVal_p =
[self.parameters parameterWithId:PARAMETER_TCP_NO_DELAY].value.boolValue
? "1"
: "0";
// Use multi-message send option.
self.multiSend_s =
[self.parameters parameterWithId:PARAMETER_MULTI_SEND_MODE]
.value.boolValue
? 1
: 0;
multiSend_p = [self.parameters parameterWithId:PARAMETER_MULTI_SEND_MODE]
.value.boolValue
? "true"
: "false";
"APPLIANCE_IP: %s, APPLIANCE_USERNAME: %s, NUM_MSGS: %d, MSG_RATE: "
"%d, MSG_SIZE: %d, PUB_SUB_MODE %d %s threads, TCP_NO_DELAY: %s, "
"MULTI_SEND_MODE: %s\n",
[[self.parameters parameterWithId:PARAMETER_HOST].value
cStringUsingEncoding:NSASCIIStringEncoding],
[[self.parameters parameterWithId:PARAMETER_USERNAME].value
cStringUsingEncoding:NSASCIIStringEncoding],
self.msgNum_s, self.msgRate_s, self.binaryPayloadSize_s, numThread,
pubSub_p, noDelay_p, multiSend_p);
// Print version information.
if ((rc = solClient_version_get(&versionInfo_p)) != SOLCLIENT_OK) {
[self handleErrorWithReturnCode:rc
errorString:"solClient_version_get()"];
goto notInitialized;
}
startTime = [self getTimeInUs];
// Create a Context to use for the Session.
solClient_log(SOLCLIENT_LOG_DEBUG, "creating solClient context");
NULL, &contextThreadInfo.context_p, &contextFuncInfo,
sizeof(contextFuncInfo))) != SOLCLIENT_OK) {
[self handleErrorWithReturnCode:rc
errorString:"solClient_context_create()"];
goto initialized;
}
// Start the Context thread.
solClient_log(SOLCLIENT_LOG_DEBUG, "starting solClient context thread");
if (![self startContextThread:&contextThreadInfo]) {
solClient_log(SOLCLIENT_LOG_ERROR, "startContextThread: failed");
goto contextCreated;
}
// Create Session for sending/receiving messages.
propIndex = 0;
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_HOST;
sessionProps[propIndex++] =
[[self.parameters parameterWithId:PARAMETER_HOST].value
cStringUsingEncoding:NSASCIIStringEncoding];
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_USERNAME;
sessionProps[propIndex++] =
[[self.parameters parameterWithId:PARAMETER_USERNAME].value
cStringUsingEncoding:NSASCIIStringEncoding];
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_PASSWORD;
sessionProps[propIndex++] =
[[self.parameters parameterWithId:PARAMETER_PASSWORD].value
cStringUsingEncoding:NSASCIIStringEncoding];
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_TCP_NODELAY;
sessionProps[propIndex++] = noDelayVal_p;
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_REAPPLY_SUBSCRIPTIONS;
sessionProps[propIndex++] = SOLCLIENT_PROP_ENABLE_VAL;
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_CONNECT_BLOCKING;
sessionProps[propIndex++] = SOLCLIENT_PROP_DISABLE_VAL;
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_CONNECT_RETRIES;
sessionProps[propIndex++] = "3";
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_RECONNECT_RETRIES;
sessionProps[propIndex++] = "3";
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_REAPPLY_SUBSCRIPTIONS;
sessionProps[propIndex++] = SOLCLIENT_PROP_ENABLE_VAL;
// If publishing to a appliance running SolOS-Topic Routing (TR), a Message
// VPN must be specified.
if ([[self.parameters parameterWithId:PARAMETER_VPN].value
cStringUsingEncoding:NSASCIIStringEncoding][0]) {
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_VPN_NAME;
sessionProps[propIndex++] =
[[self.parameters parameterWithId:PARAMETER_VPN].value
cStringUsingEncoding:NSASCIIStringEncoding];
}
// The certificate validation property is ignored on non-SSL sessions.
// For simple demo applications, disable it on SSL sessions (host
// string begins with tcps:) so a local trusted root and certificate
// store is not required. See the API users guide for documentation
// on how to setup a trusted root so the servers certificate returned
// on the secure connection can be verified if this is desired.
sessionProps[propIndex++] = SOLCLIENT_PROP_DISABLE_VAL;
sessionProps[propIndex] = NULL;
sessionFuncInfo.rxMsgInfo.callback_p = messageReceiveCallback;
sessionFuncInfo.rxMsgInfo.user_p = self.nullBridge_p;
sessionFuncInfo.eventInfo.callback_p = eventCallback;
sessionFuncInfo.eventInfo.user_p = self.nullBridge_p;
solClient_log(SOLCLIENT_LOG_DEBUG, "creating solClient session");
(char **)sessionProps, contextThreadInfo.context_p, &session_p,
&sessionFuncInfo, sizeof(sessionFuncInfo))) != SOLCLIENT_OK) {
[self handleErrorWithReturnCode:rc
errorString:"solClient_session_create()"];
goto contextThreadCreated;
}
// Connect the session
solClient_log(SOLCLIENT_LOG_DEBUG, "connecting solClient session");
[self handleErrorWithReturnCode:rc
errorString:"solClient_session_connect()"];
goto sessionCreated;
}
if (self.useSub_s) {
// Wait for confirmation that the subscription has been applied.
session_p, 0, subTopic_p)) != SOLCLIENT_OK)) {
[self handleErrorWithReturnCode:
rc errorString:"solClient_session_topicSubscribeExt()"];
goto sessionConnected;
}
}
if (self.usePub_s) {
for (loop = 0; loop < numThread; loop++) {
callbackBridge_t *bridge = malloc(sizeof(callbackBridge_t));
bridge->user_p = session_p;
bridge->example_p = (__bridge void *)self;
if (self.multiSend_s) {
if ((pubThreadHandle[loop] =
[self startThreadWithFP:pubThreadSendMultiple
argument:bridge]) == _NULL_THREAD_ID) {
"could not create publisher thread");
goto sessionConnected;
}
} else {
if ((pubThreadHandle[loop] =
[self startThreadWithFP:pubThread argument:bridge]) ==
_NULL_THREAD_ID) {
"could not create publisher thread");
goto sessionConnected;
}
}
}
for (loop = 0; loop < numThread; loop++) {
[self waitOnThread:pubThreadHandle[loop]];
}
}
if (self.requestCancel) {
goto sessionConnected;
}
if (self.useSub_s) {
if (self.usePub_s) {
// In pubsub mode
"Waiting up to 1 second for subscriber to receive all "
"messages...\n");
contextThreadInfo.context_p,
SOLCLIENT_CONTEXT_TIMER_ONE_SHOT, 1000, waitRxDoneCallback,
self.nullBridge_p, &timerId)) != SOLCLIENT_OK) {
[self
handleErrorWithReturnCode:rc
errorString:"solClient_context_startTimer()"];
goto sessionCreated;
}
} else {
// In sub mode only.
"Waiting to receive %u message(s) or more ... \n",
self.msgNum_s);
}
// Now wait for the message receive to finish receiving all messages
// if using a subscriber. Simple polling is used here, but some sort
// of thread synchronization object could be used.
while (!self.rxDone_s && !self.requestCancel && !self.rxTimeout_s) {
[NSThread sleepForTimeInterval:0.1]; // Check every 100 ms.
}
if (!self.rxTimeout_s && self.usePub_s) {
if ((rc = solClient_context_stopTimer(contextThreadInfo.context_p,
&timerId)) != SOLCLIENT_OK) {
[self
handleErrorWithReturnCode:rc
errorString:"solClient_context_stopTimer()"];
goto sessionCreated;
}
}
// Remove the subscription that was previously added.
subTopic_p)) != SOLCLIENT_OK)) {
[self handleErrorWithReturnCode:
rc errorString:"solClient_session_topicUnsubscribeExt()"];
}
}
endTime = [self getTimeInUs];
[self getUsageTimeWithUserTime:&userTime systemTime:&sysTime];
elapsedTime = endTime - startTime;
"Elasped time: %lld us, user time: %lld us, sys time: %lld us\n"
"Percent CPU: %Lf\n",
elapsedTime, userTime, sysTime,
(long double)100.0 * (((long double)userTime + (long double)sysTime) /
(long double)elapsedTime));
[self printStatsForSession:session_p];
sessionConnected:
// Disconnect the session
if ((rc = solClient_session_disconnect(session_p)) != SOLCLIENT_OK) {
[self handleErrorWithReturnCode:rc
errorString:"solClient_session_disconnect()"];
}
sessionCreated:
// Destroy the Session.
if ((rc = solClient_session_destroy(&session_p)) != SOLCLIENT_OK) {
[self handleErrorWithReturnCode:rc
errorString:"solClient_session_destroy()"];
}
contextThreadCreated:
// Stop the Context thread.
[self stopContextThread:&contextThreadInfo];
contextCreated:
// Destroy the Context.
if ((rc = solClient_context_destroy(&contextThreadInfo.context_p)) !=
[self handleErrorWithReturnCode:rc
errorString:"solClient_context_destroy()"];
}
initialized:
// Cleanup solclient
if ((rc = solClient_cleanup()) != SOLCLIENT_OK) {
[self handleErrorWithReturnCode:rc errorString:"solClient_cleanup()"];
}
goto notInitialized;
notInitialized:
[self cleanup];
}
@end