Leveraging AMQP Messaging with Solace Schema Registry: Serializing and Deserializing Messages in Java

The SERDES Collection provides Java libraries for efficient serialization and deserialization of structured message payloads over AMQP 1.0. These libraries (Solace Avro SERDES for Java, Solace JSON Schema SERDES for Java, and Generic SERDES for Java) enable applications to convert complex data structures into compact, transmittable formats and reconstruct them on receipt. SERDES transmits schema information through AMQP application properties rather than embedding it in the message payload, preserving schema identifiers during transmission and supporting both binary and text message formats. The examples on this page use Apache Qpid JMS 2.0, however the concepts apply to any AMQP 1.0 client library with appropriate adjustments.

Prerequisites

Before implementing SERDES with AMQP messaging, you should understand the language-agnostic concepts and configuration options covered in Serialization and Deserialization with Solace Schema Registry. Key topics include:

  • Connecting to Schema Registry (authentication and security)
  • Choosing schema resolution strategies (Destination ID, Topic ID, Topic ID with Profiles, Record ID)
  • Optimizing performance (caching and lookup options)
  • Advanced configuration (auto-registration, header formats, Avro and JSON Schema-specific settings)
  • Cross-protocol message translation

This page focuses on AMQP-specific implementation details, including Java imports, Apache Qpid JMS configuration, and code examples for serializing and deserializing messages over AMQP 1.0.

Deploying with Maven

To use the SERDES Collection with your AMQP applications, you need to include the appropriate Maven dependencies in your project. The SERDES libraries are distributed as separate artifacts that you can include based on your serialization format requirements.

The following sections show Maven dependencies for the Apache Qpid JMS 2.0 implementation of AMQP. If you're using a different AMQP client library, you would need to include the appropriate dependencies for that implementation instead of or in addition to the Qpid JMS client shown here.

The following sections show you how to configure your Maven project to include the necessary dependencies for schema registry integration:

SERDES BOM Dependencies (Recommended)

The SERDES Bill of Materials (BOM) provides a centralized way to manage compatible versions of all Solace Schema Registry SERDES components. Using the BOM ensures that all SERDES dependencies are aligned with tested and compatible versions. This simplifies dependency management and reduces version conflicts in your applications.

The BOM includes version management for:

  • Avro SERDES components
  • JSON Schema SERDES components
  • Common SERDES libraries
  • Compatible versions of underlying dependencies

To use the SERDES BOM, add it to your Maven pom.xml file in the dependencyManagement section:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.solace</groupId>
            <artifactId>solace-schema-registry-serdes-bom</artifactId>
            <version>1.0.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

After importing the BOM, you can add SERDES dependencies without specifying versions:

<dependencies>
    <!-- Avro SERDES (version managed by BOM) -->
    <dependency>
        <groupId>com.solace</groupId>
        <artifactId>solace-schema-registry-avro-serde</artifactId>
    </dependency>
    
    <!-- JSON Schema SERDES (version managed by BOM) -->
    <dependency>
        <groupId>com.solace</groupId>
        <artifactId>solace-schema-registry-jsonschema-serde</artifactId>
    </dependency>
</dependencies>

Avro Serializer Dependencies

The Avro serializer provides a specific implementation for Apache Avro serialization and deserialization. This dependency includes the AvroSerializer, AvroDeserializer, and Avro-specific configuration properties, and also brings in the required common SERDES components used across all schema formats.

Add the following dependency to your Maven pom.xml file:

<dependencies>
    <dependency>
        <groupId>com.solace</groupId>
        <artifactId>solace-schema-registry-avro-serde</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

For the latest version information and additional details about the Avro SERDES artifact, see the Maven Central Repository.

Important considerations when adding Maven dependencies:

  • Always use the latest compatible versions of the SERDES libraries for optimal performance and security.
  • The Avro SERDES dependency automatically includes the necessary Apache Avro libraries as transitive dependencies.

JSON Schema Serializer Dependencies

The JSON Schema serializer provides a specific implementation for JSON Schema serialization and deserialization. This dependency includes the JsonSchemaSerializer, JsonSchemaDeserializer, and JSON Schema-specific configuration properties, and also brings in the required common SERDES components used across all schema formats.

Add the following dependency to your Maven pom.xml file:

<dependencies>
    <dependency>
        <groupId>com.solace</groupId>
        <artifactId>solace-schema-registry-jsonschema-serde</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

For the latest version information and additional details about the JSON Schema SERDES artifact, see the Maven Central Repository.

Important considerations when adding Maven dependencies:

  • Always use the latest compatible versions of the SERDES libraries for optimal performance and security.
  • The JSON Schema SERDES dependency automatically includes the necessary JSON Schema validation libraries as transitive dependencies.

AMQP Client Dependencies (Apache Qpid JMS Example)

For AMQP messaging with SERDES using Apache Qpid JMS 2.0, you need to include the appropriate client library. This client provides Jakarta JMS 2.0 API support and AMQP 1.0 protocol implementation. The client must be configured to disable JMS property name validation to allow schema registry headers, which contain characters not permitted by default JMS property naming rules. Other AMQP client libraries would have their own dependency requirements and configuration approaches.

Add the following dependency to your Maven pom.xml file:

<dependencies>
    <!-- Apache Qpid JMS 2.0 Client -->
    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-jms-client</artifactId>
        <version>2.9.0</version>
    </dependency>
</dependencies>

Important considerations for AMQP client configuration:

  • Apache Qpid JMS 2.0 uses Jakarta JMS APIs (not javax.jms). Ensure your imports use jakarta.jms.* packages.
  • You must disable JMS property name validation on the connection factory to allow schema registry headers containing dots (.) and hyphens (-).
  • SERDES headers are transmitted as AMQP application properties, which are mapped to JMS message properties by the client.

Using SERDES with AMQP Messaging

AMQP messaging provides a standards-based approach to message publishing and consumption that integrates with the SERDES Collection. When using SERDES with AMQP messaging, schema information is transmitted through AMQP application properties rather than being embedded in the message payload, enabling efficient schema resolution and cross-protocol compatibility with REST and Solace Messaging API applications.

General AMQP Concepts with SERDES

When implementing SERDES with any AMQP 1.0 client, there are several important protocol-level considerations:

  • Application Properties—SERDES headers are transmitted as AMQP application properties, which ensures schema identifiers and metadata are preserved during message transmission. This is a standard feature of the AMQP 1.0 protocol that all compliant clients support.
  • Message Type Support—AMQP messaging supports both binary and text message formats for SERDES operations. However, the SERDES deserializer only operates on binary payloads. Applications must extract or convert the message content into a binary format—whether from a TextMessage or a BytesMessage—before passing it to the deserializer. For an example of this, see Consuming JSON Schema Messages over AMQP (Apache Qpid JMS Example).
  • Cross-Protocol Compatibility—AMQP applications can seamlessly exchange messages with REST and Solace JCSMP API applications through the Solace broker, which automatically translates message formats and preserves SERDES headers across protocols. For more information on cross-protocol message translation, see AMQP.

Apache Qpid JMS Implementation Considerations

When using Apache Qpid JMS 2.0 as your AMQP client implementation, there are additional specific considerations:

  • JMS Property Name Validation—Apache Qpid JMS enforces strict property name validation by default, which prevents schema registry headers containing dots (.) and hyphens (-) from being transmitted. You must disable property name validation on the connection factory using setValidatePropertyNames(false) to allow SERDES headers to be transmitted properly. Without this setting, SERDES headers will not be properly transmitted and schema resolution will fail.
  • JMS Message Types—When using Apache Qpid JMS, you'll work with JMS message types like BytesMessage and TextMessage. For AMQP with Qpid JMS, the deserializer.deserialize() method can handle both message types after extracting the payload bytes. BytesMessage is recommended for optimal performance and compatibility.
  • Jakarta JMS APIs—Apache Qpid JMS 2.0 uses Jakarta JMS APIs (jakarta.jms.*) rather than the legacy javax APIs. Ensure your imports and dependencies are updated accordingly.
  • Application Properties Mapping—With Apache Qpid JMS, SERDES headers are automatically mapped between AMQP application properties and JMS message properties. For more information on how AMQP application properties map to SMF user properties, see AMQP.

AMQP Connection Configuration (Apache Qpid JMS Example)

When implementing AMQP applications that use SERDES with Apache Qpid JMS, you need to configure the connection factory properly to ensure SERDES headers are transmitted correctly. Other AMQP client libraries would have their own connection configuration approaches, but would still need to ensure that application properties containing schema information are properly handled:

  • Connection Factory Configuration—Create a JmsConnectionFactory with the appropriate broker connection details and disable property name validation.
  • Client ID Configuration—Set a unique client ID on the JMS context when using durable subscriptions or other features that require client identification.
  • Session Configuration—Configure JMS sessions with appropriate acknowledgment modes and transaction settings based on your application requirements.

Example connection factory configuration:

// Create connection factory with property validation disabled
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(username, password, brokerUrl);
connectionFactory.setValidatePropertyNames(false);

// Create JMS context and set client ID if needed
try (JMSContext context = connectionFactory.createContext()) {
    context.setClientID("MyAMQPClient");
    
    // Create topic and configure producers/consumers
    Topic topic = context.createTopic("solace/samples/json");
}

Cross-Protocol Message Flow with SERDES

SERDES supports cross-protocol messaging between AMQP and other messaging protocols. For detailed information about cross-protocol message translation rules and compatibility, see Cross-Protocol Message Translation.

The following examples illustrate the primary cross-protocol flows:

Example 1: AMQP Publisher to REST Consumer

  1. Publisher Side—The AMQP client serializes the message payload using the appropriate SERDES serializer, which adds schema identification headers as AMQP application properties.
  2. Message Transmission—The serialized payload and SERDES headers are transmitted via AMQP 1.0 to the event broker.
  3. Consumer Side—The REST consumer application receives the HTTP request via a REST Delivery Point (RDP), extracts the SERDES headers from the HTTP headers (mapped from AMQP application properties), and uses deserializer.deserialize() to reconstruct the message payload.

Example 2: REST Publisher to AMQP Consumer

  1. Publisher Side—The REST client serializes the message payload using the appropriate SERDES serializer, which adds schema identification headers to the HTTP request.
  2. Message Transmission—The serialized payload and SERDES headers are transmitted via HTTP POST to the event broker REST endpoint.
  3. Consumer Side—The AMQP consumer receives the message, extracts the SERDES headers from the AMQP application properties, retrieves the message payload bytes, and uses deserializer.deserialize() to reconstruct the message payload.

Example 3: AMQP Publisher to Solace Message Format (SMF) Consumer

  1. Publisher Side—The AMQP client serializes the message payload using the appropriate SERDES serializer, which adds schema identification headers as application properties.
  2. Message Transmission—The serialized payload and SERDES headers are transmitted via AMQP 1.0 to the event broker.
  3. Consumer Side—A Solace Messaging API application receives the message, extracts the SERDES headers from the SMF user properties (mapped from AMQP application properties), and uses SerdeMessage.deserialize() to reconstruct the message payload.

Serializing and Deserializing Messages with Avro over AMQP

The following examples demonstrate how to implement Avro serialization and deserialization in AMQP messaging applications. These examples use Apache Qpid JMS 2.0 as the AMQP client implementation, but the core concepts can be applied to other AMQP 1.0 client libraries with appropriate adjustments for their specific APIs.

Publishing Avro Messages over AMQP (Apache Qpid JMS Example)

When publishing Avro messages over AMQP using Apache Qpid JMS, you create an AvroSerializer, configure it with the appropriate SERDES properties, and use it to serialize message payloads before sending them via JMS APIs. The same serialization principles would apply with other AMQP clients, though the specific API calls would differ.

Key configuration for Avro AMQP publishing:

  • Disable JMS property name validation on the connection factory
  • Set appropriate encoding type (BINARY or JSON)
  • Use BytesMessage for optimal performance and compatibility

Example configuration for an Avro AMQP publisher:

// Configure the connection factory with property validation disabled
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(username, password, brokerUrl);
connectionFactory.setValidatePropertyNames(false);

// Configure the Avro serializer with schema registry connection details
Map<String, Object> config = new HashMap<>();
config.put(SchemaResolverProperties.REGISTRY_URL, REGISTRY_URL);
config.put(SchemaResolverProperties.AUTH_USERNAME, REGISTRY_USERNAME);
config.put(SchemaResolverProperties.AUTH_PASSWORD, REGISTRY_PASSWORD);
// Set encoding type - BINARY for compact payload, JSON for human-readable
config.put(AvroProperties.ENCODING_TYPE, AvroProperties.AvroEncoding.BINARY);

For information about configuring the config map with SERDES properties, see Getting Started with SERDES and Avro-Specific Configuration.

Example serialization and message publishing:

// Create a User object to serialize
User user = new User("John Doe", "123", "support@solace.com");
Map<String, Object> headers = new HashMap<>();

// Serialize the object - this populates the headers map with schema information
byte[] payloadBytes = serializer.serialize(topic, user, headers);

// Create a BytesMessage and set the payload
BytesMessage message = context.createBytesMessage();
message.writeBytes(payloadBytes);

// Set the schema ID and other headers as JMS application properties
for (Map.Entry<String, Object> entry : headers.entrySet()) {
    message.setObjectProperty(entry.getKey(), entry.getValue());
}

// Send the message
producer.send(topic, message);

For a complete example, see AvroPublisher.java in the background files provided, which demonstrates the pattern adapted for AMQP messaging.

For a complete list of Avro SERDES properties and methods, see the Java Avro SERDES API Reference.

Consuming Avro Messages over AMQP (Apache Qpid JMS Example)

When consuming Avro messages over AMQP using Apache Qpid JMS, you create a JMS consumer that receives messages and uses an AvroDeserializer to reconstruct the original message payload from the SERDES headers and payload. With other AMQP clients, you would follow similar principles but use the client-specific APIs for message consumption.

Key aspects of Avro AMQP consumption:

  • Extract SERDES headers from JMS application properties
  • Handle both BytesMessage and TextMessage objects
  • Use appropriate error handling for deserialization failures

Example SERDES header extraction and deserialization:

// Create a durable consumer (similar to DurableTopicSubscriber example)
try (JMSContext context = connectionFactory.createContext()) {
    context.setClientID("MyAMQPClient");
    Topic topic = context.createTopic("solace/samples/avro");
    
    // Create durable consumer
    JMSConsumer consumer = context.createDurableConsumer(topic, "MySubscription");
    
    // Receive and process messages
    while (true) {
        Message message = consumer.receive(1000);
        if (message == null) continue;
        
        // Extract SERDES headers from JMS properties
        Map<String, Object> serdesHeaders = new HashMap<>();
        Enumeration<?> propertyNames = message.getPropertyNames();
        while (propertyNames.hasMoreElements()) {
            String propertyName = (String) propertyNames.nextElement();
            Object propertyValue = message.getObjectProperty(propertyName);
            serdesHeaders.put(propertyName, propertyValue);
        }
        
        // Extract message payload
        byte[] messagePayload;
        if (message instanceof BytesMessage) {
            BytesMessage bytesMessage = (BytesMessage) message;
            messagePayload = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(messagePayload);
        } else if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            messagePayload = textMessage.getText().getBytes(StandardCharsets.UTF_8);
        } else {
            throw new IllegalArgumentException("Unsupported message type: " + message.getClass());
        }
        
        // Deserialize the message
        try {
            GenericRecord record = deserializer.deserialize(topic.getTopicName(), messagePayload, serdesHeaders);
            System.out.printf("Received SERDES message: %s%n", record.toString());
        } catch (SerializationException se) {
            // Handle serialization/deserialization errors
            System.err.println("Serialization error: " + se.getMessage());
        }
    }
}

For a complete list of Avro SERDES properties and methods, see the Java Avro SERDES API Reference.

Serializing and Deserializing Messages with JSON Schema over AMQP

JSON Schema serialization and deserialization over AMQP follows similar patterns to Avro, but uses JsonSchemaSerializer and JsonSchemaDeserializer classes with JSON-specific configuration options. As with the Avro examples, the following code demonstrates implementation using Apache Qpid JMS 2.0, though the concepts apply to any AMQP 1.0 compliant client.

Publishing JSON Schema Messages over AMQP (Apache Qpid JMS Example)

When publishing JSON Schema messages over AMQP using Apache Qpid JMS, you create a JsonSchemaSerializer, configure it with appropriate validation settings, and serialize JSON data structures before transmission. The serialization process would be similar with other AMQP clients, though the specific API calls for message creation and transmission would differ.

Example configuration for a JSON Schema AMQP publisher:

// Configure the connection factory with property validation disabled
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(username, password, brokerUrl);
connectionFactory.setValidatePropertyNames(false);

// Configure the JSON Schema serializer with validation enabled
Map<String, Object> config = new HashMap<>();
config.put(SchemaResolverProperties.REGISTRY_URL, REGISTRY_URL);
config.put(SchemaResolverProperties.AUTH_USERNAME, REGISTRY_USERNAME);
config.put(SchemaResolverProperties.AUTH_PASSWORD, REGISTRY_PASSWORD);
// Enable schema validation during serialization (recommended for data quality)
config.put(JsonSchemaProperties.VALIDATE_SCHEMA, true);

For information about configuring the config map with SERDES properties, see Getting Started with SERDES and JSON Schema-Specific Configuration.

Example JSON object creation and serialization:

// Create a JSON object using Jackson ObjectMapper (as shown in JSONSchemasPublisher example)
ObjectNode user = new ObjectMapper().createObjectNode();
user.put("name", "John Doe");
user.put("id", "123");
user.put("email", "support@solace.com");

// Serialize the JSON object - headers will be populated with schema information
Map<String, Object> headers = new HashMap<>();
byte[] payloadBytes = serializer.serialize(topic, user, headers);

// Create a BytesMessage and set the payload
BytesMessage message = context.createBytesMessage();
message.writeBytes(payloadBytes);

// Set the schema ID and other headers as JMS application properties
for (Map.Entry<String, Object> entry : headers.entrySet()) {
    message.setObjectProperty(entry.getKey(), entry.getValue());
}

// Send the message
producer.send(topic, message);

For a complete example, see JsonSchemaPublisher.java on the Solace Samples Repository.

For a complete list of JSON Schema SERDES properties and methods, see the Java JSON SERDES API Reference.

Consuming JSON Schema Messages over AMQP (Apache Qpid JMS Example)

JSON Schema AMQP consumption with Apache Qpid JMS includes additional error handling for schema validation failures, which can help identify data quality issues in incoming messages. The same validation principles would apply with other AMQP clients, though the message consumption APIs would differ.

Example configuration for a JSON Schema AMQP consumer:

// Configure the JSON Schema deserializer for message consumption
Map<String, Object> config = new HashMap<>();
config.put(SchemaResolverProperties.REGISTRY_URL, REGISTRY_URL);
config.put(SchemaResolverProperties.AUTH_USERNAME, REGISTRY_USERNAME);
config.put(SchemaResolverProperties.AUTH_PASSWORD, REGISTRY_PASSWORD);
// Enable validation to ensure incoming messages conform to schema
config.put(JsonSchemaProperties.VALIDATE_SCHEMA, true);

Example error handling with message type support:

try {
    // Extract SERDES headers from JMS properties
    Map<String, Object> serdesHeaders = new HashMap<>();
    Enumeration<?> propertyNames = message.getPropertyNames();
    while (propertyNames.hasMoreElements()) {
        String propertyName = (String) propertyNames.nextElement();
        Object propertyValue = message.getObjectProperty(propertyName);
        serdesHeaders.put(propertyName, propertyValue);
    }
    
    // Extract message payload based on message type
    byte[] messagePayload;
    if (message instanceof BytesMessage) {
        BytesMessage bytesMessage = (BytesMessage) message;
        messagePayload = new byte[(int) bytesMessage.getBodyLength()];
        bytesMessage.readBytes(messagePayload);
    } else if (message instanceof TextMessage) {
        TextMessage textMessage = (TextMessage) message;
        messagePayload = textMessage.getText().getBytes(StandardCharsets.UTF_8);
    } else {
        throw new IllegalArgumentException("Unsupported message type: " + message.getClass());
    }
    
    // Deserialize the message
    JsonNode jsonNode = deserializer.deserialize(topic.getTopicName(), messagePayload, serdesHeaders);
    System.out.println("Deserialized AMQP message: " + jsonNode);
    
} catch (JsonSchemaValidationException ve) {
    // Schema validation failed
    System.err.println("Schema validation error: " + ve.getMessage());
} catch (SerializationException se) {
    // General serialization/deserialization error
    System.err.println("Serialization error: " + se.getMessage());
} catch (Exception e) {
    // Other unexpected errors
    System.err.println("Unexpected error: " + e.getMessage());
    e.printStackTrace();
}

For a complete example, see JsonSchemaConsumer.java on the Solace Samples Repository.

For a complete list of JSON Schema SERDES properties and methods, see the Java JSON SERDES API Reference.

Troubleshooting Cross-Protocol SERDES

When working with mixed AMQP, REST, and Solace JCSMP API environments using SERDES, you may encounter specific issues related to message type handling, property validation, and cross-protocol compatibility. This section provides guidance for common troubleshooting scenarios, with a focus on Apache Qpid JMS implementation but including general AMQP considerations as well.

Common Issues and Solutions

  • General AMQP Considerations—Ensure your AMQP client properly handles application properties, as these are used to transmit schema information. Different AMQP clients may have different APIs for setting and retrieving these properties.
  • Apache Qpid JMS Specific: Property Name Validation Errors—If you receive exceptions about invalid property names containing dots (.) or hyphens (-), ensure that you have disabled property name validation on the Apache Qpid JMS connection factory using connectionFactory.setValidatePropertyNames(false). This is required for SERDES headers to be transmitted properly. On the consumer side, if property name validation is not disabled, headers are silently dropped, which causes issues with the deserializer because schema information is missing.
  • Missing SERDES Headers—With Apache Qpid JMS, verify that SERDES headers are being set as JMS application properties using message.setObjectProperty() and that the connection factory has property validation disabled. Check that headers are being extracted properly using message.getPropertyNames() and message.getObjectProperty(). With other AMQP clients, ensure you're properly setting and retrieving application properties according to that client's API.
  • Cross-Protocol Header Mapping Issues—Ensure that AMQP application properties are being mapped correctly to other protocol headers:
    • For AMQP to REST: Application properties should appear as HTTP headers with solace-user-property- prefix.
    • For AMQP to Solace JCSMP API: Application properties should appear as SMF user properties.
    • Verify that the broker is configured to preserve user properties during message translation.
  • Message Type Compatibility—Ensure that your AMQP consumers can handle both BytesMessage and TextMessage objects, as cross-protocol scenarios may result in different message types depending on the publishing protocol and content type.
  • Jakarta vs Javax JMS APIs—Verify that you are using the correct JMS API imports. Apache Qpid JMS 2.0 uses Jakarta JMS APIs (jakarta.jms.*), not the legacy javax APIs (javax.jms.*). Mixing these APIs results in compilation or runtime errors.