Introduce

Eclipse Hono

Architecture

graph LR
  Device -->|http,mqtt,...| Hono(Hono: Tenant & Device & Credential) --> Kafka(Kafka, ksqlDB, Connector) -->|JDBCSinkConnector| MariaDB

Components

Service Ports Comment
MariaDB 3306
Postgres 5432 Optional: JDBC Device Registry
MongoDB 27017 Optional: MongoDB Device Registry
Kafka 9092 Messaging
Kafka Connect 28083 Avro to MariaDB
Schema Registry 28081 Avro Schema Registry
ksqlDB 28088 Json Convert Avro
hono-adapter-http 8080 Http Adapter
hono-device-registry 28080 Device Registry & Credentials

Installation

VM: Centos 7

Docker, Docker Compose, K3s

User: iotops

1
2
3
4
5
6
7
useradd iotops
passwd iotops
# Ops789
groupadd wheel
usermod -a -G wheel iotops

su - iotops

Datadir

1
2
3
4
5
mkdir -p /home/data/mariadb
mkdir -p /home/data/postgres
mkdir -p /home/data/www
mkdir -p /home/data/kafka
mkdir -p /home/data/zookeeper

Docker & Docker Compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
## docker
sudo yum install -y yum-utils

sudo yum-config-manager \
--add-repo \
https://download.docker.com/linux/centos/docker-ce.rep

sudo yum install docker-ce docker-ce-cli containerd.io

## docker compose
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

sudo chmod +x /usr/local/bin/docker-compose

sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

MariaDB

Docker Compose

docker-comopse.yml

1
2
3
4
5
6
7
8
9
10
11
12
version: "3.9"  # optional since v1.27.0
services:
mariadb:
image: mariadb
restart: always
container_name: mariadb
ports:
- 3306:3306
volumes:
- /home/data/mariadb:/var/lib/mysql
environment:
MYSQL_ROOT_PASSWORD: root
Initial
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sudo docker exec -it mariadb bash

mysql -u root -p

CREATE DATABASE hono_data_db;
CREATE USER honodata@'%' IDENTIFIED BY 'honodata';
GRANT ALL ON hono_data_db.* TO 'honodata'@'%';

use hono_data_db;

CREATE TABLE `hono_telemetry_demo` (
`k1` varchar(50) NOT NULL,
`k2` varchar(50) NOT NULL,
`k3` varchar(50) NOT NULL,
`firstName` varchar(50) NOT NULL,
`lastName` varchar(50) NOT NULL,
`age` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Mongodb (optional: MongoDB Device Registry)

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
version: '3'
services:
mongo:
image: mongo:4.4.6
ports:
- 27017:27017
container_name: mongodb
restart: always
volumes:
- /home/data/mongodb/data:/data/db
Initial Database
1
2
3
4
5
6
7
8
9
use hono

db.createUser(
{
user: "hono",
pwd: "hono",
roles: [ { role: "dbAdmin", db: "hono" } ]
}
)

Postgres (optional: JDBC Device Registry)

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
version: '3'
services:
postgresql:
image: postgres:9.6
environment:
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
container_name: postgresql
restart: always
volumes:
- /home/data/postgres:/var/lib/postgresql/data
Initial Database
1
2
3
4
5
6
7
psql -U postgres
\l

# hono database initial
create database hono;
create user hono with password 'hono';
grant all privileges on database hono to hono;

Kafka

Docker Compose

Upload Connector jars:

1
2
3
4
5
6
7
8
9
10
11
12
cd /home/iotops/kafka/connectors/mysql
# yum install -y wget unzip
#
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz
tar -zxvf mysql-connector-java-8.0.27.tar.gz
mv mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar ./

# https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.6/confluentinc-kafka-connect-jdbc-10.2.6.zip

unzip confluentinc-kafka-connect-jdbc-10.2.6.zip
mv confluentinc-kafka-connect-jdbc-10.2.6/lib/*.* ./

docker-comopse.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- /home/data/zookeeper/data:/var/lib/zookeeper/data
- /home/data/zookeeper/log:/var/lib/zookeeper/log
networks:
- local

kafka:
image: confluentinc/cp-kafka:7.0.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,EXTERNAL://10.40.80.38:29092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_BROKER_ID: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /home/data/kafka/data:/var/lib/kafka/data
networks:
- local

schema-registry:
image: confluentinc/cp-schema-registry:7.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka
ports:
- "28081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092'
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081
networks:
- local

ksqldb-server:
image: confluentinc/ksqldb-server:0.23.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka
ports:
- "28088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
networks:
- local

ksqldb-cli:
image: confluentinc/ksqldb-cli:0.23.1
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
networks:
- local

kafka-connect:
image: confluentinc/cp-kafka-connect:7.0.0
hostname: kafka-connect
container_name: kafka-connect
depends_on:
- kafka
ports:
- 28083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 28083
CONNECT_GROUP_ID: "kc-group-local"
CONNECT_CONFIG_STORAGE_TOPIC: "kc-config"
CONNECT_OFFSET_STORAGE_TOPIC: "kc-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "kc-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "10.40.80.38"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/connector
volumes:
- ./connectors/mysql:/usr/share/connector/mysql
networks:
- local

networks:
local:
driver: bridge

Stream

  1. open ksqldb-cli
1
sudo docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
  1. create data convert stream
1
2
3
CREATE STREAM S1_1 (firstName VARCHAR, lastName VARCHAR, age INT) WITH (KAFKA_TOPIC='hono.telemetry.MY_TENANT', VALUE_FORMAT='JSON');

CREATE STREAM S1_2 WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='hono.telemetry.avro') AS SELECT * FROM S1_1;

Connector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
curl -X PUT \
http://10.40.80.38:28083/connectors/hono-telemetry-demo/config \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"table.name.format": "hono_telemetry_demo",
"consumer.override.group.id": "1227",
"connection.password": "honodata",
"tasks.max": "1",
"topics": "hono.telemetry.avro",
"batch.size": "3000",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"delete.enabled": "false",
"auto.evolve": "false",
"connection.user": "honodata",
"auto.create": "false",
"connection.url": "jdbc:mysql://10.40.80.38:3306/hono_data_db?verifyServerCertificate=false&useSSL=false&requireSSL=false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"insert.mode": "upsert",
"pk.mode": "kafka",
"pk.fields": "k1,k2,k3"
}'

Hono

  • k3s
  • helm

K3s

1
curl -sfL http://rancher-mirror.cnrancher.com/k3s/k3s-install.sh | INSTALL_K3S_MIRROR=cn sh -

Helm

  1. Install helm
1
2
tar -zxvf helm-v3.7.2-linux-amd64.tar.gz
mv linux-amd64/helm /usr/local/bin/helm
  1. Add chart repository
1
2
3
4
5
6
7
8
9
10
11
12
# helm repo
# https://www.eclipse.org/packages/repository/
helm repo add eclipse-iot https://eclipse.org/packages/charts

# helm list --all-namespaces -a
# helm --kubeconfig /etc/rancher/k3s/k3s.yaml list --all-namespaces -a

k3s kubectl create namespace hono

# helm uninstall eclipse-hono -n hono

k3s kubectl get service -n hono
  1. Config custom.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
deviceRegistryExample:
#enabled: false
type: jdbc # embedded|mongodb|jdbc
addExampleData: false
mongoDBBasedDeviceRegistry:
mongodb:
host: 10.40.80.38
port: 27017
dbName: hono
username: hono
password: hono
jdbcBasedDeviceRegistry:
applicationProfiles: registry-adapter,registry-management,tenant-service,create-schema,dev
registry:
jdbc:
adapter:
url: jdbc:postgresql://10.40.80.38:5432/hono
driverClass: org.postgresql.Driver
username: hono
password: hono
management:
url: jdbc:postgresql://10.40.80.38:5432/hono
driverClass: org.postgresql.Driver
username: hono
password: hono
tenant:
jdbc:
adapter:
url: jdbc:postgresql://10.40.80.38:5432/hono
driverClass: org.postgresql.Driver
username: hono
password: hono
management:
url: jdbc:postgresql://10.40.80.38:5432/hono
driverClass: org.postgresql.Driver
username: hono
password: hono

adapters:
kafkaMessagingSpec:
commonClientConfig:
bootstrap.servers: 10.40.80.38:29092
amqp:
enabled: false
mqtt:
enabled: false

messagingNetworkTypes:
- kafka

amqpMessagingNetworkExample:
enabled: false

kafkaMessagingClusterExample:
enabled: false
  1. Deploy
1
2
3
sudo chmod 644 /etc/rancher/k3s/k3s.yaml

helm --kubeconfig /etc/rancher/k3s/k3s.yaml install -f ./custom.yaml -n hono eclipse-hono eclipse-iot/hono
  1. Uninstall (Ops)
1
helm --kubeconfig /etc/rancher/k3s/k3s.yaml uninstall eclipse-hono -n hono

Telemetry (Demo)

Mock Device Send Telemetry Data

  1. Create Tenant
1
curl --location --request POST 'http://10.40.80.38:28080/v1/tenants/MY_TENANT'
  1. Create Device
1
curl --location --request POST 'http://10.40.80.38:28080/v1/devices/MY_TENANT/MY_DEVICE'
  1. Set Credentials for Device
1
2
3
4
5
6
7
8
9
curl --location --request PUT 'http://10.40.80.38:28080/v1/credentials/MY_TENANT/MY_DEVICE' \
--header 'Content-Type: application/json' \
-d '[{
"type": "hashed-password",
"auth-id": "MY_DEVICE",
"secrets": [{
"pwd-plain": "my-pwd"
}]
}]'
  1. Send Telemetry

    Basic Authorization 用户名为:MY_DEVICE@MY_TENANT,密码为:my-pwd

1
2
3
4
5
6
7
curl --location --request POST 'http://10.40.80.38:8080/telemetry' \
--header 'Authorization: Basic TVlfREVWSUNFQE1ZX1RFTkFOVDpteS1wd2Q=' \
--header 'Content-Type: application/json' \
-d '{
"firstName": "fn23",
"lastName": "ddd"
}'

Body (Demo):

  • firstName *
  • lastName *
  • age

Command (Demo: HttpAdapter)

  1. Device Send Telemetry/Event With ttd:60

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    curl --location -v --request POST 'http://10.40.80.38:8080/event' \
    > --header 'hono-ttd: 60' \
    > --header 'Authorization: Basic TVlfREVWSUNFQE1ZX1RFTkFOVDpteS1wd2Q=' \
    > --header 'Content-Type: application/json' \
    > -d '{
    > "firstName": "fn23",
    > "lastName": "ddd",
    > "age": 4
    > }'
    * About to connect() to 10.40.80.38 port 8080 (#0)
    * Trying 10.40.80.38...
    * Connected to 10.40.80.38 (10.40.80.38) port 8080 (#0)
    > POST /event HTTP/1.1
    > User-Agent: curl/7.29.0
    > Host: 10.40.80.38:8080
    > Accept: */*
    > hono-ttd: 60
    > Authorization: Basic TVlfREVWSUNFQE1ZX1RFTkFOVDpteS1wd2Q=
    > Content-Type: application/json
    > Content-Length: 64
    >
    * upload completely sent off: 64 out of 64 bytes
    < HTTP/1.1 200 OK
    < vary: origin
    < hono-command: 1
    < hono-cmd-req-id: 203111
    < content-type: application/json
    < content-length: 32
    <
    * Connection #0 to host 10.40.80.38 left intact
    {"col_foo":1, "firstName": "zs"}

    60(可自定义) 秒内执行第2步,则响应码为 200 OK,否则为 202 Accepted

    若执行第2步选择的是需要设备端响应命令执行结果的,则此时响应头会包含 hono-cmd-req-id,此 ID 将作为第 3 步请求的路径参数。

  2. Application Send Command

    Command 分为两种:One-way 与 Command(带响应的命令模式,默认)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ## kcat
    ## brew install kcat
    ##
    ## kcat -b 10.40.80.38:29092 -L

    docker run -it --rm --entrypoint /bin/sh edenhill/kcat:1.7.0
    ## docker run -it --rm edenhill/kcat:1.7.0 -b 10.40.80.38:29092 -L

    ## Command One-way
    echo 'MY_DEVICE${"col_foo":1, "firstName": "zs"}'|kcat -b 10.40.80.38:29092 -t hono.command.MY_TENANT -P -K $ -H device_id=MY_DEVICE -H subject=1 -H content-type=application/json

    ## Command With Response
    echo 'MY_DEVICE${"col_foo":1, "firstName": "zs"}'|kcat -b 10.40.80.38:29092 -t hono.command.MY_TENANT -P -K $ -H device_id=MY_DEVICE -H subject=1 -H content-type=application/json -H correlation-id=111 -H response-required=true
  3. Device Send Command Response

    请求路径中:

    • 203111 为第一步从响应头中获得的 hono-cmd-req-id 值。

    • hono-cmd-status 为命令执行后的状态码。

    1
    2
    3
    4
    5
    6
    7
    8
    curl --location --request POST 'http://10.40.80.38:8080/command/res/203111?hono-cmd-status=201' \
    --header 'Authorization: Basic TVlfREVWSUNFQE1ZX1RFTkFOVDpteS1wd2Q=' \
    --header 'Content-Type: application/json' \
    -d '{
    "commandResponse": "res",
    "sss": 1,
    "repeat": "111"
    }'
  4. Application Receive Command Response

    1
    kcat -b 10.40.80.38:29092 -t hono.command_response.MY_TENANT -o beginning -f '%K %h %s'

Done.

Architecture

graph RL
    REST --> Konga
    subgraph Kong
      REST(Admin REST API)
      OIDC(Plugin: kong-oidc) 
    end
    Keycloak --> OIDC

Install Without DB

Docker

1
2
3
4
5
6
7
8
9
10
11
12
docker run -d --name kong \
-e "KONG_DATABASE=off" \
-e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \
-e "KONG_ADMIN_ACCESS_LOG=/dev/stdout" \
-e "KONG_PROXY_ERROR_LOG=/dev/stderr" \
-e "KONG_ADMIN_ERROR_LOG=/dev/stderr" \
-e "KONG_ADMIN_LISTEN=0.0.0.0:8001, 0.0.0.0:8444 ssl" \
-p 8000:8000 \
-p 8443:8443 \
-p 8001:8001 \
-p 8444:8444 \
kong
  • Admin Port8001``8444
  • Gateway Port 8000 8443

Config

Default Configuration

1
2
docker exec -it kong kong config init /home/kong/kong.yml
docker exec -it kong cat /home/kong/kong.yml >> kong.yml

Post new Config

1
2
3
curl --location --request POST "http://localhost:8001/config" \
--header "Content-Type: text/yaml" \
--data-binary "@/kong.yml"

docker exec with root

1
docker exec -it --user root kong bash
1
2
3
4
# https://docs.konghq.com/install/source/

git clone https://github.com/Kong/kong-build-tools

Install With DB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
docker run -d --name kong-database \
-p 5432:5432 \
-e "POSTGRES_USER=kong" \
-e "POSTGRES_DB=kong" \
-e "POSTGRES_PASSWORD=kong" \
postgres:9.6

docker run --rm \
--link kong-database:kong-database \
-e "KONG_DATABASE=postgres" \
-e "KONG_PG_HOST=kong-database" \
-e "KONG_PG_USER=kong" \
-e "KONG_PG_PASSWORD=kong" \
-e "KONG_CASSANDRA_CONTACT_POINTS=kong-database" \
kong kong migrations bootstrap

docker run -d --name kong \
--link kong-database:kong-database \
-e "KONG_DATABASE=postgres" \
-e "KONG_PG_HOST=kong-database" \
-e "KONG_PG_PASSWORD=kong" \
-e "KONG_CASSANDRA_CONTACT_POINTS=kong-database" \
-e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \
-e "KONG_ADMIN_ACCESS_LOG=/dev/stdout" \
-e "KONG_PROXY_ERROR_LOG=/dev/stderr" \
-e "KONG_ADMIN_ERROR_LOG=/dev/stderr" \
-e "KONG_ADMIN_LISTEN=0.0.0.0:8001, 0.0.0.0:8444 ssl" \
-p 8000:8000 \
-p 8443:8443 \
-p 8001:8001 \
-p 8444:8444 \
kong

# add services, routes


docker exec -it --user root kong bash
# apk
apk update
apk add wget
apk add curl
# install plugins
luarocks install kong-oidc

cp /etc/kong/kong.conf.default /etc/kong/kong.conf
vi /etc/kong/kong.conf
# plugins = oidc
# exit kong container
docker restart kong

Keycloak

1
docker run -d --name keycloak -p 8080:8080 -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin jboss/keycloak
1
curl -s http://dev:8080/auth/realms/master/.well-known/openid-configuration | python -mjson.tool
1
2
3
4
5
6
7
CLIENT_SECRET=d1776f95-8a0b-4dc0-8e36-0a5a21b85bdf
curl -s -X POST http://dev:8001/plugins \
-d name=oidc \
-d config.client_id=kong \
-d config.client_secret=${CLIENT_SECRET} \
-d config.discovery=http://dev:8080/auth/realms/master/.well-known/openid-configuration \
| python -mjson.tool

Install

Community

Download

1
2
3
4
5
## 
git clone https://github.com/TheThingsNetwork/lorawan-stack.git

## install
https://www.thethingsindustries.com/docs/getting-started/installation/

Certificates

ca.json

1
2
3
4
5
{
"names": [
{"C": "NL", "ST": "Noord-Holland", "L": "Amsterdam", "O": "The Things Demo"}
]
}

Generate:

1
cfssl genkey -initca ca.json | cfssljson -bare ca

cert.json

change host

1
2
3
4
5
6
{
"hosts": ["thethings.example.com"],
"names": [
{"C": "NL", "ST": "Noord-Holland", "L": "Amsterdam", "O": "The Things Demo"}
]
}

Generate:

1
cfssl gencert -ca ca.pem -ca-key ca-key.pem cert.json | cfssljson -bare cert

Configuration Cert

  • docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
    # If using custom certificates:
secrets:
- ca.pem
- cert.pem
- key.pem

# If using custom certificates:
secrets:
ca.pem:
file: ./ca.pem
cert.pem:
file: ./cert.pem
key.pem:
file: ./key.pem
  • ttn-lw-stack-docker.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# If using custom certificates:
tls:
source: file
root-ca: /run/secrets/ca.pem
certificate: /run/secrets/cert.pem
key: /run/secrets/key.pem

# Let's encrypt for "thethings.example.com"
# tls:
# source: 'acme'
# acme:
# dir: '/var/lib/acme'
# email: 'you@thethings.example.com'
# hosts: ['thethings.example.com']
# default-host: 'thethings.example.com'

Up

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
docker-compose pull
docker-compose run --rm stack is-db init
docker-compose run --rm stack is-db create-admin-user --id admin --email your@email.com

docker-compose run --rm stack is-db create-oauth-client \
--id cli \
--name "Command Line Interface" \
--owner admin \
--no-secret \
--redirect-uri "local-callback" \
--redirect-uri "code"


# see config/stack/ttn-lw-stack-docker.yml
CONSOLE_SECRET="console"
SERVER_ADDRESS="dev:1885"

docker-compose run --rm stack is-db create-oauth-client \
--id console \
--name "Console" \
--owner admin \
--secret "${CONSOLE_SECRET}" \
--redirect-uri "${SERVER_ADDRESS}/console/oauth/callback" \
--redirect-uri "/console/oauth/callback" \
--logout-redirect-uri "${SERVER_ADDRESS}/console" \
--logout-redirect-uri "/console"

Override

Things Stack 是一个实现 LoRaWAN 协议的网络服务器。

什么是 Things Stack?

The Things Stack是企业级的 LoRaWAN 网络服务器,建立在开源核心之上。Things Stack允许您在自己的硬件或云上构建和管理 LoRaWAN 网络。

  • TTS 是 The Things Stack, LoRaWAN Network Server Stack。Things Stack 目前是网络服务器实现的版本3,因此也被非正式地称为V3。

  • TTN 是 “物联网”(The Things Network),它是一个全球协同物联网生态系统,使用 LoRaWAN 创建网络、设备和解决方案。The Things Network 运行着 The Things Stack Community Edition,这是一个众包、开放和去中心化的 LoRaWAN 网络。这个网络是一个很好的开始测试设备、应用程序和集成,并熟悉 LoRaWAN 的方法。

  • TTI 是 The Things Industries:该公司主要负责The Things Stack 的开发和文档编写。Things Industries 还提供具有额外企业功能的云托管和本地私有 LoRaWAN 网络,以及针对企业客户的高级支持计划。

Command-Line Interface

Console

控制台是 LoRaWAN 的 Things Stack 的管理应用程序。它是一个 Web 应用程序,可以用来注册应用程序,终端设备或网关,监控网络流量,或配置网络相关选项等。

API

Gateway Server MQTT

MQTT 协议可用于开发自定义包转发器或网关桥接器,用于在网关和 The Things Stack 之间交换流量,或用于测试目的,方便地模拟网关流量。它是一个替代 Basic Station 和 Semtech UDP协议。

MQTT 是用于交换消息的服务器-客户机协议。客户端可以连接到服务器并发布特定主题下的消息(数据)。它们还可以订阅一个主题,从而接收在该主题下发布的所有消息(由其他客户机或MQTT服务器本身发布)。

网关服务器是 MQTT 服务器,网关作为 MQTT 客户端连接到网关服务器。连接的网关由用于身份验证的用户名标识。

Protocol Buffers

为了与 MQTT 协议通信,网关服务器和网关正在交换 Protocol Buffers。https://github.com/TheThingsNetwork/lorawan-stack/blob/v3.13/api/lorawan.proto

Connecting to the Gateway Server

通过身份验证的客户端可以只写访问以下主题:

  • v3/< Gateway -id>@/up:用于向网关服务器发送上行流量。
  • v3/@/status:向网关服务器发送网关状态消息。
  • v3/< Gateway -id>@/down/ack:用于向网关服务器发送TxAck消息。

客户端也获得只读访问,并应该订阅以下主题:

  • v3/< Gateway -id>@/down:网关服务器发布网关应该发送的下行消息。
1
2
3
4
5
6
$ export GATEWAY_ID="test-gtw@my-tenant"
$ export GATEWAY_API_KEY="NNSXS.VEEBURF3KR77ZR..." # API key with RIGHT_GATEWAY_LINK rights
$ mosquitto_pub \
-h "thethings.example.com" -p 1882 \
-u "$GATEWAY_ID" -P "$GATEWAY_API_KEY" \
-t "v3/$GATEWAY_ID/up" -f test-uplink-message
1
2
3
4
5
6
$ export GATEWAY_ID="test-gtw@my-tenant"
$ export GATEWAY_API_KEY="NNSXS.VEEBURF3KR77ZR..." # API key with RIGHT_GATEWAY_LINK rights
$ mosquitto_sub \
-h "thethings.example.com" -p 1882 \
-u "$GATEWAY_ID" -P "$GATEWAY_API_KEY" \
-t "v3/$GATEWAY_ID/down" -v

Gateway Status Messages

1
2
3
4
5
6
$ export GATEWAY_ID="test-gtw@my-tenant"
$ export GATEWAY_API_KEY="NNSXS.VEEBURF3KR77ZR..." # API key with RIGHT_GATEWAY_LINK rights
$ mosquitto_pub \
-h "thethings.example.com" -p 1882 \
-u "$GATEWAY_ID" -P "$GATEWAY_API_KEY" \
-t "v3/$GATEWAY_ID/status" -f test-gateway-status

TxAck Messages

1
2
3
4
5
6
$ export GATEWAY_ID="test-gtw@my-tenant"
$ export GATEWAY_API_KEY="NNSXS.VEEBURF3KR77ZR..." # API key with RIGHT_GATEWAY_LINK rights
$ mosquitto_pub \
-h "thethings.example.com" -p 1882 \
-u "$GATEWAY_ID" -P "$GATEWAY_API_KEY" \
-t "v3/$GATEWAY_ID/down/ack" -f example-tx-ack

Working with Events

Things Stack 生成大量的事件,让你能够洞察正在发生的事情。可以订阅应用程序、网关、终端设备事件,以及用户、组织和 OAuth 客户端事件。

CLI

1
ttn-lw-cli events subscribe --gateway-id gtw1 --application-id app1

HTTP

1
2
3
4
5
6
7
8
9
10
11
12
# create api key token
ttn-lw-cli users api-key create \
--user-id admin \
--right-application-all \
--right-gateway-all

# subscribe stream
curl https://thethings.example.com/api/v3/events \
-X POST \
-H "Authorization: Bearer NNSXS.BR55PTYILPPVXY.." \
-H "Accept: text/event-stream" \
--data '{"identifiers":[{"application_ids":{"application_id":"app1"}},{"gateway_ids":{"gateway_id":"gtw1"}}]}'

Components

Application Server

Application Server 处理 LoRaWAN 应用层,包括上行数据的解密和解码、下行队列和下行数据的编码和加密。

它托管一个用于流媒体应用程序数据的MQTT服务器,支持HTTP webhook以及发布/订阅集成。

应用程序可以通过多种协议和机制连接到 Application Server。

  • MQTT Protocol

应用程序可以通过 MQTT 交换 JSON 消息来连接到应用服务器。MQTT 在 TLS 上可用,为应用程序和应用服务器之间交换的消息提供机密性。

上行消息不仅包含数据上行消息,而且还包含不同主题的 join-accept 和 downlink 事件。

  • HTTP Webhooks

应用程序可以通过 HTTP webhook 获取流 JSON 消息,并通过向应用服务器发出 HTTP 请求来调度下行消息。

与 MQTT 一样,可以配置所有上游消息,包括上行消息、加入接收事件和下行事件,每个都用于分隔 URL 路径。

  • Pub/Sub Integrations

应用程序还可以使用发布/订阅集成来处理流数据。这包括连接到外部 MQTT 服务器和 NATS 服务器。

  • Message Processing

Application Server 可以解码和编码由终端设备发送和接收的二进制有效负载。这允许使用结构化流数据,例如使用 MQTT 和 HTTP Webhook 的JSON对象,同时使用通过无线电传输的压缩二进制数据。

消息处理器可以是众所周知的格式或自定义脚本,可以在设备级别或针对整个应用程序进行设置。

Network Server

Network Server 负责处理 LoRaWAN 网络层,包括 MAC命令、区域参数和自适应数据速率(ADR)。

Device Management

网络服务器为终端设备(End-Device)管理公开了 NsEndDeviceRegistry 服务。该服务的典型客户端有 Console 和 CLI。

网络服务器存储设备 MAC 配置、MAC状态 和网络会话密钥。

设备MAC配置变化可能会触发下行报文。

网络服务器允许应用服务器推送、替换和列出应用下行链路,以及通过 gRPC API 链接应用。

改变应用程序下行队列可能会触发下行消息。

一旦链接建立,Network Server 将通过该链接向客户端发送特定于应用程序的上行消息。每个应用程序最多只能有一个活动链接。

如果链接不是活动的,但是 Network Server 有特定于应用程序的上行消息要发送,这些消息将在链接建立后排队并发送。

网络服务器维护内部下行任务队列。每个下行任务都有一个与之相关的执行时间,这些任务按升序排列。每当下行任务准备执行时,它就会尽快执行。

Join-accept

如果存在一个挂起的会话,并且为该设备排队加入join-accept,它将被调度。

如果一个挂起的会话不存在或已经发送了 Join-accept, Network Server 将尝试在活动会话中生成并调度数据下行。

网络服务器通过 gRPC 从网关服务器接收上行链路。

网络服务器对接收到的上行链路进行处理。第一步是将上行链路与设备匹配。如果上行链路不能与网络服务器中存储的设备匹配,则丢弃上行链路。

Join-Request

如果收到 Join-request:

  1. 设备使用连接请求中出现的 DevEUI和 JoinEUI 对进行匹配,这是唯一标识设备的。
  2. 新的 DevAddr 被分配给设备,新的 MAC 状态被派生出来。
  3. 如果集群中存在 Join Server, Network Server 将向集群本地 Join Server 发送一个Join请求消息。
  4. 如果集群中不存在 Join Server,或者设备没有在集群本地 Join Server 中提供,网络服务器将向通过互操作性配置发现的 Join Server 发送一个 Join 请求消息。
  5. 如果一个 Join Server 接受了Join-request,设备可能会排队接收 Join-accept 消息,并向链接的 Application Server 发送特定于应用程序的上行消息,这些上行消息携带有关Join-accept 的信息。

如果收到数据上行:

  1. 使用数据上行链路中存在的 DevAddr 匹配设备。通过比较会话上下文和 MAC 状态进行匹配,并进行 MIC 检查。由于多个设备可能具有相同的 DevAddr,网络服务器在匹配设备之前可能需要通过多个存储设备。
  2. 网络服务器处理 MAC 命令,如果这样的命令存在于帧中,并相应地更新 MAC 状态。
  3. 如果数据上行配置了ADR 位,Network Server 运行 ADR 算法,更新 MAC 状态。
  4. 如果数据上行链路处理成功,下行链路可能会为设备排队,一个或多个特定于应用程序的上行消息携带有关 Join-accept 的相关信息被发送到链接的应用服务器。

Identity Server

身份服务器提供了存储实体(如应用程序及其终端设备、网关、用户、组织、OAuth客户端和身份验证提供者)的注册中心。它还通过成员关系和API键管理访问控制。

Entity Registries

实体注册中心存储关于 Things Stack 中所有主要实体的公共信息。这包括名称、描述和属性(用户定义的键值对)。

Users

在 Identity Server 中注册的第一个实体通常是管理用户。用户可以通过提供电子邮件地址并选择用户ID和密码进行注册。这些信息以后可以用于登录。

用户ID是用户的唯一标识符。用户 ID 与组织 ID 在同一个命名空间中。这意味着不可能创建与现有组织具有相同ID的用户。出于安全原因,也不可能重用已删除用户或组织的ID。

用户可以是“admin”,这给了他们更高的权限。正常的注册过程不允许用户注册为 admin,这就是为什么入门指南使用不同的命令创建admin用户。

用户可以处于多个状态之一:请求、批准、拒绝、挂起等。用户的状态决定了用户是否能够执行操作,以及哪些操作。正常情况下,用户处于“批准”状态。如果 The Things Stack 被配置为需要新用户的管理批准,那么用户最初处于“请求”状态,管理用户可以将其更新为“批准”或“拒绝”状态。如果用户行为不当,也可能被管理员挂起。

Gateways

网关可以通过选择一个ID和可选地注册网关的EUI来注册。注册后,可以创建一个API密钥;网关可以使用它的ID(或EUI)和API密钥一起使用 The Things Stack 进行身份验证。

为了保证网关的正常运行,需要设置频率规划ID,并设置网关是否需要满足占空比。

如果网关能够与网关配置服务器通信,则可以在网关注册表中设置网关服务器地址和固件更新设置。

网关注册表还存储有关网关天线的信息,如位置和增益。

Applications

应用程序用于在一个地方组织多个终端设备的注册和流量。应用程序通常对应于处于相同部署或相同类型的终端设备集合。

End Devices

Identity Server 中的终端设备注册表只存储终端设备的元数据,允许 Console 和 CLI 等客户端在应用程序中列出终端设备。它通常存储有关终端设备的品牌、模型、硬件、固件和位置的元数据。它还存储了 Network Server、Application Server 和Join Server的地址,以便 Console 和 CLI 等客户机知道存储终端设备的其他属性的位置。

Organizations

组织用于创建多个用户组,并方便地为整个用户组分配权限。

组织ID是组织的唯一标识符。组织id与用户id在相同的命名空间中。这意味着不可能创建具有与现有用户相同ID的组织。出于安全原因,也不可能重用已删除用户或组织的ID。

OAuth Clients

可以在 OAuth 注册表中注册外部 OAuth 客户端。OAuth 客户端使用客户端ID和秘密进行注册。

与用户一样,OAuth客户机可以处于多种状态中的一种。非admin用户创建的OAuth客户端需要admin用户审批。

官方的OAuth客户端可以被标记为“背书”,或者可以为所有用户预先授权。

Authentication Providers

联邦身份验证提供者可以在身份验证提供者注册中心中注册。身份验证提供者使用ID、名称和提供者配置注册。

身份验证提供者用于允许 Identity Server 从外部身份提供者(如OpenID Connect提供者)获得用户的身份。

Entity Access

标识服务器负责对实体的访问控制。

Memberships

成员资格定义了用户或组织对另一个实体拥有的权利。最简单的成员关系是用户是应用程序或网关的直接成员(或合作者)。成员资格的权限表明允许用户对应用程序或网关做什么。

间接成员关系是指用户是某个组织的成员,而该组织是某个应用程序或网关的成员。在这种情况下,用户-组织成员关系的权限与组织-应用或组织-网关成员关系的权限相交,计算出该用户的有效权限。

API Keys

对于大多数实体,都可以创建API Keys。这些 API Keys 允许您代表实体调用 API。API 密钥不会过期,但是可以撤销 API 密钥。

API键具有相关联的权限。这意味着可以创建一个应用程序API密钥,该密钥可以用来读取有关终端设备的信息,但不能看到根密钥,也不能进行更改。

可以合并成员资格和API密钥,例如,您可以创建一个组织API密钥,该密钥具有列出组织成员所在的应用程序的权利,以及读取这些应用程序中终端设备信息的权利。

OAuth

Identity Server 是 OAuth 2.0 服务器。用户可以授权 OAuth 客户机访问他们的数据、管理授权,甚至管理单个访问令牌。

Join Server

连接服务器处理 LoRaWAN 连接流,包括网络和应用服务器身份验证和会话密钥生成。

Join Procedure

连接服务器通过 gRPC 接收来自网络服务器的加入请求,如果加入请求验证通过,则为注册设备发出加入接收。

如果接受了一个连接请求,Join Server将派生会话安全上下文,该上下文包含会话密钥,并由会话密钥 ID 标识。使用分别在网络服务器和应用服务器之间共享的密钥加密密钥(KEKs), Join Servers 加密派生的网络和应用程序会话密钥,并将会话密钥以加密的形式包含在 Join-accept 中。

Device Management

Join Servers 公开JsEndDeviceRegistry 服务用于终端设备管理。该服务的典型客户端有 Console 和 CLI。

Join Servers 存储设备根和会话密钥。

Session Key Retrieval

Join Servers 公开 rpc 以检索给定会话密钥ID的会话密钥。

Interoperability

Join Servers 公开了LoRaWAN后端接口 1.0 规范中定义的 AS-JS、vNS-JS 和 hNS-JS 服务。

Gateway Server

网关服务器维护与支持基站、UDP、MQTT 和 gRPC 协议的网关的连接。上行流量直接或间接转发给网络服务器,下行流量由网关调度。

Connective

网关可以通过多种协议连接到网关服务器。

  • Basic Station Protocol

网关可以通过基站 LNS 协议与网关服务器连接。建议使用此协议连接网关。

  • UDP Protocol

网关可以通过 UDP 协议连接到网关服务器。与每条消息一起发送的 EUI 用于标识网关。

如果在标识服务器中找到具有此 EUI 的网关,则消息与此网关相关。否则,根据网络配置,上行链路可能被路由或丢弃。但是,由于无法识别其区域参数,网络将不会向此网关发送下行链路。

旧版本的数据包转发器实现 UDP 协议没有实现任何排队系统下行链路,导致数据包丢失,因为 SX130x 集中器不缓冲多个下行链路。因此,对于 UDP 协议,Things Stack 实现了向下发送到网关的延迟,这意味着它们将由集中器发出。您可以每个网关单独禁用此功能,例如,如果网关和网关服务器之间的RTT过高。

  • MQTT Protocol

网关可以通过 MQTT 交换 Protocol Buffer 连接到网关服务器。MQTT 可在TLS上使用,提供网关和网络之间交换的消息的机密性。与使用 JSON 编码的 UDP 协议相比,使用 Protocol Buffer 的编码减少了带宽的使用。技术细节请参见网关服务器MQTT协议。

实现 MQTT 协议的包转发器(Packet forwarders)特定于Things Stack。

Gateway Information

当连接网关时,网关服务器会统计与网关交换的消息以及网关发送的状态消息。这些统计数据可以通过网关服务器的gRPC和HTTP api来检索。看到Gs服务。

Communication with Network Server

网关服务器的主要功能是维护与网关的连接,并作为网关和网络服务器之间的中继。

当网关服务器接收数据上行消息时,根据设备的 DevAddr 和配置的转发表决定发送到哪个 Network Server。连接请求被路由到所有已配置的端点。端点可以是集群的 gRPC 网络服务器,或者是中间流量路由机制。

网络服务器可以请求下行消息的传输。网关服务器尝试基于所选网关、发送消息的时间和 LoRaWAN 设置(下行链路类、RX1延迟和RX1/RX2数据速率和频率)来调度消息。

网关服务器跟踪所有发送的和连接到它的网关将要发送的下行链路,包括基于消息大小和数据速率的准确广播时间。这使得 The Things Stack 能够进行智能调度。除了定时和 LoRaWAN 设置,网关服务器考虑到适用的限制,包括:

  • 调度冲突:如果一个下行链路的发送与另一个下行链路的发送重叠,则下行链路将被拒绝。
  • 停飞时间:一些频带和频率计划有停飞时间限制,这意味着网关在发送下行链路后的一段时间内不能发射。
  • 频宽比:一些国家,如欧洲国家,有频宽比限制,禁止一个设备在一定范围内排放超过一定百分比的时间。
  • 停留时间:一些国家,如美国,受停留时间的规定,即传输的持续时间不能超过一定的限制。

Networking

Port Allocations

The following table lists the default ports used.

Purpose Protocol Authentication Port Port (TLS)
Gateway data Semtech Packet Forwarder None 1700 (UDP) N/A
Gateway data MQTT (V2) API key, token 1881 8881
Gateway data MQTT API key, token 1882 8882
Application data, events MQTT API key, token 1883 8883
Management, data, events gRPC API key, token 1884 8884
Management HTTP API key, token 1885 8885
Backend Interfaces HTTP Custom N/A 8886
LNS Web Sockets Auth Token, Custom 1887 8887
Tabs Hubs LNS Web Sockets Auth Token, Custom 1888 8888

Service Discovery

The Things Stack supports discovering services using DNS SRV records. This is useful when dialing a cluster only by host name; the supported services and target host name and port are discovered using DNS.

To support service discovery for your The Things Stack cluster, configure DNS SRV records with the following services and protocols:

Protocol SRV Service SRV Protocol SRV Target
gRPC ttn-v3-is-grpc tcp Identity Server
gRPC ttn-v3-gs-grpc tcp Gateway Server
Semtech Packet Forwarder ttn-v3-gs-udp udp Gateway Server
MQTT (V2) ttn-v3-gs-mqttv2 tcp Gateway Server
MQTT ttn-v3-gs-mqtt tcp Gateway Server
Basic Station LNS ttn-v3-gs-basicstationlns tcp Gateway Server
gRPC ttn-v3-ns-grpc tcp Network Server
gRPC ttn-v3-as-grpc tcp Application Server
MQTT ttn-v3-as-mqtt tcp Application Server
gRPC ttn-v3-js-grpc tcp Join Server
gRPC ttn-v3-dtc-grpc tcp Device Template Converter
gRPC ttn-v3-dcs-grpc tcp Device Claiming Server
gRPC ttn-v3-gcs-grpc tcp Gateway Configuration Server
gRPC ttn-v3-qrg-grpc tcp QR Code Generator

简介

https://www.chirpstack.io/

LoRaWAN®是一种低功耗、广域(LPWA)网络协议,设计用于在区域、国家或全球网络中将电池操作的“事物”无线连接到互联网,并针对物联网的关键需求,如双向通信、端到端安全性、移动性和本地化服务。

ChirpStack 为 LoRaWAN 网络提供开源组件。它们一起形成了一个现成的解决方案,包括用于设备管理的用户友好的 WEB 界面和用于集成的 API。模块化的体系结构使得在现有的基础设施中集成成为可能。所有组件都在MIT许可下许可,并可用于商业目的。提供了以下组件:

  • ChirpStack网关桥:处理与 LoRaWAN 网关的通信。
  • ChirpStack网络服务器:一个 LoRaWAN 网络服务器的实现。
  • ChirpStack应用服务器:一个 LoRaWAN 应用服务器的实现。
  • ChirpStack Gateway OS:在基于树莓派(Raspberry Pi)的 LoRa 网关上运行(完整) ChirpStack栈的 Linux 操作系统。

架构

architecture

LoRaWAN 设备

LoRaWAN设备(上图中未显示)是通过一个或多个 LoRa 网关向 ChirpStack 网络服务器发送数据的设备。例如,这些设备可以是测量空气质量、温度、湿度、位置的传感器…

LoRa®网关

一个 LoRa 网关(通常)同时侦听 8 个或更多的通道,并将接收到的数据(从设备)转发到一个 LoRaWAN 网络服务器(在本例中是 ChirpStack 网络服务器)。运行在 LoRa 网关上负责接收和发送的软件称为包转发器(Packet Forwarder)。常见的实现是 Semtech UDP Packet ForwarderSemtech Basic Station Packet Forwarder.。

ChirpStack 网关桥

ChirpStack 网关桥接器位于数据包转发器和 MQTT 代理之间。它将数据包转发器格式(如Semtech UDP数据包转发器协议)转换为数据格式,由 ChirpStack 组件使用。它还提供与各种云平台的集成,如 GCP 云物联网核心和 Azure 物联网枢纽。

ChirpStack 网络服务器

ChirpStack 网络服务器是一个 LoRaWAN 网络服务器,负责管理网络状态。它具有网络上设备激活的知识,并能够在设备想要加入网络时处理连接请求。

当数据被多个网关接收时,ChirpStack 网络服务器将删除这些重复数据,并将其作为一个有效负载转发到 ChirpStack 应用服务器。

当应用服务器需要将数据发送回设备时,ChirpStack 网络服务器将把这些项目保存在队列中,直到它能够发送到其中一个网关。

ChirpStack应用服务器

ChirpStack 应用服务器是 LoRaWAN 应用服务器,与 ChirpStack 网络服务器兼容。它提供了一个 WEB 界面和 API,用于管理用户、组织、应用、网关和设备。

接收到的上行数据被转发到一个或多个已配置的集成。

终端应用

终端应用程序通过一个已配置的集成接收设备数据。它可以使用 ChirpStack 应用服务器 API 来调度设备的下行有效负载。终端应用程序的目的可能是分析、警报、数据可视化、触发操作……

ChirpStack Application Server

ChirpStack Application Server是一个开源的LoRaWAN®应用服务器,是 ChirpStack 开源 LoRaWAN 网络服务器堆栈的一部分。它负责 LoRaWAN 基础设施的设备“库存”部分,处理连接请求和应用程序有效负载的处理和加密。

它提供了一个可以管理用户、组织、应用程序和设备的网络界面。为了与外部服务集成,它提供了 gRPC 和 RESTful API。

发送 和/或 接收的设备数据可以通过 MQTT、HTTP 或直接写入到 InfluxDB。

有效载荷加密/解密

ChirpStack Application Server 处理应用程序有效负载的加密和解密。它还保存每个设备的应用程序键,并在 OTAA 激活时处理 join-accept。这意味着有效载荷将被解密发送到集成,但在有效载荷被发送到 ChirpStack 网络服务器之前,网络服务器无法访问这些有效载荷。

Web 界面

ChirpStack Application Server 提供了一个 Web 界面 (构建在提供的 RESTful API 之上)。这个 Web 界面可以用来管理用户、组织、应用程序和设备。

用户授权

使用 ChirpStack Application Server,可以授予用户全局的管理员权限,使他们成为组织的管理员,或者为他们分配组织内的仅查看权限。这使得在多租户环境中运行 ChirpStack Application Server 成为可能,在这种环境中,每个组织或团队只能访问他们自己的应用程序和设备。

API

为了与外部服务集成,ChirpStack Application Server 提供了gRPC 和 RESTFul API,公开了与 Web 界面相同的功能。

有效载荷和设备事件

ChirpStack Application Server 提供了不同的方式发送和接收设备负载(例如MQTT, HTTP, InfluxDB,…)。

注意:下行负载也可以通过API进行调度。

网关发现

对于包含多个网关的网络,ChirpStack Application Server 提供了一个特性来测试网关的网络覆盖。通过向每个网关定期发送 “ping”,ChirpStack 应用服务器能够发现这些 ping 被同一网络中的其他网关接收的情况。采集到的数据以地图的形式显示在 Web 界面上。可以为每个 Network Server 启用和配置此功能。

活动的帧日志(frame-logging)

使用 ChirpStack Application Server,可以检查每个网关或设备的所有原始和加密的 LoRaWAN 帧。当打开网关或设备详细信息页面上的 LoRaWAN 帧标签时,将看到实时的所有帧。这也将允许您检查每个 LoRaWAN 帧的(加密)内容。

活动的事件日志(event-logging)

使用 ChirpStack Application Server,可以从 Web 界面检查所有事件,而不需要使用 MQTT 客户端或构建集成。当打开实时事件日志标签上的设备详情,将看到所有实时 uplink,ack,join 和 error 事件。

Startup

certs (optional)

1
2
3
4
5
6
7
8
9
rm -rf certs/ca
mkdir -p certs/ca
cfssl gencert -initca config/ca-csr.json | cfssljson -bare certs/ca/ca

mkdir -p certs/mqtt/server
cfssl gencert -ca certs/ca/ca.pem -ca-key certs/ca/ca-key.pem -config config/ca-config.json -profile server config/mqtt/server/certificate.json | cfssljson -bare certs/mqtt/server/mqtt-server

cp certs/ca/* ~/codebase/oss/chirpstack/mqtt/specs/
cp certs/mqtt/server/* ~/codebase/oss/chirpstack/mqtt/specs/

mosquitto

https://mosquitto.org/

mosquitto mqtt-broker

1
2
3
4
5
6
docker run -it -p 11883:1883 -p 19001:9001 -v ./oss/chirpstack/mqtt/mosquitto.conf:/mosquitto/config/mosquitto.conf -v ./oss/chirpstack/mqtt/specs:/usr/local/certs --name mqtt-broker eclipse-mosquitto

docker exec -it mqtt-broker sh

mosquitto_sub -d -v -h mqtt-broker -p 11883 -t test --cafile /usr/local/certs/ca.pem
mosquitto_pub -d -h mqtt-broker -p 11883 -t test -m hellomqtt --cafile /usr/local/certs/ca.pem

chirpstack-docker

https://github.com/brocaar/chirpstack-docker

docker-compose.yml

chirpstack-application-server.toml

1
2
3
4
cd chirpstack-docker

docker-compose up
docker-compose rm

chirpstack-simulator

Download

1
https://github.com/brocaar/chirpstack-simulator

Build

解决 proxy.golang.org 无法访问

1
go env -w GOPROXY=https://goproxy.cn
1
go build -a -installsuffix cgo -ldflags "-s -w -X main.version=37a7e02" -o build/chirpstack-simulator cmd/chirpstack-simulator/main.go

Startup

chirpstack-simulator.toml (修改配置文件,用于自动创建设备及模拟设备感知数据)

  • jwt_token (Application Server API Keys)
  • service_profile_id (Application Server Service Profile)
1
./build/chirpstack-simulator -c chirpstack-simulator.toml

RedisGears 版本: v1.0

Home

RedisGears - Redis 中数据处理的可编程引擎。

RedisGears 是一个 Serverless 的引擎,用于处理 Redis 中的事务、批处理和事件驱动的数据。它是一个动态的执行函数的框架,而这些函数反过来又实现了 Redis 的数据流,同时(几乎)完全抽象了数据的分布和部署的选择(如单机vs集群,OSS vs企业级)。函数可以用不同的语言实现,包括 Python and C APIs。

Diagram Components

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
+---------------------------------------------------------------------+
| Redis Server +--------------------------------------+ |
| | RedisGears Module | |
| +----------------+ | | |
| | Data | Input | +------------+ +-------------------+ | |
| | +-------->+ | Function | | APIs | | |
| | Key1 : Value1 | | | +--------+ | | C, Python, ... | | |
| | Key2 : Value2 | Output | | | Reader | | +-------------------+ | |
| | Key3 : Value3 <---------+ | +---+----+ | +-------------------+ | |
| | ... | | | v | | Redis commands | | |
| +----------------+ | | +---+----+ | | Gears admin & ops | | |
| | | | Step 1 | | +-------------------+ | |
| | | +---+----+ | +-------------------+ | |
| +----------------+ | | v | | Coordinator | | |
| | Events | | | +---+----+ | | Cluster MapReduce | | |
| | | Trigger | | | Step 2 | | +-------------------+ | |
| | Data update +-------->+ | +---+----+ | +-------------------+ | |
| | Stream message | | | v | | Engine | | |
| | Time interval | | | ... | | Runtime execution | | |
| | ... | | +------------+ +-------------------+ | |
| +----------------+ +--------------------------------------+ |
+---------------------------------------------------------------------+

Quickstart

RedisGears Cluster

1
docker run -p 30001:30001 -p 30002:30002 -p 30003:30003 redislabs/rgcluster:latest

RedisGears Standalone

1
2
3
4
5
6
# run 
docker run -d --name redisgears -p 6379:6379 redislabs/redisgears:latest
# redis-cli
docker exec -it redisgears redis-cli
# function
RG.PYEXECUTE "GearsBuilder().run()"

RGGearsBuilder() 的别名。

Introduction

Reader

  • KeysReader

Keys Pattern

1
RG.PYEXECUTE "GearsBuilder().run('person:*')"

Function

执行复杂函数

1
cat mygear.py | docker exec -i redisgears redis-cli -x RG.PYEXECUTE

  • filter

  • map

    1
    2
    3
    4
    5
    gb = GearsBuilder()                       # declare a function builder
    gb.map(lambda x: int(x['value']['age'])) # map each record to just an age
    gb.run('person:*') # run it

    ## Expected result: [70, 14]
  • accumulate

Blocking vs. Nonblocking Execution

UNBLOCKING

非阻塞模式执行

1
2
127.0.0.1:6379> RG.PYEXECUTE "GB().run()" UNBLOCKING
"0000000000000000000000000000000000000000-0"

RG.DUMPEXECUTIONS

执行状态

1
2
3
4
5
127.0.0.1:6379> RG.DUMPEXECUTIONS
1) 1) "executionId"
2) "0000000000000000000000000000000000000000-0"
3) "status"
4) "done"

RG.GETRESULTS

获取结果

1
2
3
4
5
127.0.0.1:6379> RG.GETRESULTS 0000000000000000000000000000000000000000-0
1) 1) "{'key': 'foo', 'value': 'bar'}"
2) "{'key': 'person:1', 'value': {'age': '70', 'name': 'Rick Sanchez'}}"
3) "{'key': 'person:2', 'value': {'age': '14', 'name': 'Morty Smith'}}"
2) (empty list or set)

RG.GETRESULTSBLOCKING

转入阻塞模式,等待结果

Event Processing

默认是非阻塞执行

1
2
3
gb.register()
RG.DUMPEXECUTIONS ## 执行状态
RG.GETRESULTS ## 获取结果

Writing Data

execute()

RedisGears Python API 附带 execute ()函数,该函数允许在数据库中执行任意 Redis 命令。

Cluster

1
2
3
4
docker run -d --name rgcluster -p 30001:30001 -p 30002:30002 -p 30003:30003 redislabs/rgcluster:latest

docker exec -i rgcluster redis-cli -c -p 30001 < data.txt
# 默认情况下,cli 不遵循集群的重定向。要让 cli 自动在分片之间跳转,请使用 -c 命令行开关启动它。

data.txt

1
2
3
4
5
6
SET foo bar
HSET person:1 name "Rick Sanchez" age 70
HSET person:2 name "Morty Smith" age 14
HSET person:3 name "Summer Smith" age 17
HSET person:4 name "Beth Smith" age 35
HSET person:5 name "Shrimply Pibbles" age 87

Distributed Processing

当 RedisGears 在集群中运行时,默认情况下它将在集群的所有分片上执行函数。通过向 run ()操作提供 collect = False 参数来禁用。

1
2
3
4
127.0.0.1:30001> RG.PYEXECUTE "GB().run(collect=False)"
1) 1) "{'key': 'person:1', 'value': {'age': '70', 'name': 'Rick Sanchez'}}"
2) "{'key': 'person:5', 'value': {'age': '87', 'name': 'Shrimply Pibbles'}}"
2) (empty list or set)

MapReduce

Cluster Map and Reduce

accumulate-collect,这种情况只会将每个分片的最大值收集起来

1
2
3
4
5
6
7
8
9
10
11
12
def maximum(a, x):
''' Returns the maximum '''
a = a if a else 0 # initialize the accumulator
return max(a, x)

# Original, non-reduced, maximum function version
gb = GearsBuilder()
gb.map(lambda x: int(x['value']['age']))
gb.accumulate(maximum)
gb.run('person:*')

## Expected result: [87, 35, 14]

accumulate-collect-accumulate,这种情况会将所有分片的最大值收集起来后再计算最大值,得到最佳结果(1 个最大值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def maximum(a, x):
''' Returns the maximum '''
a = a if a else 0 # initialize the accumulator
return max(a, x)

# Reduced maximum function
gb = GearsBuilder()
gb.map(lambda x: int(x['value']['age']))
gb.accumulate(maximum)
gb.collect()
gb.accumulate(maximum)
gb.run('person:*')

## Expected result: [87]

aggregate()

RedisGears Python API 包含 aggregate() 操作,该操作将accumulate-collect-accumulate步骤包装为单个步骤:

1
2
3
4
5
6
7
8
9
# Aggregated maximum version
gb = GearsBuilder()
gb.map(lambda x: int(x['value']['age']))
gb.aggregate(0,
lambda a, x: max(a, x),
lambda a, x: max(a, x))
gb.run('person:*')

## Expected result: [87]

aggregate() 接受三个参数: 第一个是累加器的零值,另外两个是对累加函数的回调,这些函数将分别在本地和全局执行。

Local vs. Global

1
2
localgroupby  ## 由每个分片的引擎在本地执行
groupby ## 将结果重新分区,再由每个分片的引擎在本地执行 = repartition + localgroupby

Diagram(localgroupby, groupby)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
+----------------------+   +----------------------+   +----------------------+
| Shard A | | Shard B | | Shard C |
| +----------+-------+ | | +----------+-------+ | | +----------+-------+ |
| | Key | Value | | | | Key | Value | | | | Key | Value | |
| +------------------+ | | +------------------+ | | +------------------+ |
| | person:1 | {...} | | | | person:3 | {...} | | | | foo | bar | |
| | person:5 | {...} | | | | person:4 | {...} | | | | person:2 | {...} | |
| +-+--------+-------+ | | +-+--------+-------+ | | +-+--------+-------+ |
| v localgroupby() | | v localgroupby() | | v localgroupby() |
| +-+--------+-------+ | | +-+--------+-------+ | | +-+--------+-------+ |
| | Key | Value | | | | Key | Value | | | | Key | Value | |
| +------------------+ | | +------------------+ | | +------------------+ |
| | Sanchez | 1 | | | | Smith | 2 | | | | Smith | 1 | |
| | Pibbles | 1 | | | +-+----------------+ | | +-+----------------+ |
| +-+--------+-------+ | | | | |
+----------------------+ +----------------------+ +----------------------+
| v repartition() | | v repartition() | | v repartition() |
| +-+--------+-------+ | | +-+--------+-------+ | | +-+--------+-------+ |
| | Key | Value | | | | Key | Value | | | | Key | Value | |
| +------------------+ | | +------------------+ | | +------------------+ |
| | Pibbles | 1 | | | | Sanchez | 1 | | | | Smith | 2 | |
| +------------------+ | | +------------------+ | | | Smith | 1 | |
| | | | | +------------------+ |
| v localgroupby() | | v localgroupby() | | v localgroupby() |
| +-+--------+-------+ | | +-+--------+-------+ | | +-+--------+-------+ |
| | Key | Value | | | | Key | Value | | | | Key | Value | |
| +------------------+ | | +------------------+ | | +------------------+ |
| | Pibbles | 1 | | | | Sanchez | 1 | | | | Smith | 3 | |
| +------------------+ | | +------------------+ | | +------------------+ |
| | +---|------------------+ +---|------------------+
| | | |
| +-+--------+-------+ | | Implicit collect() |
| | Key | Value |<--------+--------------------------+
| +------------------+ |
| | Sanchez | 1 | |
| | Pibbles | 1 | |
| | Smith | 3 | |
| +------------------+ |
+----------------------+

当绝对需要时,函数可以使用任意键对集群中的数据进行重新分区。当数据被重新分区时,每个工作线程被分配一个记录键的子集,并且这些键从所有其他工作线程传递到它。

Mock(localgroupby, repartition, localgroupby)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def fname(x):
''' Extracts the family name from a person's record '''
return x['value']['name'].split(' ')[1]

def key(x):
''' Extracts the key of a record '''
return x['key']

def counter(k, a, r):
''' Counts records '''
return (a if a else 0) + 1

def summer(k, a, r):
''' Sums record values '''
return (a if a else 0) + r['value']

# Repartition for storing counts
gb = GearsBuilder()
gb.localgroupby(fname, counter)
gb.repartition(key)
gb.localgroupby(key, summer)
gb.foreach(lambda x: execute('SET', x['key'], x['value']))
gb.run('person:*')

# Expected result: the same + stored in Redis String keys

RedisGears 的 Python API 包含了 aggregateby ()操作,它等同于使用 GB () . localgroupby () . repartition () . localgroupby ()流。

Reference

Runtime

https://oss.redislabs.com/redisgears/runtime.html

Python RedisGears 函数使用嵌入式 Python 解释器运行。每个函数都使用一个单独的子解释器。所有函数共享相同的环境和依赖关系。导入的环境有几个默认值。

Python Interpreter

embeds python 3.7.2+

Environment

解释器的环境可以用任何依赖的包进行扩展,这些包以后可以被它们各自的子解释器中的函数导入和使用。

GearsBuilder

默认环境提供的函数上下文构造器。

execute

执行命令函数,默认环境提供的执行 Redis 命令的函数。

Python API
1
def execute(command, *args)
Examples
1
2
# Pings the server (reply should be 'PONG')
reply = execute('PING')

atomic

原子操作函数,上下文通过阻塞主 Redis 进程来确保它中的所有操作都以原子的方式执行。

Python API
1
class atomic()
Examples
1
2
3
4
5
6
7
8
9
# Increments two keys atomically
def transaction(_):
with atomic():
execute('INCR', f'{{{hashtag()}}}:foo')
execute('INCR', f'{{{hashtag()}}}:bar')

gb = GB('ShardsIDReader')
gb.foreach(transaction)
gb.run()

configGet

这个函数获取 RedisGears 配置选项的当前值。

gearsConfigGet

这个函数获取RedisGears配置选项的当前值,如果该键不存在,则返回默认值。

hashtag

这个函数返回一个 hashtag,该 hashtag 映射到本地引擎的分片所服务的最低哈希槽。换句话说,它作为一个 hashtag 在集群中进行分区时非常有用。

log

这个函数打印一条消息到Redis的日志中。

1
def log(message, level='notice')

level 取值 debug,verbose,notice,warning

Functions

RedisGears 函数是数据流中处理步骤的正式描述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
                  +------------+
| Function |
+-------------+ | +--------+ |
| Input data +-->+ | Reader | |
+-------------+ | +---+----+ |
| v |
| +---+----+ |
| | Step 1 | |
| +---+----+ |
| | |
| ... |
| v |
| +---+----+ |
| | Step n | |
| +---+----+ |
| v |
+-------------+ | +---+----+ |
| Results +<--+ | Action | |
+-------------+ | +--------+ |
+------------+

一个函数总是:

  1. 始于一个 Reader
  2. 操作零个或多个 Records
  3. 包含零个或多个 Operations (Step)
  4. 终于一个 Action
  5. 返回零个或多个 Results
  6. 可能会产生更多错误。

Execution

一个函数由 RedisGears 引擎以以下两种方式之一执行:

  • Batch 立即执行,对已有数据执行
  • Event由新事件及其数据触发执行

函数的执行方式是由它的动作决定的。有两种类型的行动:

  • Run 批量运行函数
  • Register 注册由事件触发的函数

当以批处理或事件的形式执行时,函数的上下文由引擎管理。除了函数的逻辑之外,上下文还包括内部执行步骤、状态、统计数据、结果和遇到的任何错误。

Execution ID

每个函数的执行都在内部分配了一个惟一的值,称为 Execution ID

ID 是由两部分组成的字符串值,以“-”分隔,如下所示:

  • Shard ID 集群中一个 Shard 的标识符,长度为40个字节
  • Sequence 一个不断增加的计数器

Execution ID示例:

Redis Standalone Mode: 0000000000000000000000000000000000000000-1

Redis Cluster Mode: a007297351b007297351c007297351d007297351-1

Execution Plan

在执行该功能之前,引擎会生成一个执行计划。该计划包含引擎执行功能将采取的基本步骤。

Execution Parallelization

在集群中执行时,执行计划由启动器生成。然后,默认情况下,它将在所有碎片上共享并并行执行。

发起者的协调器协调分布式操作。

Execution Status

Execution Status描述了功能当前的执行状态。它可以是以下几种之一:
created 已创建
running 正在执行
done执行完成
aborted执行被终止
pending_cluster启动器正在等待所有 Worker 执行完成
pending_runWorker 挂起等待启动器确认执行
pending_receive启动器在接收执行时挂起 Worker 的确认(ACK)
pending_terminationWorker 正在等待来自启动器的终止消息

下图演示了状态转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
       Initiator                                  Worker
+---------------------+ execution plan +---------------------+
| created +------------------>+ created |
+----------+----------+ +----------+----------+
v v
+----------+----------+ acknowledgement +----------+----------+
| pending_receive +<------------------+ pending_run |
+----------+----------+ +---------------------+
v
+----------+----------+ start execution +---------------------+
| running +------------------>+ running |
+----------+----------+ +----------+----------+
v v
+----------+----------+ results +----------+----------+
| pending_cluster +<------------------+ pending_termination |
+----------+----------+ +---------------------+
v
+----------+----------+ terminate +---------------------+
| done +------------------>+ done |
+---------------------+ +---------------------+

Registration

事件驱动函数的表示称为注册(Registration)。

注册被持久化在 Redis 的快照中,也就是 RDB 文件中。这允许在发生故障时恢复数据库中的数据和事件处理程序。

Registration ID

每个注册都有一个唯一的内部标识符,称为 Registration ID。它以与 Execution ID 相同的方式生成,尽管看起来相同,但两者不应该混淆。

Context Builder

Python 中的 RedisGears 函数总是以一个上下文构建器——GearsBuilder 类开始。

Actions

动作 (Action) 是一种特殊类型的操作。它总是函数的最后一步。

Run

Run动作作为批处理(Batch)运行函数。函数只执行一次,一旦读取器耗尽数据就退出。

Python API
1
class GearsBuilder.run(arg=None, convertToStr=True, collect=True)

Arguments

  • arg
    • 一个类似于全局的模式,用于 KeysReaderKeysOnlyReader
    • StreamReader 的密钥名称
    • 用于 PythonReader 的 Python 生成器
  • convertToStr 当为 true时将在流程末尾添加一个 map操作,用于字符串化记录
  • collect 当为 true 时将在流程末尾添加一个 collect 操作
Register

Register 操作将函数注册为事件处理程序。每次有事件出现时都执行该函数。每次执行时,函数对事件的数据进行操作,完成后将暂停,直到新事件调用它。

Python API
1
class GearsBuilder.register(convertToStr=True, collect=True, mode='async', onRegistered=None)

Arguments

  • convertToStr 当为 true时将在流程末尾添加一个 map操作,用于字符串化记录
  • collect 当为 true 时将在流程末尾添加一个 collect 操作
  • mode 触发函数的执行方式。可以是:
    • async 在整个集群中异步执行
    • async_local执行将是异步的,并且仅限于处理分片
    • sync本地同步执行
  • onRegistered 一个函数回调函数,在每个shard上注册函数时被调用。这是初始化非序列化对象(如网络连接)的好地方。

注意,可以将更多参数传递给 register函数,这些参数取决于 reader,详见 Readers 说明。

Results

函数的执行产生零条或多条结果记录。结果由函数最后一次操作和最后一次操作之前的任何记录组成。

结果存储在函数的执行上下文中。

Readers

Reader 是任何 RedisGears 函数必须执行的第一步,每个函数都有一个 Reader。Reader 读取数据并从中生成输入记录(Input)。输入记录由函数使用。

Reader Output Batch Event
KeysReader Redis keys and values Yes Yes
KeysOnlyReader Redis keys Yes No
StreamReader Redis Stream messages Yes Yes
PythonReader Arbitrary Yes No
ShardsIDReader Shard ID Yes No
CommandReader Command arguments No Yes

Operations

操作(Operation)是 RedisGears 函数的构建块。可采用不同的操作类型实现多种结果,满足各种数据处理需求。

操作可以有零个或多个参数来控制其操作。根据操作的类型参数,可以是语言原生数据类型和函数回调。

Operation Description Type
Map Maps 1:1 Local
FlatMap Maps 1:N Local
ForEach Does something for each record Local
Filter Filters records Local
Accumulate Maps N:1 Local
LocalGroupBy Groups records by key (many-to-less) extractor + reducer Local
Limit Limits the number of records Local
Collect Shuffles all records to one engine Global
Repartition Shuffles records between all engines Global
GroupBy Groups records by key (many-to-less) repartition+extractor+reducer Sugar
BatchGroupBy Groups records by key (many-to-less)repartition+extractor+batch reducer Sugar
Sort Sorts records aggregate+local sort+flatmap Sugar
Distinct Makes distinct records Sugar
Aggregate Aggregates records (many-to-one) accumulator+collect+accumulator Sugar
AggregateBy Aggregates records by key (many-to-less) extractor+reducer+repartition+extractor+reducer Sugar
Count Counts records aggregate(local counting + global suming) Sugar
CountBy Counts records by key aggregate (extractor + local counting + global suming) Sugar
Avg Computes the average aggregate(global sum + global count + local map) Sugar

Terminology

Local

本地(Local)操作的执行是在以独立模式或集群模式部署的 RedisGears 引擎中执行的。

当在独立模式下使用时,只有一个引擎在本地执行所有数据上的所有操作。

当再集群模式下使用时,该操作将分布到所有分片。每个分片的引擎也在本地执行操作。然而,分片的引擎只能处理集群对它们进行分区的数据。

Global

全局(Global)操作只在集群 RedisGears 环境的上下文中相关。这些是收集(collect)和重分区(repartition)操作,用于在分片之间转移记录。

Sugar

Sugar 操作是实用程序操作。这些是通过基本操作和相关回调在内部实现的。

Callback

Callback 用于在 API 所使用的语言中调用函数(回调函数)。

Extractor

Extractor 是一个回调函数,它接收输入记录(record)作为参数。它返回从记录中提取的值(value)。返回值应该是本机字符串(Native String)。

Mapper

Mapper 是一个回调函数,它接收输入记录(record)作为参数。它必须返回一个输出记录(record)。

Expander

Expander 是一个回调函数,它接收输入记录(record),必须返回一个或多个输出记录(record)。

Processor

Processor 是一个回调函数,它接收输入记录(record),它不该返回任何东西。

Filter

Filter 是一个回调函数,它接收输入记录(record),它必须返回一个布尔值(boolean)。

Accumulator

Accumulator 是一个回调函数,它接收输入记录和累加器变量。它将输入聚集到累加器变量中,累加器变量存储函数调用之间的状态。函数必须在每次调用后返回累加器的更新值。

Reducer

Reducer 是一个回调函数,它接收一个键、一个输入和一个称为累加器的变量。它的执行类似于 Accumulator 回调,不同之处在于它为每个 reduced 的键维护一个累加器。

Batch Reducer

Batch Reducer 是一个回调函数,它接收一个键和一个输入记录列表。它的执行类似于 reducer 回调,不同之处在于它的输入是一个记录列表,而不是单个记录。它将为这些记录返回一个累加器值。


Commands

RedisGears 是通过 Redis 客户端发送给服务器的命令来运行的。

Registration

RG.DUMPREGISTRATIONS

用来输出函数注册列表

RG.UNREGISTER

用来取消某个函数的注册。

EXECUTION

RG.PYEXECUTE ““ [UNBLOCKING] [REQUIREMENTS “ …”]

用于执行 Python 函数

RG.DUMPEXECUTIONS

用于输出函数执行列表。执行列表的长度由 MaxExecutions配置选项限制。

RG.GETEXECUTION [SHARD|CLUSTER]

用于获取函数执行细节:执行计划,步骤等

RG.GETRESULTS

用于获取函数的执行结果及错误信息

RG.GETRESULTSBLOCKING

取消函数在后台(UNBLOCKING)执行。取消执行的客户端被阻塞,直到函数执行结束,然后发送任何结果和错误。

RG.DROPEXECUTION

用于删除一个函数的执行。

RG.ABORTEXECUTION

在运行过程中中止函数的执行。

Trigger

RG.TRIGGER [arg …]

用来触发已注册的 CommandReader 函数的执行。

Global

RG.CONFIGGET […]

返回一个或多个内置配置或用户定义选项的值。

RG.CONFIGSET […]

设置一个或多个内置配置或用户定义选项的值。

RG.PYSTATS

从 Python 解释器返回内存使用统计信息。

RG.PYDUMPREQS (version 1.0.1)

返回所有可用 Python 依赖的列表(包含关于每个依赖的信息)。

RG.INFOCLUSTER

显示集群信息。

RG.REFRESHCLUSTER

刷新节点的集群拓扑视图。需要在集群的每个节点执行

Configuration

RedisGears 提供了配置选项来控制它的操作。这些选项可以在模块启动时设置,在某些情况下也可以在运行时设置。

Bootstrap Configuration 在加载 Redis Module 时配置,Runtime Configuration 在运行时配置(详见命令:RG.CONFIGSET <key> <value> [...]RG.CONFIGGET <key> [...])。

配置项 说明 期望值 默认值 运行时可配置性
MaxExecutions MaxExecutions 配置选项控制将保存在执行列表中的最大执行次数。
一旦达到这个阈值,就会按照创建顺序(FIFO)从列表中删除旧的执行。
只有已经完成的执行(例如“完成”或“中止”状态)被删除。
Integer 1000 Supported
MaxExecutionsPerRegistration MaxExecutionsPerRegistration 配置选项控制每个注册保存在列表中的最大执行次数。
一旦达到这个阈值,该注册的旧执行将按创建顺序(FIFO)从列表中删除。
只有已经完成的执行(例如“完成”或“中止”状态)被删除。
Integer 100 Supported
ProfileExecutions ProfileExecutions 配置选项控制是否对执行进行分析。对性能损耗较大,建议用于调试操作。 0 (disabled)
1 (enabled)
0 Supported
PythonAttemptTraceback PythonAttemptTraceback 配置选项控制引擎是否试图为 Python 运行时错误产生堆栈跟踪。 0 (disabled)
1 (enabled)
1 Supported
DownloadDeps DownloadDeps 配置选项控制 RedisGears 是否会尝试下载缺少的 Python 依赖项。 0 (disabled)
1 (enabled)
1 Not Supported
DependenciesUrl DependenciesUrl 配置选项控制 RedisGears 试图从何处下载其 Python 依赖项。 URL-like string 与RedisGears版本有关 Not Supported
DependenciesSha256 DependenciesSha256 配置选项指定 Python 依赖项的 SHA256 哈希值。
在下载依赖项之后,将验证此值,如果不匹配,将停止服务器的启动。
String 与RedisGears版本有关 Not Supported
PythonInstallationDir PythonInstallationDir 配置选项指定了 RedisGears 的 Python 依赖项的路径。 String /var/opt/redislabs/modules/rg Not Supported
CreateVenv CreateVenv 配置选项控制引擎是否创建虚拟 Python 环境。 0 (disabled)
1 (enabled)
0 Not Supported
ExecutionThreads ExecutionThreads 配置选项控制将要执行的线程数。 Integer > 0 3 Not Supported
ExecutionMaxIdleTime ExecutionMaxIdleTime 配置选项控制中止执行之前的最大空闲时间(以毫秒为单位)。空闲时间意味着执行没有进展。空闲时间的主要原因是在等待来自另一个失败(即崩溃)碎片的记录时被阻塞的执行。在这种情况下,执行将在指定的时间限制后中止。当执行再次开始进行时,将重置空闲计时器。 Integer > 0 5 seconds Supported
PythonInstallReqMaxIdleTime
1.0.1
PythonInstallReqMaxIdleTime 配置选项控制 Python 依赖安装中止之前的最大空闲时间(以毫秒为单位)。“空闲时间”表示安装没有进展。空闲时间的主要原因与ExecutionMaxIdleTime 相同。 Integer > 0 30000 Supported
SendMsgRetries
1.0.1
SendMsgRetries 配置选项控制在 RedisGears 的分片之间发送消息的最大重试次数。当消息被发送,而 shard 在确认之前断开连接,或者当它返回一个错误时,该消息将被重新发送,直到达到这个阈值。设置为0表示不限制重试次数。 Integer >= 0 3 Supported

Examples

RedisGears Examples: https://oss.redislabs.com/redisgears/examples.html

Clients

RedisGears 是一个 Redis 模块,所以需要 Redis 客户端来操作它。任何支持发送原始 Redis 命令的客户端都可以使用。

Redis Clients: https://redis.io/clients

RedisGears-specific clients: https://github.com/RedisGears/redisgears-py

Design

Isolation Technics - 隔离技术

Python

  • 全局字典(当前版本支持)
  • 子解释器(当前版本出现不兼容某些库而未启用,后续版本提供开关支持

Samples

环境搭建

RedisGears Cluster

1
2
3
## RedisGears Cluster
# docker run -p 30001:30001 -p 30002:30002 -p 30003:30003 --name rgc redislabs/rgcluster:latest
docker run -p 30001:30001 -p 30002:30002 -p 30003:30003 --name rgc redislabs/rgcluster:1.2.1

MySQL

1
2
## mysql 
docker run --name rgmysql -e MYSQL_ROOT_PASSWORD=root -p 3306:3306 -d mysql:8.0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# root
mysql -p
# Enter password: `root`

# show databases;
# DROP USER 'rguser'@'%';

create database rgdb;
create user 'rguser'@'%' identified by 'rguser';

# mysql 8.0 syntax
grant all privileges on rgdb.* TO 'rguser'@'%';
flush privileges;
exit;

mysql -u rguser -p;
use rgdb;
CREATE TABLE IF NOT EXISTS `person`(
`id` VARCHAR(100),
`first` VARCHAR(100),
`last` VARCHAR(40),
`age` INT,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
describe person;

Monitor UI

1
2
3
4
5
6
7
8
## Monitor UI
https://github.com/RedisGears/RedisGearsMonitor.git
cd RedisGearsMonitor

npm install
node main.js --port 30001

# http://localhost:9001/

Run Recipe

1
2
3
4
5
6
7
8
9
10
11
## run recipe
# pip3 install gears-cli
# gears-cli --host <host> --port <post> --password <password> run example.py REQUIREMENTS rgsync PyMySQL cryptography
# gears-cli run --host localhost --port 30001 example.py REQUIREMENTS rgsync PyMySQL cryptography

gears-cli run --host localhost --port 30001 example.py REQUIREMENTS git+https://gitlab.com/shankai/rgsync.git PyMySQL cryptography

# 这条是异常信息: failed running gear function (Failed install requirement on shard, check shard log for more info.)
# OK 代表发布成功。
# gears-cli run 之后,redisgears cluster 会动态加载相关模块。
# Monitor UI 会显示 PersonsWriteBehind Registrations 相关状态信息。
example.py (WriteBehind + RGWriteThrough)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from rgsync import RGWriteBehind, RGWriteThrough
from rgsync.Connectors import MySqlConnector, MySqlConnection

'''
Create MySQL connection object
'''
# connection = MySqlConnection('demouser', 'Password123!', 'localhost:3306/test')
connection = MySqlConnection('rguser', 'rguser', 'localhost:3306/rgdb')

'''
Create MySQL persons connector
'''
personsConnector = MySqlConnector(connection, 'person', 'id')

personsMappings = {
'first_name':'first',
'last_name':'last',
'age':'age'
}

RGWriteBehind(GB, keysPrefix='person', mappings=personsMappings, connector=personsConnector, name='PersonsWriteBehind', version='99.99.99')

# RGWriteThrough(GB, keysPrefix='__', mappings=personsMappings, connector=personsConnector, name='PersonsWriteThrough', version='99.99.99')

演示操作

WriteBehind

hmset (不同操作完成后,在 mysql rgdb.person 表中查看数据同步结果)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hmset person:1 first_name zhang last_name san age 21
hmset person:2 first_name li last_name sisi age 18

hmset person:10 first_name li last_name sisi age 18

## 复制控制

## 变更并同步(默认行为)
hmset person:3 first_name li last_name sisi age 22 # =

## 变更不同步
hmset person:3 first_name li last_name sisi age 50 # +
# HGETALL person:3

## 删除并同步
hmset person:3 # ~

hmset person:3 first_name li last_name sisi age 22 # =
## 删除不同步
hmset person:3 # -

正好一次(源码有缺陷,mysql connector 修复见 https://gitlab.com/shankai/rgsync
1
2
3
4
5
6
CREATE TABLE IF NOT EXISTS `exactlyonce`(
`id` VARCHAR(100),
`val` VARCHAR(100),
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
describe exactlyonce;

example.py 文件内修改 personsConnector 实例化,添加 exactlyOnceTableName 参数值

1
personsConnector = MySqlConnector(connection, 'person', 'id', 'exactlyonce')
ACK
1
2
3
4
# 
hset person:007 first_name James last_name Bond age 43 # =6ce0c902-30c2-4ac9-8342-2f04fb359a944412
# XREAD BLOCK <timeout> STREAMS {<hash key>}<uuid> 0-0
XREAD BLOCK 2000 STREAMS {person:007}6ce0c902-30c2-4ac9-8342-2f04fb359a944412 0-0

WriteThrough

1
2
3
HSET __{person:1} first_name foo last_name bar age 20 # =123456

XREAD BLOCK 2000 STREAMS {person:104}123456 0-0

(完)

Introduce

ZeroMQ, 一个开源的通用消息库。

ZeroMQ(也拼作ØMQ、0MQ 或 ZMQ)是一个高性能异步消息传递库,旨在在分布式或并发应用程序中使用。它提供了一个消息队列,但与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。

ZeroMQ支持各种传输(TCP、进程内、进程间、多播、WebSocket等等)上的通用消息传递模式(发布/订阅、请求/应答、客户端/服务器等),使进程间消息传递像线程间消息传递一样简单。这使你的代码清晰,模块化和极容易扩展。

ZeroMQ是由一个大型贡献者社区开发的。许多流行的编程语言都有第三方实现,C# 和 Java 也有 Native Ports。

ZeroMQ的哲学是从零开始的。零代表零代理(ZeroMQ是无代理的)、零延迟、零成本(它是免费的)和零管理。更普遍地说,“零”指的是渗透到项目中的极简主义文化。我们通过消除复杂性而不是暴露新功能来增加功能。

Libzmq - the low level library

Libzmq (https://github.com/zeromq/libzmq) 是大多数不同语言绑定背后的底层库。Libzmq 公开 C-API 及 C++ 的实现。

Languages

C、C++、C#、Erlang、F#、Go、Haskell、Java、Node.js、Perl、Python、Ruby、Rust 等。

更多:http://wiki.zeromq.org/bindings:_start

Java

Messages

Messages

Blob

ZeroMQ 消息是在应用程序或相同应用程序的组件之间传递的离散数据单元。从ZeroMQ本身的角度来看,消息被认为是不透明的二进制数据。

在网络上,ZeroMQ 消息是可以容纳内存的从 0 开始任意大小的 Blob。可以使用 Protocol Buffers、msgpack、JSON 或应用程序需要表达的任何东西来进行自己的序列化。选择可移植的数据表示是明智的,可以自己做出权衡的决定。

Frame

最简单的 ZeroMQ 消息由一个帧(也称为消息部分)组成。

帧是 ZeroMQ 消息的基本格式,帧是指定长度的数据块。

ZeroMQ 保证传递消息的所有部分(一个或多个),或者不传递任何部分。这允许以单个消息的形式发送或接收帧列表。

Multipart

消息(单个或多个部分)必须装入内存。如果想发送任意大小的文件,应该将它们分解为多个块(piece),并将每个块(piece)作为单个部分消息(single-part)发送。

使用 Multipart 数据不会减少内存消耗。

Working with strings

将数据作为字符串传递通常是进行通信的最简单方法,因为序列化非常简单。对于ZeroMQ,有这样的规则:字符串是指定长度的,并且在发送时不带末尾null

以下函数(JeroMQ 为例)将字符串作为单帧消息发送到套接字,其中字符串的长度等于帧的长度。

1
socket.send("HELLO");

要从套接字读取字符串,只需调用 recv 函数。

1
String hello = socket.recvStr()

如果需要更多的控制帧,可以使用 ZFrame 类。下面的代码片段展示了如何从字符串创建帧并发送帧。

1
2
ZFrame stringFrame = new ZFrame("HELLO");
stringFrame.send(socket, 0);

调用 ZFrame 类的静态 recvFrame 函数,从套接字中读取一个帧并返回一个 ZFrame 对象。如果内容是字符串,则可以通过提供用于序列化它的字符集的 getString 方法来检索它。

默认情况下,ZMQ.CHARSET 用于序列化和反序列化字符串的所有操作。

1
2
ZFrame stringFrame = ZFrame.recvFrame(socket);
String hello = stringFrame.getString(ZMQ.CHARSET);

因为我们利用帧的长度来反映字符串的长度,我们可以通过将每个字符串放入一个单独的帧来发送多个字符串。

在一个消息中发送多个字符串帧是可以使用 sendMore方法的。这个方法将延迟消息的实际发送,直到最后一帧被发送。

1
2
3
socket.sendMore(socket, "HELLO");
socket.sendMore(socket, "beautiful");
socket.send(socket, "WORLD!");

可以使用 ZMsg 类构造多帧消息,而不必使用套接字的 send API。

1
2
3
4
5
ZMsg *strings = new ZMsg();
strings.add("HELLO");
strings.add("beautiful");
strings.add("WORLD");
strings.send(socket);

若要接收一系列字符串帧,请多次调用 recvStr 函数。

1
2
3
String hello     = socket.recvStr();
String beautiful = socket.recvStr();
String world = socket.recvStr();

为了在一次调用中检索整个消息,可以使用 ZMsg 类的静态 recvMsg 方法。

1
2
3
4
ZMsg strings = ZMsg.recvMsg(socket);
String hello = strings.popString();
String beautiful = strings.popString();
String world = strings.popString();

Socket API

套接字实际上是网络编程的标准 API。这就是为什么 ZeroMQ 提供了一个熟悉的基于套接字的API。ZeroMQ 对开发人员特别有吸引力的一点是,它使用不同的套接字类型来实现任何任意的消息传递模式。此外,ZeroMQ 套接字对底层网络协议提供了一个清晰的抽象,这隐藏了这些协议的复杂性,使它们之间的切换非常容易。

与传统套接字的关键区别

一般来说,传统套接字为面向连接的可靠字节流(SOCK_STREAM)或不可靠数据报(SOCK_DGRAM)提供同步接口。相比之下,ZeroMQ 套接字提供了异步消息队列的抽象,其确切的队列语义取决于所使用的套接字类型。传统套接字传输字节流或离散数据报,ZeroMQ 套接字传输离散消息。

ZeroMQ 套接字是异步的,这意味着物理连接的建立和断开、重新连接和有效交付的时间对用户是透明的,并由 ZeroMQ 自己组织。此外,如果对等端无法接收消息,则消息可能被排队。

传统套接字只允许严格的一对一(两个对等体)、多对一(多个客户端、一个服务器)或在某些情况下一对多(多播)关系。除了 PAIR 套接字之外,ZeroMQ 套接字可以连接到多个端点,同时接受来自绑定到套接字的多个端点的入站连接,从而允许多对多关系。

套接字生命

ZeroMQ 套接字生命有四部分,类似 BSD 套接字:

  • 创建和销毁套接字,它们共同形成了一个套接字生命的因果循环;
  • 通过在套接字上设置选项并在必要时检查它们来配置套接字;
  • 通过创建与套接字的ZeroMQ连接,将套接字插入到网络拓扑中;
  • 通过在套接字上写入和接收消息来传递数据。

绑定与连接(Bind vs Connect)

ZeroMQ 套接字并不在乎谁在绑定,谁在连接。在之前的示例中,你可能注意到的是服务端使用了绑定,客户端使用了连接。为什么是这样的,有什么区别?

ZeroMQ 为每个基础的连接创建队列。如果你的套接字连接到了 3 个对等的套接字,那么在后台将有 3 个消息队列。

使用绑定(Bind),允许任意的对等套接字连接(Connect)上来,因此在绑定端不清楚到底会有多少个对等点,也不能提前创建队列。相反,队列是作为连接(Connect)到绑定套接字的单个对等体创建的。

使用连接(Connect),ZeroMQ 知道至少会有一个对等体,因此它可以立即创建单个队列。这适用于所有的套接字类型,除了路由器(ROUTER),只有在我们连接的对等体确认我们的连接后才创建队列。

因此,当向没有对等体的绑定(Bind)套接字或没有实时连接的路由器(ROUTER)发送消息时,没有队列来存储消息。

何时用绑定(Bind),何时用连接(Connect)?

一般情况下,在体系结构最稳定的点(Points)使用绑定(Bind),在具有不稳定端点(Endpoints)的动态组件使用连接(Connect)。对于请求/应答(request/reply),服务提供者可能是您绑定和客户端使用 Connect 的地方。就像普通的旧 TCP 一样。

如果不知道哪个部分更稳定(比如点对点) ,可以考虑中间的一个稳定设备,这样所有的部分都可以去连接。

高水位标记(High-Water-Mark)

高水位线是对 ZeroMQ 在内存中排队的未完成消息的最大数量的硬性限制

如果达到此限制,套接字将进入异常状态,根据套接字类型,ZeroMQ 将采取适当的操作,如阻塞(Blocking)**或丢弃(Dropping)**发送的消息。请参考下面的套接字描述,以了解为每种套接字类型所采取的具体操作。

消息传递模式(Messaging Patterns)

在 ZeroMQ 套接字 API 是对消息传递模式(Messaging Patterns)的包装。ZeroMQ 模式通过具有匹配类型套接字对来实现。

套接字类型定义了套接字的语义、它用于向内向外路由消息的策略、队列等。可以将某些类型的套接字连接在一起,例如,发布服务器套接字和订阅服务器套接字。套接字在“消息传递模式”中协同工作。

内置的 ZeroMQ 核心模式如下:

  • Request-reply(请求/应答),它将一组客户端连接到一组服务端。这是一种远程过程调用(RPC)和任务分布(Task Distribution)模式。

  • Pub-sub(发布-订阅),它将一组发布者连接到一组订阅者。这是一种数据分布(Data Distribution)模式。

  • Pipeline(管道),它以扇出(fan-out)/扇入(fan-in)模式连接节点,该模式可以有多个步骤和循环。这是一个并行任务分配和收集(Parallel task distribution and collection)模式。

  • Exclusive pair(独占对),专门连接两个套接字。这是一种在进程中连接两个线程的模式,不要与“普通”的套接字对混淆。

还有更多ZeroMQ模式仍处于草案状态:

  • Client-server(客户端-服务器),它允许单个 ZeroMQ 服务器与一个或多个 ZeroMQ 客户机通话。客户端始终启动会话,在此之后,任何一方都可以异步地向另一方发送消息。
  • Radio-dish(无线接收天线),用于一对多的数据分发,以扇形方式(fan out)将数据从一个发布者发送到多个订阅者。

Request-reply

https://rfc.zeromq.org/spec/28/

请求-应答(Request-reply)模式适用于各种面向服务的体系结构。它有两种基本形式:** 同步(REQ 和 REP)异步(DEALER 和 ROUTER) **,可以以各种方式混合使用。DEALER 和 ROUTER 套接字是许多高级协议的构建块,比如 rfc.zeromq.org/spec:18/mdp 协议。

REQ

客户端使用 REQ 套接字向服务发送请求并从服务接收响应。这种套接字类型只允许交替的发送和随后的接收调用。一个 REQ 套接字可以连接到任意数量的 REPROUTER 套接字。发送的每个请求都在所有连接的服务之间循环,接收到的每个应答都与最后发出的请求匹配。它是为简单的请求-应答模型而设计的,在这种模型中,针对失败对等体的可靠性不是问题。

如果没有可用的服务,那么套接字上的任何发送操作都将阻塞,直到至少有一个服务可用。REQ 套接字不会丢弃任何消息。

REP

服务使用 REP 套接字接收来自客户机的请求并向客户机发送应答。这种套接字类型只允许接收和随后的发送调用交替序列。接收到的每个请求都是来自所有客户机的公平队列(fair-queued),发送的每个应答都被路由到发出最后一个请求的客户机。如果原始请求者不再存在,则静默丢弃应答。

DEALER

DEALER 套接字类型与一组匿名对等体通信,使用循环(round-robin)算法发送和接收消息。只要它不丢弃消息,它就是可靠的。对于与 REPROUTER 通信的客户端,DEALER 作为 REQ 的异步替代。DEALER 收到的消息从所有连接的对等体公平排队(fair-queued)。

DEALER 由于达到了所有对等体的高水位而进入静音状态,或者如果根本没有对等体,那么套接字上的任何发送操作都将阻塞(Block),直到静音状态结束或至少有一个对等体可用来发送;消息不会被丢弃。

DEALER 连接到 REP 套接字时,发送的消息必须包含一个空帧作为消息的第一部分(分隔符),然后是一个或多个正文部分。

ROUTER

ROUTER 套接字类型与一组对等体对话,使用显式寻址,以便每个传出的消息被发送到一个特定的对等体连接。ROUTER 作为REP的异步替代品,经常用作服务器与 DEALER 通信的基础。

ROUTER 套接字接收消息时,在将消息传递给应用程序之前,将包含原始对等体的路由id的消息部分预先添加到消息部分。接收到的消息从所有连接的对等体中公平排队(fair-queued)。当发送消息时,一个 ROUTER 套接字将删除消息的第一部分,并使用它来确定消息应该被路由到的对等体的路由id。如果对等体已经不存在或从未存在过,则该消息将被静默丢弃

ROUTER 套接字由于达到了对所有对等体的最高水位而进入静音状态时,发送到该套接字的任何消息都将被丢弃,直到静音状态结束。同样,路由到已达到单个高水位标记的对等点的任何消息也将被丢弃。

当一个 REQ 套接字连接到一个 ROUTER 套接字时,除了原始对等体的路由id外,每个收到的消息还应该包含一个空的分隔符消息部分。因此,应用程序看到的每个接收到的消息的整个结构变成:一个或多个路由id部分、分隔符部分、一个或多个主体部分。当向 REQ 套接字发送应答时,应用程序必须包含分隔符部分。

Publish-subscribe

https://rfc.zeromq.org/spec/29/

发布-订阅(Publish-subscribe)模式用于以扇形方式将数据从一个发布者分发到多个订阅者的一对多分布。

ZeroMQ 通过四种套接字类型来支持发布/订阅:PUB,XPUB,SUB,XSUB

Topics

ZeroMQ 使用多部分(Multipart)消息来传递主题(Topic)信息。Topic 表示为字节数组,但也可以使用字符串和适当的文本编码。

发布者必须在消息有效负载之前的第一个帧中包含主题。例如,向status主题的订阅者发布状态消息:

1
2
3
//  Send a message on the 'status' topic
pub.sendMore("Status");
pub.send("All is well");

订阅者通过 Socket 类的 subscribe 方法指定他们感兴趣的主题:

1
2
//  Subscribe to the 'status'
sub.subscribe("status");

一个订阅者(subscriber)套接字可以有多个订阅筛选器(filter)。

使用前缀检查将消息的主题(Topic)与订阅者的订阅主题进行比较,也就是说,订阅了 topic 的订阅者会收到主题为:topictopic/subtopictopical的消息,然而,它不会接收 topiTOPIC 的主题消息。

PUB

发布者使用 PUB 套接字来分发数据。发送的消息以扇出的方式分发到所有连接的对等体。此套接字类型无法接收任何消息。

PUB 套接字由于达到了订阅者的高水位而进入静音状态时,将发送到有问题的订阅者的任何消息都将被丢弃,直到静音状态结束。对于这种套接字类型,Send 函数永远不会阻塞。

SUB

订阅者使用SUB套接字订阅由发布者分发的数据。最初,SUB套接字不订阅任何消息。此套接字类型没有实现 send 函数。

XPUB

PUB 相同,只是可以以传入消息的形式从对等节点接收订阅(Subscription)。订阅消息由两部分组成,一是字节1(用于订阅)或字节0(用于取消订阅),二是在一后面跟着订阅主体。没有订阅或取消订阅前缀的消息也会被接收,但对订阅状态没有影响。

XSUB

SUB相同,不同的是您是通过向套接字发送订阅(Subscription)消息来进行订阅的。订阅消息由两部分组成,一是字节1(用于订阅)或字节0(用于取消订阅),二是在一后面跟着订阅主体。没有订阅或取消订阅前缀的消息也会被发送,但对订阅状态没有影响。

Pipeline

https://rfc.zeromq.org/spec/30/

管道(pipeline)模式用于任务分配,通常是在一个多阶段管道中,其中一个或几个节点将工作推给多个工作人员,然后这些工作人员又将结果推给一个或几个收集器。该模式大多是可靠的,除非节点意外断开连接,否则它不会丢弃消息。它是可扩展的,节点可以在任何时候加入。

ZeroMQ通过两种套接字类型支持流水线:PUSHPULL

PUSH

PUSH 套接字类型与一组匿名的 PULL 对等体对话,使用循环算法发送消息。没有为该套接字类型实现接收(receive)操作。

当由于到达所有下游节点的高水位,或没有下游节点,则套接字进入静音状态,此时任何套接字上发送操作将被阻塞,直到静音状态结束或至少一个下游节点发送可用;消息不会被丢弃

PULL

PULL套接字类型与一组匿名 PUSH 对等体对话,使用公平排队算法接收消息。没有为该套接字类型实现发送(send)操作。

Exclusive Pair

https://rfc.zeromq.org/spec/31/

独占对 PAIR 不是一个通用套接字,用于两个对等体在结构上稳定的特定用例。这通常限制了 PAIR 只能在单个进程中使用,用于线程间通信。

PAIR 类型的套接字在任何时候只能连接到单个对等点。对通过 PAIR 套接字发送的消息不执行路由筛选

当一个 PAIR 套接字由于已连接的对等点达到了高水位标记而进入静音状态,或者如果没有对等点被连接,那么套接字上的任何发送操作都会被阻塞,直到对等点可以发送为止;消息不会被丢弃。

虽然 PAIR 套接字可以通过 inproc 以外的传输方式使用,但是它们不能自动重新连接,而且新连入的连接将被终止,而以前存在的任何连接(包括处于关闭状态的连接)在大多数情况下都不适合 TCP。

zguide

Advanced Request-Reply Pattern

路由的本质是消息中包含了一个身份帧(信封地址)。

REQ 和 REP 是同步的;DEALER 和 ROUTER 是异步的,无视应答信封。

DEALER 类似于异步 REQ 套接字,ROUTER 类似于异步 REP 套接字。在使用 REQ 套接字的地方,我们可以使用 DEALER; 我们只需要自己读写信封。在使用 REP 套接字的地方,我们可以粘贴 ROUTER; 我们只需要自己管理标识。

可以将 REQ 和 DEALER 套接字视为“客户端”,将 REP 和 ROUTER 套接字视为“服务器”。通常,您需要 bind REP 和 ROUTER 套接字,并将 REQ 和 DEALER 套接字 connect 到它们。

Identity 标识当前套接字的身份信息;envelope(Envelope Frame) 标识将要路由到的套接字地址

请求-应答模式套接字有效的组合:

  • REQ to REP
  • DEALER to REP
  • REQ to ROUTER
  • DEALER to ROUTER
  • DEALER to DEALER
  • ROUTER to ROUTER

无效组合:

  • REQ to REQ 双方都希望通过相互发送消息来开始。
  • REQ to DEALER 当有多个 REQ 时,DEALER 无法原路返回。
  • REP to REP 双方都会等待对方发送第一条消息。
  • REP to ROUTER ROUTER 不知道 REP 的地址。

Introduce

简单身份验证和安全层(Simple Authentication and Security Layer,简称SASL)是一种Internet标准(RFC 2222),它指定了用于身份验证和可选地在客户机和服务器应用程序之间建立安全层的协议。SASL定义了如何交换身份验证数据,但本身并不指定该数据的内容。它是一个框架,指定身份验证数据的内容和语义的特定身份验证机制可以融入其中。

SASL被协议(如轻量级目录访问协议版本3 (LDAP v3)和Internet消息访问协议版本4 (IMAP v4))用于启用可插拔身份验证。LDAP v3和IMAP v4没有将身份验证方法硬连接到协议中,而是使用SASL执行身份验证,从而通过各种SASL机制启用身份验证。

Internet社区为不同级别的安全性和部署场景定义了许多标准的SASL机制。这些级别的范围从无安全性(例如,匿名身份验证)到高安全性(例如,Kerberos身份验证)以及介于两者之间的级别。

The Java SASL API

Java SASL API为使用SASL机制的应用程序定义了类和接口。它被定义为与机制无关的:使用API的应用程序不需要硬连接到使用任何特定的SASL机制。该API支持客户机和服务器应用程序。它允许应用程序根据所需的安全特性选择要使用的机制,比如它们是否容易受到被动字典攻击,或者它们是否接受匿名身份验证。
Java SASL API还允许开发人员使用他们自己的自定义SASL机制。通过使用Java加密体系结构(JCA)安装SASL机制。

When to Use SASL

SASL为网络应用程序提供了可插拔的身份验证和安全层。Java SE中还有其他特性提供类似的功能,包括Java安全套接字扩展(JSSE)和Java通用安全服务(Java GSS)。JSSE为SSL和TLS协议的Java语言版本提供了一个框架和实现。Java GSS是通用安全服务应用程序编程接口(GSS- api)的Java语言绑定。Java SE上这个API目前支持的唯一机制是Kerberos v5。
与JSSE和Java GSS相比,SASL是相对轻量级的,并且在最近的协议中很流行。它还有一个优点,就是定义了几种流行的轻量级(在基础设施支持方面)SASL机制。另一方面,主要的JSSE和Java GSS机制具有相对重量级的机制,需要更详细的基础设施(分别是公钥基础设施和Kerberos)。

SASL、JSSE 和 Java GSS 通常一起使用。例如,一个常见的模式是应用程序使用 JSSE 建立安全通道,使用 SASL 进行客户端基于用户名/密码的身份验证。在 GSS-API 机制之上还有 SASL 机制;一个流行的示例是与 LDAP 一起使用的 SASL GSS-API/Kerberos v5 机制。

除了从头定义和构建协议之外,协议定义通常是决定使用哪个 API 的最大因素。例如,LDAP 和 IMAP 被定义为使用 SASL,因此与这些协议相关的软件应该使用 Java SASL API。在构建Kerberos 应用程序和服务时,使用的 API 是Java GSS。当构建使用 SSL/TLS 作为协议的应用程序和服务时,使用的 API 是JSSE。

SASL Architechture

SASL Architechture

sasl-archit

SASL Life Cycle

sasl-flowchart-overview

SASL Session Initialization

sasl-flowchart-init

SASL Authentication: Sending Client Data

sasl-flowchart-auth

SASL Authentication: Processing Server Data

sasl-flowchart-auth2

Java SASL API Overview

SASL 是一种挑战-响应协议。服务器向客户机发出质疑,客户机根据质疑发送响应。这种交换一直持续到服务器满意并且不发出进一步的挑战为止。这些挑战和响应是任意长度的二进制标记。封装协议(例如 LDAP 或 IMAP)指定了这些令牌的编码和交换方式。例如,LDAP 指定 SASL 标记如何封装在 LDAP 绑定请求和响应中。

Java SASL API是根据这种交互和使用风格建模的。它具有分别表示客户端和服务器端机制的接口 SaslClient 和 SaslServer。应用程序通过表示挑战和响应的字节数组与机制交互。服务器端机制迭代、发出挑战并处理响应,直到它满意为止,而客户端机制迭代、评估挑战并发出响应,直到服务器满意为止。使用该机制的应用程序驱动每次迭代。也就是说,它从协议包中提取挑战或响应,并将其提供给该机制,然后将该机制返回的响应或响应放入协议包中,并将其发送给对等体。

How SASL Mechanisms are Installed and Selected

SASL 机制实现由 SASL 安全提供者提供。每个提供者可以支持一个或多个 SASL 机制,并在 JCA 注册。

默认情况下,SunSASL 提供程序自动注册为 JCA 提供程序。若要将其作为 JCA 提供程序移除或重新排列其优先级,请更改 Java 安全属性文件(Java-home/conf/security/Java.security)。

1
security.provider.7=SunSASL

若要添加或删除 SASL 提供程序,请在安全属性文件中添加或删除相应的行。

或者,可以使用 java.security.Security 以编程方式添加自己的提供程序。例如,下面的示例代码将 com.example.MyProvider 注册到可用的 SASL 安全提供程序列表中。

1
Security.addProvider(new com.example.MyProvider());

The SunSASL Provider

SunSASL 提供程序支持以下客户机和服务器机制:

  • Client Mechanisms
    • PLAIN (RFC 2595). This mechanism supports cleartext user name/password authentication.
    • CRAM-MD5 (RFC 2195). This mechanism supports a hashed user name/password authentication scheme.
    • DIGEST-MD5 (RFC 2831). This mechanism defines how HTTP Digest Authentication can be used as a SASL mechanism.
    • EXTERNAL (RFC 2222). This mechanism obtains authentication information from an external channel (such as TLS or IPsec).
    • NTLM. This mechanism supports NTLM authentication.
  • Server Mechanisms
    • CRAM-MD5
    • DIGEST-MD5
    • NTLM

The JdkSASL Provider

JdkSASL 提供程序支持以下客户机和服务器机制:

  • Client Mechanisms
    • GSSAPI (RFC 2222). This mechanism uses the GSSAPI for obtaining authentication information. It supports Kerberos v5 authentication.
  • Server Mechanisms
    • GSSAPI (Kerberos v5)

Debugging and Monitoring

SunSASL 和 JdkSASL 提供程序使用 Logging api 提供实现日志输出。这个输出可以通过使用日志配置文件和编程 API (java.util.logging)来控制。SunSASL 提供程序使用的日志记录器名称是 javax.security.sasl。下面是一个样例日志配置文件,它为 SunSASL 提供程序启用了 FINEST 日志级别:

1
2
3
javax.security.sasl.level=FINEST
handlers=java.util.logging.ConsoleHandler
java.util.logging.ConsoleHandler.level=FINEST

Implementing a SASL Security Provider

实现 SASL 安全提供商有三个基本步骤:

  1. 编写一个实现 SaslClient 或 SaslServer 接口的类。
  2. 编写一个工厂类(实现 SaslClientFactory 或 SaslServerFactory) ,创建类的实例。
  3. 编写注册工厂的 JCA 提供程序。

References

https://docs.oracle.com/en/java/javase/11/security/java-sasl-api-programming-and-deployment-guide1.html#GUID-6D78EE33-62E6-4D85-9695-322EED493F72

https://docs.oracle.com/cd/E53394_01/html/E54753/sasl.intro.20.html

(完)

Introduce

Java身份认证和授权服务 (Java Authentication and Authorization Service,JAAS)

JAAS 可用于两个目的:

  • 认证(对于用户身份验证,可以可靠且安全地确定谁正在执行Java代码,而不管代码是作为应用程序、applet、bean还是servlet运行)
  • 授权(对用户进行授权,以确保他们拥有执行操作所需的访问控制权限)

JAAS 实现了标准可插入的认证模块(PAM: Pluggable Authentication Module )框架的 Java 版本。

传统上,Java 提供了基于代码源的访问控制(基于代码来源和代码签名人的访问控制)。然而,它缺乏根据运行代码的人来增加访问控制的能力。JAAS 提供了一个框架,用这种支持扩展 Java 安全架构。

JAAS 身份验证是以可插拔的方式执行的。这允许应用程序独立于底层的身份验证技术。可以在应用程序下插入新的或更新的身份验证技术,而无需对应用程序本身进行修改。应用程序通过实例化 LoginContext 对象来启用身份验证过程,该对象引用 Configuration 来确定用于执行身份验证的身份验证技术或技术,或 LoginModule。典型的 LoginModules 可能会提示输入并验证用户名和密码。其他人可以阅读和验证声音或指纹样本。

执行代码的用户或服务经过身份验证后,JAAS授权组件与核心Java SE访问控制模型一起工作,以保护对敏感资源的访问。访问控制决策既基于执行代码的 CodeSource,也基于运行代码的用户或服务(由Subject对象表示)。如果身份验证成功,则由带有相关主体和凭据的LoginModule更新主题。

Architecture

Authentication

jaas-authn

Authorization

protdom

Concept

核心类和接口

JAAS 相关的核心类和接口可以分为三类: 通用类身份验证类授权类

通用类

通用类是由 JAAS 身份验证和授权组件共享的类。

关键的 JAAS 类是 javax.security.auth.Subject,它表示单个实体(如个人)的相关信息的分组。它包含实体的主体、公共凭证和私有凭证。

  • Subject

    要授权对资源的访问,应用程序首先需要对请求的源进行身份验证。JAAS 框架定义了 Subject 这个术语来表示请求的来源。Subject 可以是任何实体,如个人或服务。一旦主体被认证,一个 javax.security.auth.Subject 由相关的标识或身份(Principal)填充。

    Subject 还可以拥有与安全性相关的属性,这些属性称为凭据。需要特殊保护的敏感凭据(如私有密钥)存储在私有凭据集中。要共享的凭据(如公钥证书)存储在公共凭据集中。访问和修改不同的凭据集需要不同的权限。

    doAs versus doAsPrivileged

    doAsPrivileged 方法的行为与 doAs 方法完全相同,只是它们没有将提供的 Subject 与当前 Thread 的 AccessControlContext 关联起来,而是使用提供的 AccessControlContext。通过这种方式,可以通过与当前的 AccessControlContexts 不同的方式来限制操作。

  • Principals

    一个主体(Subject)可能会有多个身份(Principal)。

  • Credentials

    除了关联的主体之外,Subject 还可以拥有与安全相关的属性,这些属性称为凭据。凭据可以包含用于对新服务的主题进行身份验证的信息。这些凭据包括密码、 Kerberos 票证和公钥证书。凭据还可能包含仅允许主体执行某些活动的数据。例如,加密密钥表示允许主体签名或加密数据的凭据。公共和私有凭据类不是核心 JAAS 类库的一部分。因此,任何类都可以表示凭据。

    公共和私有凭据类不是核心 JAAS 类库的一部分。然而,开发者可能会选择让他们的凭证类实现与凭证相关的两个接口: Refreshable 和 Destroyable。

    • javax.security.auth.Refreshable 接口提供了凭据刷新自身的功能。
    • javax.security.auth.Destroyable 接口提供了销毁凭证内容的功能。

身份验证类和接口

身份验证代表验证主体身份的过程,并且必须以安全的方式执行; 否则犯罪者可能冒充他人以获得对系统的访问权。认证通常涉及主体提供某种形式的凭证来证明其身份。这种证据可能是只有当事人可能知道或拥有的信息(如密码或指纹) ,也可能是只有当事人可以提供的信息(如使用私人钥匙签名的数据)。

  • LoginContext

    LoginContext 类提供了用于对 Subject 进行身份验证的基本方法,并提供了一种独立于底层身份验证技术的应用程序开发方法。LoginContext 查询 Configuration 以确定为特定应用程序配置的身份验证服务或 LoginModule。因此,可以在应用程序下插入不同的 LoginModule,而不需要对应用程序本身进行任何修改。

  • LoginModule

    LoginModule 接口使开发人员能够实现应用程序下可以插入的各种身份验证技术。例如,一种类型的 LoginModule 可能执行基于用户名/密码的身份验证形式。其他的 LoginModule 可以与硬件设备接口,如智能卡或生物识别设备。

  • CallbackHandler

    在某些情况下,LoginModule 必须与用户通信以获取身份验证信息。

    应用程序实现 CallbackHandler 接口并将其传递给 LoginContext,后者直接将其转发给底层 LoginModules。LoginModule 使用 CallbackHandler 收集用户的输入(例如密码或智能卡密码) ,或者向用户提供信息(例如状态信息)。通过允许应用程序指定 CallbackHandler,底层 LoginModules 可以保持独立于应用程序与用户交互的不同方式。

    LoginModule 向 CallbackHandler handle 方法传递一个适当的 Callbacks 数组,例如用于用户名的 NameCallback 和用于密码的 PasswordCallback,CallbackHandler 执行请求的用户交互并在 Callbacks 中设置适当的值。例如,要处理 NameCallback,CallbackHandler 可能会提示输入名称,从用户检索值,并调用 NameCallback 的 setName 方法来存储名称。

  • Callback

    LoginModules 可以直接将 Callback 数组传递给 CallbackHandler 的 handle 方法。

授权类

要使 JAAS 授权发生,授予访问控制权限不仅要基于运行的代码,还要基于运行代码的人,需要以下几点:

  1. 必须经过身份验证

  2. 身份验证结果的主体(Subject)必须与访问控制上下文关联

  3. 必须在安全策略中配置基于身份(Principal)的项

  • Policy

    策略类是用于表示系统范围的访问控制策略的抽象类。策略 API 支持基于身份(Principal)的查询。

    缺省情况下,JDK 提供了一个基于文件的子类实现,它被升级为支持策略文件中基于身份的授权条目。

  • AuthPermission
    当前,AuthPermission 对象用于保护对 Policy、 Subject、 LoginContext 和 Configuration 对象的访问。

java.Security Properties 文件中的 JAAS 设置

许多与 JAAS 相关的设置可以在 java.Security 主安全属性文件中配置,该文件位于 JDK 的 conf/Security 目录中。

  • Login Configuration Provider

    默认的 JAAS 登录配置实现可以通过在 login.configuration.provider 属性中指定替代的 provider 类实现来替换。

    1
    login.configuration.provider=com.foo.Config

    如果没有找到 Security 属性 login.configuration.provider,或者没有指定,那么它将被设置为缺省值:

    1
    login.configuration.provider=com.sun.security.auth.login.ConfigFile

    注意,没有办法从命令行动态设置登录配置提供程序。

  • Login Configuration URLs

    如果使用的登录配置实现期望在文件中指定配置信息,则可以通过在 login.configurl.n 属性中指定各自的 url 静态设置登录配置文件的位置。N’是一个连续编号的整数,从1开始。如果指定了多个配置文件(如果 n > = 2) ,它们将被读取并合并为单个配置。

    1
    2
    login.config.url.1=file:C:/config/.java.login.config
    login.config.url.2=file:C:/users/foo/.foo.login.config

    如果配置文件的位置没有在 java.security 属性文件中设置,也没有在命令行中动态指定(通过-Djava.security.auth.login.config 选项) ,JAAS 将尝试从 file:${user.home}/.java.login.config 加载默认配置。

  • Policy Provider

    可以通过在 policy.provider 属性中指定替代提供程序类实现来替换默认策略实现。

    1
    policy.provider=com.foo.Policy

    如果找不到 Security 属性 Policy.provider,或者未指定,则策略将设置为默认值:

    1
    policy.provider=sun.security.provider.PolicyFile

    请注意,不存在从命令行动态设置策略提供程序的方法。

  • Policy File URLs

    通过在 auth.policy.url.n 属性中指定各自的 url,可以静态设置访问控制策略文件的位置。N 是一个连续编号的整数,从1开始。如果指定了多个策略(如果 n > = 2) ,它们将被读取并合并为一个策略。

    1
    2
    policy.url.1=file:C:/policy/.java.policy
    policy.url.2=file:C:/users/foo/.foo.policy

    如果策略文件的位置没有在 java.security 属性文件中设置,也没有从命令行(通过 -Djava.security.policy 选项)动态指定,那么访问控制策略默认为与 JDK 安装的系统策略文件相同的策略。

    • 将所有权限授予标准扩展
    • 允许任何人监听非特权端口
    • 允许任何代码读取某些不敏感于安全性的“标准”属性,如 os.name 和 file.separator 属性。

JAAS 教程和示例程序

https://github.com/shankai/sample-jaas

References

https://docs.oracle.com/en/java/javase/11/security/java-authentication-and-authorization-service-jaas1.html

Intro

Kafka Connect 是一个可伸缩、可靠地在 Apache Kafka 和其他系统之间传输数据的工具框架。

Kafka Connect 功能特性:

  • 通用框架 Kafka Connect 为其他系统与 Kafka 集成提供了标准化,简化了 Connector 开发、部署和管理。
  • 分布式和独立模式 Kafka Connect 扩展到支持整个组织的大型集中管理服务,或者缩减到开发、测试和小型生产部署。
  • REST 接口 通过一个易于使用的 REST API 向 Kafka Connect 集群提交和管理连接器。
  • 自动偏移量管理 只需要从连接器获得一点点信息,Kafka Connect 就可以自动管理偏移提交过程,因此连接器开发人员不必担心连接器开发中容易出错的部分。
  • 默认分布式可伸缩 Kafka Connect 基于现有的组管理协议,可以添加更多的 Worker 来扩展 Kafka Connect 集群。
  • 流/批处理集成 Kafka Connect 是连接流式和批处理数据系统的理想解决方案。

Concept

  • Connectors 通过管理任务来协调数据流的高级抽象
  • Tasks 实现从 Kafka 读取数据或将数据写入 Kafka
  • Workers 用于执行 Connectors 与 Tasks 的服务进程
  • Converters 用于实现在 Kafka 与目标/源系统之间的数据转换程序
  • Transforms 用于转换通过 Connect 的每条数据的简单逻辑实现
  • Dead Letter Queue 死信队列,用于处理连接器错误

Connectors

Kafka Connect 中的 Connector 定义数据应该从哪里复制或复制到哪里去。Connector 实例是一个逻辑工作,负责管理 Kafka 和另一个系统之间的数据复制。

连接器模型

connector-model-simple

Configuration

Connector Configuration 配置是简单的键值映射。

对于独立(Standalone)模式,它们在属性文件中定义,并在命令行上传递给 Connect 进程。

在分布式(Distributed)模式,它们将包含在创建(或修改) Connector 的 REST 请求的 JSON 有效负载中。

通用的配置:

  • name Connector 的唯一名称。再次尝试注册相同名称将会失败。
  • connector.class Connector 的 Java 类,
    (多种格式,如org.apache.kafka.connect.file.FileStreamSinkConnector、FileStreamSink、FileStreamSinkConnector)
  • tasks.max 为此 Connector 应创建的最大任务数。如果 Connector 无法实现此级别的并行性,则可以创建较少的任务。
  • key.converter - (可选) 覆盖由 Worker 设置的默认键转换器。
  • value.converter - (可选) 覆盖由 Worker 设置的默认值转换器。

Sink Connector 还需要设置下列其中一个配置项:

  • topics 用逗号分隔的主题列表,用作此 Connector 的输入。
  • topics.regex 用 Java 正则表达式匹配到的主题,用作此 Connector 的输入。

更多的配置项,将由具体的 Connector 提供文档说明。

Tasks

Task 是用于连接的数据模型中的主要参与者。每个 Connector 实例协调一组实际复制数据的 Task。通过允许 Connector 将单个工作分解成多个 Task,Kafka Connect 对并行和可伸缩的数据复制提供了很好的支持,且只需要很少的配置。这些 Task 中没有存储状态。Task 状态存储在 Kafka 的特殊主题 config.storage 和 status.storage,并由关联连接器管理。因此,可以在任何时候启动、停止或重新启动 Task,以提供一个弹性的、可伸缩的数据管道。

data-model-simple

Task Rebalancing

当 Connector 首次提交给集群时,Worker 将重新平衡集群中的全部 Connectors 集及其 Tasks,以便每个 Worker 的工作量大致相同。

当 Connector 增加或减少它们所需的 Task 数量时,或者当 Connector 的配置更改时,也使用同样的重新平衡过程。

当一个 Worker 失败时,Task 会在活跃的 Worker 之间重新平衡。

当 Task 失败时,不会触发 Task 失败的再平衡,因为 Task 失败被认为是一个特例。因此,失败的 Task 不会由框架自动重新启动,应该通过 REST API 重新启动。

下图为 Worker 失败时 Task 故障转移示例

task-failover

Workers

Connectors 和 Tasks 是必须在进程中执行的逻辑工作单元,Kafka Connect 称这些进程为 Worker。

有两种类型的 Worker: StandaloneDistributed

Standalone

Standalone 模式是最简单的模式,其中单个进程负责执行所有连接器和任务。因为它是一个单一的进程,所以只需要最少的配置。在开发过程中,以及在某些只有一个进程有意义的情况下,如从主机收集日志时,Standalone 模式非常方便。但是,由于只有一个进程,它的功能也更有限:可伸缩性仅限于单个进程,除了添加到单个进程的任何监视之外,没有容错能力。

Distributed

Distributed 模式为 Kafka Connect 提供了可伸缩性和自动容错能力。在 Distributed 模式下,使用相同的 group.id 启动许多辅助进程,它们自动协调来调度所有可用辅助 Workers 之间的Connectors 和 Tasks 的执行。如果添加了一个 Worker,关闭了一个 Worker,或者一个 Worker 意外地失败了,其余的 Worker 会检测到这一点,并自动协调在更新的可用 Workers 集中重新分配 Connectors 和 Tasks。请注意它们与 Consumer Group 再平衡的相似之处,在掩护之下,Worker 正在利用 Consumer Group 来协调和平衡。

worker-model-basics

Converters

当写入或从 Kafka 读取数据时,必须有一个 Kafka Connect 部署支持特定的数据格式。任务使用转换器将数据格式从字节转换为 Connect 内部数据格式,反之亦然。

  • AvroConverter io.confluent.connect.avro.AvroConverter 使用 Schema Registry
  • ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter 使用 Schema Registry
  • JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter 使用 Schema Registry
  • JsonConverter org.apache.kafka.connect.json.JsonConverter (没有模式注册表):与结构化数据一起使用
  • StringConverter org.apache.kafka.connect.storage.StringConverter简单的字符串格式
  • ByteArrayConverter org.apache.kafka.connect.converters.ByteArrayConverter提供不进行转换的“直通”选项

Converter Graphic

converter-basics

Transforms

单个消息转换(Single Message Transformations,下列简称SMT)应用于通过 Connect 的消息。

SMT 在源连接器(Source Connector)生成入站消息之后进行转换,但在它们被写入Kafka之前进行转换。
SMT 在出站消息发送到接收器连接器(Sink Connecors)之前对其进行转换。

可以在连接器配置中指定转换链。

  • transforms 转换别名列表,指定应用转换的顺序。
  • transforms.$alias.type 转换的完全限定类名。
  • transforms.$alias.$transformationSpecificConfig 转换的配置属性。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# connector common config
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
# transforms chain: MakeMap--InsertSource
transforms=MakeMap, InsertSource
# config for MakeMap transform
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
# config for InsertSource transform
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

Kafka Connect 包含了一些广泛适用的数据和路由转换:

  • InsertField 使用静态数据或记录元数据添加字段
  • ReplaceField 过滤或重命名字段
  • MaskField将字段替换为类型的有效null值(如 0,false 及空字符串等等)
  • ValueToKey 将记录键替换为由记录值中的字段子集形成的新键
  • HoistField 将整个事件作为单个字段包装在结构或映射中
  • ExtractField从 Struct 和 Map 中提取一个特定的字段,并在结果中只包含这个字段
  • SetSchemaMetadata 修改模式名称或版本
  • TimestampRouter基于原始主题和时间戳修改记录的主题。当使用接收器时,需要根据时间戳写入不同的表或索引
  • RegexRouter 基于原始主题、替换字符串和正则表达式修改记录的主题

Dead Letter Queue

出现无效记录的原因有很多。一个例子是,当一条记录以 JSON 格式序列化到达 Sink Connector 时,但 Sink Connector 配置要求为 Avro 格式。当 Sink Connector 无法处理无效记录时,将根据 Connector 配置属性 errors.tolerance (错误容忍)处理错误。Dead Letter Queue 仅适用于 Sink Connector。

此配置属性有两个有效值: none (默认值)或 all

errors.tolerance 设置为 none 时,错误或无效记录将导致 Connector Task 立即失败,并且 Connector 进入失败状态。要解决此问题,需要查看 Kafka Connect Worker 日志以找出导致故障的原因,进行纠正,然后重新启动 Connector。

errors.tolerance 设置为 all 时,将忽略所有错误或无效记录,并继续处理。Connect Worker 日志中没有写入错误。为了确定记录是否失败,必须使用内部度量或计算源处的记录数量,并将其与所处理的记录数量进行比较。

有一个错误处理特性,它将把所有无效的记录路由到一个特殊的主题并报告错误。该主题包含 Sink Connector 无法处理的记录的 Dead Letter Queue

SinkConnector 配置:

1
2
errors.tolerance = all
errors.deadletterqueue.topic.name = <dead-letter-topic-name>

即使 Dead Letter Queue 包含失败的记录,它也没有显示原因。可以添加以下附加配置属性以包含失败的记录头信息。

1
errors.deadletterqueue.context.headers.enable = true

当该参数设置为 true (默认为false)时,记录头被添加到死信队列中。然后可以查看记录头,并确定记录失败的原因。错误也被发送到 Connect Reporter。为了避免与原始记录头冲突,Dead Letter Queue 上下文头键以 _connect.errors 开头。

Deployment Configuration

Common(Standalone & Distributed)

  • bootstrap.servers Kafka 服务器列表。
  • key.converter 用于在 Kafka 连接格式和写入 Kafka 的序列化格式之间转换的转换器类。它控制了写入或从 Kafka 读取的消息中的的格式,并且由于它独立于连接器,它允许任何连接器使用任何序列化格式。常见的格式包括JSON和Avro。
  • value.converter 用于在 Kafka 连接格式和写入 Kafka 的序列化格式之间转换的转换器类。它控制了写入或从 Kafka 读取的消息中的的格式,并且由于它独立于连接器,它允许任何连接器使用任何序列化格式。常见的格式包括JSON和Avro。

Standalone

  • offset.storage.file.filename 文件中存储偏移量数据

Distributed

  • group.id (默认为 connect-cluster`) 集群的唯一名称,用于形成Connect集群组;注意,这一定不能与消费组id冲突
  • config.storage.topic (默认为connect-configs) 用于存储 Connector 和 Task 配置的主题;注意,这应该是一个单独的分区,高度复制,压缩主题(您可能需要手动创建主题,以确保正确的配置,因为自动创建的主题可能有多个分区,或者自动配置为删除而不是压缩)
  • offset.storage.topic (默认为 connect-offsets) 用于存储偏移量的主题;这个主题应该有许多分区,可以复制,并配置为压缩
  • status.storage.topic (默认为 connect-status) 用于存储状态的主题;这个主题可以有多个分区,应该进行复制和配置以进行压缩

Connect Plugins

Kafka Connect 被设计为可扩展的,因此开发人员可以创建自定义连接器(Connectors),变形器(Transforms)或转换器(Converters),用户可以安装和运行它们。

Kafka Connect 插件是一组 JAR 文件,其中包含一个或多个 Connectors、Transforms 或 Converters 的实现。Connect 将每个插件彼此隔离,这样一个插件中的库就不会受到任何其他插件中的库的影响。这在混合和匹配来自多个供应商的连接器时非常重要。

一个 Kafka Connect 插件可以是:

  • 文件系统上的一个目录,包含所有需要的 JAR 文件和插件的第三方依赖项。这是最常见和首选的。

  • 一个包含插件及其第三方依赖项的所有类文件的 uber JAR。

    (Über is the German word for above or over (it’s actually cognate with the English over). https://stackoverflow.com/questions/11947037/what-is-an-uber-jar)

一个插件不应该包含 Kafka Connect 运行时提供的任何库。

Kafka Connect 在 Worker Configuration 的 plugin.path 属性中使用一个以逗号分隔的目录路径列表定义的插件路径查找插件。

当启动 Connect Worker 时,每个 Worker 都会在插件路径的目录中发现所有 Connectors、Transforms 和 Converters 插件。当使用 Connectors、Transforms 和 Converters 时,Connect Worker 首先从各自的插件加载类,然后是 Kafka Connect 运行时和 Java 库。

Connect API

Java API

本节列举了一般需要重写的方法,更详尽的 Java API 请参见

http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/connect

SinkConnector

  • public abstract void start(Map<String,String> props)
    启动这个连接器。这个方法只会在一个干净的连接器上被调用,也就是说,它要么刚刚被实例化和初始化,要么已经调用了stop()。
  • public abstract void stop()
    停止这个连接器。
  • public abstract Class<? extends Task> taskClass()
    返回此连接器的任务实现。
  • public abstract ConfigDef config()
    定义连接器的配置。
  • public abstract List<Map<String,String>> taskConfigs(int maxTasks)
    根据当前配置返回任务的一组配置。
  • String version()
    获取该连接器的版本。

SinkTask

  • public abstract void start(Map<String,String> props)
    启动任务。这应该可以处理任何配置解析和任务的一次性设置。
  • public abstract void put(Collection<SinkRecord> records)
    将事件记录数据传递到此任务。
  • public abstract void stop()
    执行清理操作以停止此任务。
  • String version()
    获取此任务的版本。通常这应该与对应的连接器类的版本相同。

REST API

Default Port: 8083

  • GET /connectors 返回连接器的列表。
  • POST /connectors 创建一个新的连接器。请求主体应该是一个JSON对象,其中包含一个字符串 name 字段和一个带有连接器 config 参数的对象配置字段。
  • GET /connectors/{name}获取指定连接器的信息。
  • GET /connectors/{name}/config 获取指定连接器的配置参数。
  • PUT /connectors/{name}/config 更新指定连接器的配置参数。
  • GET /connectors/{name}/status 获取连接器的当前状态,包括它是否正在运行、失败、暂停等、它被分配给哪个工作器、失败时的错误信息以及它所有任务的状态。
    • UNASSIGNED: Connector/Task 暂未分配到 Worker。
    • RUNNING: Connector/Task 正在运行。
    • PAUSED: Connector/Task 在管理上已暂停。
    • FAILED: Connector/Task 运行失败,通常是发生了异常。
    • DESTROYED: Connector/Task 在管理上已被删除,将被移除集群。
  • GET /connectors/{name}/tasks 获取连接器当前正在运行的任务列表。
  • GET /connectors/{name}/tasks/{taskid}/status 获取任务的当前状态,包括它是否正在运行、失败、暂停等,它被分配给哪个 Worker,以及失败时的错误信息。
  • PUT /connectors/{name}/pause 暂停连接器及其任务,这将停止消息处理,直到连接器恢复。
  • PUT /connectors/{name}/resume 恢复暂停的连接器(如果连接器没有暂停,则不做任何操作)。
  • POST /connectors/{name}/restart 重新启动连接器(通常是因为它失败了)。
  • POST /connectors/{name}/tasks/{taskId}/restart 重新启动单个任务(通常是因为它失败了)。
  • DELETE /connectors/{name} 删除连接器,停止所有任务并删除其配置。
  • GET /connector-plugins 返回一个Kafka Connect集群中安装的连接器插件列表。
  • PUT /connector-plugins/{connector-type}/config/validate 根据配置定义验证提供的配置值。这个API执行每个配置验证,并在验证过程中返回建议值和错误消息。

Ops

Kafka Connect Logging

https://docs.confluent.io/platform/current/connect/logging.html#

  • Check log levels

    1
    curl -Ss http://localhost:8083/admin/loggers | jq
  • Change the log level for a specific logger

    1
    2
    3
    curl -s -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/admin/loggers/city.icos.icmsevent.connect \
    -d '{"level": "DEBUG"}' | jq '.'

Lenses.io

Kakfa Connect UI

https://github.com/lensesio/kafka-connect-ui

Kafka Connect CLI

https://github.com/lensesio/kafka-connect-tools

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
connect-cli 1.0.8
Usage: connect-cli [ps|get|rm|create|run|diff|status|plugins|describe|validate|restart|pause|resume] [options] [<connector-name>]

--help
prints this usage text
-e <value> | --endpoint <value>
Kafka Connect REST URL, default is http://localhost:8083/
-f <value> | --format <value>
Format of the config, default is PROPERTIES. Valid options are 'properties' and 'json'.

Command: ps
list active connectors names.

Command: get
get the configuration of the specified connector.

Command: rm
remove the specified connector.

Command: create
create the specified connector with the config from stdin; the connector cannot already exist.

Command: run
create or update the specified connector with the config from stdin.

Command: diff
diff the specified connector with the config from stdin.

Command: status
get connector and it's task(s) state(s).

Command: plugins
list the available connector class plugins on the classpath.

Command: describe
list the configurations for a connector class plugin on the classpath.

Command: pause
pause the specified connector.

Command: restart
restart the specified connector.

Command: resume
resume the specified connector.

Command: validate
validate the connector config from stdin against a connector class plugin on the classpath.

Command: task_ps
list the tasks belonging to a connector.

Command: task_status
get the status of a connector task.

Command: task_restart
restart the specified connector task.

Reference

http://kafka.apache.org/24/documentation.html#connect

https://docs.confluent.io/5.5.0/connect/index.html

Intro

作为个人的技术笔记,将 Hexo Blog 以 Github Pages 的方式对外发布。Hexo Cli 提供了较好的发布方式(hexo deploy),足以满足低频发布操作。

今天的实践是:通过 Github Actions 来“持续集成”发布 Hexo Blog Site 到 Github Pages

实践的过程中遇到各种问题,对环境做更新:
Nodejs:v12.19.0->v14.15.4
Hexo: 3.9.0-> 5.3.0

Github Actions

GitHub Actions让你很容易自动化所有的软件工作流程,现在拥有世界级的CI/CD。从GitHub上构建、测试和部署你的代码。让代码评审、分支管理和问题分类按照您想要的方式工作。

Github Actions 特性:https://github.com/features/actions

Github Actions 文档:https://docs.github.com/cn/actions

Github Actions 市场:https://github.com/marketplace?type=actions

Hexo Action

Github Actions 市场已经有了相关的 Action:Hexo Action。

https://github.com/marketplace/actions/hexo-action

https://github.com/sma11black/hexo-action

Operation

参考 Hexo Action 使用手册,

  1. 创建密钥对(Github Pages Deploy Keys 使用公钥,Github Source Repos Secrets DEPLOY_KEY使用私钥);
  2. 在 Github Source Repos 添加流程文件,如在 .github/workflows 下创建 deploy.yml

deploy.yml 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# This is a basic workflow to help you get started with Actions

name: CI

# Controls when the action will run.
on:
# Triggers the workflow on push or pull request events but only for the master branch
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
build:
runs-on: ubuntu-latest
name: A job to deploy blog.
steps:
- name: Checkout
uses: actions/checkout@v1
with:
submodules: true # Checkout private submodules(themes or something else).

# Caching dependencies to speed up workflows. (GitHub will remove any cache entries that have not been accessed in over 7 days.)
- name: Cache node modules
uses: actions/cache@v1
id: cache
with:
path: node_modules
key: ${{ runner.os }}-node-${{ hashFiles('**/package-lock.json') }}
restore-keys: |
${{ runner.os }}-node-
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: npm ci

# Deploy hexo blog website.
- name: Hexo Action
id: deploy
uses: sma11black/hexo-action@v1.0.4
with:
deploy_key: ${{ secrets.DEPLOY_KEY }}
user_name: shankai
user_email: shankai.kvn@gmail.com
commit_msg: ${{ github.event.head_commit.message }} # (or delete this input setting to use hexo default settings)
# Use the output from the `deploy` step(use for test action)
- name: Get the output
run: |
echo "${{ steps.deploy.outputs.notify }}"
  • name Actions 的名称
  • on 触发 Actions 的事件
  • jobs 执行的一系列任务,每个任务是单独的运行环境,runs-on指定运行环境
  • steps任务包含一系列操作步骤,每个步骤是一个 Action,如 Hexo Deploy 本例中使用 uses: sma11black/hexo-action@v1.0.4

编辑完成 deploy.yml 后提交变更,push 到 Hexo Source Repo。

FAQ

  1. 将主题 next 作为 git module(Github Actions: hexo-action 用法)

    因为 hexo 站点与 next 主题是完全独立的,此处在构建时 next 做为资源依赖参与构建。

1
2
3
4
rm -rf themes/next
git rm -r themes/next
rm -rf .git/modules/themes/next
git submodule add https://github.com/theme-next/hexo-theme-next themes/next
  1. 远程仓库使用 SSH 方式访问而非 Https

    出现的错误:

1
2
3
4
5
6
fatal: could not read Username for 'https://github.com': No such device or address
FATAL Something's wrong. Maybe you can find the solution here: https://hexo.io/docs/troubleshooting.html
Error: Spawn failed
at ChildProcess.<anonymous> (/github/workspace/node_modules/hexo-deployer-git/node_modules/hexo-util/lib/spawn.js:51:21)
at ChildProcess.emit (events.js:314:20)
at Process.ChildProcess._handle.onexit (internal/child_process.js:276:12)

修改 hexo/_config.yml 部署相关配置,使用 SSH 方式访问 Github Pages Repo(https://github.com/sma11black/hexo-action/issues/5)

1
2
3
4
deploy:
type: git
repo: git@github.com:<username>/<username>.github.io.git
branch: master

(完)