RedisGears 笔记

RedisGears 版本: v1.0

Home

RedisGears - Redis 中数据处理的可编程引擎。

RedisGears 是一个 Serverless 的引擎,用于处理 Redis 中的事务、批处理和事件驱动的数据。它是一个动态的执行函数的框架,而这些函数反过来又实现了 Redis 的数据流,同时(几乎)完全抽象了数据的分布和部署的选择(如单机vs集群,OSS vs企业级)。函数可以用不同的语言实现,包括 Python and C APIs。

Diagram Components

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
+---------------------------------------------------------------------+
| Redis Server +--------------------------------------+ |
| | RedisGears Module | |
| +----------------+ | | |
| | Data | Input | +------------+ +-------------------+ | |
| | +-------->+ | Function | | APIs | | |
| | Key1 : Value1 | | | +--------+ | | C, Python, ... | | |
| | Key2 : Value2 | Output | | | Reader | | +-------------------+ | |
| | Key3 : Value3 <---------+ | +---+----+ | +-------------------+ | |
| | ... | | | v | | Redis commands | | |
| +----------------+ | | +---+----+ | | Gears admin & ops | | |
| | | | Step 1 | | +-------------------+ | |
| | | +---+----+ | +-------------------+ | |
| +----------------+ | | v | | Coordinator | | |
| | Events | | | +---+----+ | | Cluster MapReduce | | |
| | | Trigger | | | Step 2 | | +-------------------+ | |
| | Data update +-------->+ | +---+----+ | +-------------------+ | |
| | Stream message | | | v | | Engine | | |
| | Time interval | | | ... | | Runtime execution | | |
| | ... | | +------------+ +-------------------+ | |
| +----------------+ +--------------------------------------+ |
+---------------------------------------------------------------------+

Quickstart

RedisGears Cluster

1
docker run -p 30001:30001 -p 30002:30002 -p 30003:30003 redislabs/rgcluster:latest

RedisGears Standalone

1
2
3
4
5
6
# run 
docker run -d --name redisgears -p 6379:6379 redislabs/redisgears:latest
# redis-cli
docker exec -it redisgears redis-cli
# function
RG.PYEXECUTE "GearsBuilder().run()"

RGGearsBuilder() 的别名。

Introduction

Reader

  • KeysReader

Keys Pattern

1
RG.PYEXECUTE "GearsBuilder().run('person:*')"

Function

执行复杂函数

1
cat mygear.py | docker exec -i redisgears redis-cli -x RG.PYEXECUTE

  • filter

  • map

    1
    2
    3
    4
    5
    gb = GearsBuilder()                       # declare a function builder
    gb.map(lambda x: int(x['value']['age'])) # map each record to just an age
    gb.run('person:*') # run it

    ## Expected result: [70, 14]
  • accumulate

Blocking vs. Nonblocking Execution

UNBLOCKING

非阻塞模式执行

1
2
127.0.0.1:6379> RG.PYEXECUTE "GB().run()" UNBLOCKING
"0000000000000000000000000000000000000000-0"

RG.DUMPEXECUTIONS

执行状态

1
2
3
4
5
127.0.0.1:6379> RG.DUMPEXECUTIONS
1) 1) "executionId"
2) "0000000000000000000000000000000000000000-0"
3) "status"
4) "done"

RG.GETRESULTS

获取结果

1
2
3
4
5
127.0.0.1:6379> RG.GETRESULTS 0000000000000000000000000000000000000000-0
1) 1) "{'key': 'foo', 'value': 'bar'}"
2) "{'key': 'person:1', 'value': {'age': '70', 'name': 'Rick Sanchez'}}"
3) "{'key': 'person:2', 'value': {'age': '14', 'name': 'Morty Smith'}}"
2) (empty list or set)

RG.GETRESULTSBLOCKING

转入阻塞模式,等待结果

Event Processing

默认是非阻塞执行

1
2
3
gb.register()
RG.DUMPEXECUTIONS ## 执行状态
RG.GETRESULTS ## 获取结果

Writing Data

execute()

RedisGears Python API 附带 execute ()函数,该函数允许在数据库中执行任意 Redis 命令。

Cluster

1
2
3
4
docker run -d --name rgcluster -p 30001:30001 -p 30002:30002 -p 30003:30003 redislabs/rgcluster:latest

docker exec -i rgcluster redis-cli -c -p 30001 < data.txt
# 默认情况下,cli 不遵循集群的重定向。要让 cli 自动在分片之间跳转,请使用 -c 命令行开关启动它。

data.txt

1
2
3
4
5
6
SET foo bar
HSET person:1 name "Rick Sanchez" age 70
HSET person:2 name "Morty Smith" age 14
HSET person:3 name "Summer Smith" age 17
HSET person:4 name "Beth Smith" age 35
HSET person:5 name "Shrimply Pibbles" age 87

Distributed Processing

当 RedisGears 在集群中运行时,默认情况下它将在集群的所有分片上执行函数。通过向 run ()操作提供 collect = False 参数来禁用。

1
2
3
4
127.0.0.1:30001> RG.PYEXECUTE "GB().run(collect=False)"
1) 1) "{'key': 'person:1', 'value': {'age': '70', 'name': 'Rick Sanchez'}}"
2) "{'key': 'person:5', 'value': {'age': '87', 'name': 'Shrimply Pibbles'}}"
2) (empty list or set)

MapReduce

Cluster Map and Reduce

accumulate-collect,这种情况只会将每个分片的最大值收集起来

1
2
3
4
5
6
7
8
9
10
11
12
def maximum(a, x):
''' Returns the maximum '''
a = a if a else 0 # initialize the accumulator
return max(a, x)

# Original, non-reduced, maximum function version
gb = GearsBuilder()
gb.map(lambda x: int(x['value']['age']))
gb.accumulate(maximum)
gb.run('person:*')

## Expected result: [87, 35, 14]

accumulate-collect-accumulate,这种情况会将所有分片的最大值收集起来后再计算最大值,得到最佳结果(1 个最大值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def maximum(a, x):
''' Returns the maximum '''
a = a if a else 0 # initialize the accumulator
return max(a, x)

# Reduced maximum function
gb = GearsBuilder()
gb.map(lambda x: int(x['value']['age']))
gb.accumulate(maximum)
gb.collect()
gb.accumulate(maximum)
gb.run('person:*')

## Expected result: [87]

aggregate()

RedisGears Python API 包含 aggregate() 操作,该操作将accumulate-collect-accumulate步骤包装为单个步骤:

1
2
3
4
5
6
7
8
9
# Aggregated maximum version
gb = GearsBuilder()
gb.map(lambda x: int(x['value']['age']))
gb.aggregate(0,
lambda a, x: max(a, x),
lambda a, x: max(a, x))
gb.run('person:*')

## Expected result: [87]

aggregate() 接受三个参数: 第一个是累加器的零值,另外两个是对累加函数的回调,这些函数将分别在本地和全局执行。

Local vs. Global

1
2
localgroupby  ## 由每个分片的引擎在本地执行
groupby ## 将结果重新分区,再由每个分片的引擎在本地执行 = repartition + localgroupby

Diagram(localgroupby, groupby)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
+----------------------+   +----------------------+   +----------------------+
| Shard A | | Shard B | | Shard C |
| +----------+-------+ | | +----------+-------+ | | +----------+-------+ |
| | Key | Value | | | | Key | Value | | | | Key | Value | |
| +------------------+ | | +------------------+ | | +------------------+ |
| | person:1 | {...} | | | | person:3 | {...} | | | | foo | bar | |
| | person:5 | {...} | | | | person:4 | {...} | | | | person:2 | {...} | |
| +-+--------+-------+ | | +-+--------+-------+ | | +-+--------+-------+ |
| v localgroupby() | | v localgroupby() | | v localgroupby() |
| +-+--------+-------+ | | +-+--------+-------+ | | +-+--------+-------+ |
| | Key | Value | | | | Key | Value | | | | Key | Value | |
| +------------------+ | | +------------------+ | | +------------------+ |
| | Sanchez | 1 | | | | Smith | 2 | | | | Smith | 1 | |
| | Pibbles | 1 | | | +-+----------------+ | | +-+----------------+ |
| +-+--------+-------+ | | | | |
+----------------------+ +----------------------+ +----------------------+
| v repartition() | | v repartition() | | v repartition() |
| +-+--------+-------+ | | +-+--------+-------+ | | +-+--------+-------+ |
| | Key | Value | | | | Key | Value | | | | Key | Value | |
| +------------------+ | | +------------------+ | | +------------------+ |
| | Pibbles | 1 | | | | Sanchez | 1 | | | | Smith | 2 | |
| +------------------+ | | +------------------+ | | | Smith | 1 | |
| | | | | +------------------+ |
| v localgroupby() | | v localgroupby() | | v localgroupby() |
| +-+--------+-------+ | | +-+--------+-------+ | | +-+--------+-------+ |
| | Key | Value | | | | Key | Value | | | | Key | Value | |
| +------------------+ | | +------------------+ | | +------------------+ |
| | Pibbles | 1 | | | | Sanchez | 1 | | | | Smith | 3 | |
| +------------------+ | | +------------------+ | | +------------------+ |
| | +---|------------------+ +---|------------------+
| | | |
| +-+--------+-------+ | | Implicit collect() |
| | Key | Value |<--------+--------------------------+
| +------------------+ |
| | Sanchez | 1 | |
| | Pibbles | 1 | |
| | Smith | 3 | |
| +------------------+ |
+----------------------+

当绝对需要时,函数可以使用任意键对集群中的数据进行重新分区。当数据被重新分区时,每个工作线程被分配一个记录键的子集,并且这些键从所有其他工作线程传递到它。

Mock(localgroupby, repartition, localgroupby)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def fname(x):
''' Extracts the family name from a person's record '''
return x['value']['name'].split(' ')[1]

def key(x):
''' Extracts the key of a record '''
return x['key']

def counter(k, a, r):
''' Counts records '''
return (a if a else 0) + 1

def summer(k, a, r):
''' Sums record values '''
return (a if a else 0) + r['value']

# Repartition for storing counts
gb = GearsBuilder()
gb.localgroupby(fname, counter)
gb.repartition(key)
gb.localgroupby(key, summer)
gb.foreach(lambda x: execute('SET', x['key'], x['value']))
gb.run('person:*')

# Expected result: the same + stored in Redis String keys

RedisGears 的 Python API 包含了 aggregateby ()操作,它等同于使用 GB () . localgroupby () . repartition () . localgroupby ()流。

Reference

Runtime

https://oss.redislabs.com/redisgears/runtime.html

Python RedisGears 函数使用嵌入式 Python 解释器运行。每个函数都使用一个单独的子解释器。所有函数共享相同的环境和依赖关系。导入的环境有几个默认值。

Python Interpreter

embeds python 3.7.2+

Environment

解释器的环境可以用任何依赖的包进行扩展,这些包以后可以被它们各自的子解释器中的函数导入和使用。

GearsBuilder

默认环境提供的函数上下文构造器。

execute

执行命令函数,默认环境提供的执行 Redis 命令的函数。

Python API
1
def execute(command, *args)
Examples
1
2
# Pings the server (reply should be 'PONG')
reply = execute('PING')

atomic

原子操作函数,上下文通过阻塞主 Redis 进程来确保它中的所有操作都以原子的方式执行。

Python API
1
class atomic()
Examples
1
2
3
4
5
6
7
8
9
# Increments two keys atomically
def transaction(_):
with atomic():
execute('INCR', f'{{{hashtag()}}}:foo')
execute('INCR', f'{{{hashtag()}}}:bar')

gb = GB('ShardsIDReader')
gb.foreach(transaction)
gb.run()

configGet

这个函数获取 RedisGears 配置选项的当前值。

gearsConfigGet

这个函数获取RedisGears配置选项的当前值,如果该键不存在,则返回默认值。

hashtag

这个函数返回一个 hashtag,该 hashtag 映射到本地引擎的分片所服务的最低哈希槽。换句话说,它作为一个 hashtag 在集群中进行分区时非常有用。

log

这个函数打印一条消息到Redis的日志中。

1
def log(message, level='notice')

level 取值 debug,verbose,notice,warning

Functions

RedisGears 函数是数据流中处理步骤的正式描述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
                  +------------+
| Function |
+-------------+ | +--------+ |
| Input data +-->+ | Reader | |
+-------------+ | +---+----+ |
| v |
| +---+----+ |
| | Step 1 | |
| +---+----+ |
| | |
| ... |
| v |
| +---+----+ |
| | Step n | |
| +---+----+ |
| v |
+-------------+ | +---+----+ |
| Results +<--+ | Action | |
+-------------+ | +--------+ |
+------------+

一个函数总是:

  1. 始于一个 Reader
  2. 操作零个或多个 Records
  3. 包含零个或多个 Operations (Step)
  4. 终于一个 Action
  5. 返回零个或多个 Results
  6. 可能会产生更多错误。

Execution

一个函数由 RedisGears 引擎以以下两种方式之一执行:

  • Batch 立即执行,对已有数据执行
  • Event由新事件及其数据触发执行

函数的执行方式是由它的动作决定的。有两种类型的行动:

  • Run 批量运行函数
  • Register 注册由事件触发的函数

当以批处理或事件的形式执行时,函数的上下文由引擎管理。除了函数的逻辑之外,上下文还包括内部执行步骤、状态、统计数据、结果和遇到的任何错误。

Execution ID

每个函数的执行都在内部分配了一个惟一的值,称为 Execution ID

ID 是由两部分组成的字符串值,以“-”分隔,如下所示:

  • Shard ID 集群中一个 Shard 的标识符,长度为40个字节
  • Sequence 一个不断增加的计数器

Execution ID示例:

Redis Standalone Mode: 0000000000000000000000000000000000000000-1

Redis Cluster Mode: a007297351b007297351c007297351d007297351-1

Execution Plan

在执行该功能之前,引擎会生成一个执行计划。该计划包含引擎执行功能将采取的基本步骤。

Execution Parallelization

在集群中执行时,执行计划由启动器生成。然后,默认情况下,它将在所有碎片上共享并并行执行。

发起者的协调器协调分布式操作。

Execution Status

Execution Status描述了功能当前的执行状态。它可以是以下几种之一:
created 已创建
running 正在执行
done执行完成
aborted执行被终止
pending_cluster启动器正在等待所有 Worker 执行完成
pending_runWorker 挂起等待启动器确认执行
pending_receive启动器在接收执行时挂起 Worker 的确认(ACK)
pending_terminationWorker 正在等待来自启动器的终止消息

下图演示了状态转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
       Initiator                                  Worker
+---------------------+ execution plan +---------------------+
| created +------------------>+ created |
+----------+----------+ +----------+----------+
v v
+----------+----------+ acknowledgement +----------+----------+
| pending_receive +<------------------+ pending_run |
+----------+----------+ +---------------------+
v
+----------+----------+ start execution +---------------------+
| running +------------------>+ running |
+----------+----------+ +----------+----------+
v v
+----------+----------+ results +----------+----------+
| pending_cluster +<------------------+ pending_termination |
+----------+----------+ +---------------------+
v
+----------+----------+ terminate +---------------------+
| done +------------------>+ done |
+---------------------+ +---------------------+

Registration

事件驱动函数的表示称为注册(Registration)。

注册被持久化在 Redis 的快照中,也就是 RDB 文件中。这允许在发生故障时恢复数据库中的数据和事件处理程序。

Registration ID

每个注册都有一个唯一的内部标识符,称为 Registration ID。它以与 Execution ID 相同的方式生成,尽管看起来相同,但两者不应该混淆。

Context Builder

Python 中的 RedisGears 函数总是以一个上下文构建器——GearsBuilder 类开始。

Actions

动作 (Action) 是一种特殊类型的操作。它总是函数的最后一步。

Run

Run动作作为批处理(Batch)运行函数。函数只执行一次,一旦读取器耗尽数据就退出。

Python API
1
class GearsBuilder.run(arg=None, convertToStr=True, collect=True)

Arguments

  • arg
    • 一个类似于全局的模式,用于 KeysReaderKeysOnlyReader
    • StreamReader 的密钥名称
    • 用于 PythonReader 的 Python 生成器
  • convertToStr 当为 true时将在流程末尾添加一个 map操作,用于字符串化记录
  • collect 当为 true 时将在流程末尾添加一个 collect 操作
Register

Register 操作将函数注册为事件处理程序。每次有事件出现时都执行该函数。每次执行时,函数对事件的数据进行操作,完成后将暂停,直到新事件调用它。

Python API
1
class GearsBuilder.register(convertToStr=True, collect=True, mode='async', onRegistered=None)

Arguments

  • convertToStr 当为 true时将在流程末尾添加一个 map操作,用于字符串化记录
  • collect 当为 true 时将在流程末尾添加一个 collect 操作
  • mode 触发函数的执行方式。可以是:
    • async 在整个集群中异步执行
    • async_local执行将是异步的,并且仅限于处理分片
    • sync本地同步执行
  • onRegistered 一个函数回调函数,在每个shard上注册函数时被调用。这是初始化非序列化对象(如网络连接)的好地方。

注意,可以将更多参数传递给 register函数,这些参数取决于 reader,详见 Readers 说明。

Results

函数的执行产生零条或多条结果记录。结果由函数最后一次操作和最后一次操作之前的任何记录组成。

结果存储在函数的执行上下文中。

Readers

Reader 是任何 RedisGears 函数必须执行的第一步,每个函数都有一个 Reader。Reader 读取数据并从中生成输入记录(Input)。输入记录由函数使用。

Reader Output Batch Event
KeysReader Redis keys and values Yes Yes
KeysOnlyReader Redis keys Yes No
StreamReader Redis Stream messages Yes Yes
PythonReader Arbitrary Yes No
ShardsIDReader Shard ID Yes No
CommandReader Command arguments No Yes

Operations

操作(Operation)是 RedisGears 函数的构建块。可采用不同的操作类型实现多种结果,满足各种数据处理需求。

操作可以有零个或多个参数来控制其操作。根据操作的类型参数,可以是语言原生数据类型和函数回调。

Operation Description Type
Map Maps 1:1 Local
FlatMap Maps 1:N Local
ForEach Does something for each record Local
Filter Filters records Local
Accumulate Maps N:1 Local
LocalGroupBy Groups records by key (many-to-less) extractor + reducer Local
Limit Limits the number of records Local
Collect Shuffles all records to one engine Global
Repartition Shuffles records between all engines Global
GroupBy Groups records by key (many-to-less) repartition+extractor+reducer Sugar
BatchGroupBy Groups records by key (many-to-less)repartition+extractor+batch reducer Sugar
Sort Sorts records aggregate+local sort+flatmap Sugar
Distinct Makes distinct records Sugar
Aggregate Aggregates records (many-to-one) accumulator+collect+accumulator Sugar
AggregateBy Aggregates records by key (many-to-less) extractor+reducer+repartition+extractor+reducer Sugar
Count Counts records aggregate(local counting + global suming) Sugar
CountBy Counts records by key aggregate (extractor + local counting + global suming) Sugar
Avg Computes the average aggregate(global sum + global count + local map) Sugar

Terminology

Local

本地(Local)操作的执行是在以独立模式或集群模式部署的 RedisGears 引擎中执行的。

当在独立模式下使用时,只有一个引擎在本地执行所有数据上的所有操作。

当再集群模式下使用时,该操作将分布到所有分片。每个分片的引擎也在本地执行操作。然而,分片的引擎只能处理集群对它们进行分区的数据。

Global

全局(Global)操作只在集群 RedisGears 环境的上下文中相关。这些是收集(collect)和重分区(repartition)操作,用于在分片之间转移记录。

Sugar

Sugar 操作是实用程序操作。这些是通过基本操作和相关回调在内部实现的。

Callback

Callback 用于在 API 所使用的语言中调用函数(回调函数)。

Extractor

Extractor 是一个回调函数,它接收输入记录(record)作为参数。它返回从记录中提取的值(value)。返回值应该是本机字符串(Native String)。

Mapper

Mapper 是一个回调函数,它接收输入记录(record)作为参数。它必须返回一个输出记录(record)。

Expander

Expander 是一个回调函数,它接收输入记录(record),必须返回一个或多个输出记录(record)。

Processor

Processor 是一个回调函数,它接收输入记录(record),它不该返回任何东西。

Filter

Filter 是一个回调函数,它接收输入记录(record),它必须返回一个布尔值(boolean)。

Accumulator

Accumulator 是一个回调函数,它接收输入记录和累加器变量。它将输入聚集到累加器变量中,累加器变量存储函数调用之间的状态。函数必须在每次调用后返回累加器的更新值。

Reducer

Reducer 是一个回调函数,它接收一个键、一个输入和一个称为累加器的变量。它的执行类似于 Accumulator 回调,不同之处在于它为每个 reduced 的键维护一个累加器。

Batch Reducer

Batch Reducer 是一个回调函数,它接收一个键和一个输入记录列表。它的执行类似于 reducer 回调,不同之处在于它的输入是一个记录列表,而不是单个记录。它将为这些记录返回一个累加器值。


Commands

RedisGears 是通过 Redis 客户端发送给服务器的命令来运行的。

Registration

RG.DUMPREGISTRATIONS

用来输出函数注册列表

RG.UNREGISTER

用来取消某个函数的注册。

EXECUTION

RG.PYEXECUTE ““ [UNBLOCKING] [REQUIREMENTS “ …”]

用于执行 Python 函数

RG.DUMPEXECUTIONS

用于输出函数执行列表。执行列表的长度由 MaxExecutions配置选项限制。

RG.GETEXECUTION [SHARD|CLUSTER]

用于获取函数执行细节:执行计划,步骤等

RG.GETRESULTS

用于获取函数的执行结果及错误信息

RG.GETRESULTSBLOCKING

取消函数在后台(UNBLOCKING)执行。取消执行的客户端被阻塞,直到函数执行结束,然后发送任何结果和错误。

RG.DROPEXECUTION

用于删除一个函数的执行。

RG.ABORTEXECUTION

在运行过程中中止函数的执行。

Trigger

RG.TRIGGER [arg …]

用来触发已注册的 CommandReader 函数的执行。

Global

RG.CONFIGGET […]

返回一个或多个内置配置或用户定义选项的值。

RG.CONFIGSET […]

设置一个或多个内置配置或用户定义选项的值。

RG.PYSTATS

从 Python 解释器返回内存使用统计信息。

RG.PYDUMPREQS (version 1.0.1)

返回所有可用 Python 依赖的列表(包含关于每个依赖的信息)。

RG.INFOCLUSTER

显示集群信息。

RG.REFRESHCLUSTER

刷新节点的集群拓扑视图。需要在集群的每个节点执行

Configuration

RedisGears 提供了配置选项来控制它的操作。这些选项可以在模块启动时设置,在某些情况下也可以在运行时设置。

Bootstrap Configuration 在加载 Redis Module 时配置,Runtime Configuration 在运行时配置(详见命令:RG.CONFIGSET <key> <value> [...]RG.CONFIGGET <key> [...])。

配置项 说明 期望值 默认值 运行时可配置性
MaxExecutions MaxExecutions 配置选项控制将保存在执行列表中的最大执行次数。
一旦达到这个阈值,就会按照创建顺序(FIFO)从列表中删除旧的执行。
只有已经完成的执行(例如“完成”或“中止”状态)被删除。
Integer 1000 Supported
MaxExecutionsPerRegistration MaxExecutionsPerRegistration 配置选项控制每个注册保存在列表中的最大执行次数。
一旦达到这个阈值,该注册的旧执行将按创建顺序(FIFO)从列表中删除。
只有已经完成的执行(例如“完成”或“中止”状态)被删除。
Integer 100 Supported
ProfileExecutions ProfileExecutions 配置选项控制是否对执行进行分析。对性能损耗较大,建议用于调试操作。 0 (disabled)
1 (enabled)
0 Supported
PythonAttemptTraceback PythonAttemptTraceback 配置选项控制引擎是否试图为 Python 运行时错误产生堆栈跟踪。 0 (disabled)
1 (enabled)
1 Supported
DownloadDeps DownloadDeps 配置选项控制 RedisGears 是否会尝试下载缺少的 Python 依赖项。 0 (disabled)
1 (enabled)
1 Not Supported
DependenciesUrl DependenciesUrl 配置选项控制 RedisGears 试图从何处下载其 Python 依赖项。 URL-like string 与RedisGears版本有关 Not Supported
DependenciesSha256 DependenciesSha256 配置选项指定 Python 依赖项的 SHA256 哈希值。
在下载依赖项之后,将验证此值,如果不匹配,将停止服务器的启动。
String 与RedisGears版本有关 Not Supported
PythonInstallationDir PythonInstallationDir 配置选项指定了 RedisGears 的 Python 依赖项的路径。 String /var/opt/redislabs/modules/rg Not Supported
CreateVenv CreateVenv 配置选项控制引擎是否创建虚拟 Python 环境。 0 (disabled)
1 (enabled)
0 Not Supported
ExecutionThreads ExecutionThreads 配置选项控制将要执行的线程数。 Integer > 0 3 Not Supported
ExecutionMaxIdleTime ExecutionMaxIdleTime 配置选项控制中止执行之前的最大空闲时间(以毫秒为单位)。空闲时间意味着执行没有进展。空闲时间的主要原因是在等待来自另一个失败(即崩溃)碎片的记录时被阻塞的执行。在这种情况下,执行将在指定的时间限制后中止。当执行再次开始进行时,将重置空闲计时器。 Integer > 0 5 seconds Supported
PythonInstallReqMaxIdleTime
1.0.1
PythonInstallReqMaxIdleTime 配置选项控制 Python 依赖安装中止之前的最大空闲时间(以毫秒为单位)。“空闲时间”表示安装没有进展。空闲时间的主要原因与ExecutionMaxIdleTime 相同。 Integer > 0 30000 Supported
SendMsgRetries
1.0.1
SendMsgRetries 配置选项控制在 RedisGears 的分片之间发送消息的最大重试次数。当消息被发送,而 shard 在确认之前断开连接,或者当它返回一个错误时,该消息将被重新发送,直到达到这个阈值。设置为0表示不限制重试次数。 Integer >= 0 3 Supported

Examples

RedisGears Examples: https://oss.redislabs.com/redisgears/examples.html

Clients

RedisGears 是一个 Redis 模块,所以需要 Redis 客户端来操作它。任何支持发送原始 Redis 命令的客户端都可以使用。

Redis Clients: https://redis.io/clients

RedisGears-specific clients: https://github.com/RedisGears/redisgears-py

Design

Isolation Technics - 隔离技术

Python

  • 全局字典(当前版本支持)
  • 子解释器(当前版本出现不兼容某些库而未启用,后续版本提供开关支持

Samples

环境搭建

RedisGears Cluster

1
2
3
## RedisGears Cluster
# docker run -p 30001:30001 -p 30002:30002 -p 30003:30003 --name rgc redislabs/rgcluster:latest
docker run -p 30001:30001 -p 30002:30002 -p 30003:30003 --name rgc redislabs/rgcluster:1.2.1

MySQL

1
2
## mysql 
docker run --name rgmysql -e MYSQL_ROOT_PASSWORD=root -p 3306:3306 -d mysql:8.0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# root
mysql -p
# Enter password: `root`

# show databases;
# DROP USER 'rguser'@'%';

create database rgdb;
create user 'rguser'@'%' identified by 'rguser';

# mysql 8.0 syntax
grant all privileges on rgdb.* TO 'rguser'@'%';
flush privileges;
exit;

mysql -u rguser -p;
use rgdb;
CREATE TABLE IF NOT EXISTS `person`(
`id` VARCHAR(100),
`first` VARCHAR(100),
`last` VARCHAR(40),
`age` INT,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
describe person;

Monitor UI

1
2
3
4
5
6
7
8
## Monitor UI
https://github.com/RedisGears/RedisGearsMonitor.git
cd RedisGearsMonitor

npm install
node main.js --port 30001

# http://localhost:9001/

Run Recipe

1
2
3
4
5
6
7
8
9
10
11
## run recipe
# pip3 install gears-cli
# gears-cli --host <host> --port <post> --password <password> run example.py REQUIREMENTS rgsync PyMySQL cryptography
# gears-cli run --host localhost --port 30001 example.py REQUIREMENTS rgsync PyMySQL cryptography

gears-cli run --host localhost --port 30001 example.py REQUIREMENTS git+https://gitlab.com/shankai/rgsync.git PyMySQL cryptography

# 这条是异常信息: failed running gear function (Failed install requirement on shard, check shard log for more info.)
# OK 代表发布成功。
# gears-cli run 之后,redisgears cluster 会动态加载相关模块。
# Monitor UI 会显示 PersonsWriteBehind Registrations 相关状态信息。
example.py (WriteBehind + RGWriteThrough)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from rgsync import RGWriteBehind, RGWriteThrough
from rgsync.Connectors import MySqlConnector, MySqlConnection

'''
Create MySQL connection object
'''
# connection = MySqlConnection('demouser', 'Password123!', 'localhost:3306/test')
connection = MySqlConnection('rguser', 'rguser', 'localhost:3306/rgdb')

'''
Create MySQL persons connector
'''
personsConnector = MySqlConnector(connection, 'person', 'id')

personsMappings = {
'first_name':'first',
'last_name':'last',
'age':'age'
}

RGWriteBehind(GB, keysPrefix='person', mappings=personsMappings, connector=personsConnector, name='PersonsWriteBehind', version='99.99.99')

# RGWriteThrough(GB, keysPrefix='__', mappings=personsMappings, connector=personsConnector, name='PersonsWriteThrough', version='99.99.99')

演示操作

WriteBehind

hmset (不同操作完成后,在 mysql rgdb.person 表中查看数据同步结果)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hmset person:1 first_name zhang last_name san age 21
hmset person:2 first_name li last_name sisi age 18

hmset person:10 first_name li last_name sisi age 18

## 复制控制

## 变更并同步(默认行为)
hmset person:3 first_name li last_name sisi age 22 # =

## 变更不同步
hmset person:3 first_name li last_name sisi age 50 # +
# HGETALL person:3

## 删除并同步
hmset person:3 # ~

hmset person:3 first_name li last_name sisi age 22 # =
## 删除不同步
hmset person:3 # -

正好一次(源码有缺陷,mysql connector 修复见 https://gitlab.com/shankai/rgsync
1
2
3
4
5
6
CREATE TABLE IF NOT EXISTS `exactlyonce`(
`id` VARCHAR(100),
`val` VARCHAR(100),
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
describe exactlyonce;

example.py 文件内修改 personsConnector 实例化,添加 exactlyOnceTableName 参数值

1
personsConnector = MySqlConnector(connection, 'person', 'id', 'exactlyonce')
ACK
1
2
3
4
# 
hset person:007 first_name James last_name Bond age 43 # =6ce0c902-30c2-4ac9-8342-2f04fb359a944412
# XREAD BLOCK <timeout> STREAMS {<hash key>}<uuid> 0-0
XREAD BLOCK 2000 STREAMS {person:007}6ce0c902-30c2-4ac9-8342-2f04fb359a944412 0-0

WriteThrough

1
2
3
HSET __{person:1} first_name foo last_name bar age 20 # =123456

XREAD BLOCK 2000 STREAMS {person:104}123456 0-0

(完)