Event Broker Nakadi Notes

简介

Nakadi 的目标是提供一个事件经纪人基础设施,以便:

  • 通过安全的 RESTful API 进行抽象事件交付

    这允许微服务团队维护服务边界,而不直接依赖于任何特定的消息代理技术。 可以为每个事件类型单独管理访问权限,并使用 OAuth 和自定义授权插件进行保护。

  • 支持事件驱动应用程序和异步微服务的方便开发

    可以使用事件类型架构定义事件类型,并通过注册表进行管理。 在发布事件类型之前,将根据模式验证所有事件。 它为数据使用者提供了数据质量和数据一致性的保证。

  • 有效的低延迟事件传递

    一旦发布者使用简单的 Http Post 发送事件,就可以通过流式 HTTP 连接推送到消费者,从而实现接近实时的事件处理。消费者连接具有保持活力控制并支持使用订阅管理流偏移量。

特性

Stream

  • 类似 Kafka 队列上的 REST 抽象
  • 事件类型的CRUD
  • 事件批量发布
  • Low-level API(消费)
    • 手动的客户端分区管理
    • 不支持提交
  • High-level API(订阅)
    • 在消费者客户端之间自动重新分配分区
    • 支持提交,移动服务器端游标

Schema

  • 模式注册表

  • 模式进化

  • 事件类型类别

    categories: 每个事件类别为一个事件类型启用不同的功能,特别是它们的模式和验证规则

    • Business 作为业务流程的一部分或驱动业务流程的事件,如客户订单中的状态转换。
    • Data Change 表示对记录或其他项或新项的更改的事件。 更改事件与创建、更新、删除或快照操作相关联。
    • Undefined 适用于生产者完全自定义的事件的自由格式类别。
  • 分区策略(Random/Hash/User Defined)

    • Random 分区是随机选择的,事件将均匀地分布在分区之间。 Nakadi 使用的默认选项是 Random。
    • Hash 通过对事件类型的 partition_key_fields中定义的字段值进行哈希处理来选择分区。 在实践中,这意味着那些大致相同的逻辑实体和具有相同的分区键值的事件将被发送到相同的分区。
    • user_defined 分区由生产者在发送事件时设置。 此选项仅适用于 businessdata change 类别。
  • 事件强化策略

  • 事件验证(通过模式)

Security

  • OAuth2 认证
  • 事件类型授权
  • 黑名单

Operations

  • Timelines
    • 允许透明地将生产和消费切换到不同的集群(tier、region、 AZ) ,而不会移动实际数据和任何服务降级
    • 为其他流媒体技术和引擎的实现提供了可能(比如 AWS Kinesis,Google pub / sub 等等)

概念

Nakadi API 允许通过 HTTP 发布和消费事件。

理解事件的一个好方法是,它们类似于流处理或队列系统中的消息,但具有可以理解和验证的定义结构。 包含描述事件的信息的对象称为事件类型。

要发布和消费事件,拥有事件的应用程序必须首先向 Nakadi 注册一个新的事件类型。 事件类型包含诸如其名称、应用程序、分区和丰富数据的策略以及 JSON 模式等信息。 Nakadi 支持列出所有可用事件类型的事件类型注册表 API。

一旦创建了事件类型,称为流的资源就可用于该事件类型。 流将接受来自生产者的类型的事件,并且一个或多个消费者可以从中读取。 Nakadi 可以验证发送到流的每个事件。

事件类型的流可以划分为一个或多个分区。 每个事件被精确地放置到一个分区中。 每个分区代表一个有序的日志——一旦一个事件被添加到一个分区中,它的位置永远不会改变,但是没有跨分区的全局排序。

消费者可以使用分配给每个分区的游标读取事件并跟踪它们在流中的位置。 消费者还可以使用光标从特定位置的流中读取数据。 多个使用者可以从同一个流中读取,允许不同的应用程序同时读取流。

事件应用分类

  • Event Type Owners 事件类型所有者通过事件类型注册表与 Nakadi 交互,以基于模式定义事件类型并创建事件流。

  • Event Producers 生产者将符合事件类型模式的事件发布到事件类型的流。

  • Event Consumers 消费者从事件流中读取事件,多个消费者可以从同一流中读取数据。

Cursors, Offsets, Partitiions(游标, 偏移量, 分区)

默认情况下,事件资源将从事件类型的所有分区和流的末端(或“尾端”)消耗。 要只选择特定的分区和流中要开始的位置,可以在请求中提供 X-Nakadi-Cursors 标头:

1
2
curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events \
-H 'X-Nakadi-Cursors: [{"partition": "0", "offset":"12"}]'

标头值是游标的 JSON 数组。 数组中的每个游标描述流的分区和从中流出的偏移量。 请注意,同一个分区中的事件保持了它们的总体顺序。

光标的偏移量值允许您选择要从流中的哪个位置使用。 这可以是任何已知的偏移量值,也可以是将从头开始启动流的专用值 BEGIN。 例如,从分区0开始读:

1
2
curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events \
-H 'X-Nakadi-Cursors:[{"partition": "0", "offset":"BEGIN"}]'

Event Stream Keepalives 事件流保活

如果没有事件需要发送,Nakadi 将通过定期发送一个没有事件但包含指向当前偏移量的指针的批处理来保持流连接打开。 例如:

1
2
3
4
5
6
7
8
9
10
curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events 


HTTP/1.1 200 OK

{"cursor":{"partition":"0","offset":"6"},"events":[{"order_number": "ORDER_003", "metadata": {"eid": "4cc6d2f0-eb01-11e5-b606-1c6f65464fc6", "occurred_at": "2016-03-15T23:58:15+01:00"}}]}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}

注意:该示例为 Low-level API 已被否决,并将从未来的 Nakadi 版本中删除。 请考虑使用高级别的 API。

Timelines

时间线的创建是使用 Zookeeper 通过一系列的锁(Locks)和屏障(Barriers)来协调的。

  1. 初始状态

每次启动 Nakadi 应用程序时,它都会尝试创建以下 ZK 结构。为了不覆盖初始结构,由于并发性,每个实例在执行之前都需要获取锁 /nakadi/timelines/lock

1
2
3
4
5
6
7
timelines:
lock: - lock for timeline versions synchronization
version: {version} monotonically incremented long value (version of timelines configuration)
locked_et: -
nodes: nakadi nodes
node1: {version} Each nakadi node exposes the version used on this node
node2: {version}
  1. et_1 创建时间线

当创建一个新的时间线时,第一步是通过在 /timelines/locked_et/et_1 上创建一个临时节点来获得更新时间线的锁。

1
2
3
4
5
6
7
8
timelines:
lock: -
version: 0
locked_et:
et_1: -
nodes:
node1: 0
node2: 0
  1. 通知所有 Nakadi 节点相关更改:版本屏障

协调创建时间线的实例修改版本节点,所有 Nakadi 实例都在监听变化,因此当发生变化时,它们会得到通知。

1
2
3
4
5
6
7
8
timelines:
lock: -
version: 1 # this is incremented by 1
locked_et:
et_1: -
nodes:
node1: 0
node2: 0
  1. 等待所有节点对新版本作出反应

每个 Nakadi 实例监视 /nakadi/timelines/version/ 的值。 当它发生更改时,每个实例检查所有锁定的事件类型,并通过在本地释放或阻塞发布者来做出相应的反应。一旦每个实例更新了它本地的锁定事件类型列表,它就会添加自己的版本,让时间线创建者发起者知道它可以处理。

1
2
3
4
5
6
7
8
9
timelines:
lock: -
version: 1
locked_et:
et_1: -
nodes:
node1: 1 # each instance updates its own version
node2: 1

  1. 更改数据库

一旦所有实例都作出反应,创建过程继续进行,发起者在 timeline 表中插入必要的数据库条目,并对现有存储进行快照显示可用的最新偏移量。 它还在新存储中创建一个主题。 请注意,如果从未使用过时间线分区,则存储的偏移量为 -1。 如果只有一个事件,则偏移量为 0,依此类推。

  1. 移除锁并再次通知所有实例

按照初始状态创建时间线的相同逻辑,锁将被删除,版本将被修改。 所有 Nakadi 实例的处理是移除本地锁并在必要时切换时间线。

1
2
3
4
5
6
7
timelines:
lock: -
version: 2
locked_et:
nodes:
node1: 1
node2: 1

在每个实例处理之后,它看起来应该像:

1
2
3
4
5
6
7
timelines:
lock: -
version: 2
locked_et:
nodes:
node1: 2 # each instance updates its own version
node2: 2
  • 至此,一个新的时间线创建成功。

部署

下载源码

1
git clone https://github.com/zalando/nakadi.git

Docker Compose

1
2
cd nakadi
docker-compose up

启动成功后,各服务端口:

  • 8080 - API Server
  • 5432 - PostgreSQL
  • 9092, 29092 - Kafka
  • 2181 - Zookeeper

API 示例

  • 创建事件类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    curl -X POST \
    http://172.16.18.143:8080/event-types \
    -H 'Content-Type: application/json' \
    -d '{
    "name": "order.ORDER_RECEIVED",
    "owning_application": "order-service",
    "category": "undefined",
    "schema": {
    "type": "json_schema",
    "schema": "{ \"additionalProperties\": true }"
    }
    }'

    API 用法

  • 添加订阅

    1
    2
    3
    4
    5
    6
    7
    curl -X POST \
    http://172.16.18.143:8080/subscriptions \
    -H 'Content-Type: application/json' \
    -d '{
    "owning_application": "abc",
    "event_types": ["order.ORDER_RECEIVED"]
    }'

    API 用法

  • 发布消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    curl -X POST \
    http://172.16.18.143:8080/event-types/order.ORDER_RECEIVED/events \
    -H 'Content-Type: application/json' \
    -d '[
    {
    "order_number": "24873243241"
    },
    {
    "order_number": "24873243242"
    }
    ]'

    API 用法

  • 订阅消费

    1
    curl -X GET http://172.16.18.143:8080/subscriptions/{subscription_id}/events

    API 用法

部署 UI(可选)

https://github.com/zalando-nakadi/nakadi-ui

Docker

1
docker run -it -d -p 3000:3000 -e NAKADI_API_URL=http://172.16.18.1:8080 -e BASE_URL=http://172.16.18.143:3000 nakadi/nakadi-ui:latest

(待续)