Getting data from Cassandra to Kafka topic and from Kafka topic to cassandra using Kafka Connect (KConnect)

Below is the docker-compose file to add cassandra connector with two kafka node and a zookeeper and Landoop connect ui

version: '2'
services:
  zookeeper:
    image: "confluentinc/cp-zookeeper:5.0.0"
    restart: unless-stopped
    hostname: zookeeper
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - ./zookeeper/data:/data
      - ./zookeeper/datalog:/datalog

  kafka:
    image: "confluentinc/cp-kafka:5.0.0"
    hostname: kafka
    ports:
      - '9092:9092'
      - '29092:29092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://192.168.0.0:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    volumes:
      - ./broker_new/broker1/data:/var/lib/kafka/data
  kafka1:
    image: "confluentinc/cp-kafka:5.0.0"
    hostname: kafka1
    ports:
      - '9093:9093'
      - '29093:29093'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29093,PLAINTEXT_HOST://192.168.0.0:9093
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    volumes:
      - ./broker_new/broker2/data:/var/lib/kafka/data

  schema-registry:
    image: "confluentinc/cp-schema-registry:5.0.0"
    hostname: schema-registry
    depends_on:
      - zookeeper
      - kafka
      - kafka1
    ports:
      - '9081:9081'
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092,PLAINTEXT://kafka1:29093
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:9081

  kafka-connect-cp:
    image: datamountaineer/kafka-connect-cassandra:1.1.0
    hostname: kafka-connect-cp
    ports:
      - "18083:18083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:29092,kafka1:29093"
      bootstrap.servers: "kafka:9092,kafka1:29093"
      CONNECT_REST_PORT: 18083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:9081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:9081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-cp"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java'
    depends_on:
      - zookeeper
      - kafka
      - kafka1
      - schema-registry
    links:
      - kafka
      - kafka1
      - schema-registry
  connectui:
    image: "landoop/kafka-connect-ui"
    hostname: connectui
    depends_on:
      - zookeeper
      - kafka
      - kafka1
      - schema-registry
    ports:
      - '8000:8000'
    environment:
      CONNECT_URL: "http://kafka-connect-cp:18083"


Now access the url. Landoop ui will be displayed


Click on New button -> There should be cassandra connector in source as well as cassandra connector in sink as shown below



Cassandra to Kafka

Click on Source Connector -> Cassandra

Configuration -> Properties tab

connector.class=com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.consistency.level=LOCAL_ONE
connect.cassandra.key.space=mykey
connect.cassandra.contact.points=192.168.0.0
tasks.max=1
connect.cassandra.port=9042
connect.cassandra.kcql=INSERT INTO  kafka_topic select * from cassandra_table
connect.cassandra.password=cassandra
connect.cassandra.username=cassandra

Click on Validate & Save

This will move all data from cassandra to the Kafka topic


Kafka to Cassandra

Click on Sink Connector -> Cassandra

Configuration -> Properties tab

connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
connect.cassandra.key.space=mykeyspace
topics=kafka_topic
connect.cassandra.kcql=INSERT INTO cassandra_table_1 SELECT * FROM kafka_topic
internal.key.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
connect.cassandra.contact.points=192.168.0.0
connect.cassandra.port=9042
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

Comments

Popular Posts