Kafka Connect

Worker.java (ENABLE_AUTO_COMMIT_CONFIG:false, AUTO_OFFSET_RESET_CONFIG:earliest)

WorkerSinkTask.java

offset.flush.interval.ms
Interval at which to try committing offsets for tasks.
Default: 60000 (1 minute)

Confluent Kafka Connect Tutorial

1
2
3
https://docs.confluent.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html

https://docs.confluent.io/5.0.0/connect/concepts.html#connect-converters

Kafka Connect REST API

1
2
http://dev:8083/connector-plugins
http://dev:8083/connectors

Lenses

1
2
3
https://docs.lenses.io/4.1/integrations/connectors/stream-reactor/sinks/redissinkconnector/
https://docs.lenses.io/2.1/lenses-sql/kcql.html

测试环境

1
2
3
4
5
6
# all(zk,kafka,schema-registry,kafka-connect,ui,connectors)
https://github.com/lensesio/fast-data-dev

# ui & kafka-connect
https://github.com/lensesio/kafka-connect-ui
https://hub.docker.com/r/confluentinc/cp-kafka-connect

Build

1
2
3
4
# clone from fork repository 
git clone https://github.com/shankai/stream-reactor.git
cd stream-reactor/kafka-connect-redis
gradle clean build collectFatJar -x test

Installation

All

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# redis pubsub mode has bug
docker run --rm -d --name lensesio_dev \
-p 2181:2181 -p 3030:3030 -p 9092:9092 \
-p 8081-8083:8081-8083 -p 9581-9585:9581-9585 \
-e ADV_HOST=dev -e CONNECTORS=redis \
lensesio/fast-data-dev:latest

# redis pubsub fix
docker run --rm -d --name lensesio_dev \
-p 2181:2181 -p 3030:3030 -p 9092:9092 \
-p 8081-8083:8081-8083 -p 9581-9585:9581-9585 \
-e ADV_HOST=dev -e CONNECTORS=redis \
-v /Users/shankai/codebase/oss/stream-reactor/kafka-connect-redis/build/libs/kafka-connect-redis-2.1.3.jar:/opt/landoop/connectors/stream-reactor/kafka-connect-redis/kafka-connect-redis-2.1.3.jar \
lensesio/fast-data-dev:latest

Kafka Connect & UI

docker pull confluentinc/cp-zookeeper
docker pull confluentinc/cp-kafka

1
2
3
4
5
6
7
8
9
10
docker run --name eventbus_kafka -d \
-p 9092:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://dev:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=dev:2181 \
-e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_DELETE_TOPIC_ENABLE=true \
-e KAFKA_BROKER_ID=0 \
-v /var/run/docker.sock:/var/run/docker.sock \
wurstmeister/kafka:2.11-1.1.1

docker pull confluentinc/cp-schema-registry

1
2
3
4
docker run --name schema-registry -h schema-registry -p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=dev:2181 \
confluentinc/cp-schema-registry:5.5.0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
docker run --name schema-registry -h schema-registry -p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=dev:2181 \
confluentinc/cp-schema-registry:5.5.0

docker run --name eventbus_kafka -d \
-p 9092:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://dev:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=dev:2181 \
-e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_DELETE_TOPIC_ENABLE=true \
-e KAFKA_BROKER_ID=0 \
-v /var/run/docker.sock:/var/run/docker.sock \
wurstmeister/kafka:2.11-1.1.1

docker pull confluentinc/cp-kafka-connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
docker run -d \
--name=kafka-connect-avro \
-e CONNECT_BOOTSTRAP_SERVERS=dev:9092 \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID="quickstart-avro" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-avro-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-avro-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-avro-status" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="dev" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \
-e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars \
-v /Users/shankai/codebase/oss/fork/jars/:/etc/kafka-connect/jars/ \
-p 8083:8083 \
confluentinc/cp-kafka-connect:latest


docker run -d \
--name=kc-redis-avro \
-e CONNECT_BOOTSTRAP_SERVERS=dev:9092 \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID="quickstart-avro" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-avro-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-avro-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-avro-status" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="dev" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \
-e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars \
-p 8083:8083 \
kc-redis:latest

docker pull landoop/kafka-connect-ui

1
2
3
https://github.com/lensesio/kafka-connect-ui

docker run -d --name kafka-connect-ui -it -p 8000:8000 -e "CONNECT_URL=http://dev:8083" landoop/kafka-connect-ui

Connector

connector configuration override

1
2
value.converter.schema.registry.url=http://dev:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter

worker.properties: connector.client.config.override.policy=All

1
2
"consumer.override.max.poll.records": "1",
"consumer.override.fetch.max.wait.ms": "0"

LOG_DIR=/app

Properties

RedisSinkConnector PubSub

1
2
3
4
5
6
7
connector.class=com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector
connect.redis.port=6379
connect.redis.kcql=select * from sea_vessel_position_reports STOREAS PubSub(channel=Timestamp)
topics=sea_vessel_position_reports
tasks.max=1
connect.redis.host=dev
name=quickstart-redis-pub

RedisSinkConnector Cache

1
2
3
4
5
6
7
connector.class=com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector
connect.redis.port=6379
connect.redis.kcql=INSERT INTO PK- select * from sea_vessel_position_reports PK Timestamp
topics=sea_vessel_position_reports
tasks.max=1
connect.redis.host=dev
name=quickstart-redis-cache

Rest

Redis Cache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
curl --location --request POST 'http://dev:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "RedisSinkConnectorCache",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector",
"connect.redis.port": "6379",
"connect.redis.kcql": "INSERT INTO FX- select location from sea_vessel_position_reports PK Timestamp",
"topics": "sea_vessel_position_reports",
"tasks.max": "1",
"connect.redis.host": "dev",
"name": "RedisSinkConnectorCache"
}
}'

Redis PubSub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
curl --location --request POST 'http://dev:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "RedisSinkConnectorPubSub",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector",
"connect.redis.port": "6379",
"connect.redis.kcql": "select * from sea_vessel_position_reports STOREAS PubSub(channel=Timestamp)",
"topics": "sea_vessel_position_reports",
"tasks.max": "1",
"connect.redis.host": "dev",
"name": "RedisSinkConnectorPubSub"
}
}'

Redis Benchmark

1
redis-benchmark -h redis-ps-redis-perf-paas.paas -c 1 -d 1024  script load "redis.call('publish', 'abc')" 

Intro

A high performance, open source universal RPC framework

https://www.grpc.io/

gRPC is a modern open source high performance RPC framework that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking and authentication. It is also applicable in last mile of distributed computing to connect devices, mobile applications and browsers to backend services.

  • Simple service definition Define your service using Protocol Buffers, a powerful binary serialization toolset and language
  • Start quickly and scale Install runtime and dev environments with a single line and also scale to millions of RPCs per second with the framework
  • Works across languages and platforms Automatically generate idiomatic client and server stubs for your service in a variety of languages and platforms
  • Bi-directional streaming and integrated auth Bi-directional streaming and fully integrated pluggable authentication with HTTP/2-based transport

Overview

landing-2

Protocol Compiler

Download: https://github.com/protocolbuffers/protobuf/releases

1
2
3
4
5
6
7
8
9
# https://github.com/protocolbuffers/protobuf/releases/download/v3.14.0/protoc-3.14.0-osx-x86_64.zip

cp /Users/shankai/Downloads/protoc-3.14.0-osx-x86_64/bin/protoc /usr/local/bin/protoc

protoc -h

# cd /path/xx.proto

protoc --java_out=../java/ vehicle-service.proto

Example

java & nodejs

https://github.com/shankai/artifacts/tree/master/grpc-example

Reference

Java

https://www.grpc.io/docs/languages/java/quickstart/

https://www.grpc.io/docs/languages/java/basics/

https://github.com/grpc/grpc-java/blob/master/README.md

Node

https://www.grpc.io/docs/languages/node/quickstart/

  • 「长期价值——百年可口可乐的经营策略」([英]Neville Isdell [美]David Beasley 高洁 译)
  • 「分布式服务架构——原理、设计与实践」(李艳鹏 杨彪 著)
  • 「见识——你能走多远,取决于见识」(吴军 著)
  • 「态度——把简单的事情,做得出人意料的精彩」(吴军 著)
  • 「格局——世界永远不缺聪明人」(吴军 著)
  • 「我的改变——个人的现代化 40 年」(王石)
  • 「不拘一格——网飞的自由与责任工作法」([美]Reed Hastings [美]Erin Meyer 杨占 译)
  • 「反脆弱——从不确定性中获益」([美]Nassim Nicholas Taleb 雨珂 译)
  • 「人生由我」([加]Maye Musk 代晓 译)
  • 「他影响了中国——陈云」(叶永烈 著)
  • 「美国家庭万用亲子英文」
  • 「罗伯特议事规则」第 11 版
  • 「你当像鸟飞往你的山——Educated」([美] 塔拉·韦斯特弗 | 任爱红 译)

安装

Docker run

1
docker run -d --name activemq -p 61616:61616 -p 61613:61613 -p 61614:61614 -p 5672:5672 -p 1883:1883 -p 8161:8161 rmohr/activemq:5.15.9

环境变量

  • ACTIVEMQ_TCP=61616
  • ACTIVEMQ_AMQP=5672
  • ACTIVEMQ_STOMP=61613
  • ACTIVEMQ_MQTT=1883
  • ACTIVEMQ_WS=61614
  • ACTIVEMQ_UI=8161

UI

1
http://localhost:8161/admin

用户/密码: admin/admin

Protocols

MQTT 消息队列遥测传输

Message Queuing Telemetry Transport

MQTT Publish / Subscribe Architecture

mqtt-publish-subscribe

MQTT Specification

客户端

  • Eclipse Paho
1
2
3
4
5
6
7
8
// eclipse paho
https://www.eclipse.org/paho/index.php?page=clients/java/index.php#

// github
https://github.com/eclipse/paho.mqtt.java

// javadoc
https://www.eclipse.org/paho/files/javadoc/index.html

JMS Java消息服务

Java Message Service

JMS Specification

https://www.oracle.com/java/technologies/java-message-service.html

AMQP 高级消息队列协议

Advanced Message Queuing Protocol

AMQP Specification

https://www.amqp.org/

https://activemq.apache.org/amqp

STOMP 简单(或流)面向文本的消息传递协议

STOMP is the Simple (or Streaming) Text Orientated Messaging Protocol.

Stomp Specification

客户端

ActiveMQ Library:

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>

Package:

1
2
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;

Examples

Java Client Demo

https://github.com/shankai/artifacts/tree/master/activemq-example

Intro

Install

通过 Docker 运行 Nuclio

1
docker run -p 8070:8070 -v /var/run/docker.sock:/var/run/docker.sock -v /Users/shankai/DockerTmp:/tmp --name nuclio-dashboard quay.io/nuclio/dashboard:stable-amd64

访问

1
http://localhost:8070/projects

Function

Java

Deploy

nuctl

1
https://github.com/nuclio/nuclio/releases

1
2
apt-get install inetutils-ping
apt-get install net-tools

jq

Options

  • -j “join”
  • -r “raw”

Examples

1
2
3
4
5
6
7
8
9
10
11
12
curl --location --request GET "http://icosregistry-icosregistry-service-icos.icos.icos.city/v1/assets?pageSize=100000&pageNo=0&typeName=vehicle_speedlimit"

curl --location --request GET "http://icosregistry-icosregistry-service-icos.icos.icos.city/v1/assets?typeName=traffic_lamp_type&pageSize=100" | jq -jr '.data[]|.id," ",.name,"\n"'

curl --location --request GET "http://icosregistry-icosregistry-service-icos.icos.icos.city/v1/assets?typeName=twin_intersection_type&pageSize=100" | jq -jr '.data[]|.id," ",.name,"\n"'

curl --location --request GET "http://icosregistry-icosregistry-service-icos.icos.icos.city/v1/assets?typeName=twin_traffic_lamp_direction&pageSize=100" | jq -jr '.data[]|.id," ",.name,"\n"'


curl --location --request GET "http://icosregistry-icosregistry-service-icos.icos.icos.city/v1/assets?typeName=isvd1_hikvision&pageSize=100" | jq -jr '.data[]|.id," ",.name,"\n"'

curl --location --request GET "http://icosregistry-icosregistry-service-icos.icos.icos.city/v1/assets?typeName=cam1_hikvision&pageSize=100" | jq -jr '.data[]|.id," ",.name,"\n"'

Pipenv

安装 pipenv

1
2
3
4
5
6
7
8
9
10
# 安装 pipenv
pip3 install --user pipenv

# 输出用户基础目录
python3 -m site --user-base

# 将用户基础目录/bin 添加到系统 PATH 中, 例如
# export PATH=$PAHT:/python_user_base_path/bin

pipenv -h

示例

安装依赖 requests

1
2
3
4
# 进入项目目录
cd hellopython
# 安装依赖(requests), 成功后目录下包含 Pipfile 文件
pipenv install requests

创建 main.py 文件,使用 requests 依赖

1
2
3
4
5
import requests

response = requests.get('https://httpbin.org/ip')

print('Your IP is {0}'.format(response.json()['origin']))

运行 main.py

1
2
3
pipenv run python main.py
# 输出示例
Your IP is 123.123.123.123

Virtualenv

安装

1
2
pip3 install virtualenv
virtualenv --version

示例

初始化工程

1
2
mkdir hellopython-virtualenv
cd hellopython-virtualenv

创建虚拟环境 venv

1
2
3
4
5
6
7
virtualenv venv
# 激活 venv
source venv/bin/activate
# 停用当前虚拟环境
# deactivate
# 删除虚拟环境(删除对应文件夹)
# rm -rf venv

安装依赖

1
pip install requests

Intro

GeoJSON Preview

Install

1
2
3
4
5
6
7
8
## pull
docker pull tile38/tile38
## run
docker run -d -p 9851:9851 --name tile38-dev tile38/tile38
## in
docker exec -it tile38-dev sh
## tile38-cli
tile38-cli

Command Usage

Set

SET key id [FIELD name value …] [EX seconds] [NX|XX] (OBJECT geojson)|(POINT lat lon [z])|(BOUNDS minlat minlon maxlat maxlon)|(HASH geohash)|(STRING value)

FSET

FSET key id [XX] field value [field value …]

GET

GET key id [WITHFIELDS] [OBJECT|POINT|BOUNDS|(HASH geohash)]

EXPIRE

Set a timeout on an id.

EXPIRE key id [seconds]

DEL

Remove a specified object.

DEL fleet truck1

DROP

Remove all objects from specified key.

DROP fleet

JSET

启动实例(开发环境)

1
2
3
4
docker run --cap-add=IPC_LOCK -e 'VAULT_DEV_ROOT_TOKEN_ID=myroot' -p 8200:8200 -d --name=dev-vault vault


docker run --cap-add=IPC_LOCK -e 'VAULT_DEV_ROOT_TOKEN_ID=myroot' -e 'VAULT_DEV_LISTEN_ADDRESS=0.0.0.0:8200' vault

启动成功后,在启动日志中获取默认的 Token。如:

1
2
Unseal Key: BRGYd/8vJSl5101boUHdXDu2ZfYTsJy5tdgXlvOyysQ=
Root Token: myroot

认证

1
vault login myroot

安装 python 2.7

1
2
apt-get install python
apt-get install python-pip

安装依赖

  1. 安装通过 import 生成依赖文件(如果存在依赖文件,即可忽略)
    1
    2
    pip install pipreqs
    pipreqs ./
  2. 安装依赖
    1
    pip install -r requirements.txt

    执行程序

    1
    python xxx.py