简介
模式注册表为元数据提供了一个服务层。 它提供了一个用于存储和检索 Avro 模式的 RESTful 接口,存储所有模式的版本历史,提供多个兼容性设置,并允许根据配置的兼容性设置改进模式。 它提供了插入 Kafka 客户机的序列化程序,处理以 Avro 格式发送的 Kafka 消息的模式存储和检索。
项目主页:https://github.com/confluentinc/schema-registry
部署
Docker
镜像引用来源:https://github.com/confluentinc/examples/blob/5.4.0-post/cp-all-in-one-community/
其中 dev 为宿主机
1 | docker run --name eventbus_zk -d -p 2181:2181 zookeeper:3.5.7 |
API
查询主题/模式
List all subjects
1
curl -X GET http://172.16.18.143:8081/subjects
List all schema versions registered under the subject “Kafka-key”
1
curl -X GET http://172.16.18.143:8081/subjects/Kafka-key/versions
Fetch version 1 of the schema registered under subject “Kafka-key”
1
curl -X GET http://172.16.18.143:8081/subjects/Kafka-key/versions/1
Fetch the most recently registered schema under subject “Kafka-key”
1
curl -X GET http://172.16.18.143:8081/subjects/Kafka-key/versions/latest
Fetch a schema by globally unique id 1
1
curl -X GET http://172.16.18.143:8081/schemas/ids/1
注册主题
- Register a new version of a schema under the subject “Kafka-key”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
"schema": "{\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"} ]}"}' \
http://172.16.18.143:8081/subjects/Kafka-key/versions
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
"schema": "{\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"},{\"name\": \"age1\", \"type\": \"int\"} ]}"}' \
http://172.16.18.143:8081/subjects/Kafka-key/versions
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
"schema": "{\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"},{\"name\": \"age2\", \"type\": \"int\"} ]}"}' \
http://172.16.18.143:8081/subjects/Kafka-key/versions
删除模式/版本
Delete version 1 of the schema registered under subject “Kafka-key”
1
curl -X DELETE http://172.16.18.143:8081/subjects/Kafka-key/versions/1
Output:
1
Delete all versions of the schema registered under subject “Kafka-key”
1
curl -X DELETE http://172.16.18.143:8081/subjects/Kafka-key
Output:
[2,3,4]
兼容性检查
- Check whether a schema has been registered under subject “Kafka-key”
1
2
3curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://172.16.18.143:8081/subjects/Kafka-key
- Test compatibility of a schema with the latest schema under subject “Kafka-key”Output:
1
2
3curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://172.16.18.143:8081/compatibility/subjects/Kafka-key/versions/latest{"is_compatible":true}
兼容性设置
Get top level config
1
curl -X GET http://172.16.18.143:8081/config
Output:
{"compatibilityLevel":"BACKWARD"}
Update compatibility requirements globally
1
2
3curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://172.16.18.143:8081/configOutput:
{"compatibilityLevel":"NONE"}
Update compatibility requirements under the subject “Kafka-key”
1
2
3curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/Kafka-keyOutput:
{"compatibility":"BACKWARD"}
模式进化与兼容性
安全
(待续)