Intro

Install

1
2
3
4
5
6
7
8
9
10
11
12
docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 fanout/pushpin:1.28.0

docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ fanout/pushpin:1.28.0

docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ fanout/pushpin:1.31.0

docker run -d --name pushpin -p 4430:4430 -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ -v /Users/shankai/Desktop/pushpin-certs:/usr/lib/pushpin/runner/certs fanout/pushpin:1.31.0

docker run -d --name pushpin -p 4430:4430 -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/archive/cbic/icos-fdns/icosEvent/pushpinx/pushpin-config:/etc/pushpin/ -v /Users/shankai/archive/cbic/icos-fdns/icosEvent/pushpinx/pushpin-certs:/usr/lib/pushpin/runner/certs fanout/pushpin:1.31.0


docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ fanout/pushpin:1.32.2

Port

  • 7999
  • 5560
  • 5561
  • 5562

Configuration

  • conf: /etc/pushpin/pushpin.conf

  • routes: /etc/pushpin/routes

    1
    2
    * 192.168.130.40:8099
    *,proto=ws 192.168.130.40:8098,over_http

Mock Backend [test]

  • subscribe
1
curl http://localhost:7999/stream
  • publish

cli:

1
pushpin-publish test "hello there"

restful:

1
2
3
curl -d '{ "items": [ { "channel": "test", "formats": {
"http-stream": { "content": "hello there\n" } } } ] }' \
http://localhost:5561/publish/

curl -d ‘{ “items”: [ { “channel”: “test”, “formats”: {
“http-stream”: { “content”: “hello there\n” } } } ] }’
http://localhost:5561/publish/

Backend [myChannel]

  • http ( * 192.168.130.40:8099)
1
2
3
4
5
6
7
8
9
10
11
12
13
var http = require('http');
http.createServer(function (req, res) {

var headers = req.headers;
var channel = headers['channel'] || 'myChannel';
res.writeHead(200, {
'Content-Type': 'text/plain',
'Grip-Hold': 'stream',
'Grip-Channel': channel
});
console.log(headers);
res.end('Stream opened, prepare yourself!\n');
}).listen(8099, '0.0.0.0');
  • over_http ( *,proto=ws 192.168.130.40:8098,over_http )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// npm install --save grip
var grip = require('grip');
var http = require('http');

http.createServer(function (req, res) {
res.writeHead(200, {
'Sec-WebSocket-Extensions': 'grip',
'Content-Type': 'application/websocket-events'
});

var body = '';
req.on('data', function (chunk) {
body += chunk;
});

req.on('end', function() {
var inEvents = grip.decodeWebSocketEvents(body);
var outEvents = [];
if (inEvents[0].getType() == 'OPEN') {
outEvents.push(new grip.WebSocketEvent('OPEN'));
outEvents.push(new grip.WebSocketEvent('TEXT',
'c:' + grip.webSocketControlMessage(
'subscribe',
{'channel': 'mychannel'})));
}

res.end(grip.encodeWebSocketEvents(outEvents));
});
}).listen(8098, '0.0.0.0');
  • zhttp.py

Routes: * zhttpreq/tcp://127.0.0.1:10000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import zmq
import tnetstring

zmq_context = zmq.Context()
sock = zmq_context.socket(zmq.REP)
sock.connect('tcp://127.0.0.1:10000')

while True:
req = tnetstring.loads(sock.recv()[1:])
print(req['headers'])

resp = {
'id': req['id'],
'code': 200,
'reason': 'OK',
'headers': [
['Grip-Hold', 'stream'],
['Grip-Channel', 'test'],
['Content-Type', 'text/plain']
],
'body': 'welcome to the stream\n'
}

sock.send('T' + tnetstring.dumps(resp))

ZMQ

  • XPUB.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import zmq

zmq_context = zmq.Context.instance()
sock = zmq_context.socket(zmq.XPUB)

# unlimited subscriptions
sock.rcvhwm = 0

# resend subscriptions after disconnect
sock.immediate = 1

sock.connect('tcp://localhost:5562')

while True:
m = sock.recv()
mtype = m[0]
topic = m[1:]
if mtype == '\x01':
print('SUB %s' % topic)
elif mtype == '\x00':
print('UNSUB %s' % topic)
  • stats
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/python

import sys
import tnetstring
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.connect('ipc:///var/run/pushpin/pushpin-stats')
sock.setsockopt(zmq.SUBSCRIBE, '')

while True:
m_raw = sock.recv()
mtype, mdata = m_raw.split(' ', 1)
if mdata[0] != 'T':
print 'unsupported format'
continue
m = tnetstring.loads(mdata[1:])
print '%s %s' % (mtype, m)

Elasticsearch: 7.4.0, Kibana:7.4.0

Intro

官网:https://www.elastic.co/

Install

Docker

1
docker run -d --name kibana -p 5601:5601 -e ELASTICSEARCH_HOSTS=http://dev:9200 docker.elastic.co/kibana/kibana:7.4.0

Intro

官网:https://nodejs.org/

Install

参考官网,以下为笔记环境信息。

1
2
3
4
5
# 6.9.0
npm -v

# v10.16.0
node -v

Examples

通过 express 创建一个简单的端点服务,其中包含 GET、POST 端点示例

  1. 安装依赖
1
npm install express body-parser
  1. 端点服务代码,保存到 endpoint.js 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var express = require('express')
var bodyParser = require('body-parser');
var app = express()

app.use(bodyParser.json()); // for parsing application/json
app.use(bodyParser.urlencoded({ extended: true })); // for parsing application/x-www-form-urlencoded

app.post('/', function (req, res) {
console.log('===================================, Now:{}', Date.now());
console.log('Request Body:', req.body);
console.log('Request Headers:', req.headers);
res.send('ok')
})
app.get('/', function (req, res) {
res.send('ok')
})

var server = app.listen(8099, function () {

var host = server.address().address
var port = server.address().port

})
  1. 启动端点服务
1
node endpoint.js
  1. 验证

GET 返回 ok 即成功。

1
curl http://localhost:8099/

POST 返回 ok 即成功。

1
curl -XPOST http://localhost:8099/ -H "Content-Type: application/json" -d '{"name": "zhangsan", "age": 20}'

Elasticsearch: 7.4.0

Intro

官网:https://www.elastic.co/

Install

Docker

1
docker run -d --name es -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.4.0

UI

Settings

CORS

1
2
http.cors.enabled: true
http.cors.allow-origin: "*"

REST API Examples

_index

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
POST xxx-index-2020-04-10/_doc/1
{
"cluster": "primary-dc",
"reason": "Total timeout 1000 ms elapsed",
"hostname": "9ddf7031ac70",
"partition": 0,
"offset": 21,
"messageId": "70df2035-3c81-4ebf-8f76-a396ee1f94fc",
"topicName": "T2.g2.t1",
"subscription": "my-Subscription1",
"batchId": "",
"publish_timestamp": 1587010729737,
"timestamp": 1587010742388,
"status": "DISCARDED"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
GET xxx-index-wildcard-*/_search
{
"from": 0,
"size": 20,
"sort": [
{
"timestamp": {
"unmapped_type": "date",
"order": "asc"
}
}
],
"query": {
"bool": {
"must": [
{
"term": {
"topicName.keyword": {
"value": "T2.g2.t1"
}
}
},
{
"term": {
"subscription.keyword": {
"value": "my-Subscription1"
}
}
}
]
}
}
}

_alias

1
2
POST xxx-index-2020-04-20/_alias/xxx-index-alias
{}

环境准备

Docker Compose: Ubuntu 1604 TLS Server 安装 Docker Compose

Docker Version

1
2
$ docker -v
Docker version 18.06.3-ce, build d7080c1

Docker Compose Version

1
2
$ docker-compose -v
docker-compose version 1.25.4, build 8d51620a

命令用法

此处仅罗列了个人使用过的命令与选项,更全面的命令用法请参考 docker-compose -h

Usage[用法]:

1
2
3
docker-compose [-f <arg>...] [options] [COMMAND] [ARGS...]

docker-compose -h|--help

Options [选项]:

  • -f, --file FILE
    指定另外一个 compose 文件(默认:docker-compose.yml
  • -p, --project-name NAME
    指定另外一个项目名称(默认:当前目录名称)

  • -v, --version
    打印版本号并退出

COMMAND[命令]:

  • help

    命令帮助

  • build

    构建/重新构建所有服务

  • up

    创建并且启动所有容器

  • down

    停止并且移除容器、网络镜像以及卷

  • start

    启动所有服务

  • stop

    停止所有服务

  • restart

    重新启动所有服务

  • exec

    在运行的容器内执行命令

  • images

    显示镜像列表

  • ps

    显示容器列表

  • logs

    查看容器系统输出

  • version

    显示 Docker Compose 版本信息

(待续)

简介

模式注册表为元数据提供了一个服务层。 它提供了一个用于存储和检索 Avro 模式的 RESTful 接口,存储所有模式的版本历史,提供多个兼容性设置,并允许根据配置的兼容性设置改进模式。 它提供了插入 Kafka 客户机的序列化程序,处理以 Avro 格式发送的 Kafka 消息的模式存储和检索。

项目主页:https://github.com/confluentinc/schema-registry

部署

Docker

镜像引用来源:https://github.com/confluentinc/examples/blob/5.4.0-post/cp-all-in-one-community/

其中 dev 为宿主机

1
2
3
4
5
6
docker run --name eventbus_zk -d -p 2181:2181 zookeeper:3.5.7

docker run --name schema-registry -d -h schema-registry -p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=dev:2181 \
confluentinc/cp-schema-registry:5.5.0

API

查询主题/模式

  • List all subjects

    1
    curl -X GET http://172.16.18.143:8081/subjects
  • List all schema versions registered under the subject “Kafka-key”

    1
    curl -X GET http://172.16.18.143:8081/subjects/Kafka-key/versions
  • Fetch version 1 of the schema registered under subject “Kafka-key”

    1
    curl -X GET http://172.16.18.143:8081/subjects/Kafka-key/versions/1
  • Fetch the most recently registered schema under subject “Kafka-key”

    1
    curl -X GET http://172.16.18.143:8081/subjects/Kafka-key/versions/latest
  • Fetch a schema by globally unique id 1

    1
    curl -X GET http://172.16.18.143:8081/schemas/ids/1

注册主题

  • Register a new version of a schema under the subject “Kafka-key”
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{
    "schema": "{\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"} ]}"}' \
    http://172.16.18.143:8081/subjects/Kafka-key/versions


    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{
    "schema": "{\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"},{\"name\": \"age1\", \"type\": \"int\"} ]}"}' \
    http://172.16.18.143:8081/subjects/Kafka-key/versions

    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{
    "schema": "{\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"},{\"name\": \"age2\", \"type\": \"int\"} ]}"}' \
    http://172.16.18.143:8081/subjects/Kafka-key/versions

删除模式/版本

  • Delete version 1 of the schema registered under subject “Kafka-key”

    1
    curl -X DELETE http://172.16.18.143:8081/subjects/Kafka-key/versions/1

    Output: 1

  • Delete all versions of the schema registered under subject “Kafka-key”

    1
    curl -X DELETE http://172.16.18.143:8081/subjects/Kafka-key

    Output: [2,3,4]

兼容性检查

  • Check whether a schema has been registered under subject “Kafka-key”
    1
    2
    3
    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://172.16.18.143:8081/subjects/Kafka-key
  • Test compatibility of a schema with the latest schema under subject “Kafka-key”
    1
    2
    3
    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://172.16.18.143:8081/compatibility/subjects/Kafka-key/versions/latest
    Output: {"is_compatible":true}

兼容性设置

  • Get top level config

    1
    curl -X GET http://172.16.18.143:8081/config

    Output: {"compatibilityLevel":"BACKWARD"}

  • Update compatibility requirements globally

    1
    2
    3
    curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://172.16.18.143:8081/config

    Output: {"compatibilityLevel":"NONE"}

  • Update compatibility requirements under the subject “Kafka-key”

    1
    2
    3
    curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "BACKWARD"}' \
    http://localhost:8081/config/Kafka-key

    Output: {"compatibility":"BACKWARD"}

模式进化与兼容性

安全

(待续)

简介

The World’s Most Advanced Open Source Relational Database. —— PostgreSQL

部署

Docker

示例,其中 /usr/local/nakadi4cbh/database/nakadi 路径包含了初始化 Nakadi 数据库脚本。

1
2
3
4
5
6
7
docker run --name eventbus_pg -d \
-p 5432:5432 \
-e POSTGRES_USER=nakadi \
-e POSTGRES_PASSWORD=nakadi \
-e POSTGRES_DB=local_nakadi_db \
-v /usr/local/nakadi4cbh/database/nakadi:/docker-entrypoint-initdb.d \
postgres:9.6.16

psql

  • psql

  • psql -U username

    1
    2
    3
    4
    5
    bash-4.2$ psql
    psql (12.2)
    Type "help" for help.

    postgres=#

    Getting information about databases

Command List

  • \h Help

  • \l List databases

  • \l+ List databases with size, tablespace, and description

  • \x Expand/narrow table lists

  • \c Connect to a database

  • \dt Display tables

  • \d and \d+ Display columns (field names) of a table

  • \du Display user roles

  • \q quit

Command Usage

  • \l 数据库列表

    1
    postgres=# \l
  • \c 连接数据库

    1
    2
    3
    postgres=# \c icosregistry_icos
    You are now connected to database "icosregistry_icos" as user "postgres".
    icosregistry_icos=#

    SQL

  • Create Database & User, Grant

1
2
3
create database keycloak;
create user keycloak with password 'keycloak';
grant all privileges on database keycloak to keycloak;
  • Query Data From Table

    1
    2
    3
    4
    5
    6
    icosregistry_icos=# select * from asset_deploy;
    asset_code | handler_type | opr_type | opr_status | opr_message | deploy_url | created
    ------------+--------------+----------+------------+-------------+------------+---------
    (0 rows)

    icosregistry_icos=#

pg_dump [OPTION]… [DBNAME]

Options

-s -s, –schema-only dump only the schema, no data

-t -t, –table=PATTERN dump the specified table(s) only

Examples

  • 导出表结构示例
1
pg_dump -s -t asset -t asset_group -t asset_group_membership -t asset_relationship -t asset_relationship_type -t asset_type -t asset_deploy icosregistry_icos

(待续)

简介

部署

Docker

172.16.18.143 为宿主机 IP

  • Zookeeper

    1
    2
    docker run --name eventbus_zk -d -p 2181:2181 zookeeper:3.5.7
    # docker run --name eventbus_zk -d -p 2181:2181 wurstmeister/zookeeper:3.4.6
  • Kafka

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    docker run --name eventbus_kafka -d \
    -p 9092:9092 \
    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.18.143:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=172.16.18.143:2181 \
    -e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \
    -e KAFKA_DELETE_TOPIC_ENABLE=true \
    -e KAFKA_BROKER_ID=0 \
    -v /var/run/docker.sock:/var/run/docker.sock \
    wurstmeister/kafka:2.11-1.1.1

命令行

示例

  • Topic List

    1
    /opt/kafka/bin/kafka-topics.sh --list --zookeeper 172.16.18.143:2181
  • Producer

    1
    /opt/kafka/bin/kafka-console-producer.sh --broker-list 172.16.18.143:9092 --topic test.eb1
  • Consumer

    1
    /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.18.143:9092 --topic test.eb1  --from-beginning

运维日志

清除 Topic 数据(不删除主题)

思路:

  1. 更改 topic 配置,指定数据保留时间retension.ms=1000为1秒(默认7天);指定清除策略为cleanup.policy=delete
  2. 查看 topic 配置,确保配置生效
  3. 等待 kafka 执行清理操作,并确认 topic 数据被清理。
  4. 恢复 topic 配置,删除 retension.ms 配置项,更改清除策略为 cleanup.policy=compact

示例:删除 topic connect-status,connect-config,connect-offsets数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# connect-status
kafka-configs --zookeeper 172.16.0.61:2181 --alter --entity-type topics --add-config retention.ms=1000,cleanup.policy=delete --entity-name connect-status
kafka-configs --zookeeper 172.16.0.61:2181 --entity-type topics --describe --entity-name connect-status
kafka-configs --zookeeper 172.16.0.61:2181 --alter --entity-type topics --add-config cleanup.policy=compact --delete-config retention.ms --entity-name connect-status

# connect-config
kafka-configs --zookeeper 172.16.0.61:2181 --alter --entity-type topics --add-config retention.ms=1000,cleanup.policy=delete --entity-name connect-config
kafka-configs --zookeeper 172.16.0.61:2181 --entity-type topics --describe --entity-name connect-config
kafka-configs --zookeeper 172.16.0.61:2181 --alter --entity-type topics --add-config cleanup.policy=compact --delete-config retention.ms --entity-name connect-config

# connect-offsets
kafka-configs --zookeeper 172.16.0.61:2181 --alter --entity-type topics --add-config retention.ms=1000,cleanup.policy=delete --entity-name connect-offsets
kafka-configs --zookeeper 172.16.0.61:2181 --entity-type topics --describe --entity-name connect-offsets
kafka-configs --zookeeper 172.16.0.61:2181 --alter --entity-type topics --add-config cleanup.policy=compact --delete-config retention.ms --entity-name connect-offsets

(待续)

简介

Vagrant 是在单个工作流中构建和管理虚拟机环境的工具。

通过易于使用的工作流程和对自动化的关注,Vagrant 降低了开发环境的设置时间,增加了生产等价性,并使“在我的机器上可以工作”成为过去的借口。

安装

Vagrant

下载页 选择适合自己平台的安装包并进一步安装。安装完成后命令行确认结果:

1
vagrant -v

版本号:Vagrant 2.2.7

Virtual Machine Providers

  • VirtualBox
  • VMWare
  • AWS
  • Other provider

命令笔记

  • 虚拟机列表
    1
    vagrant box list
  • 添加本地虚拟机
    1
    vagrant box add ubuntu/trusty64 ~/Downloads/trusty-server-cloudimg-amd64-vagrant-disk1.box

(待续)

Intro

Hermes 是使用 Kafka 作为消息存储和路由支持的消息代理,使用发布-订阅模式大大简化了服务之间的通信。

它是 HTTP-native,公开用于消息发布的 REST 端点,并将消息推送到订阅者 REST 端点。

Hermes 使用 HTTP 作为默认通信协议。 这意味着发布或接收消息的唯一先决条件是能够发送或使用 HTTP 请求。

Architecture

image

核心模块与组件

  • Hermes Frontend 接收来自客户端的消息
  • Hermes Consumers 发送消息到订阅者(push 模式)
  • Hermes Management 管理主题和订阅
  • Message Store 存储和路由消息,当前实现: Kafka
  • Metadata Store 共享元数据存储,当前实现:Zookeeper
  • Metrics Store [可选] 存储 Hermes 度量数据,当前实现:Graphite
  • Tracking Store [可选] 存储消息追踪信息,当前实现:Elasticsearch/MongoDB

消息流

消息发送者发送消息到 Hermes Frontend

  • 消息被分配唯一的 Hermes-Message-Id,可用于在系统中跟踪其路径
  • 每个动作时间被计量,度量被发送到 Metrics Store
  • 如果主题已启用跟踪,则跟踪信息将发送到 Tracking Store
  • 消息被发送到 Message Store

Hermes Consumers 发送消息给订阅者:

  • Message Store 读取消息
  • 每个动作时间被计量,度量被发送到 Metrics Store
  • 消息发送到订阅者
  • 如果订阅者发生错误,Hermes Consumers 调整发送速度并重试

主要概念

  • publisher 发送消息给 Hermes

  • subscriber 希望从 Hermes 接收消息,

  • group 是由一个发布者管理的一组主题,例如将整个主题空间划分为域和有界上下文组

  • topic 保存相同类型的消息,定义所有存储的消息的类型、模式和持久性,订阅者可以订阅存储在主题上的消息

  • subscription 是按每个主题创建的,保存有关已使用的消息和其他订阅者定义的属性(如最大传递速率或重试策略)的信息

命名约定

主题通常使用全限定名引用,全限定名由组名和以点分隔的主题名组成。 组名可以包含任何字符-字母、数字、点。 但是,主题名称不能包含点。

Full-qualifed topic name = Group name.Topic name

Deploy

Download

获取最新版本(此时为 1.4.0

1
git clone -b hermes-1.4.0 https://github.com/allegro/hermes.git hermes-1.4.0

Deploy

警告!
警告!
警告!

1.4.0 版本 docker-compose 中包含的 allegro/hermes-*:latest 镜像版本并不是对应的 1.4.0 或最新版本,tag 为 latest 的镜像竟然是 3 年前构建的。有图为证:

docker-image-latest-is-bug

遇到这种情况,此时最好的方式便是自行编译源码,自行构建对应版本的镜像。

以构建 frontend:1.4.0 为例

  1. 编译

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # Change Dir
    cd hermes-1.4.0

    # 复制原有镜像构建文件
    cp -r docker/latest docker/1.4.0

    # Change Dir
    cd hermes-frontend
    # 编译打包
    gradle -x test distZip -Pdistribution
    # 将构建包拷贝到对应的目录下
    cp build/distributions/hermes-frontend-1.4.0.zip ../docker/1.4.0/frontend/
  2. 编辑 Dockerfile

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    FROM jeanblanchard/java:8

    MAINTAINER Allegro

    RUN apk update \
    && apk add unzip wget bash \
    && rm -rf /var/cache/apk/*


    ADD hermes-frontend-1.4.0.zip /tmp/hermes-frontend-1.4.0.zip

    RUN unzip -q "/tmp/hermes-frontend-1.4.0.zip" -d /opt \
    && rm "/tmp/hermes-frontend-1.4.0.zip" \
    && mv /opt/hermes-frontend-* /opt/hermes-frontend

    ENV HERMES_FRONTEND_OPTS="-Darchaius.configurationSource.additionalUrls=file:///etc/hermes/frontend.properties -Dlogback.configurationFile=/etc/hermes/logback.xml"
    ADD frontend.properties /etc/hermes/frontend.properties
    ADD logback.xml /etc/hermes/logback.xml

    CMD /opt/hermes-frontend/bin/hermes-frontend
  3. 构建镜像

    • Frontend

      1
      2
      3
      4
      # Change Dir
      cd ../docker/1.4.0/frontend
      # 构建镜像
      docker build -t frontend:1.4.0 .
    • consumer

      1
      2
      3
      4
      # Change Dir
      cd ../docker/1.4.0/consumers
      # 构建镜像
      docker build -t consumers:1.4.0 .
    • management

      1
      2
      3
      4
      # Change Dir
      cd ../docker/1.4.0/management
      # 构建镜像
      docker build -t management:1.4.0 .
  4. 替换镜像

    替换 docker/docker-compose.yml 中镜像allegro/hermes-frontendfrontend:1.4.0

    替换 docker/docker-compose.yml 中镜像allegro/hermes-consumersconsumers:1.4.0

    替换 docker/docker-compose.yml 中镜像allegro/hermes-managementmanagement:1.4.0

至此,frontend 的替换操作已经完成。接着将 consumermanagement 也进行类似的处理。

处理完毕。

最后,可以照下述步骤进一步操作。

启动服务容器

1
2
3
cd hermes-1.4.0/docker

docker-compose up

注意:在所有容器启动成功后,服务在实际的运行过程中,会出现一个错误,这一错误导致 Hermes Consumers无法收到消息。Kafka 容器后台错误信息摘要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[2020-02-20 08:58:10,259] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:10,767] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:11,269] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:11,772] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:12,276] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:12,778] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:13,281] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:13,786] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:14,287] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:14,790] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:15,291] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:15,797] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:16,298] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
[2020-02-20 08:58:16,801] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

出现该问题的根本原因暂时没有深入研究,目前通过增加一个配置项临时规避该错误。

具体操作:对 Kafka 容器内模板文件/etc/confluent/docker/kafka.properties.template增加offsets.topic.replication.factor=1这一设置,这样目标文件/etc/kafka/kafka.properties 在容器启动后会自动包含该设置。

1
echo "offsets.topic.replication.factor=1" >> /etc/confluent/docker/kafka.properties.template

重启 Kafka 容器

1
docker restart docker_kafka_1

API Examples

Group (Hermes Management)

  • 创建组
1
2
3
4
5
6
curl -X POST \
http://172.16.18.143:8090/groups \
-H 'Content-Type: application/json' \
-d '{
"groupName": "my-group"
}'

Topic (Hermes Management)

  • 创建主题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
curl -X POST \
http://172.16.18.143:8090/topics \
-H 'Content-Type: application/json' \
-d '{
"name": "my-group.my-topic",
"description": "This is my topic",
"contentType": "JSON",
"retentionTime": {
"duration": 1
},
"owner": {
"source": "Plaintext",
"id": "MyTeam"
}
}'

Subscription (Hermes Management)

  • 创建订阅钩子端点(可以使用任何技术实现)

    本示例使用 nodejs,将以下代码保存到文件 hermes-subendpoint.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var express = require('express')
var bodyParser = require('body-parser');
var app = express()

app.use(bodyParser.json()); // for parsing application/json
app.use(bodyParser.urlencoded({ extended: true })); // for parsing application/x-www-form-urlencoded

app.post('/', function (req, res) {
console.log('Request Body:', req.body);
res.send('ok')
})
app.get('/', function (req, res) {
res.send('ok')
})

var server = app.listen(8081, function () {

var host = server.address().address
var port = server.address().port

})

运行

1
2
3
4
5
npm install express --save
npm install body-parser --save

// start
node hermes-subendpoint.js

测试

1
2
3
4
5
6
curl -X POST \
http://localhost:8081/ \
-H 'Content-Type: application/json' \
-d '{
"a":1
}'

钩子端点控制台输出:Request Body: { a: 1 }

  • 创建订阅
1
2
3
4
5
6
7
8
9
10
11
12
13
curl -X POST \
http://172.16.18.143:8090/topics/my-group.my-topic/subscriptions \
-H 'Content-Type: application/json' \
-d '{
"topicName": "my-group.my-topic",
"name": "my-Subscription",
"description": "This is my subscription",
"endpoint": "http://172.16.18.1:8081/",
"owner": {
"source": "Plaintext",
"id": "MyTeam"
}
}'

Publish (Hermes-Frontend)

  • 发布消息
1
2
3
4
5
6
7
curl -X POST \
http://172.16.18.143:8080/topics/my-group.my-topic \
-H 'Content-Type: application/json' \
-d '{
"message": "Hello world!",
"more": "1112223334444555"
}'

钩子端点控制台输出:

Request Body: { message: 'Hello world!', more: '1112223334444555' }

(待续)