Event Broker Hermes Notes

Intro

Hermes 是使用 Kafka 作为消息存储和路由支持的消息代理,使用发布-订阅模式大大简化了服务之间的通信。

它是 HTTP-native,公开用于消息发布的 REST 端点,并将消息推送到订阅者 REST 端点。

Hermes 使用 HTTP 作为默认通信协议。 这意味着发布或接收消息的唯一先决条件是能够发送或使用 HTTP 请求。

Architecture

image

核心模块与组件

  • Hermes Frontend 接收来自客户端的消息
  • Hermes Consumers 发送消息到订阅者(push 模式)
  • Hermes Management 管理主题和订阅
  • Message Store 存储和路由消息,当前实现: Kafka
  • Metadata Store 共享元数据存储,当前实现:Zookeeper
  • Metrics Store [可选] 存储 Hermes 度量数据,当前实现:Graphite
  • Tracking Store [可选] 存储消息追踪信息,当前实现:Elasticsearch/MongoDB

消息流

消息发送者发送消息到 Hermes Frontend

  • 消息被分配唯一的 Hermes-Message-Id,可用于在系统中跟踪其路径
  • 每个动作时间被计量,度量被发送到 Metrics Store
  • 如果主题已启用跟踪,则跟踪信息将发送到 Tracking Store
  • 消息被发送到 Message Store

Hermes Consumers 发送消息给订阅者:

  • Message Store 读取消息
  • 每个动作时间被计量,度量被发送到 Metrics Store
  • 消息发送到订阅者
  • 如果订阅者发生错误,Hermes Consumers 调整发送速度并重试

主要概念

  • publisher 发送消息给 Hermes

  • subscriber 希望从 Hermes 接收消息,

  • group 是由一个发布者管理的一组主题,例如将整个主题空间划分为域和有界上下文组

  • topic 保存相同类型的消息,定义所有存储的消息的类型、模式和持久性,订阅者可以订阅存储在主题上的消息

  • subscription 是按每个主题创建的,保存有关已使用的消息和其他订阅者定义的属性(如最大传递速率或重试策略)的信息

命名约定

主题通常使用全限定名引用,全限定名由组名和以点分隔的主题名组成。 组名可以包含任何字符-字母、数字、点。 但是,主题名称不能包含点。

Full-qualifed topic name = Group name.Topic name

Deploy

Download

获取最新版本(此时为 1.4.0

1
git clone -b hermes-1.4.0 https://github.com/allegro/hermes.git hermes-1.4.0

Deploy

警告!
警告!
警告!

1.4.0 版本 docker-compose 中包含的 allegro/hermes-*:latest 镜像版本并不是对应的 1.4.0 或最新版本,tag 为 latest 的镜像竟然是 3 年前构建的。有图为证:

docker-image-latest-is-bug

遇到这种情况,此时最好的方式便是自行编译源码,自行构建对应版本的镜像。

以构建 frontend:1.4.0 为例

  1. 编译

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # Change Dir
    cd hermes-1.4.0

    # 复制原有镜像构建文件
    cp -r docker/latest docker/1.4.0

    # Change Dir
    cd hermes-frontend
    # 编译打包
    gradle -x test distZip -Pdistribution
    # 将构建包拷贝到对应的目录下
    cp build/distributions/hermes-frontend-1.4.0.zip ../docker/1.4.0/frontend/
  2. 编辑 Dockerfile

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    FROM jeanblanchard/java:8

    MAINTAINER Allegro

    RUN apk update \
    && apk add unzip wget bash \
    && rm -rf /var/cache/apk/*


    ADD hermes-frontend-1.4.0.zip /tmp/hermes-frontend-1.4.0.zip

    RUN unzip -q "/tmp/hermes-frontend-1.4.0.zip" -d /opt \
    && rm "/tmp/hermes-frontend-1.4.0.zip" \
    && mv /opt/hermes-frontend-* /opt/hermes-frontend

    ENV HERMES_FRONTEND_OPTS="-Darchaius.configurationSource.additionalUrls=file:///etc/hermes/frontend.properties -Dlogback.configurationFile=/etc/hermes/logback.xml"
    ADD frontend.properties /etc/hermes/frontend.properties
    ADD logback.xml /etc/hermes/logback.xml

    CMD /opt/hermes-frontend/bin/hermes-frontend
  3. 构建镜像

    • Frontend

      1
      2
      3
      4
      # Change Dir
      cd ../docker/1.4.0/frontend
      # 构建镜像
      docker build -t frontend:1.4.0 .
    • consumer

      1
      2
      3
      4
      # Change Dir
      cd ../docker/1.4.0/consumers
      # 构建镜像
      docker build -t consumers:1.4.0 .
    • management

      1
      2
      3
      4
      # Change Dir
      cd ../docker/1.4.0/management
      # 构建镜像
      docker build -t management:1.4.0 .
  4. 替换镜像

    替换 docker/docker-compose.yml 中镜像allegro/hermes-frontendfrontend:1.4.0

    替换 docker/docker-compose.yml 中镜像allegro/hermes-consumersconsumers:1.4.0

    替换 docker/docker-compose.yml 中镜像allegro/hermes-managementmanagement:1.4.0

至此,frontend 的替换操作已经完成。接着将 consumermanagement 也进行类似的处理。

处理完毕。

最后,可以照下述步骤进一步操作。

启动服务容器

1
2
3
cd hermes-1.4.0/docker

docker-compose up

注意:在所有容器启动成功后,服务在实际的运行过程中,会出现一个错误,这一错误导致 Hermes Consumers无法收到消息。Kafka 容器后台错误信息摘要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[2020-02-20 08:58:10,259] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:10,767] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:11,269] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:11,772] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:12,276] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:12,778] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:13,281] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:13,786] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:14,287] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:14,790] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:15,291] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:15,797] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:16,298] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:16,801] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

出现该问题的根本原因暂时没有深入研究,目前通过增加一个配置项临时规避该错误。

具体操作:对 Kafka 容器内模板文件/etc/confluent/docker/kafka.properties.template增加offsets.topic.replication.factor=1这一设置,这样目标文件/etc/kafka/kafka.properties 在容器启动后会自动包含该设置。

1
echo "offsets.topic.replication.factor=1" >> /etc/confluent/docker/kafka.properties.template

重启 Kafka 容器

1
docker restart docker_kafka_1

API Examples

Group (Hermes Management)

  • 创建组
1
2
3
4
5
6
curl -X POST \
http://172.16.18.143:8090/groups \
-H 'Content-Type: application/json' \
-d '{
"groupName": "my-group"
}'

Topic (Hermes Management)

  • 创建主题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
curl -X POST \
http://172.16.18.143:8090/topics \
-H 'Content-Type: application/json' \
-d '{
"name": "my-group.my-topic",
"description": "This is my topic",
"contentType": "JSON",
"retentionTime": {
"duration": 1
},
"owner": {
"source": "Plaintext",
"id": "MyTeam"
}
}'

Subscription (Hermes Management)

  • 创建订阅钩子端点(可以使用任何技术实现)

    本示例使用 nodejs,将以下代码保存到文件 hermes-subendpoint.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var express = require('express')
var bodyParser = require('body-parser');
var app = express()

app.use(bodyParser.json()); // for parsing application/json
app.use(bodyParser.urlencoded({ extended: true })); // for parsing application/x-www-form-urlencoded

app.post('/', function (req, res) {
console.log('Request Body:', req.body);
res.send('ok')
})
app.get('/', function (req, res) {
res.send('ok')
})

var server = app.listen(8081, function () {

var host = server.address().address
var port = server.address().port

})

运行

1
2
3
4
5
npm install express --save
npm install body-parser --save

// start
node hermes-subendpoint.js

测试

1
2
3
4
5
6
curl -X POST \
http://localhost:8081/ \
-H 'Content-Type: application/json' \
-d '{
"a":1
}'

钩子端点控制台输出:Request Body: { a: 1 }

  • 创建订阅
1
2
3
4
5
6
7
8
9
10
11
12
13
curl -X POST \
http://172.16.18.143:8090/topics/my-group.my-topic/subscriptions \
-H 'Content-Type: application/json' \
-d '{
"topicName": "my-group.my-topic",
"name": "my-Subscription",
"description": "This is my subscription",
"endpoint": "http://172.16.18.1:8081/",
"owner": {
"source": "Plaintext",
"id": "MyTeam"
}
}'

Publish (Hermes-Frontend)

  • 发布消息
1
2
3
4
5
6
7
curl -X POST \
http://172.16.18.143:8080/topics/my-group.my-topic \
-H 'Content-Type: application/json' \
-d '{
"message": "Hello world!",
"more": "1112223334444555"
}'

钩子端点控制台输出:

Request Body: { message: 'Hello world!', more: '1112223334444555' }

(待续)