RedisGears 版本: v1.0
Home
RedisGears - Redis 中数据处理的可编程引擎。
RedisGears 是一个 Serverless 的引擎,用于处理 Redis 中的事务、批处理和事件驱动的数据。它是一个动态的执行函数的框架,而这些函数反过来又实现了 Redis 的数据流,同时(几乎)完全抽象了数据的分布和部署的选择(如单机vs集群,OSS vs企业级)。函数可以用不同的语言实现,包括 Python and C APIs。
Diagram Components
1 | +---------------------------------------------------------------------+ |
Quickstart
RedisGears Cluster
1 | docker run -p 30001:30001 -p 30002:30002 -p 30003:30003 redislabs/rgcluster:latest |
RedisGears Standalone
1 | run |
RG
是 GearsBuilder()
的别名。
Introduction
Reader
- KeysReader
Keys Pattern
1 | RG.PYEXECUTE "GearsBuilder().run('person:*')" |
Function
执行复杂函数
1 | cat mygear.py | docker exec -i redisgears redis-cli -x RG.PYEXECUTE |
filter
map
1
2
3
4
5gb = GearsBuilder() # declare a function builder
gb.map(lambda x: int(x['value']['age'])) # map each record to just an age
gb.run('person:*') # run it
# Expected result: [70, 14]accumulate
Blocking vs. Nonblocking Execution
UNBLOCKING
非阻塞模式执行
1 | 127.0.0.1:6379> RG.PYEXECUTE "GB().run()" UNBLOCKING |
RG.DUMPEXECUTIONS
执行状态
1 | 127.0.0.1:6379> RG.DUMPEXECUTIONS |
RG.GETRESULTS
获取结果
1 | 127.0.0.1:6379> RG.GETRESULTS 0000000000000000000000000000000000000000-0 |
RG.GETRESULTSBLOCKING
转入阻塞模式,等待结果
Event Processing
默认是非阻塞执行
1 | gb.register() |
Writing Data
execute()
RedisGears Python API 附带 execute ()函数,该函数允许在数据库中执行任意 Redis 命令。
Cluster
1 | docker run -d --name rgcluster -p 30001:30001 -p 30002:30002 -p 30003:30003 redislabs/rgcluster:latest |
data.txt
1 | SET foo bar |
Distributed Processing
当 RedisGears 在集群中运行时,默认情况下它将在集群的所有分片上执行函数。通过向 run ()操作提供 collect = False 参数来禁用。
1 | 127.0.0.1:30001> RG.PYEXECUTE "GB().run(collect=False)" |
MapReduce
Cluster Map and Reduce
accumulate-collect,这种情况只会将每个分片的最大值收集起来
1 | def maximum(a, x): |
accumulate-collect-accumulate,这种情况会将所有分片的最大值收集起来后再计算最大值,得到最佳结果(1 个最大值)
1 | def maximum(a, x): |
aggregate()
RedisGears Python API 包含 aggregate()
操作,该操作将accumulate-collect-accumulate
步骤包装为单个步骤:
1 | Aggregated maximum version |
aggregate()
接受三个参数: 第一个是累加器的零值,另外两个是对累加函数的回调,这些函数将分别在本地和全局执行。
Local vs. Global
1 | localgroupby ## 由每个分片的引擎在本地执行 |
Diagram(localgroupby, groupby)
1 | +----------------------+ +----------------------+ +----------------------+ |
当绝对需要时,函数可以使用任意键对集群中的数据进行重新分区。当数据被重新分区时,每个工作线程被分配一个记录键的子集,并且这些键从所有其他工作线程传递到它。
Mock(localgroupby, repartition, localgroupby)
1 | def fname(x): |
RedisGears 的 Python API 包含了 aggregateby ()操作,它等同于使用 GB () . localgroupby () . repartition () . localgroupby ()流。
Reference
Runtime
Python RedisGears 函数使用嵌入式 Python 解释器运行。每个函数都使用一个单独的子解释器。所有函数共享相同的环境和依赖关系。导入的环境有几个默认值。
Python Interpreter
embeds python 3.7.2+
Environment
解释器的环境可以用任何依赖的包进行扩展,这些包以后可以被它们各自的子解释器中的函数导入和使用。
GearsBuilder
默认环境提供的函数上下文构造器。
execute
执行命令函数,默认环境提供的执行 Redis 命令的函数。
Python API
1 | def execute(command, *args) |
Examples
1 | # Pings the server (reply should be 'PONG') |
atomic
原子操作函数,上下文通过阻塞主 Redis 进程来确保它中的所有操作都以原子的方式执行。
Python API
1 | class atomic() |
Examples
1 | # Increments two keys atomically |
configGet
这个函数获取 RedisGears 配置选项的当前值。
gearsConfigGet
这个函数获取RedisGears配置选项的当前值,如果该键不存在,则返回默认值。
hashtag
这个函数返回一个 hashtag,该 hashtag 映射到本地引擎的分片所服务的最低哈希槽。换句话说,它作为一个 hashtag 在集群中进行分区时非常有用。
log
这个函数打印一条消息到Redis的日志中。
1 | def log(message, level='notice') |
level
取值 debug
,verbose
,notice
,warning
Functions
RedisGears 函数是数据流中处理步骤的正式描述。
1 | +------------+ |
一个函数总是:
- 始于一个 Reader
- 操作零个或多个 Records
- 包含零个或多个 Operations (Step)
- 终于一个 Action
- 返回零个或多个 Results
- 可能会产生更多错误。
Execution
一个函数由 RedisGears 引擎以以下两种方式之一执行:
Batch
立即执行,对已有数据执行Event
由新事件及其数据触发执行
函数的执行方式是由它的动作决定的。有两种类型的行动:
Run
批量运行函数Register
注册由事件触发的函数
当以批处理或事件的形式执行时,函数的上下文由引擎管理。除了函数的逻辑之外,上下文还包括内部执行步骤、状态、统计数据、结果和遇到的任何错误。
Execution ID
每个函数的执行都在内部分配了一个惟一的值,称为 Execution ID
。
ID 是由两部分组成的字符串值,以“-”分隔,如下所示:
Shard ID
集群中一个 Shard 的标识符,长度为40个字节Sequence
一个不断增加的计数器
Execution ID
示例:
Redis Standalone Mode: 0000000000000000000000000000000000000000-1
Redis Cluster Mode: a007297351b007297351c007297351d007297351-1
Execution Plan
在执行该功能之前,引擎会生成一个执行计划。该计划包含引擎执行功能将采取的基本步骤。
Execution Parallelization
在集群中执行时,执行计划由启动器生成。然后,默认情况下,它将在所有碎片上共享并并行执行。
发起者的协调器协调分布式操作。
Execution Status
Execution Status
描述了功能当前的执行状态。它可以是以下几种之一:created
已创建running
正在执行done
执行完成aborted
执行被终止pending_cluster
启动器正在等待所有 Worker 执行完成pending_run
Worker 挂起等待启动器确认执行pending_receive
启动器在接收执行时挂起 Worker 的确认(ACK)pending_termination
Worker 正在等待来自启动器的终止消息
下图演示了状态转换:
1 | Initiator Worker |
Registration
事件驱动函数的表示称为注册(Registration)。
注册被持久化在 Redis 的快照中,也就是 RDB 文件中。这允许在发生故障时恢复数据库中的数据和事件处理程序。
Registration ID
每个注册都有一个唯一的内部标识符,称为 Registration ID
。它以与 Execution ID
相同的方式生成,尽管看起来相同,但两者不应该混淆。
Context Builder
Python 中的 RedisGears 函数总是以一个上下文构建器——GearsBuilder
类开始。
Actions
动作 (Action
) 是一种特殊类型的操作。它总是函数的最后一步。
Run
Run
动作作为批处理(Batch
)运行函数。函数只执行一次,一旦读取器耗尽数据就退出。
Python API
1 | class GearsBuilder.run(arg=None, convertToStr=True, collect=True) |
Arguments
- arg
- 一个类似于全局的模式,用于
KeysReader
和KeysOnlyReader
StreamReader
的密钥名称- 用于
PythonReader
的 Python 生成器
- 一个类似于全局的模式,用于
- convertToStr 当为
true
时将在流程末尾添加一个map
操作,用于字符串化记录 - collect 当为
true
时将在流程末尾添加一个collect
操作
Register
Register 操作将函数注册为事件处理程序。每次有事件出现时都执行该函数。每次执行时,函数对事件的数据进行操作,完成后将暂停,直到新事件调用它。
Python API
1 | class GearsBuilder.register(convertToStr=True, collect=True, mode='async', onRegistered=None) |
Arguments
- convertToStr 当为
true
时将在流程末尾添加一个map
操作,用于字符串化记录 - collect 当为
true
时将在流程末尾添加一个collect
操作 - mode 触发函数的执行方式。可以是:
async
在整个集群中异步执行async_local
执行将是异步的,并且仅限于处理分片sync
本地同步执行
- onRegistered 一个函数回调函数,在每个shard上注册函数时被调用。这是初始化非序列化对象(如网络连接)的好地方。
注意,可以将更多参数传递给 register函数,这些参数取决于 reader,详见 Readers 说明。
Results
函数的执行产生零条或多条结果记录。结果由函数最后一次操作和最后一次操作之前的任何记录组成。
结果存储在函数的执行上下文中。
Readers
Reader 是任何 RedisGears 函数必须执行的第一步,每个函数都有一个 Reader。Reader 读取数据并从中生成输入记录(Input)。输入记录由函数使用。
Reader | Output | Batch | Event |
---|---|---|---|
KeysReader | Redis keys and values | Yes | Yes |
KeysOnlyReader | Redis keys | Yes | No |
StreamReader | Redis Stream messages | Yes | Yes |
PythonReader | Arbitrary | Yes | No |
ShardsIDReader | Shard ID | Yes | No |
CommandReader | Command arguments | No | Yes |
Operations
操作(Operation)是 RedisGears 函数的构建块。可采用不同的操作类型实现多种结果,满足各种数据处理需求。
操作可以有零个或多个参数来控制其操作。根据操作的类型参数,可以是语言原生数据类型和函数回调。
Operation | Description | Type |
---|---|---|
Map | Maps 1:1 | Local |
FlatMap | Maps 1:N | Local |
ForEach | Does something for each record | Local |
Filter | Filters records | Local |
Accumulate | Maps N:1 | Local |
LocalGroupBy | Groups records by key (many-to-less) extractor + reducer |
Local |
Limit | Limits the number of records | Local |
Collect | Shuffles all records to one engine | Global |
Repartition | Shuffles records between all engines | Global |
GroupBy | Groups records by key (many-to-less) repartition +extractor +reducer |
Sugar |
BatchGroupBy | Groups records by key (many-to-less)repartition +extractor +batch reducer |
Sugar |
Sort | Sorts records aggregate +local sort +flatmap |
Sugar |
Distinct | Makes distinct records | Sugar |
Aggregate | Aggregates records (many-to-one) accumulator +collect +accumulator |
Sugar |
AggregateBy | Aggregates records by key (many-to-less) extractor +reducer +repartition +extractor +reducer |
Sugar |
Count | Counts records aggregate (local counting + global suming) |
Sugar |
CountBy | Counts records by key aggregate (extractor + local counting + global suming) |
Sugar |
Avg | Computes the average aggregate (global sum + global count + local map) |
Sugar |
Terminology
Local
本地(Local)操作的执行是在以独立模式或集群模式部署的 RedisGears 引擎中执行的。
当在独立模式下使用时,只有一个引擎在本地执行所有数据上的所有操作。
当再集群模式下使用时,该操作将分布到所有分片。每个分片的引擎也在本地执行操作。然而,分片的引擎只能处理集群对它们进行分区的数据。
Global
全局(Global)操作只在集群 RedisGears 环境的上下文中相关。这些是收集(collect)和重分区(repartition)操作,用于在分片之间转移记录。
Sugar
Sugar 操作是实用程序操作。这些是通过基本操作和相关回调在内部实现的。
Callback
Callback 用于在 API 所使用的语言中调用函数(回调函数)。
Extractor
Extractor 是一个回调函数,它接收输入记录(record)作为参数。它返回从记录中提取的值(value)。返回值应该是本机字符串(Native String)。
Mapper
Mapper 是一个回调函数,它接收输入记录(record)作为参数。它必须返回一个输出记录(record)。
Expander
Expander 是一个回调函数,它接收输入记录(record),必须返回一个或多个输出记录(record)。
Processor
Processor 是一个回调函数,它接收输入记录(record),它不该返回任何东西。
Filter
Filter 是一个回调函数,它接收输入记录(record),它必须返回一个布尔值(boolean)。
Accumulator
Accumulator 是一个回调函数,它接收输入记录和累加器变量。它将输入聚集到累加器变量中,累加器变量存储函数调用之间的状态。函数必须在每次调用后返回累加器的更新值。
Reducer
Reducer 是一个回调函数,它接收一个键、一个输入和一个称为累加器的变量。它的执行类似于 Accumulator 回调,不同之处在于它为每个 reduced 的键维护一个累加器。
Batch Reducer
Batch Reducer 是一个回调函数,它接收一个键和一个输入记录列表。它的执行类似于 reducer 回调,不同之处在于它的输入是一个记录列表,而不是单个记录。它将为这些记录返回一个累加器值。
Commands
RedisGears 是通过 Redis 客户端发送给服务器的命令来运行的。
Registration
RG.DUMPREGISTRATIONS
用来输出函数注册列表
RG.UNREGISTER
用来取消某个函数的注册。
EXECUTION
RG.PYEXECUTE ““ [UNBLOCKING] [REQUIREMENTS “ …”]
用于执行 Python 函数
RG.DUMPEXECUTIONS
用于输出函数执行列表。执行列表的长度由
MaxExecutions
配置选项限制。
RG.GETEXECUTION [SHARD|CLUSTER]
用于获取函数执行细节:执行计划,步骤等
RG.GETRESULTS
用于获取函数的执行结果及错误信息
RG.GETRESULTSBLOCKING
取消函数在后台(UNBLOCKING)执行。取消执行的客户端被阻塞,直到函数执行结束,然后发送任何结果和错误。
RG.DROPEXECUTION
用于删除一个函数的执行。
RG.ABORTEXECUTION
在运行过程中中止函数的执行。
Trigger
RG.TRIGGER [arg …]
用来触发已注册的 CommandReader 函数的执行。
Global
RG.CONFIGGET […]
返回一个或多个内置配置或用户定义选项的值。
RG.CONFIGSET […]
设置一个或多个内置配置或用户定义选项的值。
RG.PYSTATS
从 Python 解释器返回内存使用统计信息。
RG.PYDUMPREQS (version 1.0.1)
返回所有可用 Python 依赖的列表(包含关于每个依赖的信息)。
RG.INFOCLUSTER
显示集群信息。
RG.REFRESHCLUSTER
刷新节点的集群拓扑视图。需要在集群的每个节点执行
Configuration
RedisGears 提供了配置选项来控制它的操作。这些选项可以在模块启动时设置,在某些情况下也可以在运行时设置。
Bootstrap Configuration 在加载 Redis Module 时配置,Runtime Configuration 在运行时配置(详见命令:RG.CONFIGSET <key> <value> [...]
、RG.CONFIGGET <key> [...]
)。
配置项 | 说明 | 期望值 | 默认值 | 运行时可配置性 |
---|---|---|---|---|
MaxExecutions | MaxExecutions 配置选项控制将保存在执行列表中的最大执行次数。 一旦达到这个阈值,就会按照创建顺序(FIFO)从列表中删除旧的执行。 只有已经完成的执行(例如“完成”或“中止”状态)被删除。 |
Integer | 1000 | Supported |
MaxExecutionsPerRegistration | MaxExecutionsPerRegistration 配置选项控制每个注册保存在列表中的最大执行次数。 一旦达到这个阈值,该注册的旧执行将按创建顺序(FIFO)从列表中删除。 只有已经完成的执行(例如“完成”或“中止”状态)被删除。 |
Integer | 100 | Supported |
ProfileExecutions | ProfileExecutions 配置选项控制是否对执行进行分析。对性能损耗较大,建议用于调试操作。 | 0 (disabled) 1 (enabled) |
0 | Supported |
PythonAttemptTraceback | PythonAttemptTraceback 配置选项控制引擎是否试图为 Python 运行时错误产生堆栈跟踪。 | 0 (disabled) 1 (enabled) |
1 | Supported |
DownloadDeps | DownloadDeps 配置选项控制 RedisGears 是否会尝试下载缺少的 Python 依赖项。 | 0 (disabled) 1 (enabled) |
1 | Not Supported |
DependenciesUrl | DependenciesUrl 配置选项控制 RedisGears 试图从何处下载其 Python 依赖项。 | URL-like string | 与RedisGears版本有关 | Not Supported |
DependenciesSha256 | DependenciesSha256 配置选项指定 Python 依赖项的 SHA256 哈希值。 在下载依赖项之后,将验证此值,如果不匹配,将停止服务器的启动。 |
String | 与RedisGears版本有关 | Not Supported |
PythonInstallationDir | PythonInstallationDir 配置选项指定了 RedisGears 的 Python 依赖项的路径。 | String | /var/opt/redislabs/modules/rg | Not Supported |
CreateVenv | CreateVenv 配置选项控制引擎是否创建虚拟 Python 环境。 | 0 (disabled) 1 (enabled) |
0 | Not Supported |
ExecutionThreads | ExecutionThreads 配置选项控制将要执行的线程数。 | Integer > 0 | 3 | Not Supported |
ExecutionMaxIdleTime | ExecutionMaxIdleTime 配置选项控制中止执行之前的最大空闲时间(以毫秒为单位)。空闲时间意味着执行没有进展。空闲时间的主要原因是在等待来自另一个失败(即崩溃)碎片的记录时被阻塞的执行。在这种情况下,执行将在指定的时间限制后中止。当执行再次开始进行时,将重置空闲计时器。 | Integer > 0 | 5 seconds | Supported |
PythonInstallReqMaxIdleTime 1.0.1 |
PythonInstallReqMaxIdleTime 配置选项控制 Python 依赖安装中止之前的最大空闲时间(以毫秒为单位)。“空闲时间”表示安装没有进展。空闲时间的主要原因与ExecutionMaxIdleTime 相同。 | Integer > 0 | 30000 | Supported |
SendMsgRetries 1.0.1 |
SendMsgRetries 配置选项控制在 RedisGears 的分片之间发送消息的最大重试次数。当消息被发送,而 shard 在确认之前断开连接,或者当它返回一个错误时,该消息将被重新发送,直到达到这个阈值。设置为0表示不限制重试次数。 | Integer >= 0 | 3 | Supported |
Examples
RedisGears Examples: https://oss.redislabs.com/redisgears/examples.html
Clients
RedisGears 是一个 Redis 模块,所以需要 Redis 客户端来操作它。任何支持发送原始 Redis 命令的客户端都可以使用。
Redis Clients: https://redis.io/clients
RedisGears-specific clients: https://github.com/RedisGears/redisgears-py
Design
Isolation Technics - 隔离技术
Python
- 全局字典(当前版本支持)
- 子解释器(当前版本出现不兼容某些库而未启用,后续版本提供开关支持
Samples
环境搭建
RedisGears Cluster
1 | ## RedisGears Cluster |
MySQL
1 | ## mysql |
1 | # root |
Monitor UI
1 | ## Monitor UI |
Run Recipe
1 | ## run recipe |
example.py (WriteBehind + RGWriteThrough)
1 | from rgsync import RGWriteBehind, RGWriteThrough |
演示操作
WriteBehind
hmset
(不同操作完成后,在 mysql rgdb.person 表中查看数据同步结果)
1 | hmset person:1 first_name zhang last_name san age 21 |
正好一次(源码有缺陷,mysql connector
修复见 https://gitlab.com/shankai/rgsync
)
1 | CREATE TABLE IF NOT EXISTS `exactlyonce`( |
example.py
文件内修改 personsConnector 实例化,添加 exactlyOnceTableName
参数值
1 | personsConnector = MySqlConnector(connection, 'person', 'id', 'exactlyonce') |
ACK
1 | # |
WriteThrough
1 | HSET __{person:1} first_name foo last_name bar age 20 # =123456 |
(完)