In today's data-driven world, ensuring data consistency and interoperability across different systems is crucial. This is where a schema registry comes into play. In this walkthrough, we'll explore how to use Solace Schema Registry with the Solace JCSMP API or REST applications, focusing on the JSON format for schema definition.
You'll learn about:
✔️ What a schema registry is and why it's important
✔️ What a Serializer and Deserializer (SERDES) is and the role they play
✔️ How to deploy and configure Solace Schema Registry using either Docker Compose or Kubernetes
✔️ How to create and manage schema artifacts in Solace Schema Registry web console
✔️ How to use schemas in your event-driven applications with Solace's JCSMP API or with REST messaging
✔️ Best practices for schema evolution
You can also check out our demo video covering the Solace Schema Registry here:
Video TBD
schema-registry-v1.0.0.tar.gz
that contains all the necessary pieces you will need. This is available on the Solace Product Portal (link to be added).NOTE: If you cannot access the Solace Product Portal, please click the Report a mistake
at the bottom left of the codelab and open an issue asking for access.
A schema registry is a central repository for managing and storing schemas. It helps ensure data consistency, enables data governance, and supports schema evolution. Here's why it's important:
A SERDES (Serializer/Deserializer) in the context of a schema registry is a component that handles two key functions:
In a schema registry system, SERDES works closely with schemas to:
For example:
This is a fundamental concept in message-based systems where data needs to be:
We'll use Docker Compose to set up the Solace Schema Registry, for local development or standalone use cases with minimal resource and networking requirements. We've prepared a Docker Compose file that will launch an instance of the Solace Schema Registry and all the necessary components with a pre-defined configuration.
Schema-Registry-V1.0
. You should see the following files and folders:solace-schema-registry-dist
.for img in {docker-images}/*.tar.gz; do docker load -i "$img"; done
to load docker images. For Window use this instead for %i in (docker-images\*.tar.gz \*.tar.gz) do docker load -i "%i"
..env
file to change things such as default login or ports. While using the built-in identity provider, we can leave everything to defaults for this codelab.docker compose -f compose.yaml -f compose.nginx.yaml -f compose.nginx.for.embedded.yaml -f compose.embedded.yaml up -d
.localhost:8888
which should re-direct you to the Solace Schema Registry login screen.That's it, you have now installed an instance of the Solace Schema Registry with the Postgres storage option!
Alternatively, for enterprise-grade security features, Solace Schema Registry supports external identity providers. For that, make the necessary configurations in your IdP and set the environment variables in your .env
file:
Insert image
Run the following command docker compose -f compose.yaml -f compose.nginx.yaml -f compose.nginx.for.external.yaml -f compose.external.multiple.issuers.yaml up -d
.
Alternatively, we can set up Solace Schema Registry on a Kubernetes cluster using Helm.
NOTE: You must have an existing Kubernetes cluster available (for example, Amazon EKS, Google GKE, Microsoft AKS, or an on-prem/self-managed cluster).
kubectl
to communicate with your cluster.Schema-Registry-V1.0
.export REGISTRY="your-registry.com/project"
for img in {docker-images}/*.tar.gz; do
LOADED=$(docker load -i "$img" | grep "Loaded image:" | cut -d' ' -f3)
NEW_NAME="$REGISTRY/$(basename "$img" .tar.gz)"
docker tag "$LOADED" "$NEW_NAME"
docker push "$NEW_NAME"
done
kubectl apply --server-side -f https://raw.githubusercontent.com/cloudnative-pg/cloudnative-pg/release-1.26/releases/cnpg-1.26.0.yaml
.helm upgrade --install schema-registry ./solace-schema-registry
kubectl get pods -n solace
https://ui.
Let's create a simple schema for a Clock-in-out
event:
localhost:8888
.sr-developer
and password is devPassword
.Create artifact
button. Once the dialogue opens enter the following as shown below: RetailMore/payroll
JSON Schema
Artifact Metadata
step as it is optional and will click Next
.Next
when done.{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ClockInOut",
"type": "object",
"additionalProperties": false,
"properties": {
"region_code": {
"description": "region code for clock in or out",
"type": "string"
},
"store_id": {
"description": "Store identifier",
"type": "string"
},
"employee_id": {
"description": "Employee ID for who clocked in or out",
"type": "string"
},
"datetime": {
"description": "Clock time",
"type": "string",
"format": "date-time"
}
},
"required": [
"region_code",
"store_id",
"employee_id",
"datetime"
]
}
Version Metadata
step as it is optional and will simply click the Create
button.You've now created and registered your first schema!
Now, let's see how to use this schema in Java using the Solace Messaging API for Java (JCSMP):
solace-samples-java-jcsmp
directory: git clone https://github.com/SolaceSamples/solace-samples-java-jcsmpcd solace-samples-java-jcsmp
./gradlew assemble
to build the sample.build/staged
directory and run the sample application with the broker connection details: cd build/stagedbin/HelloWorldJCSMPJsonSchemaSerde localhost:55555 default default
This sample talks to the locally deployed Solace Schema Registry and retrieves the schema along with the schema ID. It will then do the following:
Configures the Serializer and Deserializer:
// Create and configure JSON Schema serializer and deserializer
try (Serializer<JsonNode> serializer = new JsonSchemaSerializer<>();
Deserializer<JsonNode> deserializer = new JsonSchemaDeserializer<>()) {
serializer.configure(getConfig());
deserializer.configure(getConfig());
/**
* Returns a configuration map for the JSON Schema serializer and deserializer.
*
* @return A Map containing configuration properties
*/
private static Map<String, Object> getConfig() {
Map<String, Object> config = new HashMap<>();
config.put(SchemaResolverProperties.REGISTRY_URL, REGISTRY_URL);
config.put(SchemaResolverProperties.AUTH_USERNAME, REGISTRY_USERNAME);
config.put(SchemaResolverProperties.AUTH_PASSWORD, REGISTRY_PASSWORD);
return config;
}
Serializes the message payload and publishes the message to the connected broker on solace/samples
destination with the serialized payload and schema ID:
// Create and populate a ClockInOut JsonNode with sample data
ObjectMapper mapper = new ObjectMapper();
ObjectNode clockInOut = mapper.createObjectNode();
clockInOut.put("region_code", "NA-WEST");
clockInOut.put("store_id", "STORE-001");
clockInOut.put("employee_id", "EMP-12345");
clockInOut.put("datetime", "2025-01-20T15:30:00Z");
// Serialize and send the message
BytesMessage msg = JCSMPFactory.onlyInstance().createMessage(BytesMessage.class);
SerdeMessage.serialize(serializer, topic, msg, clockInOut);
System.out.printf("Sending ClockInOut Message:%n%s%n", msg.dump());
producer.send(msg, topic);
It will then receive the published message and deserialize it by looking up the schema ID and retrieving the schema and print it out to console:
// Set up the message consumer with a deserialization callback
XMLMessageConsumer cons = session.getMessageConsumer(Consumed.with(deserializer, (msg, clockInOutData) -> {
System.out.printf("Got a ClockInOut JsonNode: %s%n", clockInOutData);
System.out.printf("Employee %s clocked in/out at store %s in region %s at %s%n",
clockInOutData.get("employee_id").asText(),
clockInOutData.get("store_id").asText(),
clockInOutData.get("region_code").asText(),
clockInOutData.get("datetime").asText());
latch.countDown(); // Signal the main thread that a message has been received
}, (msg, deserializationException) -> {
System.out.printf("Got exception: %s%n", deserializationException);
System.out.printf("But still have access to the message: %s%n", msg.dump());
latch.countDown();
}, jcsmpException -> {
System.out.printf("Got exception: %s%n", jcsmpException);
latch.countDown();
}));
cons.start();
The sourcecode can be further looked at by opening the HelloWorldJCSMPJsonSchemaSerde.java
file.
We can also use this schema with Solace Schema Registry in REST-based messaging environments.
[!NOTE]
Before running the REST samples, you need to configure the Solace broker with the appropriate queues and REST Delivery Points (RDPs). For more detailed documentation, refer to the Solace Documentation on REST Delivery Points.
Let's build a Publisher and Consumer sample.
git clone https://github.com/SolaceSamples/solace-samples-rest-messaging
./gradlew build
. You should see a BUILD SUCCESSFUL
message. [!NOTE]For windows users, use the gradlew.bat
file instead of gradlew
./gradlew runJsonSchemaRestPublisherHttpClient --args="brokerUrl 38080"
This sample serializes and publishes by doing the following: Configures the Serializer:
// Create and configure Json serializer
try (Serializer<JsonNode> serializer = new JsonSchemaSerializer<>()) {
serializer.configure(getConfig());
/**
* Gets the configuration for the serializer.
* @return A map of configuration properties.
*/
private static Map<String, Object> getConfig() {
Map<String, Object> config = new HashMap<>();
config.put(SchemaResolverProperties.REGISTRY_URL, REGISTRY_URL);
config.put(SchemaResolverProperties.AUTH_USERNAME, REGISTRY_USERNAME);
config.put(SchemaResolverProperties.AUTH_PASSWORD, REGISTRY_PASSWORD);
// This configuration property will populate the SERDES header with a schema ID that is of type String
config.put(SerdeProperties.SCHEMA_HEADER_IDENTIFIERS, SchemaHeaderId.SCHEMA_ID_STRING);
return config;
Serializes the HTTP message with payload and headers:
* Serializes the HTTP message with the given payload and headers.
Map<String, Object> headers = new HashMap<>();
byte[] payloadBytes = serializer.serialize(topic, payload, headers);
for (String key : headers.keySet()) {
Object value = headers.get(key);
// No Integer/Long to String mapping is needed, as the SERDES schema ID header is already configured as a String.
// The type parameter is optional. By default all Solace User Properties in a REST message are assumed to have a type "string"
httpBuilder.header(String.format("%s%s", "Solace-User-Property-", key), value.toString());
}
if ("JSON".equals(CONTENT_TYPE)) {
httpBuilder.header("Content-Type", "application/json");
} else if ("BINARY".equals(CONTENT_TYPE)) {
httpBuilder.header("Content-Type", "application/octet-stream");
} else {
httpBuilder.header("Content-Type", CONTENT_TYPE);
}
return httpBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(payloadBytes));
Publishes the message to specific topic:
//Publishes a message to a specific topic
ObjectNode user = new ObjectMapper().createObjectNode();
try {
String url = String.format("http://%s:%d/TOPIC/%s", brokerHost, port, topic);
HttpRequest.Builder httpBuilder = HttpRequest.newBuilder().uri(URI.create(url));
user.put("name", "John Doe");
user.put("id", id);
user.put("email", "support@solace.com");
HttpRequest request = serializeHttpMessage(httpBuilder, serializer, topic, user).build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
System.out.println("Published message with status: " + response.statusCode());
System.out.println("Published message with record: " + user);
solace/samples/json
and configure RDP client as mentioned above. REST consumer samples start a local HTTP server that listens for incoming POST requests from a Solace broker's REST Delivery Point (RDP)../gradlew runJsonSchemaRestConsumer --args="/my-rest-endpoint 8080 X-Solace-Topic"
Configures the Deserializer:
// Create and configure Json deserializer
Deserializer<User> deserializer = new JsonSchemaDeserializer<>()) {
deserializer.configure(getDeserializerConfig());
Returns a map of configuration properties for the deserializer.
*
* @return a map of configuration properties for the deserializer
*/
private static Map<String, Object> getDeserializerConfig() {
HashMap<String,Object> config = new HashMap<>();
config.put(SchemaResolverProperties.REGISTRY_URL, REGISTRY_URL);
config.put(SchemaResolverProperties.AUTH_USERNAME, REGISTRY_USERNAME);
config.put(SchemaResolverProperties.AUTH_PASSWORD, REGISTRY_PASSWORD);
config.put(JsonSchemaProperties.TYPE_PROPERTY, "customJavaType");
return config;
It will receive the published message from http headers and deserialize it and retrieving the schema and print it out:
// Handles a SERDES message.
Headers httpHeaders = exchange.getRequestHeaders();
// Read the message body
InputStream is = exchange.getRequestBody();
byte[] messagePayload = is.readAllBytes();
// extract SERDES headers from http headers
Map<String, Object> serdesHeaders = getSerdesHeaders(httpHeaders);
// deserialize with topic, payload and SERDES headers
T object = deserializer.deserialize(optionalTopic.orElse(""), messagePayload, serdesHeaders);
System.out.printf("%n- - - - - - - - - - RECEIVED SERDES MESSAGE - - - - - - - - - -%n");
printAllHeaders(httpHeaders);
System.out.println("BodyBytesLength: " + messagePayload.length);
System.out.println("Body: " + objectToStringFunc.apply(object));
System.out.printf("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -%n");
As your data model evolves, you'll need to update your schemas. Here are some best practices:
When updating a schema in Solace Schema Registry:
✔️ Schema registries are crucial for maintaining data consistency in event-driven architectures.
✔️ Solace Schema Registry provides a powerful platform for managing schemas.
✔️ Solace's JCSMP API can be used effectively with the Solace Schema Registry for robust event-driven applications.
✔️ Proper schema evolution practices ensure smooth updates without breaking existing systems.
✔️ Using schemas in your applications helps catch data issues early and improves overall system reliability.
✔️ Solace Schema Registry supports built-in authentication as well as external identity providers for enterprise environments. ✔️ Solace Schema Registry currently supports Avro and JSON Schema. More schema SERDES will be made available in the future.
Thanks for participating in this codelab! Let us know what you thought in the Solace Community Forum! If you found any issues along the way we'd appreciate it if you'd raise them by clicking the Report a mistake button at the bottom left of this codelab.