Kafka Connect Redis

Kafka Connect

Worker.java (ENABLE_AUTO_COMMIT_CONFIG:false, AUTO_OFFSET_RESET_CONFIG:earliest)

WorkerSinkTask.java

offset.flush.interval.ms
Interval at which to try committing offsets for tasks.
Default: 60000 (1 minute)

Confluent Kafka Connect Tutorial

1
2
3
https://docs.confluent.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html

https://docs.confluent.io/5.0.0/connect/concepts.html#connect-converters

Kafka Connect REST API

1
2
http://dev:8083/connector-plugins
http://dev:8083/connectors

Lenses

1
2
3
https://docs.lenses.io/4.1/integrations/connectors/stream-reactor/sinks/redissinkconnector/
https://docs.lenses.io/2.1/lenses-sql/kcql.html

测试环境

1
2
3
4
5
6
# all(zk,kafka,schema-registry,kafka-connect,ui,connectors)
https://github.com/lensesio/fast-data-dev

# ui & kafka-connect
https://github.com/lensesio/kafka-connect-ui
https://hub.docker.com/r/confluentinc/cp-kafka-connect

Build

1
2
3
4
# clone from fork repository 
git clone https://github.com/shankai/stream-reactor.git
cd stream-reactor/kafka-connect-redis
gradle clean build collectFatJar -x test

Installation

All

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# redis pubsub mode has bug
docker run --rm -d --name lensesio_dev \
-p 2181:2181 -p 3030:3030 -p 9092:9092 \
-p 8081-8083:8081-8083 -p 9581-9585:9581-9585 \
-e ADV_HOST=dev -e CONNECTORS=redis \
lensesio/fast-data-dev:latest

# redis pubsub fix
docker run --rm -d --name lensesio_dev \
-p 2181:2181 -p 3030:3030 -p 9092:9092 \
-p 8081-8083:8081-8083 -p 9581-9585:9581-9585 \
-e ADV_HOST=dev -e CONNECTORS=redis \
-v /Users/shankai/codebase/oss/stream-reactor/kafka-connect-redis/build/libs/kafka-connect-redis-2.1.3.jar:/opt/landoop/connectors/stream-reactor/kafka-connect-redis/kafka-connect-redis-2.1.3.jar \
lensesio/fast-data-dev:latest

Kafka Connect & UI

docker pull confluentinc/cp-zookeeper
docker pull confluentinc/cp-kafka

1
2
3
4
5
6
7
8
9
10
docker run --name eventbus_kafka -d \
-p 9092:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://dev:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=dev:2181 \
-e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_DELETE_TOPIC_ENABLE=true \
-e KAFKA_BROKER_ID=0 \
-v /var/run/docker.sock:/var/run/docker.sock \
wurstmeister/kafka:2.11-1.1.1

docker pull confluentinc/cp-schema-registry

1
2
3
4
docker run --name schema-registry -h schema-registry -p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=dev:2181 \
confluentinc/cp-schema-registry:5.5.0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
docker run --name schema-registry -h schema-registry -p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=dev:2181 \
confluentinc/cp-schema-registry:5.5.0

docker run --name eventbus_kafka -d \
-p 9092:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://dev:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=dev:2181 \
-e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_DELETE_TOPIC_ENABLE=true \
-e KAFKA_BROKER_ID=0 \
-v /var/run/docker.sock:/var/run/docker.sock \
wurstmeister/kafka:2.11-1.1.1

docker pull confluentinc/cp-kafka-connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
docker run -d \
--name=kafka-connect-avro \
-e CONNECT_BOOTSTRAP_SERVERS=dev:9092 \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID="quickstart-avro" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-avro-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-avro-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-avro-status" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="dev" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \
-e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars \
-v /Users/shankai/codebase/oss/fork/jars/:/etc/kafka-connect/jars/ \
-p 8083:8083 \
confluentinc/cp-kafka-connect:latest


docker run -d \
--name=kc-redis-avro \
-e CONNECT_BOOTSTRAP_SERVERS=dev:9092 \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID="quickstart-avro" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-avro-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-avro-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-avro-status" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="dev" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \
-e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars \
-p 8083:8083 \
kc-redis:latest

docker pull landoop/kafka-connect-ui

1
2
3
https://github.com/lensesio/kafka-connect-ui

docker run -d --name kafka-connect-ui -it -p 8000:8000 -e "CONNECT_URL=http://dev:8083" landoop/kafka-connect-ui

Connector

connector configuration override

1
2
value.converter.schema.registry.url=http://dev:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter

worker.properties: connector.client.config.override.policy=All

1
2
"consumer.override.max.poll.records": "1",
"consumer.override.fetch.max.wait.ms": "0"

LOG_DIR=/app

Properties

RedisSinkConnector PubSub

1
2
3
4
5
6
7
connector.class=com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector
connect.redis.port=6379
connect.redis.kcql=select * from sea_vessel_position_reports STOREAS PubSub(channel=Timestamp)
topics=sea_vessel_position_reports
tasks.max=1
connect.redis.host=dev
name=quickstart-redis-pub

RedisSinkConnector Cache

1
2
3
4
5
6
7
connector.class=com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector
connect.redis.port=6379
connect.redis.kcql=INSERT INTO PK- select * from sea_vessel_position_reports PK Timestamp
topics=sea_vessel_position_reports
tasks.max=1
connect.redis.host=dev
name=quickstart-redis-cache

Rest

Redis Cache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
curl --location --request POST 'http://dev:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "RedisSinkConnectorCache",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector",
"connect.redis.port": "6379",
"connect.redis.kcql": "INSERT INTO FX- select location from sea_vessel_position_reports PK Timestamp",
"topics": "sea_vessel_position_reports",
"tasks.max": "1",
"connect.redis.host": "dev",
"name": "RedisSinkConnectorCache"
}
}'

Redis PubSub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
curl --location --request POST 'http://dev:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "RedisSinkConnectorPubSub",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector",
"connect.redis.port": "6379",
"connect.redis.kcql": "select * from sea_vessel_position_reports STOREAS PubSub(channel=Timestamp)",
"topics": "sea_vessel_position_reports",
"tasks.max": "1",
"connect.redis.host": "dev",
"name": "RedisSinkConnectorPubSub"
}
}'

Redis Benchmark

1
redis-benchmark -h redis-ps-redis-perf-paas.paas -c 1 -d 1024  script load "redis.call('publish', 'abc')"