Configuring Connection Details

This section provides instructions for configuring the connection details required to establish communication between your PubSub+ event broker and your third-party system.

PubSub+ Connection Details

The Spring Cloud Stream Binder for PubSub+ uses Spring Boot Auto-Configuration for the Solace Java API to configure its session. In the application.yml, this typically is configured as follows:

solace:
  java:
    host: tcp://localhost:55555
    msg-vpn: default
    client-username: default
    client-password: default    

For more information and options to configure the PubSub+ session, see Spring Boot Auto-Configuration for the Solace Java API.

Preventing Message Loss when Publishing to Topic-to-Queue Mappings

If the connector is publishing to a topic that is subscribed to by a queue, messages may be lost if they are rejected (for example, if queue ingress is shut down).

To prevent message loss, configure the reject-msg-to-sender-on-discard option with the including-when-shutdown flag.

PostgreSQL Connection Details

To manually configure the PostgreSQL connection details, set the following properties in application.yml:

spring:
  cloud:
    stream:
      pgcdc:
        bindings:
          input-0:
            consumer:
              endpoint:
                query-parameters:
                  databaseHostname: <hostname>
                  databasePort: <port>
                  databaseDbname: <database name>
                  username: <database user/role name>
                  <authentication parameter>: <authentication parameter value>
                  <authentication parameter>: <authentication parameter value>
                  ...

Where:

  • databaseHostname is the hostname of the PostgreSQL database server.

  • databasePort is the connection port of the PostgreSQL database server.

  • databaseDbname is the name of the PostgreSQL database to connect to.

  • databaseUser is the PostgreSQL user to log in as. The PostgreSQL user determines the user role and permissions for the database.

  • <authentication parameter> is a parameter required for authenticating with the PostgreSQL database server. Choose one of:

Basic Authentication Parameters

Field Type Description
databaseUser String The username to use for logging in to PostgreSQL.
databasePassword String The password to use for authenticating with the PostgreSQL database.

Client Certificate Authentication Parameters

Field Type Description
databaseSslmode String

The SSL mode determines the security level for client-server communications. It controls whether SSL/TLS encryption is used and how rigorously the server's identity is verified. Valid values are:

  • disable to use an unencrypted connection

  • allow to try to use an unencrypted connection first and, failing that, a secure (encrypted) connection

  • prefer to try to use a secure (encrypted) connection first and, failing that, an unencrypted connection

  • require to use a secure (encrypted) connection, and fail if one cannot be established

  • verify-ca like required but additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates, or fail if no valid matching CA certificates are found

  • verify-full like verify-ca but additionally verify that the server certificate matches the host to which the connection is attempted

For more information, see the Protection Provided in Different Modes section of the PostgreSQL documentation.

databaseSslcert String The path to the SSL Certificate for the client, for example, local/ssl/client.crt.
databaseSslkey String The path to the file containing the SSL private key for the client.. for example, local/ssl/client.key
databaseSslpassword String The password to access the client private key from the file specified by databaseSslkey.
databaseSslrootcert String The path to the database SSL root certificate against which the server is validated, for example, local/ssl/root.crt.
databaseUser String The username to use for logging in to PostgreSQL.

Checkpoint Store Configuration

The Connector for PostgreSQL CDC stores the information about the current progress of processing files in a checkpoint store backed by a Last Value Queue (LVQ) on a PubSub+ event broker. The checkpoint store contains information about the files that have been processed and the files that are currently being processed.

The following table lists the configuration options for the Checkpoint Store.

Config Option Type Valid Values Default Value Description

solace.pgcdc.checkpoint.lvqName

String

Required.

The name of the event broker Last Value Queue (spool size 0) to be used for checkpointing. The queue must exist on the same event broker and Message VPN as the target queue.

If the LVQ is deleted (administratively) or a message from the LVQ is deleted or consumed by another consumer, the Connector for PostgreSQL CDC will not be able to resume from the last checkpoint. In addition, the LVQ should not be shared by multiple instances of the Connector for PostgreSQL CDC.

solace.pgcdc.checkpoint.autoProvisionLvq

boolean

true, false

false

Optional.

Set to true to auto-provision the LVQ queue if it does not exist.

PostgreSQL CDC Binder Configuration Options

The following properties are available at the PostgreSQL CDC binder level.

Config Option Type Valid Values Description

spring.cloud.stream.bindings.input-<workflow-id>.destination

String

Format:

schema.tableName

For example, public.products

The PostgreSQL table from which to consume change events. The destination must include the schema and the name of the table.

spring.cloud.stream.bindings.input-<workflow-id>.binder

String

pgcdc

This property must be set to pgcdc.

PostgreSQL CDC Consumer Configuration Options

The following table lists the Debezium PostgreSQL source-specific properties. For more information about these configuration options, see the Apache Camel Debezium PostgreSQL component documentation.

All properties must be prefixed with spring.cloud.stream.pgcdc.bindings.input-<workflow-id>.consumer.endpoint.query-parameters.

Example configuration:

pgcdc:
  bindings:
     input-0:
       consumer:
         endpoint:
           query-parameters:
             <connection parameters>
	      pluginName: pgoutput
             slotName: debezium
             tableIncludeList: public.inventory
             schemaIncludeList: public
             topicPrefix: cdc
             snapshotMode: initial
Config Option Type Valid Values Default Value Description

databaseDbName

String

The name of the PostgreSQL database.

slotName

String

The name of the PostgreSQL logical decoding slot. A logical decoding slot is a persistent object that maintains a stream of database changes from a plugin. The Connector for PostgreSQL CDC consumes the change stream from the specified slot. For more information, consult your PostgreSQL server administrator.

topicPrefix

String

The binding name (for example, input-0)

Topic prefix that identifies and provides a namespace for the particular database server/cluster that is capturing changes.

snapshotMode

String

no_data | always | initial | initial_only | never | custom

no_data

The criteria for running a snapshot upon startup of the connector.

pluginName

String

pgoutput | decoderbufs

pgoutput

The name of the PostgreSQL logical decoding plugin installed on the server. The logical decoding plugin converts write-ahead log (WAL) entries into a stream of changes. For more information, consult your PostgreSQL server administrator.

databaseUser

String

The user to authenticate with PostgreSQL database.

databasePassword

String

Password to use for authentication.

databaseSslmode

String

The SSL mode to authenticate with the PostgreSQL database.

databaseSslcert

String

The SSL Certificate for the client.

databaseSslkey

String

The SSL private key for the client.

databaseSslpassword

String

The database SSL password to authenticate with the PostgreSQL database.

databaseSslrootcert

String

The database SSL root certificate against which the server is validated.

skippedOperations

String

t

A comma-separated list of operations to skip during streaming. Valid options include: c for inserts/create; u for updates; d for deletes, t for truncates, and none to indicate nothing skipped.

publicationAutocreateMode

String

disabled | all_tables | filtered

disabled

Specifies when a publication is to be created. Applies only when streaming changes using pgoutput.

tombstonesOnDelete

boolean

false

Indicates whether delete operations should be represented by a delete event and a subsequent tombstone event.

tableIncludeList

This parameter is ignored.

schemaIncludeList

This parameter is ignored.

offsetStorageTopic

This parameter is ignored.

offsetStoragePartitions

This parameter is ignored.

offsetStorageReplicationFactor

This parameter is ignored.

PostgreSQL CDC Micro-Integration Processing Options

The following section describe the properties specific to the Connector for PostgreSQL CDC. All properties are prefixed with spring.cloud.stream.camel.bindings.<input-x>.consumer.endpoint.query-parameters.

Config Option Type Valid Values Default Value Description

payloadSchemaEnabled

boolean

true or false

false

Indicates whether to include the schema information in the final payload.

skipDeleteEvents

boolean

true or false

true

Indicates whether to skip delete events. When true, this setting is equivalent to skippedOperations=t,d; when false, it is equivalent to skippedOperations=t.

PostgreSQL CDC Injected Headers

The following file-related headers are injected into each message. The examples illustrate the header values for an update to the PostgreSQL source table.

Header Name Type Description

cdc_operation

String

Values for the connector are c for create (or insert), u for update, d for delete or r for read.

cdc_db_name

String

The name of the database.

cdc_schema

String

The name of the schema.

For example, public.

cdc_table_name

String

The name of the table.

For example, products.

cdc_lsn

Long

The log sequence number.

cdc_txId

Long

The transaction ID of the database transaction.

cdc_key

String

The key of the event.

For example, {"id": 5}.

cdc_before

String

The state of the row before the event occurred.

For example, {"id": 5, "key1": "value1", "key2": "value2"}.

cdc_timestamp

Long

The time at which the connector processed the event.