Leveraging AMQP Messaging with Solace Schema Registry: Serializing and Deserializing Messages in Java

The SERDES Collection provides Java libraries for efficient serialization and deserialization of structured message payloads over AMQP 1.0. These libraries (Solace Avro SERDES for Java, Solace JSON Schema SERDES for Java, and Generic SERDES for Java) enable applications to convert complex data structures into compact, transmittable formats and reconstruct them on receipt. SERDES transmits schema information through AMQP application properties rather than embedding it in the message payload, preserving schema identifiers during transmission and supporting both binary and text message formats. The examples on this page use Apache Qpid JMS 2.0, however the concepts apply to any AMQP 1.0 client library with appropriate adjustments.

Using the SERDES Collection with Solace Schema Registry

In modern messaging systems, managing data formats across distributed applications can present challenges. As you scale your applications, differences in how message producers and consumers serialize and interpret data can lead to issues with data consistency and version compatibility. Without a shared understanding of the message format, mismatches such as missing fields, unexpected data types, or incompatible versions can result in runtime errors. To prevent these problems and streamline data exchange in Solace messaging environments, we recommend you use Solace Schema Registry with the SERDES Collection.

What is a schema?

A schema defines the structure, data types, and validation rules for messages exchanged between producers and consumers, ensuring that they agree on the format of the data being exchanged. In formats like Avro or JSON Schema, a schema explicitly describes each field's name, type, and if it is required. This enables compatibility checks, validation, and tooling support, and ensures consistent data interpretation across services, especially when schemas are managed in a schema registry.

What is a schema registry?

A schema registry is a centralized service that stores and manages schemas used for serializing and deserializing structured data. It ensures that applications exchanging messages follow a consistent data format, which prevents compatibility issues between message producers and consumers. By using Solace Schema Registry, you can enforce data validation, track schema versions, and support schema evolution in a way that maintains compatibility with applications using older schema versions.

How does everything work together?

When integrated with the SERDES Collection, your AMQP applications can automatically retrieve and apply the correct schema from Solace Schema Registry during message serialization and deserialization. When a message producer sends a message, the serializer component encodes the data according to a registered schema, including a schema identifier stored in the SERDES header appended to your message as AMQP application properties. On the message consumer side, the application extracts the schema identifier from the application properties, retrieves the corresponding schema from the registry, and ensures the message is correctly parsed. This header-based approach allows for efficient schema resolution without modifying the message payload itself. This interaction enables dynamic schema resolution, eliminates the need to hardcode schemas in applications, and simplifies the management of structured data formats like Avro and JSON Schema in your AMQP messaging applications.

Steps for Using the SERDES Collection with Solace Schema Registry

  1. Set up a Solace Schema Registry instance then define and register your schemas.
  2. Choose a serialization format. Solace currently supports Avro and JSON Schema.
  3. Configure your SERDES objects (Avro or JSON Schema).
    • Both JSON and Avro share a common SERDES configuration API for registry access, resolution, and caching.
  4. Use a serializer to serialize outbound messages and a deserializer to deserialize inbound messages.

For more information, see Solace Schema Registry.

Deploying with Maven

To use the SERDES Collection with your AMQP applications, you need to include the appropriate Maven dependencies in your project. The SERDES libraries are distributed as separate artifacts that you can include based on your serialization format requirements.

The following sections show Maven dependencies for the Apache Qpid JMS 2.0 implementation of AMQP. If you're using a different AMQP client library, you would need to include the appropriate dependencies for that implementation instead of or in addition to the Qpid JMS client shown here.

The following sections show you how to configure your Maven project to include the necessary dependencies for schema registry integration:

SERDES BOM Dependencies (Recommended)

The SERDES Bill of Materials (BOM) provides a centralized way to manage compatible versions of all Solace Schema Registry SERDES components. Using the BOM ensures that all SERDES dependencies are aligned with tested and compatible versions. This simplifies dependency management and reduces version conflicts in your applications.

The BOM includes version management for:

  • Avro SERDES components
  • JSON Schema SERDES components
  • Common SERDES libraries
  • Compatible versions of underlying dependencies

To use the SERDES BOM, add it to your Maven pom.xml file in the dependencyManagement section:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.solace</groupId>
            <artifactId>solace-schema-registry-serdes-bom</artifactId>
            <version>1.0.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

After importing the BOM, you can add SERDES dependencies without specifying versions:

<dependencies>   
    <!-- Avro SERDES (version managed by BOM) -->
    <dependency>
        <groupId>com.solace</groupId>
        <artifactId>solace-schema-registry-avro-serde</artifactId>
    </dependency>
    
    <!-- JSON Schema SERDES (version managed by BOM) -->
    <dependency>
        <groupId>com.solace</groupId>
        <artifactId>solace-schema-registry-jsonschema-serde</artifactId>
    </dependency>
</dependencies>

AMQP Client Dependencies (Apache Qpid JMS Example)

For AMQP messaging with SERDES using Apache Qpid JMS 2.0, you need to include the appropriate client library. This client provides Jakarta JMS 2.0 API support and AMQP 1.0 protocol implementation. The client must be configured to disable JMS property name validation to allow schema registry headers, which contain characters not permitted by default JMS property naming rules. Other AMQP client libraries would have their own dependency requirements and configuration approaches.

Add the following dependency to your Maven pom.xml file:

<dependencies>
    <!-- Apache Qpid JMS 2.0 Client -->
    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-jms-client</artifactId>
        <version>2.9.0</version>
    </dependency>
</dependencies>

Important considerations for AMQP client configuration:

  • Apache Qpid JMS 2.0 uses Jakarta JMS APIs (not javax.jms). Ensure your imports use jakarta.jms.* packages.
  • You must disable JMS property name validation on the connection factory to allow schema registry headers containing dots (.) and hyphens (-).
  • SERDES headers are transmitted as AMQP application properties, which are mapped to JMS message properties by the client.

Avro Serializer Dependencies

The Avro serializer provides a specific implementation for Apache Avro serialization and deserialization. This dependency includes the AvroSerializer, AvroDeserializer, and Avro-specific configuration properties, and also brings in the required common SERDES components used across all schema formats.

Add the following dependency to your Maven pom.xml file:

<dependencies>
    <dependency>
        <groupId>com.solace</groupId>
        <artifactId>solace-schema-registry-avro-serde</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

For the latest version information and additional details about the Avro SERDES artifact, see the Maven Central Repository.

Important considerations when adding Maven dependencies:

  • Always use the latest compatible versions of the SERDES libraries for optimal performance and security.
  • The Avro SERDES dependency automatically includes the necessary Apache Avro libraries as transitive dependencies.

JSON Schema Serializer Dependencies

The JSON Schema serializer provides a specific implementation for JSON Schema serialization and deserialization. This dependency includes the JsonSchemaSerializer, JsonSchemaDeserializer, and JSON Schema-specific configuration properties, and also brings in the required common SERDES components used across all schema formats.

Add the following dependency to your Maven pom.xml file:

<dependencies>
    <dependency>
        <groupId>com.solace</groupId>
        <artifactId>solace-schema-registry-jsonschema-serde</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

For the latest version information and additional details about the JSON Schema SERDES artifact, see the Maven Central Repository.

Important considerations when adding Maven dependencies:

  • Always use the latest compatible versions of the SERDES libraries for optimal performance and security.
  • The JSON Schema SERDES dependency automatically includes the necessary JSON Schema validation libraries as transitive dependencies.

Using SERDES with AMQP Messaging

AMQP messaging provides a standards-based approach to message publishing and consumption that integrates with the SERDES Collection. When using SERDES with AMQP messaging, schema information is transmitted through AMQP application properties rather than being embedded in the message payload, enabling efficient schema resolution and cross-protocol compatibility with REST and Solace Messaging API applications.

General AMQP Concepts with SERDES

When implementing SERDES with any AMQP 1.0 client, there are several important protocol-level considerations:

  • Application Properties—SERDES headers are transmitted as AMQP application properties, which ensures schema identifiers and metadata are preserved during message transmission. This is a standard feature of the AMQP 1.0 protocol that all compliant clients support.
  • Message Type Support—AMQP messaging supports both binary and text message formats for SERDES operations. However, the SERDES deserializer only operates on binary payloads. Applications must extract or convert the message content into a binary format—whether from a TextMessage or a BytesMessage—before passing it to the deserializer. For an example of this, see Consuming JSON Schema Messages over AMQP (Apache Qpid JMS Example).
  • Cross-Protocol Compatibility—AMQP applications can seamlessly exchange messages with REST and Solace JCSMP API applications through the Solace broker, which automatically translates message formats and preserves SERDES headers across protocols. For more information on cross-protocol message translation, see AMQP.

Apache Qpid JMS Implementation Considerations

When using Apache Qpid JMS 2.0 as your AMQP client implementation, there are additional specific considerations:

  • JMS Property Name Validation—Apache Qpid JMS enforces strict property name validation by default, which prevents schema registry headers containing dots (.) and hyphens (-) from being transmitted. You must disable property name validation on the connection factory using setValidatePropertyNames(false) to allow SERDES headers to be transmitted properly. Without this setting, SERDES headers will not be properly transmitted and schema resolution will fail.
  • JMS Message Types—When using Apache Qpid JMS, you'll work with JMS message types like BytesMessage and TextMessage. For AMQP with Qpid JMS, the deserializer.deserialize() method can handle both message types after extracting the payload bytes. BytesMessage is recommended for optimal performance and compatibility.
  • Jakarta JMS APIs—Apache Qpid JMS 2.0 uses Jakarta JMS APIs (jakarta.jms.*) rather than the legacy javax APIs. Ensure your imports and dependencies are updated accordingly.
  • Application Properties Mapping—With Apache Qpid JMS, SERDES headers are automatically mapped between AMQP application properties and JMS message properties. For more information on how AMQP application properties map to SMF user properties, see AMQP.

AMQP Connection Configuration (Apache Qpid JMS Example)

When implementing AMQP applications that use SERDES with Apache Qpid JMS, you need to configure the connection factory properly to ensure SERDES headers are transmitted correctly. Other AMQP client libraries would have their own connection configuration approaches, but would still need to ensure that application properties containing schema information are properly handled:

  • Connection Factory Configuration—Create a JmsConnectionFactory with the appropriate broker connection details and disable property name validation.
  • Client ID Configuration—Set a unique client ID on the JMS context when using durable subscriptions or other features that require client identification.
  • Session Configuration—Configure JMS sessions with appropriate acknowledgment modes and transaction settings based on your application requirements.

Example connection factory configuration:

// Create connection factory with property validation disabled
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(username, password, brokerUrl);
connectionFactory.setValidatePropertyNames(false);

// Create JMS context and set client ID if needed
try (JMSContext context = connectionFactory.createContext()) {
    context.setClientID("MyAMQPClient");
    
    // Create topic and configure producers/consumers
    Topic topic = context.createTopic("solace/samples/json");
}

Cross-Protocol Message Flow with SERDES

SERDES supports cross-protocol messaging between AMQP and other messaging protocols. The following examples illustrate the primary cross-protocol flows:

Example 1: AMQP Publisher to REST Consumer

  1. Publisher Side—The AMQP client serializes the message payload using the appropriate SERDES serializer, which adds schema identification headers as AMQP application properties.
  2. Message Transmission—The serialized payload and SERDES headers are transmitted via AMQP 1.0 to the event broker.
  3. Consumer Side—The REST consumer application receives the HTTP request via a REST Delivery Point (RDP), extracts the SERDES headers from the HTTP headers (mapped from AMQP application properties), and uses deserializer.deserialize() to reconstruct the message payload.

Example 2: REST Publisher to AMQP Consumer

  1. Publisher Side—The REST client serializes the message payload using the appropriate SERDES serializer, which adds schema identification headers to the HTTP request.
  2. Message Transmission—The serialized payload and SERDES headers are transmitted via HTTP POST to the event broker REST endpoint.
  3. Consumer Side—The AMQP consumer receives the message, extracts the SERDES headers from the AMQP application properties, retrieves the message payload bytes, and uses deserializer.deserialize() to reconstruct the message payload.

Example 3: AMQP Publisher to Solace Message Format (SMF) Consumer

  1. Publisher Side—The AMQP client serializes the message payload using the appropriate SERDES serializer, which adds schema identification headers as application properties.
  2. Message Transmission—The serialized payload and SERDES headers are transmitted via AMQP 1.0 to the event broker.
  3. Consumer Side—A Solace Messaging API application receives the message, extracts the SERDES headers from the SMF user properties (mapped from AMQP application properties), and uses SerdeMessage.deserialize() to reconstruct the message payload.

Cross-Protocol Message Translation

When messages are published via AMQP and consumed by REST or Solace JCSMP API applications, the Solace broker performs automatic message translation and header mapping. This translation preserves SERDES schema information and determines the message format that consumers receive.

Message Type Translation Rules

The broker translates AMQP messages to other protocol message types according to the following rules. For detailed information on protocol metadata and payload encoding interactions, see Protocol Metadata and Payload Encoding Interactions:

  • AMQP to REST Translation—AMQP application properties are mapped to HTTP headers with the solace-user-property- prefix, preserving SERDES schema information for REST consumers.
  • AMQP to Solace Message Format (SMF) Translation—AMQP application properties are mapped to Solace Message Format (SMF) user properties, allowing Solace Messaging API consumers to access SERDES headers.
  • Message Payload Preservation—Message payloads are preserved during protocol translation, ensuring that serialized data remains intact regardless of the consuming protocol.

SERDES Compatibility with Message Types

AMQP messaging with Apache Qpid JMS 2.0 supports both message types for SERDES operations. The message type is determined by the AMQP message body format and content type as described in AMQP:

  • BytesMessage Support—Traditional support for binary message payloads, where the SERDES deserializer processes the byte array directly. This is the recommended approach for optimal performance and compatibility.
  • TextMessage Support—Support for text message payloads, where the application converts the UTF-8 text content to bytes before processing. This enables flexibility in message handling and cross-protocol scenarios.

This cross-protocol compatibility allows AMQP applications to work seamlessly with REST publishers and Solace Messaging API consumers without requiring changes to the application code. Schema validation and message deserialization work consistently across all supported protocols.

Important considerations for cross-protocol messaging:

  • Always disable JMS property name validation on the Apache Qpid JMS connection factory to ensure SERDES headers are transmitted properly.
  • SERDES headers are preserved during message translation only when using proper AMQP application properties mapping.
  • Message translation is performed automatically by the broker and does not require any configuration changes.
  • Both BytesMessage and TextMessage objects are supported for deserialization, but BytesMessage is recommended for optimal performance and cross-protocol compatibility.

When implementing AMQP applications that use SERDES with Apache Qpid JMS, you need to configure the connection factory properly to ensure SERDES headers are transmitted correctly. Other AMQP client libraries would have their own connection configuration approaches, but would still need to ensure that application properties containing schema information are properly handled:

  • Connection Factory Configuration—Create a JmsConnectionFactory with the appropriate broker connection details and disable property name validation.
  • Client ID Configuration—Set a unique client ID on the JMS context when using durable subscriptions or other features that require client identification.
  • Session Configuration—Configure JMS sessions with appropriate acknowledgment modes and transaction settings based on your application requirements.

Example connection factory configuration:

// Create connection factory with property validation disabled
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(username, password, brokerUrl);
connectionFactory.setValidatePropertyNames(false);

// Create JMS context and set client ID if needed
try (JMSContext context = connectionFactory.createContext()) {
    context.setClientID("MyAMQPClient");
    
    // Create topic and configure producers/consumers
    Topic topic = context.createTopic("solace/samples/json");
}

Common SERDES Configuration

The Common SERDES Configuration section provides the foundation for all serialization and deserialization operations in the SERDES Collection, regardless of the specific format being used, such as Avro or JSON Schema. This section covers the common configuration options that enable your applications to interact with Solace Schema Registry, manage schema resolution, and implement schema selection strategies.

The following Java imports are required for implementing serialization and deserialization using the SERDES Collection:

import com.solace.serdes.Serializer;
import com.solace.serdes.Deserializer;
import com.solace.serdes.common.resolver.config.SchemaResolverProperties;
import java.util.HashMap;
import java.util.Map;

All SERDES properties are set in a HashMap object with the put() method:

Map<String, Object> config = new HashMap<>();
// set all your configuration parameters with config.put()

Authentication

The Generic SERDES for Java supports multiple authentication methods for connecting to Solace Schema Registry. This section covers how to configure authentication credentials for your SERDES objects, including basic username and password authentication, and how to implement them in both secure and non-secure connection contexts. You specify the authentication settings when you configure your schema registry client, which allows you to control access permissions and maintain the security of your schemas.

The following properties configure how your application connects to Solace Schema Registry:

  • SchemaResolverProperties.REGISTRY_URL—The URL of Solace Schema Registry where schemas are stored. This property is required in all serializer and deserializer configurations. The registry URL must point to the Registry REST API endpoint. The default path for this endpoint is /apis/registry/v3.
  • SchemaResolverProperties.AUTH_USERNAME—The username to use to authenticate with Solace Schema Registry.
  • SchemaResolverProperties.AUTH_PASSWORD—The password to use to authenticate with Solace Schema Registry.
  • SchemaResolverProperties.TRUST_STORE_PATH—The file path to the truststore containing SSL/TLS certificates for secure connections.
  • SchemaResolverProperties.TRUST_STORE_PASSWORD—The password required to access the truststore file.
  • SchemaResolverProperties.VALIDATE_CERTIFICATE—A flag that determines whether SSL certificate validation is enabled (true) or disabled (false).

    We recommend that you never disable VALIDATE_CERTIFICATE in production environments because it creates a security vulnerability.

The following code snippets show you how to set these properties based on which authentication option you use:

  • Authentication, plain text connection—Used in restricted environments where network access is controlled but authentication is still required.
    config.put(SchemaResolverProperties.REGISTRY_URL, "http://localhost:8081/apis/registry/v3");
    
    // Set authentication credentials
    config.put(SchemaResolverProperties.AUTH_USERNAME, "myUsername");
    config.put(SchemaResolverProperties.AUTH_PASSWORD, "myPassword");
  • Authentication, secure connection—The recommended production setup. This setup ensures both data encryption and access control for secure schema management.
    // Use HTTPS and non-localhost hostname for secure communication with the schema registry
    config.put(SchemaResolverProperties.REGISTRY_URL, "https://{your_hostname}:443/apis/registry/v3");
    
    // Authentication credentials
    config.put(SchemaResolverProperties.AUTH_USERNAME, "myUsername");
    config.put(SchemaResolverProperties.AUTH_PASSWORD, "myPassword");
    
    // Configure TLS properties for secure connection, this includes the truststore path and password
    config.put(SchemaResolverProperties.TRUST_STORE_PATH, "path/to/truststore");
    config.put(SchemaResolverProperties.TRUST_STORE_PASSWORD, "truststore_password");
    
    config.put(SchemaResolverProperties.VALIDATE_CERTIFICATE, true);
  • Advanced authentication—Used when you configure Schema Registry with an external identity provider such as Microsoft Entra ID.
    config.put(SchemaResolverProperties.REGISTRY_URL, "https://{your_hostname}:443/apis/registry/v3");
    
    // For OAuth authentication, set your client ID as AUTH_USERNAME and your client secret as AUTH_PASSWORD:
    config.put(SchemaResolverProperties.AUTH_USERNAME, "your-client-id");
    config.put(SchemaResolverProperties.AUTH_PASSWORD, "your-client-secret");

    When using advanced authentication flows, you configure the same AUTH_USERNAME and AUTH_PASSWORD properties. In this case, AUTH_USERNAME maps to your OAuth Client ID, and AUTH_PASSWORD maps to your OAuth Client Secret from your OAuth configuration. For more information about configuring external identity providers with Schema Registry, see Deploying and Configuring Solace Schema Registry with Docker or Podman.

Artifact Resolver Strategies (Serialization Only)

Artifact resolver strategies define how a schema or artifact is dynamically resolved at runtime for serialization operations performed by the SERDES Collection. These strategies determine which schema to fetch from Solace Schema Registry based on metadata like Solace topics, destination IDs, and record IDs. Artifact resolver strategies are only applied during serialization, because the serializer is responsible for selecting or creating the artifact ID to register or reference a schema. Deserializers simply use the schema ID embedded in the incoming message to retrieve the correct schema, without needing to determine how that ID was formed.

Some important terms:

  • record—A structured data object, for example an Avro or JSON Schema message, that is serialized or deserialized using a schema from Solace Schema Registry.
  • artifact—A named and versioned container in Solace Schema Registry that holds one or more schema versions. Each artifact is identified by an artifactId and optionally grouped using a groupId.
  • ArtifactReference—A pointer to an existing artifact in Solace Schema Registry. When the artifact contains a schema, this reference can be used to locate and apply it during serialization or deserialization.
  • artifactId—A unique identifier for an artifact in Solace Schema Registry, which typically contains a single schema.
  • groupId—A logical grouping mechanism for artifacts in Solace Schema Registry. It allows organizing related schemas, for example all schemas for a specific application. If no groupId is specified, the value is default.
In cases where multiple topics should resolve to the same schema, Solace recommends you use the Solace Topic ID Strategy with Profiles. This approach provides flexible topic-to-schema mappings using wildcard expressions and custom artifact references.

The available strategies for retrieving schemas are available:

Destination ID Strategy

The DestinationIdStrategy automatically determines which schema to use during serialization. It extracts the destination name from the record metadata to use as the artifactId, while applying a default value as the groupId to construct the complete ArtifactReference. This approach eliminates the need for explicit schema specification on a per message basis because it creates a direct mapping between message destinations and schema artifacts stored in the registry. For successful implementation, Solace Schema Registry must contain schemas with artifactIds that correspond to the destination names used in your application's messaging infrastructure. The strategy is valuable in systems where different message types are routed to different destinations, as it reduces configuration overhead and decouples serialization logic from schema-specific details.

In summary, the DestinationIdStrategy:

  • Uses the destination name defined by the parameter passed into the serialize() method call. In REST and AMQP implementations, this is typically an endpoint name or path specified directly by the user. In Solace JCSMP API implementations, this destination is automatically extracted from the message topic via the serialize() method.
  • Uses this destination name as the artifactId in the ArtifactReference.
  • Uses a default value as the groupId in the ArtifactReference.
  • Uses the constructed ArtifactReference to locate the correct schema in Solace Schema Registry.
This strategy requires that schemas are registered in Solace Schema Registry with artifactIds that match the destination names used in your application.

Here's how to configure a serializer to use the DestinationIdStrategy:

config.put(SchemaResolverProperties.ARTIFACT_RESOLVER_STRATEGY, DestinationIdStrategy.class);

Solace Topic ID Strategy

The SolaceTopicIdStrategy automatically determines which schema to use during serialization by mapping the destination name directly to the ArtifactReference, setting the destination string as the artifactId. This approach simplifies schema resolution by creating a direct correlation between Solace topic destinations and schema artifacts stored in the registry. For successful implementation, Solace Schema Registry must contain schemas with artifactIds that correspond to the topic names used in your Solace messaging infrastructure.

In summary, the SolaceTopicIdStrategy:

  • Uses the destination name defined by the parameter passed into the serialize() method call. In REST and AMQP implementations, this is typically an endpoint name or path specified directly by the user. In Solace JCSMP API implementations, this destination is automatically extracted from the message topic via the serialize() method.
  • Maps this destination name directly as the artifactId in the ArtifactReference.
  • Uses the constructed ArtifactReference to locate the correct schema in Solace Schema Registry.
This strategy requires that schemas are registered in Solace Schema Registry with artifactIds that match the topic names used in your Solace messaging application.

Here's how to configure a serializer to use the SolaceTopicIdStrategy without a profile:

config.put(SchemaResolverProperties.ARTIFACT_RESOLVER_STRATEGY, SolaceTopicIdStrategy.class);	

Solace Topic ID Strategy with Profiles

For more advanced mapping scenarios, the SolaceTopicIdStrategy can be used with a SolaceTopicProfile to provide flexible topic-to-schema mappings. This approach allows for wildcard topic expressions and custom artifact references, giving you greater control over schema resolution.

Using profiles with SolaceTopicIdStrategy provides several benefits:

  • Support for wildcard topic expressions to match multiple topics with a single mapping. You can use * for single-level wildcards and > for multi-level wildcards. For more information about wildcards, see Wildcard Characters in Topic Subscriptions.
  • The ability to map different topic patterns to specific schemas.
  • Control over groupId, artifactId, and versioning for schema selection

The SolaceTopicProfile can be configured with different types of mappings:

Topic Expression Only Mapping

This method maps a Solace topic expression directly to an artifactId that matches the topic expression itself. The groupId is left as default. Use this method when your schema registry follows a convention where schemas are registered with an artifactId that matches your topic hierarchy.

The following code snippet creates three topic mapping patterns, exact, single-level wildcard, and multi-level wildcard, and adds them to a SolaceTopicProfile container:

// Create a new instance of SolaceTopicProfile
SolaceTopicProfile profile = new SolaceTopicProfile();

// Create mappings with topic expressions
SolaceTopicArtifactMapping mapping1 = SolaceTopicArtifactMapping.create("solace/samples");
SolaceTopicArtifactMapping mapping2 = SolaceTopicArtifactMapping.create("solace/*/sample");
SolaceTopicArtifactMapping mapping3 = SolaceTopicArtifactMapping.create("solace/>");

// Add mappings to profile
profile.add(mapping1);
profile.add(mapping2);
profile.add(mapping3);

// Add the profile to your serializer configuration, enabling the SolaceTopicIdStrategy to use your topic-to-schema mapping
config.put(SchemaResolverProperties.STRATEGY_TOPIC_PROFILE, profile);

Topic Expression with Custom Artifact ID

This method maps a Solace topic expression to a specific, custom artifactId. The groupId is left as "default". Use this method when multiple topics should use the same schema, but your topic names do not match your schema registry naming conventions.

The following code snippet creates three topic-to-schema mappings that associate different Solace topic patterns with specific custom artifact IDs (User, NewUser, and OldUser) and adds them to a SolaceTopicProfile container:

// Create a new instance of SolaceTopicProfile
SolaceTopicProfile profile = new SolaceTopicProfile();

// Create mappings with topic expressions and custom artifact IDs
SolaceTopicArtifactMapping mapping1 = SolaceTopicArtifactMapping.create("solace/samples", "User");
SolaceTopicArtifactMapping mapping2 = SolaceTopicArtifactMapping.create("solace/*/sample", "NewUser");
SolaceTopicArtifactMapping mapping3 = SolaceTopicArtifactMapping.create("solace/>", "OldUser");

// Add mappings to profile
profile.add(mapping1);
profile.add(mapping2);
profile.add(mapping3);

// Add the profile to your serializer configuration, enabling the SolaceTopicIdStrategy to use your topic-to-schema mapping
config.put(SchemaResolverProperties.STRATEGY_TOPIC_PROFILE, profile);	

Topic Expression with Full ArtifactReference

This method maps a Solace topic expression to a complete ArtifactReference with a custom groupId, artifactId, and optional version for exact schema selection. Use this method when you need complete control over schema resolution, especially when you have multiple schema groups or when specific versions must be used.

The following code snippet creates two artifact references, one with version specification and one without, and maps them to specific topic patterns:

// Create a new instance of SolaceTopicProfile
SolaceTopicProfile profile = new SolaceTopicProfile();

// Create artifact reference
ArtifactReferenceBuilder builder = new ArtifactReferenceBuilder();
ArtifactReference reference = builder
        .groupId("com.solace.samples.serdes.avro.schema")
        .artifactId("User")
        .build();

// Create mapping with topic expression and artifact reference
SolaceTopicArtifactMapping mapping = SolaceTopicArtifactMapping.create("solace/samples", reference);
// Create mapping with version-specific reference
ArtifactReference versionedReference = builder
        .groupId("com.solace.samples.serdes.avro.schema")
        .artifactId("User")
        .version("0.0.1")
        .build();

SolaceTopicArtifactMapping versionedMapping = SolaceTopicArtifactMapping.create("solace/>", versionedReference);
// Add mappings to profile
profile.add(mapping);
profile.add(versionedMapping);

// Add the profile to your serializer configuration, enabling the SolaceTopicIdStrategy to use your topic-to-schema mapping
config.put(SchemaResolverProperties.STRATEGY_TOPIC_PROFILE, profile);

Schema Resolution Options

Schema resolution determines how your application retrieves, caches, and manages schemas when it interacts with Solace Schema Registry. This section covers configuration options, including caching strategies, and schema lookup options.

Schema Caching Options

To reduce the overhead of repeated schema registry lookups, clients using the SERDES Collection can configure schema caching options. These settings control how long schemas are stored locally and how different types of lookups interact with the schema registry cache.

  • SchemaResolverProperties.CACHE_TTL_MS—Determines how long schema artifacts remain valid in the cache before they need to be fetched again from the registry on the next relevant lookup. The default value is 30000ms or 30 seconds. Longer TTLs improve performance by reducing registry calls but risk using outdated schemas, shorter TTLs ensure fresher schemas but increase registry load. A zero TTL disables caching entirely, requiring registry fetches for every request. You can configure this property in several ways:
    // Example 1: Set cache TTL using a Long value
    config.put(SchemaResolverProperties.CACHE_TTL_MS, 5000L);
    
    // Example 2: Set cache TTL using a String value
    config.put(SchemaResolverProperties.CACHE_TTL_MS, "5000");
    
    // Example 3: Set cache TTL using a Duration object
    config.put(SchemaResolverProperties.CACHE_TTL_MS, Duration.ofSeconds(5));
    
    // Example 4: Disable caching completely
    config.put(SchemaResolverProperties.CACHE_TTL_MS, 0L);
  • SchemaResolverProperties.USE_CACHED_ON_ERROR—Controls whether to use cached schemas when schema registry lookup errors occur. When enabled, schema resolution uses cached schemas instead of throwing exceptions after retry attempts are exhausted, improving resilience during registry outages. When disabled, the default, the API throws an exception when registry lookup errors occur. The following code snippet shows how to configure this property:
    // Use cached schemas when registry lookups fail (resilient mode)
    config.put(SchemaResolverProperties.USE_CACHED_ON_ERROR, true);
    
    // Throw exceptions when registry lookups fail (strict mode, default value)
    config.put(SchemaResolverProperties.USE_CACHED_ON_ERROR, false);
  • SchemaResolverProperties.CACHE_LATEST(Serializers Only)—Controls whether schema lookups that specify latest or do not include an explicit version (no-version) create additional cache entries, so that future "latest" or versionless lookups can be resolved using the cache instead of querying the registry again. When enabled, the default behavior allows latest and versionless (no-version) schema lookups to create additional cache entries, enabling subsequent latest or no-version lookups to use the cached schema without contacting the registry. When you disable this property, only the resolved version is cached, meaning every latest or versionless lookup must query the registry to determine the current latest version.
    // Example 1: Enable caching of 'latest' version lookups (default behavior)
    config.put(SchemaResolverProperties.CACHE_LATEST, true);
    
    // Example 2: Disable caching of 'latest' version lookups
    // When disabled, only the resolved version is cached, requiring subsequent
    // latest/no-version lookups to be fetched from the registry
    config.put(SchemaResolverProperties.CACHE_LATEST, false);
    • When an explicit schema version is specified, the CACHE_LATEST setting has no effect on schema lookup results.
    • CACHE_LATEST only affects caching for serialization operations. It does not apply to schema references, which are always resolved through direct registry lookups, bypassing the cache.

Schema Lookup Options

When you use the SERDES Collection with Solace Schema Registry, the serializer must determine which schema to use when writing data. You can configure schema lookup options to control how the appropriate schema is resolved from the registry. The following schema lookup options are available:

  • SchemaResolverProperties.FIND_LATEST_ARTIFACT—A boolean flag that determines whether your serializer should attempt to locate the most recent artifact in the registry for the given groupId and artifactId combination configured in the configuration property map. The default value is false.
    config.put(SchemaResolverProperties.FIND_LATEST_ARTIFACT, true);
  • SchemaResolverProperties.EXPLICIT_ARTIFACT_VERSION—A string value that represents the artifact version used for querying an artifact in the registry. This property overrides the version returned by the ArtifactReferenceResolverStrategy. If this property and FIND_LATEST_ARTIFACT are both set, this property takes precedence. The default value is null.
    config.put(SchemaResolverProperties.EXPLICIT_ARTIFACT_VERSION, "1.0.0");
  • SchemaResolverProperties.EXPLICIT_ARTIFACT_ARTIFACT_ID—A string value that represents the artifactId used for querying an artifact in the registry. This property overrides the artifactId returned by the ArtifactReferenceResolverStrategy. The default value is null.
    config.put(SchemaResolverProperties.EXPLICIT_ARTIFACT_ARTIFACT_ID, "my-schema");
  • SchemaResolverProperties.EXPLICIT_ARTIFACT_GROUP_ID—A string value that represents the groupId used for querying an artifact in the registry. This property overrides the groupId returned by the ArtifactReferenceResolverStrategy. The default value is null.
    config.put(SchemaResolverProperties.EXPLICIT_ARTIFACT_GROUP_ID, "com.example");
  • SchemaResolverProperties.REQUEST_ATTEMPTS—Specifies the number of attempts to make when communicating with the schema registry before giving up. Valid values are any number between 1 and Long.MAX_VALUE. When used with USE_CACHED_ON_ERROR, this property specifies the number of attempts before falling back to the last cached value. The default value is 3.
    config.put(SchemaResolverProperties.REQUEST_ATTEMPTS, 5);
  • SchemaResolverProperties.REQUEST_ATTEMPT_BACKOFF_MS—Specifies the backoff time in milliseconds between retry attempts when communicating with the schema registry. Valid values are any number between 0 and Long.MAX_VALUE. You can set this value as a Long, string or a Duration object. The default value is 500 milliseconds.
    // Use a long value
    config.put(SchemaResolverProperties.REQUEST_ATTEMPT_BACKOFF_MS, 500L);
    
    // Use a string
    config.put(SchemaResolverProperties.REQUEST_ATTEMPT_BACKOFF_MS, "500");
    
    // Use a Duration object
    config.put(SchemaResolverProperties.REQUEST_ATTEMPT_BACKOFF_MS, Duration.ofMillis(500));

Auto-Registration Configuration

Auto-registration allows serializers to automatically register schemas in Solace Schema Registry when they don't exist during serialization operations. This feature simplifies schema management by eliminating the need to manually register schemas before using them in your applications. When auto-registration is enabled, the serializer will attempt to register the schema if it cannot find a matching schema in the registry for the given artifact reference.

Auto-registration is particularly useful in development environments where schemas are frequently updated, or in scenarios where you want to streamline the deployment process by allowing applications to self-register their schemas. However, in production environments, you may want to disable auto-registration to maintain strict control over schema evolution and prevent unauthorized schema modifications.

  • SchemaResolverProperties.AUTO_REGISTER_ARTIFACT—A boolean flag that controls whether schemas should be automatically registered when they don't exist in the registry during serialization. When set to true, the serializer will attempt to register the schema if it's not found in the registry. When set to false, the default, the serializer will throw an exception if the schema is not found. This property only affects serialization operations.
    // Enable auto-registration of schemas. This property is false by default.
    config.put(SchemaResolverProperties.AUTO_REGISTER_ARTIFACT, true);  
    
  • SchemaResolverProperties.AUTO_REGISTER_ARTIFACT_IF_EXISTS—Controls the behavior when auto-registering artifacts that might already exist in the registry. This property works in conjunction with AUTO_REGISTER_ARTIFACT and only takes effect when auto-registration is enabled. The available options are:
    • IfArtifactExists.CREATE_VERSION—If a schema with the same content already exists in the registry, create a new version of that schema. This is useful when you want to maintain version history even for identical schemas.
      config.put(SchemaResolverProperties.AUTO_REGISTER_ARTIFACT, true);
      
      // Set behavior to create a new version when artifact exists
      config.put(SchemaResolverProperties.AUTO_REGISTER_ARTIFACT_IF_EXISTS,
      SchemaResolverProperties.IfArtifactExists.CREATE_VERSION);
      
    • IfArtifactExists.FAIL—If a schema with the same name already exists in the registry, the registration will fail and throw a SerializationException. This provides strict control over schema registration.
      config.put(SchemaResolverProperties.AUTO_REGISTER_ARTIFACT, true);
      
      // Set behavior to fail when artifact exists
      config.put(SchemaResolverProperties.AUTO_REGISTER_ARTIFACT_IF_EXISTS,
      SchemaResolverProperties.IfArtifactExists.FAIL);
      
    • IfArtifactExists.FIND_OR_CREATE_VERSION—If a schema with the same content already exists in the registry, use the existing schema. Otherwise, create a new version. This is the default behavior and provides the most flexible approach to schema management.
      config.put(SchemaResolverProperties.AUTO_REGISTER_ARTIFACT, true);
      
      // Set behavior to find existing or create new version (default)
      config.put(SchemaResolverProperties.AUTO_REGISTER_ARTIFACT_IF_EXISTS,
      SchemaResolverProperties.IfArtifactExists.FIND_OR_CREATE_VERSION);
      

JSON Schema Auto-Registration

When using auto-registration with JSON Schema serialization, you must also specify the schema location using SchemaResolverProperties.SCHEMA_LOCATION to provide the classpath resource path to the schema file. Unlike Avro schemas, which embed schema information in the serialized data, JSON Schema requires an explicit schema file reference for auto-registration to work properly.

The following example shows you how to configure the schema location:

// Enable auto-registration
config.put(SchemaResolverProperties.AUTO_REGISTER_ARTIFACT, true);

// Specify the classpath resource path to the schema to be uploaded by auto-register
config.put(SchemaResolverProperties.SCHEMA_LOCATION, "json-schema/user.json");

Important considerations when using auto-registration:

  • Auto-registration only affects serialization operations. Deserializers always use the schema ID from the message to fetch the corresponding schema from the registry.
  • In production environments, consider disabling auto-registration to maintain strict control over schema evolution and prevent unauthorized schema modifications.
  • When using auto-registration with schema references, ensure that all referenced schemas are also available in the registry or can be auto-registered.
  • Auto-registration works with all artifact resolver strategies.

SERDES Header Configuration

The SCHEMA_HEADER_IDENTIFIERS property allows you to select different SERDES header formats for schema identification in message headers. This configuration involves a tradeoff between efficiency and interoperability, enabling you to optimize your messaging system based on your specific requirements and integration scenarios.

When a message producer sends a message, the serializer component encodes the data according to a registered schema and includes a schema identifier in the message headers. The format of this schema identifier can be configured to balance performance optimization with cross-protocol compatibility needs.

The SCHEMA_HEADER_IDENTIFIERS property accepts values from the SchemaHeaderId enum, which provides two header format options:

  • SchemaHeaderId.SCHEMA_ID(default)—Efficient long header configuration optimized for performance.
  • SchemaHeaderId.SCHEMA_ID_STRINGString header configuration for optimal interoperability across protocols.

Efficiency vs. Interoperability Considerations

The choice between header formats represents a tradeoff:

  • Efficiency—The default SCHEMA_ID option provides the best performance and efficiency by using a compact 8-byte long value for schema identification. This binary format minimizes header overhead and processing time, making it ideal for high-throughput Solace-to-Solace messaging scenarios.
  • Interoperability—The SCHEMA_ID_STRING option ensures optimal compatibility between different messaging protocols by using human-readable string values. While this introduces some additional header processing overhead, it enables message translation and consumption across different messaging protocol environments. REST messaging serves as a specific example where string-only headers enable easier interoperability between protocols. When messages need to be consumed by REST endpoints or translated between different messaging systems, string-based schema identifiers provide better compatibility and easier debugging capabilities.

Configuration Examples

The following example shows how to configure the SCHEMA_HEADER_IDENTIFIERS property:

// Use SCHEMA_ID, the default, for maximum efficiency
config.put(SerdeProperties.SCHEMA_HEADER_IDENTIFIERS, SchemaHeaderId.SCHEMA_ID);

// Use SCHEMA_ID_STRING for optimal interoperability
config.put(SerdeProperties.SCHEMA_HEADER_IDENTIFIERS, SchemaHeaderId.SCHEMA_ID_STRING);

Important considerations when configuring serde headers:

  • The SCHEMA_HEADER_IDENTIFIERS property only affects serialization operations. Deserializers automatically detect and handle both header formats.
  • Both header formats reference the same schema content; only the identifier representation differs.
  • You can change the header format configuration without affecting existing schemas in your registry.

Avro SERDES Configuration

Apache Avro is a data serialization system that provides a compact binary data format and schema evolution capabilities. Avro serialization allows you to encode and decode complex data structures efficiently—encoding before publishing them as messages and decoding when consuming messages. This ensures consistent data formatting across your messaging applications and enables schema evolution as your application requirements change.

This section covers Avro-specific configuration properties, for common SERDES configuration properties, including setting the required REGISTRY_URL property, see Common SERDES Configuration.

The following Java imports are required for implementing Avro serialization and deserialization using the SERDES Collection:

import com.solace.serdes.avro.AvroSerializer;
import com.solace.serdes.avro.AvroDeserializer;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParser;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecord;
import java.util.HashMap;
import java.util.Map;

You can set all SERDES properties in a HashMap object with the put() method:

Map<String, Object> config = new HashMap<>();
// set all your configuration parameters with config.put()

Dereferencing Schemas

The DEREFERENCED_SCHEMA property controls how serializers handle Avro schemas that contain references to other schemas. In Avro schema design, there are two primary approaches to managing complex schemas:

  • Referenced schemas—These schemas are structured to reference or include other shared schema components, enabling a modular and reusable design. Common types can be defined once and reused across multiple schemas. Referenced schemas are reusable and easy to maintain.
  • Dereferenced schemas—These schemas, also called flat schemas, have all references expanded inline, creating a single, self-contained definitions with no external dependencies. Dereferenced schemas simplify consumption by eliminating the need to resolve multiple schema references at runtime.

When set to true (the default), the DEREFERENCED_SCHEMA property tells the serializer to treat the schema in the record as fully dereferenced and use it directly for registry lookups. The serializer does not attempt to extract or manage referenced sub-schemas, which simplifies schema handling in your application.

Important considerations when using the DEREFERENCED_SCHEMA property:

  • If FIND_LATEST_ARTIFACT is set to true, the DEREFERENCED_SCHEMA property is ignored.
  • This property only affects serializers, because deserializers do not embed schema information, they use the schema ID to fetch the corresponding schema from the registry.
  • When DEREFERENCED_SCHEMA is enabled, all schemas in your registry must be stored in their dereferenced form, as referenced schemas cannot be properly resolved.

This configuration is useful in scenarios where you want to:

  • simplify schema handling in your application by avoiding schema reference resolution.
  • optimize performance by reducing the number of schema registry lookups.

When DEREFERENCED_SCHEMA is set to false, the serializer will treat the provided schema as one that needs to be decomposed into reference schemas. This allows applications to use one Avro schema file to represent multiple Avro schemas in the registry, which can reduce storage of common sub-schemas in the registry.

Here's how to configure a serializer to decompose schemas into references by setting DEREFERENCED_SCHEMA to false:

config.put(AvroProperties.DEREFERENCED_SCHEMA, false);

Avro Encoding Types

The ENCODING_TYPE property allows you to specify the encoding format used by the AvroSerializer to convert data to bytes. Avro supports two primary encoding formats, each with different characteristics and use cases:

  • AvroEncoding.BINARY—Uses Avro's DirectBinaryEncoder to produce a compact binary representation of the data. This is the default encoding and provides the most efficient serialization in terms of message size and processing performance.
  • AvroEncoding.JSON—Uses Avro's JsonEncoder to produce a human-readable JSON representation of the data. While less efficient than binary encoding, this format is useful for debugging, logging, and interoperability with systems that expect JSON.

The encoding type affects only the wire format of the serialized data; it does not change how the schema is resolved or how the data is structured. Both encoding types maintain full compatibility with the Avro schema system.

The encoding type is set as an Avro SERDES message header. This impacts the AvroDeserializer, which reads the assigned header value and attempts to use the appropriate decoder. If no header is present, the AvroDeserializer assumes the message was encoded with binary and attempts to decode it accordingly.

Here's how to configure a serializer to use JSON encoding:

config.put(AvroProperties.ENCODING_TYPE, AvroEncoding.JSON);

When choosing an encoding type, consider these factors:

  • Use BINARY encoding when optimizing for performance, bandwidth efficiency, or message throughput.
  • Use JSON encoding when debugging applications or when human readability is important.

If the ENCODING_TYPE property is not specified, the serializer defaults to BINARY encoding.

Record ID Artifact Resolver Strategy

The RecordIdStrategy automatically determines which schema to use during serialization by extracting information directly from the record's payload. It uses the schema name as the artifactId and the schema namespace as the groupId to construct the complete ArtifactReference. This approach eliminates the need for explicit schema specification on a per message basis because it creates a direct mapping between the data structure of your records and schema artifacts stored in the registry. For successful implementation, Solace Schema Registry must contain schemas with an artifactId and a groupId that corresponds to the schema name and namespace used in your application's data models. The strategy is valuable in systems where the schema information is inherently contained within the record structure, as it reduces configuration overhead and ensures that the correct schema is always used for each record type.

In summary, the RecordIdStrategy:

  • Extracts the schema from the record's payload.
  • Uses the schema name as the artifactId in the ArtifactReference.
  • Uses the schema namespace as the groupId in the ArtifactReference.
  • Uses the constructed ArtifactReference to locate the correct schema in Solace Schema Registry.
This strategy requires that schemas be registered in Solace Schema Registry with artifactIds that match the schema names and groupIds that match the schema namespaces used in your application.

Here's how to configure a serializer to use the RecordIdStrategy:

config.put(SchemaResolverProperties.ARTIFACT_RESOLVER_STRATEGY, RecordIdStrategy.class);

JSON Schema SERDES Configuration

JSON Schema is a vocabulary that defines how to annotate and validate JSON documents. JSON Schema serialization allows you to encode and decode structured JSON data efficiently—encoding before publishing them as messages and decoding when consuming messages. This ensures consistent data formatting across your messaging applications and enables schema evolution as your application requirements change.

This section covers JSON Schema-specific configuration properties. For common SERDES configuration properties, including setting the required REGISTRY_URL property, see Common SERDES Configuration.

The following Java imports are required for implementing JSON Schema serialization and deserialization using the SERDES Collection:

import com.solace.serdes.jsonschema.JsonSchemaSerializer;
import com.solace.serdes.jsonschema.JsonSchemaDeserializer;
import com.solace.serdes.jsonschema.JsonSchemaProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;

You can set all SERDES properties in a HashMap object with the put() method:

Map<String, Object> config = new HashMap<>();
// set all your configuration parameters with config.put()

JSON Schema Validation

The VALIDATE_SCHEMA property controls whether JSON schema validation is performed during serialization and deserialization operations. When enabled, the serializer and deserializer will validate JSON data against its registered schema to ensure data conformity and catch schema violations early in the processing pipeline.

Schema validation provides several benefits:

  • Data integrity—Ensures that JSON data conforms to the expected schema structure before processing.
  • Early error detection—Catches schema violations during serialization/deserialization rather than downstream processing.
  • Consistency enforcement—Guarantees that all messages follow the defined data contract.

However, enabling validation does introduce some performance overhead, so you may want to disable it in performance-critical scenarios where data integrity is guaranteed by other means.

The VALIDATE_SCHEMA property accepts a boolean value:

  • true (default)—Enables JSON schema validation during serialization and deserialization operations.
  • false—Disables JSON schema validation, improving performance but removing data integrity checks.

Here's how to configure JSON schema validation:

// Enable JSON schema validation (default behavior)
config.put(JsonSchemaProperties.VALIDATE_SCHEMA, true);

// Disable JSON schema validation for improved performance
config.put(JsonSchemaProperties.VALIDATE_SCHEMA, false);

Important considerations when configuring schema validation:

  • Validation is enabled by default to ensure data integrity in most use cases.
  • Disabling validation can improve performance but removes an important data quality safeguard.
  • Consider your application's requirements for data integrity versus performance when configuring this property.

Java Type Property Configuration

The TYPE_PROPERTY configuration specifies the JSON schema property name that contains the Java type class path for deserialization. The JsonSchemaDeserializer uses this property to determine the target Java class when converting JSON data back to Java objects, enabling proper type reconstruction during the deserialization process.

This property is particularly important when:

  • Your JSON schema includes type information for Java object reconstruction.
  • You need to deserialize JSON data into specific Java classes rather than generic JSON objects.
  • Your application uses polymorphic types that require runtime type resolution.

The TYPE_PROPERTY accepts a string value that specifies the property name in your JSON schema:

  • Default value: "javaType"—The deserializer will look for a property named "javaType" in the JSON schema.
  • Custom value: Any valid JSON property name that contains the Java class path information.

Here's how to configure the Java type property:

// Use the default "javaType" property name
config.put(JsonSchemaProperties.TYPE_PROPERTY, "javaType");

// Use a custom property name for Java type information
config.put(JsonSchemaProperties.TYPE_PROPERTY, "className");

// Use another custom property name
config.put(JsonSchemaProperties.TYPE_PROPERTY, "targetClass");

The following example shows a JSON schema that includes the javaType property. When the JsonSchemaDeserializer processes JSON data conforming to this schema, it will use the javaType value (com.example.User) to instantiate the correct Java class during deserialization:

{
  "$schema": "https://json-schema.org/draft-07/schema",
  "type": "object",
  "javaType": "com.example.User",
  "properties": {
    "name": {
      "type": "string"
    },
    "email": {
      "type": "string",
      "format": "email"
    },
    "age": {
      "type": "integer",
      "minimum": 0
    }
  },
  "required": ["name", "email"]
}

Important considerations when configuring the type property:

  • The property name must exist in your JSON schema and contain a valid Java class path.
  • The specified Java class must be available on the classpath during deserialization.
  • If the type property is not found, deserialization falls back to a generic JsonNode. If the property is present but contains an invalid class path, deserialization throws an exception.

Serializing and Deserializing Messages with Avro over AMQP

The following examples demonstrate how to implement Avro serialization and deserialization in AMQP messaging applications. These examples use Apache Qpid JMS 2.0 as the AMQP client implementation, but the core concepts can be applied to other AMQP 1.0 client libraries with appropriate adjustments for their specific APIs.

Publishing Avro Messages over AMQP (Apache Qpid JMS Example)

When publishing Avro messages over AMQP using Apache Qpid JMS, you create an AvroSerializer, configure it with the appropriate SERDES properties, and use it to serialize message payloads before sending them via JMS APIs. The same serialization principles would apply with other AMQP clients, though the specific API calls would differ.

Key configuration for Avro AMQP publishing:

  • Disable JMS property name validation on the connection factory
  • Set appropriate encoding type (BINARY or JSON)
  • Use BytesMessage for optimal performance and compatibility

Example configuration for an Avro AMQP publisher:

// Configure the connection factory with property validation disabled
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(username, password, brokerUrl);
connectionFactory.setValidatePropertyNames(false);

// Configure the Avro serializer with schema registry connection details
Map<String, Object> config = new HashMap<>();
config.put(SchemaResolverProperties.REGISTRY_URL, REGISTRY_URL);
config.put(SchemaResolverProperties.AUTH_USERNAME, REGISTRY_USERNAME);
config.put(SchemaResolverProperties.AUTH_PASSWORD, REGISTRY_PASSWORD);
// Set encoding type - BINARY for compact payload, JSON for human-readable
config.put(AvroProperties.ENCODING_TYPE, AvroProperties.AvroEncoding.BINARY);

Example serialization and message publishing:

// Create a User object to serialize
User user = new User("John Doe", "123", "support@solace.com");
Map<String, Object> headers = new HashMap<>();

// Serialize the object - this populates the headers map with schema information
byte[] payloadBytes = serializer.serialize(topic, user, headers);

// Create a BytesMessage and set the payload
BytesMessage message = context.createBytesMessage();
message.writeBytes(payloadBytes);

// Set the schema ID and other headers as JMS application properties
for (Map.Entry<String, Object> entry : headers.entrySet()) {
    message.setObjectProperty(entry.getKey(), entry.getValue());
}

// Send the message
producer.send(topic, message);

For a complete example, see AvroPublisher.java in the background files provided, which demonstrates the pattern adapted for AMQP messaging.

Consuming Avro Messages over AMQP (Apache Qpid JMS Example)

When consuming Avro messages over AMQP using Apache Qpid JMS, you create a JMS consumer that receives messages and uses an AvroDeserializer to reconstruct the original message payload from the SERDES headers and payload. With other AMQP clients, you would follow similar principles but use the client-specific APIs for message consumption.

Key aspects of Avro AMQP consumption:

  • Extract SERDES headers from JMS application properties
  • Handle both BytesMessage and TextMessage objects
  • Use appropriate error handling for deserialization failures

Example SERDES header extraction and deserialization:

// Create a durable consumer (similar to DurableTopicSubscriber example)
try (JMSContext context = connectionFactory.createContext()) {
    context.setClientID("MyAMQPClient");
    Topic topic = context.createTopic("solace/samples/avro");
    
    // Create durable consumer
    JMSConsumer consumer = context.createDurableConsumer(topic, "MySubscription");
    
    // Receive and process messages
    while (true) {
        Message message = consumer.receive(1000);
        if (message == null) continue;
        
        // Extract SERDES headers from JMS properties
        Map<String, Object> serdesHeaders = new HashMap<>();
        Enumeration<?> propertyNames = message.getPropertyNames();
        while (propertyNames.hasMoreElements()) {
            String propertyName = (String) propertyNames.nextElement();
            Object propertyValue = message.getObjectProperty(propertyName);
            serdesHeaders.put(propertyName, propertyValue);
        }
        
        // Extract message payload
        byte[] messagePayload;
        if (message instanceof BytesMessage) {
            BytesMessage bytesMessage = (BytesMessage) message;
            messagePayload = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(messagePayload);
        } else if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            messagePayload = textMessage.getText().getBytes(StandardCharsets.UTF_8);
        } else {
            throw new IllegalArgumentException("Unsupported message type: " + message.getClass());
        }
        
        // Deserialize the message
        try {
            GenericRecord record = deserializer.deserialize(topic.getTopicName(), messagePayload, serdesHeaders);
            System.out.printf("Received SERDES message: %s%n", record.toString());
        } catch (SerializationException se) {
            // Handle serialization/deserialization errors
            System.err.println("Serialization error: " + se.getMessage());
        }
    }
}

Serializing and Deserializing Messages with JSON Schema over AMQP

JSON Schema serialization and deserialization over AMQP follows similar patterns to Avro, but uses JsonSchemaSerializer and JsonSchemaDeserializer classes with JSON-specific configuration options. As with the Avro examples, the following code demonstrates implementation using Apache Qpid JMS 2.0, though the concepts apply to any AMQP 1.0 compliant client.

Publishing JSON Schema Messages over AMQP (Apache Qpid JMS Example)

When publishing JSON Schema messages over AMQP using Apache Qpid JMS, you create a JsonSchemaSerializer, configure it with appropriate validation settings, and serialize JSON data structures before transmission. The serialization process would be similar with other AMQP clients, though the specific API calls for message creation and transmission would differ.

Example configuration for a JSON Schema AMQP publisher:

// Configure the connection factory with property validation disabled
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(username, password, brokerUrl);
connectionFactory.setValidatePropertyNames(false);

// Configure the JSON Schema serializer with validation enabled
Map<String, Object> config = new HashMap<>();
config.put(SchemaResolverProperties.REGISTRY_URL, REGISTRY_URL);
config.put(SchemaResolverProperties.AUTH_USERNAME, REGISTRY_USERNAME);
config.put(SchemaResolverProperties.AUTH_PASSWORD, REGISTRY_PASSWORD);
// Enable schema validation during serialization (recommended for data quality)
config.put(JsonSchemaProperties.VALIDATE_SCHEMA, true);

Example JSON object creation and serialization:

// Create a JSON object using Jackson ObjectMapper (as shown in JSONSchemasPublisher example)
ObjectNode user = new ObjectMapper().createObjectNode();
user.put("name", "John Doe");
user.put("id", "123");
user.put("email", "support@solace.com");

// Serialize the JSON object - headers will be populated with schema information
Map<String, Object> headers = new HashMap<>();
byte[] payloadBytes = serializer.serialize(topic, user, headers);

// Create a BytesMessage and set the payload
BytesMessage message = context.createBytesMessage();
message.writeBytes(payloadBytes);

// Set the schema ID and other headers as JMS application properties
for (Map.Entry<String, Object> entry : headers.entrySet()) {
    message.setObjectProperty(entry.getKey(), entry.getValue());
}

// Send the message
producer.send(topic, message);

For a complete example, see JsonSchemaPublisher.java on the Solace Samples Repository.

Consuming JSON Schema Messages over AMQP (Apache Qpid JMS Example)

JSON Schema AMQP consumption with Apache Qpid JMS includes additional error handling for schema validation failures, which can help identify data quality issues in incoming messages. The same validation principles would apply with other AMQP clients, though the message consumption APIs would differ.

Example configuration for a JSON Schema AMQP consumer:

// Configure the JSON Schema deserializer for message consumption
Map<String, Object> config = new HashMap<>();
config.put(SchemaResolverProperties.REGISTRY_URL, REGISTRY_URL);
config.put(SchemaResolverProperties.AUTH_USERNAME, REGISTRY_USERNAME);
config.put(SchemaResolverProperties.AUTH_PASSWORD, REGISTRY_PASSWORD);
// Enable validation to ensure incoming messages conform to schema
config.put(JsonSchemaProperties.VALIDATE_SCHEMA, true);

Example error handling with message type support:

try {
    // Extract SERDES headers from JMS properties
    Map<String, Object> serdesHeaders = new HashMap<>();
    Enumeration<?> propertyNames = message.getPropertyNames();
    while (propertyNames.hasMoreElements()) {
        String propertyName = (String) propertyNames.nextElement();
        Object propertyValue = message.getObjectProperty(propertyName);
        serdesHeaders.put(propertyName, propertyValue);
    }
    
    // Extract message payload based on message type
    byte[] messagePayload;
    if (message instanceof BytesMessage) {
        BytesMessage bytesMessage = (BytesMessage) message;
        messagePayload = new byte[(int) bytesMessage.getBodyLength()];
        bytesMessage.readBytes(messagePayload);
    } else if (message instanceof TextMessage) {
        TextMessage textMessage = (TextMessage) message;
        messagePayload = textMessage.getText().getBytes(StandardCharsets.UTF_8);
    } else {
        throw new IllegalArgumentException("Unsupported message type: " + message.getClass());
    }
    
    // Deserialize the message
    JsonNode jsonNode = deserializer.deserialize(topic.getTopicName(), messagePayload, serdesHeaders);
    System.out.println("Deserialized AMQP message: " + jsonNode);
    
} catch (JsonSchemaValidationException ve) {
    // Schema validation failed
    System.err.println("Schema validation error: " + ve.getMessage());
} catch (SerializationException se) {
    // General serialization/deserialization error
    System.err.println("Serialization error: " + se.getMessage());
} catch (Exception e) {
    // Other unexpected errors
    System.err.println("Unexpected error: " + e.getMessage());
    e.printStackTrace();
}

For a complete example, see JsonSchemaConsumer.java on the Solace Samples Repository.

Troubleshooting Cross-Protocol SERDES

When working with mixed AMQP, REST, and Solace JCSMP API environments using SERDES, you may encounter specific issues related to message type handling, property validation, and cross-protocol compatibility. This section provides guidance for common troubleshooting scenarios, with a focus on Apache Qpid JMS implementation but including general AMQP considerations as well.

Common Issues and Solutions

  • General AMQP Considerations—Ensure your AMQP client properly handles application properties, as these are used to transmit schema information. Different AMQP clients may have different APIs for setting and retrieving these properties.
  • Apache Qpid JMS Specific: Property Name Validation Errors—If you receive exceptions about invalid property names containing dots (.) or hyphens (-), ensure that you have disabled property name validation on the Apache Qpid JMS connection factory using connectionFactory.setValidatePropertyNames(false). This is required for SERDES headers to be transmitted properly. On the consumer side, if property name validation is not disabled, headers are silently dropped, which causes issues with the deserializer because schema information is missing.
  • Missing SERDES Headers—With Apache Qpid JMS, verify that SERDES headers are being set as JMS application properties using message.setObjectProperty() and that the connection factory has property validation disabled. Check that headers are being extracted properly using message.getPropertyNames() and message.getObjectProperty(). With other AMQP clients, ensure you're properly setting and retrieving application properties according to that client's API.
  • Cross-Protocol Header Mapping Issues—Ensure that AMQP application properties are being mapped correctly to other protocol headers:
    • For AMQP to REST: Application properties should appear as HTTP headers with solace-user-property- prefix.
    • For AMQP to Solace JCSMP API: Application properties should appear as SMF user properties.
    • Verify that the broker is configured to preserve user properties during message translation.
  • Message Type Compatibility—Ensure that your AMQP consumers can handle both BytesMessage and TextMessage objects, as cross-protocol scenarios may result in different message types depending on the publishing protocol and content type.
  • Jakarta vs Javax JMS APIs—Verify that you are using the correct JMS API imports. Apache Qpid JMS 2.0 uses Jakarta JMS APIs (jakarta.jms.*), not the legacy javax APIs (javax.jms.*). Mixing these APIs results in compilation or runtime errors.