Getting data from Cassandra to Kafka topic and from Kafka topic to cassandra using Kafka Connect (KConnect)
Below is the docker-compose file to add cassandra connector with two kafka node and a zookeeper and Landoop connect ui
version: '2'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.0.0"
restart: unless-stopped
hostname: zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- ./zookeeper/data:/data
- ./zookeeper/datalog:/datalog
kafka:
image: "confluentinc/cp-kafka:5.0.0"
hostname: kafka
ports:
- '9092:9092'
- '29092:29092'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://192.168.0.0:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
volumes:
- ./broker_new/broker1/data:/var/lib/kafka/data
kafka1:
image: "confluentinc/cp-kafka:5.0.0"
hostname: kafka1
ports:
- '9093:9093'
- '29093:29093'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29093,PLAINTEXT_HOST://192.168.0.0:9093
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
volumes:
- ./broker_new/broker2/data:/var/lib/kafka/data
schema-registry:
image: "confluentinc/cp-schema-registry:5.0.0"
hostname: schema-registry
depends_on:
- zookeeper
- kafka
- kafka1
ports:
- '9081:9081'
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092,PLAINTEXT://kafka1:29093
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:9081
kafka-connect-cp:
image: datamountaineer/kafka-connect-cassandra:1.1.0
hostname: kafka-connect-cp
ports:
- "18083:18083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092,kafka1:29093"
bootstrap.servers: "kafka:9092,kafka1:29093"
CONNECT_REST_PORT: 18083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:9081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:9081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-cp"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java'
depends_on:
- zookeeper
- kafka
- kafka1
- schema-registry
links:
- kafka
- kafka1
- schema-registry
connectui:
image: "landoop/kafka-connect-ui"
hostname: connectui
depends_on:
- zookeeper
- kafka
- kafka1
- schema-registry
ports:
- '8000:8000'
environment:
CONNECT_URL: "http://kafka-connect-cp:18083"
Now access the url. Landoop ui will be displayed
Click on New button -> There should be cassandra connector in source as well as cassandra connector in sink as shown below
Cassandra to Kafka
Click on Source Connector -> Cassandra
Configuration -> Properties tab
connector.class=com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.consistency.level=LOCAL_ONE
connect.cassandra.key.space=mykey
connect.cassandra.contact.points=192.168.0.0
tasks.max=1
connect.cassandra.port=9042
connect.cassandra.kcql=INSERT INTO kafka_topic select * from cassandra_table
connect.cassandra.password=cassandra
connect.cassandra.username=cassandra
Click on Validate & Save
This will move all data from cassandra to the Kafka topic
Kafka to Cassandra
Click on Sink Connector -> Cassandra
Configuration -> Properties tab
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
connect.cassandra.key.space=mykeyspace
topics=kafka_topic
connect.cassandra.kcql=INSERT INTO cassandra_table_1 SELECT * FROM kafka_topic
internal.key.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
connect.cassandra.contact.points=192.168.0.0
connect.cassandra.port=9042
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Comments
Post a Comment