#include "os.h"
#include "common.h"
typedef enum msgDeleteStrategy {
DEL_NONE = 0,
DEL_EVEN = 1,
DEL_ODD = 2,
DEL_ALL = 3
} msgDeleteStrategy_t;
typedef struct rxBrowseFlowCallbackInfo {
solClient_uint32_t msgCount;
solClient_uint32_t delCount;
msgDeleteStrategy_t delStrategy;
} rxBrowseFlowCallbackInfo_t;
void *user_p)
{
rxBrowseFlowCallbackInfo_t *callbackInfo_p;
if (user_p == NULL) {
printf("Error: Got null user pointer");
}
callbackInfo_p = (rxBrowseFlowCallbackInfo_t *)user_p;
if (opaqueFlow_p == NULL) {
printf("Error: Got null opaque flow pointer");
}
if (msg_p == NULL) {
printf("Error: Got null message pointer");
}
common_handleError(rc, "solClient_msg_getMsgId()");
}
common_handleError(rc, "solClient_msg_getSequenceNumber()");
}
callbackInfo_p->msgCount++;
printf("Received message on browser flow: MsgID =%lld; SeqNum=%lld).\n", msgId, seqNum);
if (callbackInfo_p->delStrategy == DEL_ALL) {
printf("Deleting message from the queue: MsgID =%lld; SeqNum=%lld).\n", msgId, seqNum);
callbackInfo_p->delCount++;
}
else if ((callbackInfo_p->delStrategy == DEL_ODD) && (seqNum &0x1)) {
printf("Deleting message from the queue: MsgID =%lld; SeqNum=%lld).\n", msgId, seqNum);
callbackInfo_p->delCount++;
}
else if ((callbackInfo_p->delStrategy == DEL_EVEN) && !(seqNum &0x1)) {
printf("Deleting message from the queue: MsgID =%lld; SeqNum=%lld).\n", msgId, seqNum);
callbackInfo_p->delCount++;
}
}
static void
char *queueName,
msgDeleteStrategy_t delStrategy,
solClient_uint32_t browseWindow)
{
static char *delStrategyStr[] = {
"DEL_NONE",
"DEL_EVEN",
"DEL_ODD",
"DEL_ALL"
};
const char *flowProps[20];
int propIndex;
rxBrowseFlowCallbackInfo_t flowCallbackInfo;
solClient_uint32_t sendStartCount = browseWindow;
int loop;
char browseWindowStr[32];
solClient_uint32_t prevMsgCount;
if ((delStrategy < DEL_NONE) || (delStrategy > DEL_ALL)) {
printf("ERROR: Invalid delStrategy value '%d'\n", delStrategy);
goto cleanup;
}
snprintf(browseWindowStr, sizeof(browseWindowStr), "%d", browseWindow);
flowCallbackInfo.delStrategy = delStrategy;
flowCallbackInfo.msgCount = 0;
flowCallbackInfo.delCount = 0;
propIndex = 0;
flowProps[propIndex++] = queueName;
flowProps[propIndex++] = browseWindowStr;
flowProps[propIndex] = NULL;
session_p,
&flow_p,
&flowFuncInfo,
printf( "Error: solClient_session_createFlow() did not return SOLCLIENT_OK\n");
goto cleanup;
}
loop = 0;
printf("Browsing messages from queue '%s' with a removing message strategy '%s' , "
"Ctrl-C to stop.....\n",
queueName, delStrategyStr[delStrategy]);
do {
prevMsgCount = flowCallbackInfo.msgCount;
if (flowCallbackInfo.msgCount == sendStartCount) {
sendStartCount += browseWindow;
}
sleepInSec(2);
if (gotCtlC) {
goto cleanup;
}
} while ( prevMsgCount< flowCallbackInfo.msgCount);
printf("Number of Received Messages: %d\n", flowCallbackInfo.msgCount);
printf("Number of Deleted Messages: %d\n", flowCallbackInfo.delCount);
cleanup:
if (flow_p != NULL) {
printf("Error: could not destroy browser flow");
}
}
}
int main(int argc, char *argv[])
{
struct commonOptions commandOpts;
char binMsg[] = COMMON_ATTACHMENT_TEXT;
int publishCount = 0;
solClient_uint32_t browseWindow = 10;
printf("\nsimpleBrowserFlow.c (Copyright 2007-2024 Solace Corporation. All rights reserved.)\n");
initSigHandler();
common_initCommandOptions(&commandOpts,
( USER_PARAM_MASK ),
( HOST_PARAM_MASK |
PASS_PARAM_MASK |
LOG_LEVEL_MASK |
USE_GSS_MASK |
ZIP_LEVEL_MASK));
if (common_parseCommandOptions(argc, argv, &commandOpts, NULL) == 0) {
exit(1);
}
common_handleError(rc, "solClient_initialize()");
goto notInitialized;
}
common_printCCSMPversion();
commandOpts.logLevel);
&context_p,
&contextFuncInfo,
common_handleError(rc, "solClient_context_create()");
goto cleanup;
}
if ((rc = common_createAndConnectSession(context_p,
&session_p,
common_messageReceivePrintMsgCallback,
common_eventCallback,
NULL,
common_handleError(rc, "common_createAndConnectSession()");
goto cleanup;
}
printf("stopping as appliance doesn't have guaranteed delivery\n");
goto sessionConnected;
}
strncpy(queueName, COMMON_TESTQ, sizeof(COMMON_TESTQ));
printf("Publishing 30 messages to queue %s, Ctrl-C to stop.....\n", queueName);
publishCount = 0;
while (!gotCtlC && publishCount < 30) {
common_handleError(rc, "solClient_msg_alloc()");
goto sessionConnected;
}
common_handleError(rc, "solClient_msg_setDeliveryMode()");
goto sessionConnected;
}
binMsg,
common_handleError(rc, "solClient_msg_setBinaryAttachmentPtr()");
goto sessionConnected;
}
destination.
dest = queueName;
&destination,
common_handleError(rc, "solClient_msg_setDestination()");
goto sessionConnected;
}
common_handleError(rc, "solClient_session_send");
goto sessionConnected;
}
common_handleError(rc, "solClient_msg_free()");
goto sessionConnected;
}
publishCount ++;
}
if (gotCtlC) {
printf("Got Ctrl-C, cleaning up\n");
goto sessionConnected;
}
browserFlow(session_p, queueName, DEL_NONE, browseWindow);
if (gotCtlC) {
printf("Got Ctrl-C, cleaning up\n");
goto sessionConnected;
}
browserFlow(session_p, queueName, DEL_ODD, browseWindow);
if (gotCtlC) {
printf("Got Ctrl-C, cleaning up\n");
goto sessionConnected;
}
browserFlow(session_p, queueName, DEL_EVEN, browseWindow);
sessionConnected:
common_handleError ( rc, "solClient_session_disconnect()" );
}
cleanup:
common_handleError(rc, "solClient_cleanup()");
}
notInitialized:
return 0;
}