Configuring Connection Details

This section provides instructions for configuring the connection details required to establish communication between your PubSub+ event broker and your third-party system.

The Spring Cloud Stream Binder for PubSub+ uses Spring Boot Auto-Configuration for the Solace Java API to configure its session. In the application.yml, this typically is configured as follows:

solace:
  java:
    host: tcp://localhost:55555
    msg-vpn: default
    client-username: default
    client-password: default    

For more information and options to configure the PubSub+ session, see Spring Boot Auto-Configuration for the Solace Java API.

Preventing Message Loss when Publishing to Topic-to-Queue Mappings

If the connector is publishing to a topic that is subscribed to by a queue, messages may be lost if they are rejected (for example, if queue ingress is shut down).

To prevent message loss, configure the reject-msg-to-sender-on-discard option with the including-when-shutdown flag.

SFTP Connection Details

The Spring Cloud Stream standard properties for the Connector for SFTP are as follows.

Config Option Type Valid Values Description

spring.cloud.stream.bindings.<bindingName>.destination

String

Required format:

sftp://<host>:<port>/<startDirectoryName>/

For example:

sftp://localhost:22/orders-data/

The source or target SFTP server URI and the directory path. The destination must include the start directory name.

spring.cloud.stream.bindings.<bindingName>.binder

String

camel

The SFTP consumer/producer is implemented using the Apache Camel SFTP component.

This property must be set to camel.

SFTP Checkpoint Store Configuration Options

The Connector for SFTP stores the current progress of file processing in a checkpoint store backed by a Last Value Queue (LVQ) on a Solace event broker:

  • For an SFTP Consumer, the checkpoint store contains information about the files that have been processed and the files that are currently being processed.

  • For an SFTP Producer, the checkpoint store contains information about the number of events written to the file and the file size (for file rotation purposes).

The following table lists the configuration options for the SFTP checkpoint store.

Configuration Option Type Valid Values Default Value Description

camel-binder.checkpoint.lvqName

String

Any

None

Required.

The name of the LVQ (spool size 0) to be used for checkpointing. The queue must exist on the same event broker and Message VPN as the target queue.

If the LVQ is deleted (administratively) or a message from the LVQ is deleted or consumed by another consumer, the Connector for SFTP will not be able to resume from the last checkpoint. In addition, the LVQ should not be shared by multiple instances of the Connector for SFTP.

camel-binder.checkpoint.autoProvisionLvq

boolean

(true | false)

false

Optional.

Set to true to auto-provision the LVQ if it does not exist.

SFTP Source Configuration Options

When configured as a source connector, the Connector for SFTP reads delimited, JSON, and XML files from the SFTP server, splits file data based on configuration into individual events, and sends them to the PubSub+ event broker.

The configuration properties for the SFTP source are divided into two categories: common properties (SFTP connection or file filtering etc.) and file type specific properties.

Common Configuration Options for SFTP Source

This section details the SFTP source configuration properties that are for common for all file types. For more information, see the Apache Camel SFTP component documentation.

All properties must be prefixed with spring.cloud.stream.camel.bindings.<bindingName>.consumer.endpoint.query-parameters.

Example configuration:

spring:
  cloud:
    stream:
      camel:
        bindings:
          input-0:
            consumer:
              endpoint:
                query-parameters:
                  noop: true
                  delete: false
                  includeExt: csv
                  #other properties
Configuration Option Type Valid Values Default Value Description

username

String

 

None

The username to use to log in.

password

String

None

The password to use to log in, if you are using password authentication. If it is not set, the private key file must be set.

privateKeyFile

String

None

The private key file for the SFTP endpoint to use for private key verification.

privateKeyPassphrase

String

None

The private key file passphrase for the SFTP endpoint to use for private key verification.

passiveMode

boolean

(true | false)

false

Indicates whether to use passive mode connections. The default (false) is active mode connections.

knownHostsFile

String

None

The known_hosts file for the SFTP endpoint to use for host key verification.

You can create a known_hosts file using the following command:

ssh-keyscan -H <host> -P <port>  >> known_hosts.

For example:

 ssh-keyscan -H myftpserver -p 22  >> known_hosts

strictHostKeyChecking

String

(yes | no)

no

Indicates whether to use strict host key checking. The default (no) is not to use strict host key checking.

autoCreateKnownHostsFile

boolean

(true | false)

false

Indicates whether the known_hosts file should be created automatically if it does not exist. The default (false) is not to create the file automatically.

ciphers

String

None

A comma-separated list of ciphers to use, listed in order of preference. Possible cipher names are defined by JCraft Java Secure Channel (JSch). Some examples include: aes128-ctr, aes128-cbc, 3des-ctr, 3des-cbc, blowfish-cbc, aes192-cbc, and aes256-cbc.

keyExchangeProtocols

String

None

A comma-separated list of key exchange protocols to use, listed in order of preference. Possible cipher names are defined by JCraft JSch. Some examples include: diffie-hellman-group-exchange-sha1, diffie-hellman-group1-sha1, diffie-hellman-group14-sha1, diffie-hellman-group-exchange-sha256, ecdh-sha2-nistp256, ecdh-sha2-nistp384, and ecdh-sha2-nistp521.

publicKeyAcceptedAlgorithms

String

None

A comma-separated list of accepted public key algorithms. Some examples include: ssh-dss, ssh-rsa, ecdsa-sha2-nistp256, ecdsa-sha2-nistp384, and ecdsa-sha2-nistp521.

serverHostKeys

String

None

A comma-separated list of algorithms supported for the server host key. Some examples include: ssh-dss, ssh-rsa, ecdsa-sha2-nistp256, ecdsa-sha2-nistp384, and ecdsa-sha2-nistp521.

connectTimeout

int

> 0

10000

The time in milliseconds to wait for a connection to be established.

soTimeout

int

> 0

300000

The socket timeout, in milliseconds.

maximumReconnectAttempts

int

>= 0

None

The maximum number of times to retry to establish an initial connection to the remote FTP server. Use 0 to disable this behavior.

reconnectDelay

long

> 0

1000

The delay in milliseconds to wait before retrying the connection.

serverAliveCountMax

int

> 0

1

The maximum number of keep-alive messages to send without receiving any messages back from the server. If this threshold is reached while keep-alive messages are being sent, the connection is closed.

serverAliveInterval

int

>= 0

0

The interval (in milliseconds) between keep-alive messages. If zero is specified, no keep-alive messages are sent.

binary

boolean

(true | false)

false

The file transfer mode, either binary or ASCII. The default (false) is ASCII.

charset

String

A valid charset value.

None

The character encoding of the file contents, for example utf-8.

noop

boolean

(true | false)

false

If true, the file is not deleted or moved in any way.

delete

boolean

(true | false)

false

If true, the file is deleted after it is processed successfully.

move

String

Any

None

An expression (such as Simple Language) used to dynamically set the filename when moving it after processing.

To move files into a .done subdirectory: .done

To move files into a .processed subdirectory and rename them: .processed/$#{file:name}.done

To move files into a .processed subdirectory and rename file with current timestamp: .processed/$#{date:now:yyyyMMdd}/$#{file:name.noext}-$#{date:now:yyyyMMddHHmmss}.done

moveFailed

String

Any

None

Expression (such as Simple Language) used to dynamically set the filename when moving it after processing.

To move failed files into an .error subdirectory: .error

To move failed files into an .error subdirectory and rename file: .error/$#{file:name}.error

To move failed files into an .error subdirectory and rename them with the current timestamp: .error/$#{date:now:yyyyMMdd}/$#{file:name.noext}-$#{date:now:yyyyMMddHHmmss}.error

sortBy

String

A File Language expression.

None

Built-in sort using File Language. Supports nested sorts: for example, you can sort by filename and then by modified date.

Sort by filename: $#{file:name}

Sort by file modified time: $#{file:modified}

recursive

boolean

(true | false)

false

If the specified location is a directory, the connector looks for files in all the subdirectories as well.

maxDepth

int

> 0

2147483647

The maximum depth to traverse when recursively processing a directory.

minDepth

int

> 0

None

The minimum depth to start processing when recursively processing a directory. Using minDepth=1 means the base directory. Using minDepth=2 means the first subdirectory.

antExclude

String

Any

None

Ant-style filter exclusion. If both antInclude and antExclude are used, antExclude takes precedence. Multiple exclusions may be specified in comma-delimited format.

antInclude

String

Any

None

Ant-style filter inclusion. Multiple inclusions may be specified in comma-delimited format.

antFilterCaseSensitive

boolean

(true | false)

true

Indicates whether the ant* filters are case-sensitive. The default (true) is case-sensitive.

exclude

String

A valid regular expression

None

Excludes files where the filename matches the regex pattern (matching is case in-sensitive).

include

String

A valid regular expression

None

Includes files where the filename matches the regex pattern (matching is case-insensitive).

includeExt

String

A file extension

None

Is used to include files matching file extension name (case-insensitive).

For example to include .csv files, use includeExt=csv. Multiple extensions can be separated by comma; for example to include .csv and .dat files, use includeExt=csv,dat.

excludeExt

String

A file extension

None

Is used to exclude files matching file extension name (case-insensitive).

For example to exclude .bak files, use excludeExt=bak. Multiple extensions can be separated by comma; for example to exclude .bak and .dat files, use excludeExt=bak,dat.

filterFile, filterDirectory

String

Any

None

The source SFTP server URI and the directory path.

delay

long

Any

5000

The amount of time that must elapse before the next poll.

initialDelay

long

Any

1000

The amount of time that must elapse before the first poll starts.

timeUnit

String

Enum values: NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS

MILLISECONDS

The time unit for the initialDelay and delay options.

stopOnException

boolean

(true | false)

false

Indicates whether to stop processing the current file if an exception is encountered.

When this option is set to false (the default), the connector logs an exception and continues processing the next event in the current file.

When this option is set to true, the connector stops processing the current file and continues to the next file if any. If moveFailed is set, the file is moved to the error directory.

serverMessageLoggingLevel

String

Enum values: TRACE, DEBUG, INFO, WARN, ERROR, OFF

DEBUG

Logging levels control how much information is saved to the log file. During troubleshooting, it may be useful to raise the logging level to increase the amount of information the logs receive from the FTP server.

maxFileSize

Long

>= 0

2097152

(that is, 2 MB)

The maximum size, in bytes, of the files to be consumed. This option helps to prevent Out Of Memory (OOM) errors on systems with a small amount of available memory.

File Type-Specific Properties

The following sections describe the properties that are specific to each file type.

All properties must be prefixed with spring.cloud.stream.camel.bindings.<bindingName>.consumer.endpoint.query-parameters.

Configuration Option Type Valid Values Default Value Description

fileType

String

Enum values: delimited, json, xml

delimited

Required. The file processor to be used to handle the files. Default is delimited.

Delimited File Configuration Properties

The spring.cloud.stream.camel.bindings.<bindingName>.consumer.endpoint.query-parameters.fileType should be set to delimited.

Configuraton Option Type Valid Values Default Value Description

eventDelimiter

String

\n (newline)

The events in the files are separated by this character.

If the specified eventDelimiter is not found within the file, the connector processes the entire file as a single event.

The following example configuration uses eventDelimiter: xyzxyzxyzxyz (an unlikely delimiter) to intentionally keep the file intact without splitting. Setting payloadFormat: text preserves the raw content without JSON conversion, while ignoreHeaderLine: false ensures the first line is treated as data rather than a header:

eventDelimiter: xyzxyzxyzxyz
ignoreHeaderLine: false
paramDelimiter: bar
payloadFormat: text

The maximum payload sizes for software event brokers are 30 MB for Enterprise Edition and 10 MB for Standard Edition.

If the file size is larger than 30 MB, the connector throws an error.

ignoreHeaderLine

boolean

(true | false)

false

Indicates whether the first line of the file should be ignored as a header.

  • When set to false, the first line of the file is processed as a data line.

  • When set to true, the first line of the file is not processed as a data line. Also, if set to true and paramNames is not set, the first line of the file is used as the paramNames option to convert the data to json.

paramDelimiter

String

, (comma)

The delimiter to use to separate the parameters in the header line or paramNames configuration property.

paramNames

String

None

The list of parameter names separated by the paramDelimiter. The order of the names should match the order of the parameters in the file.

If not set, and ignoreHeaderLine is true, first line of the file is used as the paramNames.

payloadFormat

String

Enum Values: json, text

json

The format of the payload.

If json, the delimited data is converted to JSON.

If text, the delimited data is kept as is.

JSON File Configuration Properties

The spring.cloud.stream.camel.bindings.<bindingName>.consumer.endpoint.query-parameters.fileType option should be set to json.

Configuration Option Type Valid Values Default Value Description

jsonPath

String

A valid JSON path expression

$

(root node)

The JSON path expression to extract, split, or filter the data from the JSON file.

The default value of $ means the entire JSON file content will be processed as a single event. For a JSON file larger than 30 MB, this value results in an exception because the maximum size of a Solace message is 30 MB. We recommend splitting the JSON file content into multiple events by configuring the JSON path expression.

XML File Configuration Properties

The spring.cloud.stream.camel.bindings.<bindingName>.consumer.endpoint.query-parameters.fileType should be set to xml.

Configuration Option Type Valid Values Default Value Description

xPath

String

a valid XPath path expression

/

(root node)

The XPath expression to extract, split, or filter the data from the XML file.

The default value of / means the entire XML file content will be processed as a single event. For XML file larger than 30MB, it would result in exception as max Solace message size is 30MB. It is recommended to split the XML file content into multiple events by configuring the XPath expression.

For example, for an employees.xml file with following content, the XPath expression to extract employee nodes would be //employee:

<employees>
    <employee>
        <name>John</name>
        <age>30</age>
    </employee>
    <employee>
        <name>Smith</name>
        <age>40</age>
    </employee>
</employees>
                        

SFTP Target Configuration Options

The following table lists the SFTP target-specific properties. For more information, see the Apache Camel SFTP Component documentation.

All properties must be prefixed with spring.cloud.stream.camel.bindings.<output-x>.producer.endpoint.query-parameters.

Example configuration:

spring:
  cloud:
    stream:
      camel:
        bindings:
          output-0:
            producer:
              endpoint:
                query-parameters:
                  username: test
                  password: test
                  fileName: output.csv
                  #other properties

Connecting to Multiple Systems

This connector does not support multiple binder system configurations. The multiple binder systems configuration may be supported in future releases.