Creating Flows in the PubSub+ JCSMP API
To receive Guaranteed messages, client must create a consumer flow within a session, and bind that flow to an endpoint on a PubSub+ event broker that messages are published or attracted to. One or more flows can be created in a session.
A flow is an API object that allows a client to receive Guaranteed messages from an endpoint. A maximum of 1,000 Flows can be created in a Session.
To create a flow in a connected session, call the appropriate create flow method or function listed and provide the following:
- Flow properties
When a Flow is created, Flow properties must be provided. The table Important Flow (Message Consumer) Properties lists many important Flow properties that are common to the messaging APIs. For a complete list of required Flow properties, valid syntax and parameters, and default values, refer to the PubSub+ Messaging APIs for the appropriate messaging API.
- Topic subscription
The Topic subscription to use when binding to a Topic Endpoint. This argument is only required for the Java RTO and .NET APIs. For the Java, JavaScript, Node.js, and C APIs, the Topic subscription is provided in the Flow properties.
- message handlers
For each of the messaging APIs, a callback for handling received messages on the Flow must be provided:
- For JCSMP, pass in an
XMLMessageListener
callback interface to receive messages asynchronously. If noXMLMessageListener
is set (that is, the message listener tonull
), messages are received synchronously.
- For JCSMP, pass in an
- Flow event handlers
If the Active Flow Indication property is enabled, you must also provide a Flow event callback, event listener, or handler so that events indicating whether a client has an active flow to the exclusive queue, can be handled. Refer to Active Flow Indication.
- endpoint properties when binding to a temporary endpoint
For a list of the endpoint properties and the default values used, refer to the PubSub+ Messaging APIs for the appropriate messaging API.
Consumer Flows are not used to receive Direct messages because Direct messages are received directly through the Session interface.
To receive messages on a logical flow, a client application must first acquire a FlowReceiver
instance. Then to start receiving messages on the flow, call FlowReceiver.start()
. To stop receiving messages on the flow, call the stop()
method.
The FlowReceiver
is in an opened state until the FlowReceiver.close()
method is called.
To Create Guaranteed Message Flows:
-
call
JCSMPSession.createFlow(...)
to acquire aFlowReceiver
instance. -
Once a
FlowReceiver
is acquired, callstart()
to start receiving messages from the underlying connection by using either a specified asynchronousXMLMessageListener
callback interface or synchronousreceive(...)
calls. -
When using a synchronous receive mode, after
start()
is invoked for theFlowReceiver
, use the following methods to receive messages:receive()
—Receives the next available message, and waits until one is available.receive(int timeoutInMillis)
—Receives the next available message. If no message is available, the method blocks until a set amount of time expires.receiveNoWait()
—Receives the next available message.
Property | Description |
---|---|
Active Flow Indication |
When enabled, this property enables active flow events to be generated to indicate to a client bound to an exclusive queue whether it has an active flow (that is, a flow in which messages are being delivered). For more information, refer to Active Flow Indication. |
Message Acknowledgment Mode |
Sets whether the API automatically generates an application-level acknowledgment for each received message (the default), or if the client must explicitly acknowledge each received message. When the client provides an acknowledgment, the corresponding spooled message can be removed from the endpoint on the event broker. This parameter does not affect the Guaranteed Message window. For more information on message acknowledgment modes, refer to Acknowledging Messages Received by Clients in the PubSub+ JCSMP API. |
Message Acknowledgment Threshold |
The threshold for sending a windowed message acknowledgment (set as a percentage of the window size). This affects the flow-control window acknowledgment. The API sends a transport acknowledgment every N messages. N is calculated as this percentage of the flow window size if the endpoint's max-delivered-unacked-msgs-per-flow setting at bind time is greater than or equal to the transport window size. Otherwise, N is calculated as this percentage of the endpoint's max-delivered-unacked-msgs-per-flow setting at bind time. This threshold does not control application-level message acknowledgments. |
Message Acknowledgment Timer |
The maximum amount of time (in milliseconds) that can pass before an acknowledgment for Guaranteed messages received through the Flow must be sent. This parameter is used in a situation where Guaranteed messages have been received, but the message acknowledgment window threshold has not been met. |
Endpoint |
The endpoint to bind to. |
Topic Subscription |
If binding to a Topic Endpoint, the Topic subscription to set on the endpoint. |
Selector |
An optional SQL-92 selector to use for the selection of messages for delivery (refer to Using Selectors in the PubSub+ JCSMP API). |
Guaranteed Message Window Size |
The maximum number of messages that can be received through the Flow before the API must send an acknowledgment to the event broker that the messages were received. For more information, refer to Acknowledging Messages Received by Clients in the PubSub+ JCSMP API. The Guaranteed Message Window Size should not exceed the max-delivered-unacked-msgs-per-flow value that is set for a queue provisioned on the event broker, otherwise messages delivered to the API will not be acknowledged until the Message Acknowledgment Time value is exceeded. |
Start State |
Whether a Flow should be in a started state when it is created. In started state, a Flow can begin to receive messages immediately. |
No Local |
When enabled, messages published on the Session cannot be received through in an active Flow created in the same Session, even if the subscription matches the Topic of the published message. For more information, refer to No Local Delivery. |
Reconnect Retry Count |
The maximum number of Flow reconnect attempts to make after an error response with reason Replay Started or Service Unavailable is received. For more information, refer to Flow Reconnect. |
Reconnect Retry Interval |
The wait time between Flow reconnect attempts. For more information refer to Flow Reconnect. |
Related Samples
For an example of how to configure flow properties and bind to a flow, refer to the SimpleFlowToQueue
and SimpleFlowToTopic
samples in the Solace Developer Hub.
Active Flow Indication
If a queue has an exclusive access type (refer to Defining Endpoint Properties in the PubSub+ JCSMP API), multiple clients can bind to the queue, but only one client at a time can actively receive messages from it. Therefore, when a client creates a flow and binds to an exclusive queue, the flow might not be active for the client if other clients are bound to the queue.
If the active flow Indication property is enabled, a flow active event is returned to the client when its bound flow becomes the active flow. The client also receives a flow inactive event whenever it loses an active flow (for example, if the flow disconnects).
The client must pass in a FlowEventHandler
to use to handle the flow active and flow inactive events.
The active flow Indication property is ignored when a flow binds to a non-exclusive queue, unless that queue is partitioned. In the case of a partitioned queue, clients logically bind to the parent (non-exclusive) queue, but the event broker actually binds them to a partition (exclusive) queue. Because of this, the active flow Indication can be used with partitioned queues.
No Local Delivery
It is possible for a client to publish Non‑Persistent or Persistent messages to a topic, and if the same client has a matching topic subscription, it can receive them on an active flow on the same session. To prevent a client from receiving any Non‑Persistent or Persistent messages that it has published, the No Local property can be used for the flow that the client uses to bind an exclusive queue (that is, a queue that only permits a single bound client to receive messages) or a topic endpoint, and that flow must be the active flow for the endpoint. (The No Local property is ignored when the flow binds to a non‑exclusive queue.) For more information on access types, refer to refer to Defining Endpoint Properties in the PubSub+ JCSMP API.
- The No Local property only prevents messages published by a client from being spooled for that same client if it has established an active Flow at the same time. If the client does not have an active Flow established, messages published to the Topic that it subscribes to can still be spooled.
- The No Local property can also be enabled for a Session. When enabled for a Session, the No Local property only affects the delivery of Direct messages to the client on the same Session that the messages were published on. It does not affect the delivery of Non-Persistent or Persistent messages. (For more information, refer to Enabling No Local Delivery for a Session in the PubSub+ JCSMP API.)
When the No Local property is enabled only for the Session or only for the Flow, it is possible for the event broker to change the delivery mode of a published message so that it could still be delivered to its publishing client. For example, if a client publishes a message to Topic “A”, the message cannot be delivered as a Direct message to that client when No Local is enabled for the Session, but if No Local is not enabled for a Flow bound to a Topic Endpoint with the Topic subscription “A”, the message could be delivered to that client on the Flow as a Non-Persistent message.
For information on how messages’ delivery modes can be automatically modified when a Topic match occurs, refer to Topic Matching and Message Delivery Modes.
To enable no local subscriptions for a flow, use ConsumerFlowProperties.setNoLocal(boolean noLocal)
.
Flow Reconnect
When flow reconnect is enabled, all APIs automatically attempt to reconnect a Flow when an error response is received as long as the reason given in the error response string is one of the following:
- Replay Started
- Service Unavailable—Indicates that either the queue or topic endpoint was shutdown or deleted, or Assured Delivery services on the broker have been disabled.
In addition, when one of these errors is reported they are accompanied by a RECONNECTING
event as defined in each API. If the reconnect is successful, the API generates a RECONNECTED
event. If the reconnect fails, then the appropriate DOWN
error event occurs.
This behavior applies only to established flows. If an initial bind is rejected by the message broker for any reason, then it is reported as a bind failed event and no retry is attempted.
Flow Reconnect is supported and enabled by default in the PubSub+ JCSMP API, versions 10.7.0 and later.
While enabled, all APIs attempt to reconnect flows every three seconds indefinitely.
You can configure the maximum number of flow reconnect attempts, as well as the interval between attempts by providing the appropriate properties when the flow is created.
To disable flow reconnect, set the reconnect retry count to 0.
To set the reconnect retry count for a flow, use ConsumerFlowProperties.setReconnectTries
.
To set the reconnect retry interval for a flow, use ConsumerFlowProperties.setReconnectRetryIntervalInMsecs
.