Kafka Connect
Worker.java (ENABLE_AUTO_COMMIT_CONFIG:false, AUTO_OFFSET_RESET_CONFIG:earliest)
WorkerSinkTask.java
offset.flush.interval.ms
Interval at which to try committing offsets for tasks.
Default: 60000 (1 minute)
Confluent Kafka Connect Tutorial
1 2 3
| https://docs.confluent.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html
https://docs.confluent.io/5.0.0/connect/concepts.html#connect-converters
|
Kafka Connect REST API
1 2
| http://dev:8083/connector-plugins http://dev:8083/connectors
|
Lenses
1 2 3
| https://docs.lenses.io/4.1/integrations/connectors/stream-reactor/sinks/redissinkconnector/ https://docs.lenses.io/2.1/lenses-sql/kcql.html
|
测试环境
1 2 3 4 5 6
| # all(zk,kafka,schema-registry,kafka-connect,ui,connectors) https://github.com/lensesio/fast-data-dev
# ui & kafka-connect https://github.com/lensesio/kafka-connect-ui https://hub.docker.com/r/confluentinc/cp-kafka-connect
|
Build
1 2 3 4
| # clone from fork repository git clone https://github.com/shankai/stream-reactor.git cd stream-reactor/kafka-connect-redis gradle clean build collectFatJar -x test
|
Installation
All
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| # redis pubsub mode has bug docker run --rm -d --name lensesio_dev \ -p 2181:2181 -p 3030:3030 -p 9092:9092 \ -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 \ -e ADV_HOST=dev -e CONNECTORS=redis \ lensesio/fast-data-dev:latest
# redis pubsub fix docker run --rm -d --name lensesio_dev \ -p 2181:2181 -p 3030:3030 -p 9092:9092 \ -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 \ -e ADV_HOST=dev -e CONNECTORS=redis \ -v /Users/shankai/codebase/oss/stream-reactor/kafka-connect-redis/build/libs/kafka-connect-redis-2.1.3.jar:/opt/landoop/connectors/stream-reactor/kafka-connect-redis/kafka-connect-redis-2.1.3.jar \ lensesio/fast-data-dev:latest
|
Kafka Connect & UI
docker pull confluentinc/cp-zookeeper
docker pull confluentinc/cp-kafka
1 2 3 4 5 6 7 8 9 10
| docker run --name eventbus_kafka -d \ -p 9092:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://dev:9092 \ -e KAFKA_ZOOKEEPER_CONNECT=dev:2181 \ -e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \ -e KAFKA_DELETE_TOPIC_ENABLE=true \ -e KAFKA_BROKER_ID=0 \ -v /var/run/docker.sock:/var/run/docker.sock \ wurstmeister/kafka:2.11-1.1.1
|
docker pull confluentinc/cp-schema-registry
1 2 3 4
| docker run --name schema-registry -h schema-registry -p 8081:8081 \ -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \ -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=dev:2181 \ confluentinc/cp-schema-registry:5.5.0
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| docker run --name schema-registry -h schema-registry -p 8081:8081 \ -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \ -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=dev:2181 \ confluentinc/cp-schema-registry:5.5.0
docker run --name eventbus_kafka -d \ -p 9092:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://dev:9092 \ -e KAFKA_ZOOKEEPER_CONNECT=dev:2181 \ -e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \ -e KAFKA_DELETE_TOPIC_ENABLE=true \ -e KAFKA_BROKER_ID=0 \ -v /var/run/docker.sock:/var/run/docker.sock \ wurstmeister/kafka:2.11-1.1.1
|
docker pull confluentinc/cp-kafka-connect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| docker run -d \ --name=kafka-connect-avro \ -e CONNECT_BOOTSTRAP_SERVERS=dev:9092 \ -e CONNECT_REST_PORT=8083 \ -e CONNECT_GROUP_ID="quickstart-avro" \ -e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-avro-config" \ -e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-avro-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="quickstart-avro-status" \ -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \ -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \ -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \ -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_REST_ADVERTISED_HOST_NAME="dev" \ -e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \ -e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars \ -v /Users/shankai/codebase/oss/fork/jars/:/etc/kafka-connect/jars/ \ -p 8083:8083 \ confluentinc/cp-kafka-connect:latest
docker run -d \ --name=kc-redis-avro \ -e CONNECT_BOOTSTRAP_SERVERS=dev:9092 \ -e CONNECT_REST_PORT=8083 \ -e CONNECT_GROUP_ID="quickstart-avro" \ -e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-avro-config" \ -e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-avro-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="quickstart-avro-status" \ -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \ -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \ -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \ -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://dev:8081" \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_REST_ADVERTISED_HOST_NAME="dev" \ -e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \ -e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars \ -p 8083:8083 \ kc-redis:latest
|
docker pull landoop/kafka-connect-ui
1 2 3
| https://github.com/lensesio/kafka-connect-ui
docker run -d --name kafka-connect-ui -it -p 8000:8000 -e "CONNECT_URL=http://dev:8083" landoop/kafka-connect-ui
|
Connector
connector configuration override
1 2
| value.converter.schema.registry.url=http://dev:8081 value.converter=io.confluent.connect.protobuf.ProtobufConverter
|
worker.properties: connector.client.config.override.policy=All
1 2
| "consumer.override.max.poll.records": "1", "consumer.override.fetch.max.wait.ms": "0"
|
LOG_DIR=/app
Properties
RedisSinkConnector PubSub
1 2 3 4 5 6 7
| connector.class=com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector connect.redis.port=6379 connect.redis.kcql=select * from sea_vessel_position_reports STOREAS PubSub(channel=Timestamp) topics=sea_vessel_position_reports tasks.max=1 connect.redis.host=dev name=quickstart-redis-pub
|
RedisSinkConnector Cache
1 2 3 4 5 6 7
| connector.class=com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector connect.redis.port=6379 connect.redis.kcql=INSERT INTO PK- select * from sea_vessel_position_reports PK Timestamp topics=sea_vessel_position_reports tasks.max=1 connect.redis.host=dev name=quickstart-redis-cache
|
Rest
Redis Cache
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| curl --location --request POST 'http://dev:8083/connectors' \ --header 'Content-Type: application/json' \ --data-raw '{ "name": "RedisSinkConnectorCache", "config": { "connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector", "connect.redis.port": "6379", "connect.redis.kcql": "INSERT INTO FX- select location from sea_vessel_position_reports PK Timestamp", "topics": "sea_vessel_position_reports", "tasks.max": "1", "connect.redis.host": "dev", "name": "RedisSinkConnectorCache" } }'
|
Redis PubSub
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| curl --location --request POST 'http://dev:8083/connectors' \ --header 'Content-Type: application/json' \ --data-raw '{ "name": "RedisSinkConnectorPubSub", "config": { "connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector", "connect.redis.port": "6379", "connect.redis.kcql": "select * from sea_vessel_position_reports STOREAS PubSub(channel=Timestamp)", "topics": "sea_vessel_position_reports", "tasks.max": "1", "connect.redis.host": "dev", "name": "RedisSinkConnectorPubSub" } }'
|
Redis Benchmark
1
| redis-benchmark -h redis-ps-redis-perf-paas.paas -c 1 -d 1024 script load "redis.call('publish', 'abc')"
|