Kafka Connect 笔记

Intro

Kafka Connect 是一个可伸缩、可靠地在 Apache Kafka 和其他系统之间传输数据的工具框架。

Kafka Connect 功能特性:

  • 通用框架 Kafka Connect 为其他系统与 Kafka 集成提供了标准化,简化了 Connector 开发、部署和管理。
  • 分布式和独立模式 Kafka Connect 扩展到支持整个组织的大型集中管理服务,或者缩减到开发、测试和小型生产部署。
  • REST 接口 通过一个易于使用的 REST API 向 Kafka Connect 集群提交和管理连接器。
  • 自动偏移量管理 只需要从连接器获得一点点信息,Kafka Connect 就可以自动管理偏移提交过程,因此连接器开发人员不必担心连接器开发中容易出错的部分。
  • 默认分布式可伸缩 Kafka Connect 基于现有的组管理协议,可以添加更多的 Worker 来扩展 Kafka Connect 集群。
  • 流/批处理集成 Kafka Connect 是连接流式和批处理数据系统的理想解决方案。

Concept

  • Connectors 通过管理任务来协调数据流的高级抽象
  • Tasks 实现从 Kafka 读取数据或将数据写入 Kafka
  • Workers 用于执行 Connectors 与 Tasks 的服务进程
  • Converters 用于实现在 Kafka 与目标/源系统之间的数据转换程序
  • Transforms 用于转换通过 Connect 的每条数据的简单逻辑实现
  • Dead Letter Queue 死信队列,用于处理连接器错误

Connectors

Kafka Connect 中的 Connector 定义数据应该从哪里复制或复制到哪里去。Connector 实例是一个逻辑工作,负责管理 Kafka 和另一个系统之间的数据复制。

连接器模型

connector-model-simple

Configuration

Connector Configuration 配置是简单的键值映射。

对于独立(Standalone)模式,它们在属性文件中定义,并在命令行上传递给 Connect 进程。

在分布式(Distributed)模式,它们将包含在创建(或修改) Connector 的 REST 请求的 JSON 有效负载中。

通用的配置:

  • name Connector 的唯一名称。再次尝试注册相同名称将会失败。
  • connector.class Connector 的 Java 类,
    (多种格式,如org.apache.kafka.connect.file.FileStreamSinkConnector、FileStreamSink、FileStreamSinkConnector)
  • tasks.max 为此 Connector 应创建的最大任务数。如果 Connector 无法实现此级别的并行性,则可以创建较少的任务。
  • key.converter - (可选) 覆盖由 Worker 设置的默认键转换器。
  • value.converter - (可选) 覆盖由 Worker 设置的默认值转换器。

Sink Connector 还需要设置下列其中一个配置项:

  • topics 用逗号分隔的主题列表,用作此 Connector 的输入。
  • topics.regex 用 Java 正则表达式匹配到的主题,用作此 Connector 的输入。

更多的配置项,将由具体的 Connector 提供文档说明。

Tasks

Task 是用于连接的数据模型中的主要参与者。每个 Connector 实例协调一组实际复制数据的 Task。通过允许 Connector 将单个工作分解成多个 Task,Kafka Connect 对并行和可伸缩的数据复制提供了很好的支持,且只需要很少的配置。这些 Task 中没有存储状态。Task 状态存储在 Kafka 的特殊主题 config.storage 和 status.storage,并由关联连接器管理。因此,可以在任何时候启动、停止或重新启动 Task,以提供一个弹性的、可伸缩的数据管道。

data-model-simple

Task Rebalancing

当 Connector 首次提交给集群时,Worker 将重新平衡集群中的全部 Connectors 集及其 Tasks,以便每个 Worker 的工作量大致相同。

当 Connector 增加或减少它们所需的 Task 数量时,或者当 Connector 的配置更改时,也使用同样的重新平衡过程。

当一个 Worker 失败时,Task 会在活跃的 Worker 之间重新平衡。

当 Task 失败时,不会触发 Task 失败的再平衡,因为 Task 失败被认为是一个特例。因此,失败的 Task 不会由框架自动重新启动,应该通过 REST API 重新启动。

下图为 Worker 失败时 Task 故障转移示例

task-failover

Workers

Connectors 和 Tasks 是必须在进程中执行的逻辑工作单元,Kafka Connect 称这些进程为 Worker。

有两种类型的 Worker: StandaloneDistributed

Standalone

Standalone 模式是最简单的模式,其中单个进程负责执行所有连接器和任务。因为它是一个单一的进程,所以只需要最少的配置。在开发过程中,以及在某些只有一个进程有意义的情况下,如从主机收集日志时,Standalone 模式非常方便。但是,由于只有一个进程,它的功能也更有限:可伸缩性仅限于单个进程,除了添加到单个进程的任何监视之外,没有容错能力。

Distributed

Distributed 模式为 Kafka Connect 提供了可伸缩性和自动容错能力。在 Distributed 模式下,使用相同的 group.id 启动许多辅助进程,它们自动协调来调度所有可用辅助 Workers 之间的Connectors 和 Tasks 的执行。如果添加了一个 Worker,关闭了一个 Worker,或者一个 Worker 意外地失败了,其余的 Worker 会检测到这一点,并自动协调在更新的可用 Workers 集中重新分配 Connectors 和 Tasks。请注意它们与 Consumer Group 再平衡的相似之处,在掩护之下,Worker 正在利用 Consumer Group 来协调和平衡。

worker-model-basics

Converters

当写入或从 Kafka 读取数据时,必须有一个 Kafka Connect 部署支持特定的数据格式。任务使用转换器将数据格式从字节转换为 Connect 内部数据格式,反之亦然。

  • AvroConverter io.confluent.connect.avro.AvroConverter 使用 Schema Registry
  • ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter 使用 Schema Registry
  • JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter 使用 Schema Registry
  • JsonConverter org.apache.kafka.connect.json.JsonConverter (没有模式注册表):与结构化数据一起使用
  • StringConverter org.apache.kafka.connect.storage.StringConverter简单的字符串格式
  • ByteArrayConverter org.apache.kafka.connect.converters.ByteArrayConverter提供不进行转换的“直通”选项

Converter Graphic

converter-basics

Transforms

单个消息转换(Single Message Transformations,下列简称SMT)应用于通过 Connect 的消息。

SMT 在源连接器(Source Connector)生成入站消息之后进行转换,但在它们被写入Kafka之前进行转换。
SMT 在出站消息发送到接收器连接器(Sink Connecors)之前对其进行转换。

可以在连接器配置中指定转换链。

  • transforms 转换别名列表,指定应用转换的顺序。
  • transforms.$alias.type 转换的完全限定类名。
  • transforms.$alias.$transformationSpecificConfig 转换的配置属性。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# connector common config
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
# transforms chain: MakeMap--InsertSource
transforms=MakeMap, InsertSource
# config for MakeMap transform
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
# config for InsertSource transform
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

Kafka Connect 包含了一些广泛适用的数据和路由转换:

  • InsertField 使用静态数据或记录元数据添加字段
  • ReplaceField 过滤或重命名字段
  • MaskField将字段替换为类型的有效null值(如 0,false 及空字符串等等)
  • ValueToKey 将记录键替换为由记录值中的字段子集形成的新键
  • HoistField 将整个事件作为单个字段包装在结构或映射中
  • ExtractField从 Struct 和 Map 中提取一个特定的字段,并在结果中只包含这个字段
  • SetSchemaMetadata 修改模式名称或版本
  • TimestampRouter基于原始主题和时间戳修改记录的主题。当使用接收器时,需要根据时间戳写入不同的表或索引
  • RegexRouter 基于原始主题、替换字符串和正则表达式修改记录的主题

Dead Letter Queue

出现无效记录的原因有很多。一个例子是,当一条记录以 JSON 格式序列化到达 Sink Connector 时,但 Sink Connector 配置要求为 Avro 格式。当 Sink Connector 无法处理无效记录时,将根据 Connector 配置属性 errors.tolerance (错误容忍)处理错误。Dead Letter Queue 仅适用于 Sink Connector。

此配置属性有两个有效值: none (默认值)或 all

errors.tolerance 设置为 none 时,错误或无效记录将导致 Connector Task 立即失败,并且 Connector 进入失败状态。要解决此问题,需要查看 Kafka Connect Worker 日志以找出导致故障的原因,进行纠正,然后重新启动 Connector。

errors.tolerance 设置为 all 时,将忽略所有错误或无效记录,并继续处理。Connect Worker 日志中没有写入错误。为了确定记录是否失败,必须使用内部度量或计算源处的记录数量,并将其与所处理的记录数量进行比较。

有一个错误处理特性,它将把所有无效的记录路由到一个特殊的主题并报告错误。该主题包含 Sink Connector 无法处理的记录的 Dead Letter Queue

SinkConnector 配置:

1
2
errors.tolerance = all
errors.deadletterqueue.topic.name = <dead-letter-topic-name>

即使 Dead Letter Queue 包含失败的记录,它也没有显示原因。可以添加以下附加配置属性以包含失败的记录头信息。

1
errors.deadletterqueue.context.headers.enable = true

当该参数设置为 true (默认为false)时,记录头被添加到死信队列中。然后可以查看记录头,并确定记录失败的原因。错误也被发送到 Connect Reporter。为了避免与原始记录头冲突,Dead Letter Queue 上下文头键以 _connect.errors 开头。

Deployment Configuration

Common(Standalone & Distributed)

  • bootstrap.servers Kafka 服务器列表。
  • key.converter 用于在 Kafka 连接格式和写入 Kafka 的序列化格式之间转换的转换器类。它控制了写入或从 Kafka 读取的消息中的的格式,并且由于它独立于连接器,它允许任何连接器使用任何序列化格式。常见的格式包括JSON和Avro。
  • value.converter 用于在 Kafka 连接格式和写入 Kafka 的序列化格式之间转换的转换器类。它控制了写入或从 Kafka 读取的消息中的的格式,并且由于它独立于连接器,它允许任何连接器使用任何序列化格式。常见的格式包括JSON和Avro。

Standalone

  • offset.storage.file.filename 文件中存储偏移量数据

Distributed

  • group.id (默认为 connect-cluster`) 集群的唯一名称,用于形成Connect集群组;注意,这一定不能与消费组id冲突
  • config.storage.topic (默认为connect-configs) 用于存储 Connector 和 Task 配置的主题;注意,这应该是一个单独的分区,高度复制,压缩主题(您可能需要手动创建主题,以确保正确的配置,因为自动创建的主题可能有多个分区,或者自动配置为删除而不是压缩)
  • offset.storage.topic (默认为 connect-offsets) 用于存储偏移量的主题;这个主题应该有许多分区,可以复制,并配置为压缩
  • status.storage.topic (默认为 connect-status) 用于存储状态的主题;这个主题可以有多个分区,应该进行复制和配置以进行压缩

Connect Plugins

Kafka Connect 被设计为可扩展的,因此开发人员可以创建自定义连接器(Connectors),变形器(Transforms)或转换器(Converters),用户可以安装和运行它们。

Kafka Connect 插件是一组 JAR 文件,其中包含一个或多个 Connectors、Transforms 或 Converters 的实现。Connect 将每个插件彼此隔离,这样一个插件中的库就不会受到任何其他插件中的库的影响。这在混合和匹配来自多个供应商的连接器时非常重要。

一个 Kafka Connect 插件可以是:

  • 文件系统上的一个目录,包含所有需要的 JAR 文件和插件的第三方依赖项。这是最常见和首选的。

  • 一个包含插件及其第三方依赖项的所有类文件的 uber JAR。

    (Über is the German word for above or over (it’s actually cognate with the English over). https://stackoverflow.com/questions/11947037/what-is-an-uber-jar)

一个插件不应该包含 Kafka Connect 运行时提供的任何库。

Kafka Connect 在 Worker Configuration 的 plugin.path 属性中使用一个以逗号分隔的目录路径列表定义的插件路径查找插件。

当启动 Connect Worker 时,每个 Worker 都会在插件路径的目录中发现所有 Connectors、Transforms 和 Converters 插件。当使用 Connectors、Transforms 和 Converters 时,Connect Worker 首先从各自的插件加载类,然后是 Kafka Connect 运行时和 Java 库。

Connect API

Java API

本节列举了一般需要重写的方法,更详尽的 Java API 请参见

http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/connect

SinkConnector

  • public abstract void start(Map<String,String> props)
    启动这个连接器。这个方法只会在一个干净的连接器上被调用,也就是说,它要么刚刚被实例化和初始化,要么已经调用了stop()。
  • public abstract void stop()
    停止这个连接器。
  • public abstract Class<? extends Task> taskClass()
    返回此连接器的任务实现。
  • public abstract ConfigDef config()
    定义连接器的配置。
  • public abstract List<Map<String,String>> taskConfigs(int maxTasks)
    根据当前配置返回任务的一组配置。
  • String version()
    获取该连接器的版本。

SinkTask

  • public abstract void start(Map<String,String> props)
    启动任务。这应该可以处理任何配置解析和任务的一次性设置。
  • public abstract void put(Collection<SinkRecord> records)
    将事件记录数据传递到此任务。
  • public abstract void stop()
    执行清理操作以停止此任务。
  • String version()
    获取此任务的版本。通常这应该与对应的连接器类的版本相同。

REST API

Default Port: 8083

  • GET /connectors 返回连接器的列表。
  • POST /connectors 创建一个新的连接器。请求主体应该是一个JSON对象,其中包含一个字符串 name 字段和一个带有连接器 config 参数的对象配置字段。
  • GET /connectors/{name}获取指定连接器的信息。
  • GET /connectors/{name}/config 获取指定连接器的配置参数。
  • PUT /connectors/{name}/config 更新指定连接器的配置参数。
  • GET /connectors/{name}/status 获取连接器的当前状态,包括它是否正在运行、失败、暂停等、它被分配给哪个工作器、失败时的错误信息以及它所有任务的状态。
    • UNASSIGNED: Connector/Task 暂未分配到 Worker。
    • RUNNING: Connector/Task 正在运行。
    • PAUSED: Connector/Task 在管理上已暂停。
    • FAILED: Connector/Task 运行失败,通常是发生了异常。
    • DESTROYED: Connector/Task 在管理上已被删除,将被移除集群。
  • GET /connectors/{name}/tasks 获取连接器当前正在运行的任务列表。
  • GET /connectors/{name}/tasks/{taskid}/status 获取任务的当前状态,包括它是否正在运行、失败、暂停等,它被分配给哪个 Worker,以及失败时的错误信息。
  • PUT /connectors/{name}/pause 暂停连接器及其任务,这将停止消息处理,直到连接器恢复。
  • PUT /connectors/{name}/resume 恢复暂停的连接器(如果连接器没有暂停,则不做任何操作)。
  • POST /connectors/{name}/restart 重新启动连接器(通常是因为它失败了)。
  • POST /connectors/{name}/tasks/{taskId}/restart 重新启动单个任务(通常是因为它失败了)。
  • DELETE /connectors/{name} 删除连接器,停止所有任务并删除其配置。
  • GET /connector-plugins 返回一个Kafka Connect集群中安装的连接器插件列表。
  • PUT /connector-plugins/{connector-type}/config/validate 根据配置定义验证提供的配置值。这个API执行每个配置验证,并在验证过程中返回建议值和错误消息。

Ops

Kafka Connect Logging

https://docs.confluent.io/platform/current/connect/logging.html#

  • Check log levels

    1
    curl -Ss http://localhost:8083/admin/loggers | jq
  • Change the log level for a specific logger

    1
    2
    3
    curl -s -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/admin/loggers/city.icos.icmsevent.connect \
    -d '{"level": "DEBUG"}' | jq '.'

Lenses.io

Kakfa Connect UI

https://github.com/lensesio/kafka-connect-ui

Kafka Connect CLI

https://github.com/lensesio/kafka-connect-tools

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
connect-cli 1.0.8
Usage: connect-cli [ps|get|rm|create|run|diff|status|plugins|describe|validate|restart|pause|resume] [options] [<connector-name>]

--help
prints this usage text
-e <value> | --endpoint <value>
Kafka Connect REST URL, default is http://localhost:8083/
-f <value> | --format <value>
Format of the config, default is PROPERTIES. Valid options are 'properties' and 'json'.

Command: ps
list active connectors names.

Command: get
get the configuration of the specified connector.

Command: rm
remove the specified connector.

Command: create
create the specified connector with the config from stdin; the connector cannot already exist.

Command: run
create or update the specified connector with the config from stdin.

Command: diff
diff the specified connector with the config from stdin.

Command: status
get connector and it's task(s) state(s).

Command: plugins
list the available connector class plugins on the classpath.

Command: describe
list the configurations for a connector class plugin on the classpath.

Command: pause
pause the specified connector.

Command: restart
restart the specified connector.

Command: resume
resume the specified connector.

Command: validate
validate the connector config from stdin against a connector class plugin on the classpath.

Command: task_ps
list the tasks belonging to a connector.

Command: task_status
get the status of a connector task.

Command: task_restart
restart the specified connector task.

Reference

http://kafka.apache.org/24/documentation.html#connect

https://docs.confluent.io/5.5.0/connect/index.html