Confluent Schema Registry Notes

简介

模式注册表为元数据提供了一个服务层。 它提供了一个用于存储和检索 Avro 模式的 RESTful 接口,存储所有模式的版本历史,提供多个兼容性设置,并允许根据配置的兼容性设置改进模式。 它提供了插入 Kafka 客户机的序列化程序,处理以 Avro 格式发送的 Kafka 消息的模式存储和检索。

项目主页:https://github.com/confluentinc/schema-registry

部署

Docker

镜像引用来源:https://github.com/confluentinc/examples/blob/5.4.0-post/cp-all-in-one-community/

其中 dev 为宿主机

1
2
3
4
5
6
docker run --name eventbus_zk -d -p 2181:2181 zookeeper:3.5.7

docker run --name schema-registry -d -h schema-registry -p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=dev:2181 \
confluentinc/cp-schema-registry:5.5.0

API

查询主题/模式

  • List all subjects

    1
    curl -X GET http://172.16.18.143:8081/subjects
  • List all schema versions registered under the subject “Kafka-key”

    1
    curl -X GET http://172.16.18.143:8081/subjects/Kafka-key/versions
  • Fetch version 1 of the schema registered under subject “Kafka-key”

    1
    curl -X GET http://172.16.18.143:8081/subjects/Kafka-key/versions/1
  • Fetch the most recently registered schema under subject “Kafka-key”

    1
    curl -X GET http://172.16.18.143:8081/subjects/Kafka-key/versions/latest
  • Fetch a schema by globally unique id 1

    1
    curl -X GET http://172.16.18.143:8081/schemas/ids/1

注册主题

  • Register a new version of a schema under the subject “Kafka-key”
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{
    "schema": "{\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"} ]}"}' \
    http://172.16.18.143:8081/subjects/Kafka-key/versions


    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{
    "schema": "{\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"},{\"name\": \"age1\", \"type\": \"int\"} ]}"}' \
    http://172.16.18.143:8081/subjects/Kafka-key/versions

    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{
    "schema": "{\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"},{\"name\": \"age2\", \"type\": \"int\"} ]}"}' \
    http://172.16.18.143:8081/subjects/Kafka-key/versions

删除模式/版本

  • Delete version 1 of the schema registered under subject “Kafka-key”

    1
    curl -X DELETE http://172.16.18.143:8081/subjects/Kafka-key/versions/1

    Output: 1

  • Delete all versions of the schema registered under subject “Kafka-key”

    1
    curl -X DELETE http://172.16.18.143:8081/subjects/Kafka-key

    Output: [2,3,4]

兼容性检查

  • Check whether a schema has been registered under subject “Kafka-key”
    1
    2
    3
    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://172.16.18.143:8081/subjects/Kafka-key
  • Test compatibility of a schema with the latest schema under subject “Kafka-key”
    1
    2
    3
    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://172.16.18.143:8081/compatibility/subjects/Kafka-key/versions/latest
    Output: {"is_compatible":true}

兼容性设置

  • Get top level config

    1
    curl -X GET http://172.16.18.143:8081/config

    Output: {"compatibilityLevel":"BACKWARD"}

  • Update compatibility requirements globally

    1
    2
    3
    curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://172.16.18.143:8081/config

    Output: {"compatibilityLevel":"NONE"}

  • Update compatibility requirements under the subject “Kafka-key”

    1
    2
    3
    curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "BACKWARD"}' \
    http://localhost:8081/config/Kafka-key

    Output: {"compatibility":"BACKWARD"}

模式进化与兼容性

安全

(待续)