Creating Flows

To receive Guaranteed messages, client must create a consumer Flow within a Session, and bind that flow to an endpoint on a Solace PubSub+ event broker that messages are published or attracted to. One or more Flows can be created in a Session.

A Flow is an API object that allows a client to receive Guaranteed messages from an endpoint. A maximum of 1,000 Flows can be created in a Session.

:  JavaScript and Node.js APIs use the concept of a MessageConsumer to represent a Flow.

To create a Flow in a connected Session, call the appropriate create flow method or function listed and provide the following:

  • Flow properties

    When a Flow is created, Flow properties must be provided. The table Important Flow (Message Consumer) Properties lists many important Flow properties that are common to the messaging APIs. For a complete list of required Flow properties, valid syntax and parameters, and default values, refer to the APIs & Protocols for the appropriate messaging API.

    :  JavaScript and Node.js APIs use solace.MessageConsumerProperties for Flow properties.

  • endpoint to bind to

    This argument is only required for the Java RTO and .NET APIs. For the Java, JavaScript, Node.js, and C APIs, the endpoint to bind to is provided in the Flow properties.

  • Topic subscription

    The Topic subscription to use when binding to a Topic Endpoint. This argument is only required for the Java RTO and .NET APIs. For the Java, JavaScript, Node.js, and C APIs, the Topic subscription is provided in the Flow properties.

  • message handlers

    For each of the messaging APIs, a callback for handling received messages on the Flow must be provided:

    • For the Java API, pass in an XMLMessageListener callback interface to receive messages asynchronously. If no XMLMessageListener is set (that is, the message listener to null), messages are received synchronously.
    • For the Java RTO API, pass in a MessageCallback.
    • For the C API, pass in a pointer to a structure that provides information on callback functions for received messages and events.
    • For the .NET API, pass in a messageEventHandler.
    • For the JavaScript and Node.js APIs, define a listener to the solace.MessageConsumerEventName.MESSAGE event.
  • Flow event handlers

    If the Active Flow Indication property is enabled, you must also provide a Flow event callback, event listener, or handler so that events indicating whether a client has an active flow to the exclusive queue, can be handled. Refer to Active Flow Indication.

  • endpoint properties when binding to a temporary endpoint

    For a list of the endpoint properties and the default values used, refer to the APIs & Protocols for the appropriate messaging API.

:  Consumer Flows are not used to receive Direct messages because Direct messages are received directly through the Session interface.

Java API

To receive messages on a logical Flow, a client application must first acquire a FlowReceiver instance. Then to start receiving messages on the Flow, call FlowReceiver.start(). To stop receiving messages on the Flow, call the stop() method.

The FlowReceiver is in an opened state until the FlowReceiver.close() method is called.

To Create Guaranteed Message Flows

Language

Call

Java

JCSMPSession.createFlow(...) to acquire a FlowReceiver instance.

 

Once a FlowReceiver is acquired, call start() to start receiving messages from the underlying connection by using either a specified asynchronous XMLMessageListener callback interface or synchronous receive(...) calls.

When using a synchronous receive mode, after start() is invoked for the FlowReceiver, use the following methods to receive messages:

  • receive()—Receives the next available message, and waits until one is available.

  • receive(int timeoutInMillis)—Receives the next available message. If no message is available, the method blocks until a set amount of time expires.

  • receiveNoWait()—Receives the next available message.

Java RTO

SessionHandle.createFlowForHandle(...)

C

solClient_session_createFlow(...)

.NET

ISession.createFlow(...)

JavaScript and Node.js

solace.Session.createMessageConsumer(...)

Important Flow (Message Consumer) Properties

Property

Description

Active Flow Indication

When enabled, this property enables active flow events to be generated to indicate to a client bound to an exclusive queue whether it has an active flow (that is, a flow in which messages are being delivered).

For more information, refer to Active Flow Indication.

Message Acknowledgment Mode

Sets whether the API automatically generates an application-level acknowledgment for each received message (the default), or if the client must explicitly acknowledge each received message.

When the client provides an acknowledgment, the corresponding spooled message can be removed from the endpoint on the event broker.

This parameter does not affect the Guaranteed Message window.

For more information on message acknowledgment modes, refer to Acknowledging Messages Received by Clients.

Message Acknowledgment Threshold

The threshold for sending a windowed message acknowledgment (set as a percentage of the window size).

This affects the flow-control window acknowledgment. The API sends a transport acknowledgment every N messages. N is calculated as this percentage of the flow window size if the endpoint's max-delivered-unacked-msgs-per-flow setting at bind time is greater than or equal to the transport window size. Otherwise, N is calculated as this percentage of the endpoint's max-delivered-unacked-msgs-per-flow setting at bind time. This threshold does not control application-level message acknowledgments.

Message Acknowledgment Timer

The maximum amount of time (in milliseconds) that can pass before an acknowledgment for Guaranteed messages received through the Flow must be sent.

This parameter is used in a situation where Guaranteed messages have been received, but the message acknowledgment window threshold has not been met.

Endpoint (Java, Java RTO, C, .NET APIs)

The endpoint to bind to.

Queue Descriptor (JavaScript and Node.js APIs)

Defines the queue from which to consume (bind to):

  • type (queue or topic endpoint)

  • name (optional for topic endpoints)

  • durability (optional, durable if not specified)

Queue Properties (JavaScript and Node.js APIs)

The properties of the remote queue.

  • For non-durable queues and topic endpoints, these properties define the queue that is created. For possible values, see the online API documentation.

  • For durable queues, these must be unset on consumer creation. The values will be populated after the queue is connected.

Topic Subscription

If binding to a Topic Endpoint, the Topic subscription to set on the endpoint.

Selector

An optional SQL-92 selector to use for the selection of messages for delivery (refer to Using Selectors).

Guaranteed Message Window Size

The maximum number of messages that can be received through the Flow before the API must send an acknowledgment to the event broker that the messages were received. For more information, refer to Acknowledging Messages Received by Clients.

Note:  The Guaranteed Message Window Size should not exceed the max-delivered-unacked-msgs-per-flow value that is set for a queue provisioned on the event broker, otherwise messages delivered to the API will not be acknowledged until the Message Acknowledgment Time value is exceeded.

Start State

Whether a Flow should be in a started state when it is created. In started state, a Flow can begin to receive messages immediately.

No Local

When enabled, messages published on the Session cannot be received through in an active Flow created in the same Session, even if the subscription matches the Topic of the published message. For more information, refer to No Local Delivery.

Reconnect Retry Count

The maximum number of Flow reconnect attempts to make after an error response with reason Replay Started or Service Unavailable is received. For more information, refer to Flow Reconnect.

Reconnect Retry Interval

The wait time between Flow reconnect attempts. For more information refer to Flow Reconnect.

Related Samples

For an example of how to configure Flow properties and bind to a Flow, refer to the SimpleFlowToQueue and SimpleFlowToTopic samples for the appropriate messaging APIs.

For the JavaScript and Node.js APIs, refer to the QueueConsumer and DTEConsumer samples.

Active Flow Indication

If a queue has an exclusive access type (refer to Defining Endpoint Properties), multiple clients can bind to the queue, but only one client at a time can actively receive messages from it. Therefore, when a client creates a Flow and binds to an exclusive queue, the flow might not be active for the client if other clients are bound to the queue.

If the Active Flow Indication Flow property is enabled, a Flow active event is returned to the client when its bound flow becomes the active flow. The client also receives a Flow inactive event whenever it loses an active flow (for example, if the flow disconnects).

The client must provide the following when the flow is created:

  • When using the Java or Java RTO APIs, pass in a FlowEventHandler to use to handle the Flow active and Flow inactive events.
  • When using the C API, pass in a pointer to the functInfo_p structure to provide information on the callback function to use for received Flow active and Flow inactive events.
  • When using the .NET API, pass in a flowEventHandler to use to handle the Flow active and Flow inactive events.

:  The Active Flow Indication property is ignored when a Flow binds to a non‑exclusive Queue.

No Local Delivery

It is possible for a client to publish Non‑Persistent or Persistent messages to a Topic, and if the same client has a matching Topic subscription, it can receive them on an active Flow on the same Session. To prevent a client from receiving any Non‑Persistent or Persistent messages that it has published, the No Local property can be used for the Flow that the client uses to bind an exclusive Queue (that is, a Queue that only permits a single bound client to receive messages) or a Topic Endpoint, and that Flow must be the active Flow for the endpoint. (The No Local property is ignored when the Flow binds to a non‑exclusive Queue.) For more information on access types, refer to refer to Defining Endpoint Properties.

  • The No Local property only prevents messages published by a client from being spooled for that same client if it has established an active Flow at the same time. If the client does not have an active Flow established, messages published to the Topic that it subscribes to can still be spooled.
  • The No Local property can also be enabled for a Session. When enabled for a Session, the No Local property only affects the delivery of Direct messages to the client on the same Session that the messages were published on. It does not affect the delivery of Non-Persistent or Persistent messages. (For more information, refer to Enabling No Local Delivery for a Session.)

When the No Local property is enabled only for the Session or only for the Flow, it is possible for the event broker to change the delivery mode of a published message so that it could still be delivered to its publishing client. For example, if a client publishes a message to Topic “A”, the message cannot be delivered as a Direct message to that client when No Local is enabled for the Session, but if No Local is not enabled for a Flow bound to a Topic Endpoint with the Topic subscription “A”, the message could be delivered to that client on the Flow as a Non-Persistent message.
For information on how messages’ delivery modes can be automatically modified when a Topic match occurs, refer to Topic Matching & Message Delivery Modes.

To Enable No Local Subscriptions for a Flow

Language

Property

Java

ConsumerFlowProperties.
setNoLocal(boolean noLocal)

Java RTO

FlowHandle.PROPERTIES.NO_LOCAL

C

SOLCLIENT_FLOW_PROP_NO_LOCAL

.NET

FlowProperties.NoLocal

JavaScript and Node.js

solace.MessageConsumerProperties.noLocal

Flow Reconnect

When Flow Reconnect is enabled, all APIs automatically attempt to reconnect a Flow when an error response is received as long as the reason given in the error response string is one of the following:

  • Replay Started
  • Service Unavailable—Indicates that either the queue or topic endpoint was shutdown or deleted, or Assured Delivery services on the broker have been disabled.

In addition, when one of these errors is reported they are accompanied by a RECONNECTING event as defined in each API. If the reconnect is successful, the API generates a RECONNECTED event. If the reconnect fails, then the appropriate DOWN error event occurs.

:  This behavior applies only to established Flows. If an initial bind is rejected by the message broker for any reason, then it is reported as a bind failed event and no retry is attempted.

Flow Reconnect is supported and enabled by default in the following API versions:

  • Java API 10.7.0+
  • Java RTO API 7.12.0+
  • C API 7.13.0+
  • .NET API 10.8.0+
  • JavaScript and Node.js API 10.4.0+

While enabled, all APIs attempt to reconnect Flows every three seconds indefinitely.

You can configure the maximum number of Flow reconnect attempts, as well as the interval between attempts by providing the appropriate properties when the Flow is created (see the tables below).

To disable Flow Reconnect, set the reconnect retry count to 0.

To Set the Reconnect Retry Count for a Flow

Language

Property

Java

ConsumerFlowProperties.setReconnectTries

Java RTO

FlowHandle.PROPERTIES.MAX_RECONNECT_TRIES

C

SOLCLIENT_FLOW_PROP_MAX_RECONNECT_TRIES

.NET

FlowProperties.ReconnectTries

JavaScript and Node.js

solace.MessageConsumerProperties.reconnectAttempts

To Set the Reconnect Retry Interval for a Flow

Language

Property

Java

ConsumerFlowProperties.setReconnectRetryIntervalInMsecs

Java RTO

FlowHandle.PROPERTIES.RECONNECT_RETRY_INTERVAL_MS

C

SOLCLIENT_FLOW_PROP_RECONNECT_RETRY_INTERVAL_MS

.NET

FlowProperties.ReconnectRetryInterval

JavaScript and Node.js

solace.MessageConsumerProperties.reconnectIntervalInMsecs