lost and found ( for me ? )

run kafka and kafka-manager with docker

Here are logs when running kafka with docker.

Reference ( kafka )
https://github.com/wurstmeister/kafka-docker
http://wurstmeister.github.io/kafka-docker/

Reference ( kafka-manager )
https://github.com/yahoo/kafka-manager
https://hub.docker.com/r/sheepkiller/kafka-manager/

Assume that you have already installed docker-engine and docker-compose.

Docker host IP (eth0) : 172.21.242.141

# git clone https://github.com/wurstmeister/kafka-docker.git

# cd kafka-docker/

edit docker-compose.yml

I specify an eth0 IP address on docker host.
# cat docker-compose.yml
zookeeper:
 image: wurstmeister/zookeeper
 ports:
   - "2181"
kafka:
 build: .
 ports:
   - "9092"
 links:
   - zookeeper:zk
 environment:
   KAFKA_ADVERTISED_HOST_NAME: 172.21.242.141
 volumes:
   - /var/run/docker.sock:/var/run/docker.sock

run a cluster
# docker-compose up -d
Creating kafkadocker_zookeeper_1
Creating kafkadocker_kafka_1

one zookeeper and one kafka(broker) are running.
# docker-compose ps
        Name                        Command               State                          Ports
----------------------------------------------------------------------------------------------------------------------
kafkadocker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32786->9092/tcp
kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:32785->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

You can asscess to zookeepr or brokers via “docker exec”
# docker exec –it 935848a43538 bash
root@935848a43538:/opt/zookeeper-3.4.6# ls
CHANGES.txt  README.txt            build.xml  data        ivy.xml          recipes              zookeeper-3.4.6.jar.asc
LICENSE.txt  README_packaging.txt  conf       dist-maven  ivysettings.xml  src                  zookeeper-3.4.6.jar.md5
NOTICE.txt   bin                   contrib    docs        lib              zookeeper-3.4.6.jar  zookeeper-3.4.6.jar.sha1
root@935848a43538:/opt/zookeeper-3.4.6# exit


add more brokers.
# docker-compose scale kafka=3
Creating and starting 2 ... done
Creating and starting 3 ... done

You can check logs of all containers by docker-compose logs.
# docker-compose logs

Three brokers are running.
docker-compose ps
        Name                        Command               State                          Ports
----------------------------------------------------------------------------------------------------------------------
kafkadocker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32786->9092/tcp
kafkadocker_kafka_2       start-kafka.sh                   Up      0.0.0.0:32788->9092/tcp
kafkadocker_kafka_3       start-kafka.sh                   Up      0.0.0.0:32787->9092/tcp
kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:32785->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

start a container by “start-kafka-shell.sh” to interact with the cluster.

To use this script, you need to specify host IP and zookeeper IP.
# cat start-kafka-shell.sh
#!/bin/bash
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash

You can check the IP by “docker-compose ps”
# docker-compose ps
        Name                        Command               State                          Ports
----------------------------------------------------------------------------------------------------------------------
kafkadocker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32786->9092/tcp
kafkadocker_kafka_2       start-kafka.sh                   Up      0.0.0.0:32788->9092/tcp
kafkadocker_kafka_3       start-kafka.sh                   Up      0.0.0.0:32787->9092/tcp
kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:32785->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

If you access to docker host IP(in my case, 172.21.242.141) destined to TCP 32785, you can access to the zookerper container(TCP 2181).

run the script.
# sh start-kafka-shell.sh 172.21.242.141 172.21.242.141:32785
bash-4.3#

bash-4.3# env
KAFKA_HOME=/opt/kafka_2.11-0.9.0.1
JAVA_VERSION_BUILD=02
HOSTNAME=e2f47701b242
TERM=xterm
SCALA_VERSION=2.11
JAVA_VERSION_MAJOR=8
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/jdk/bin
KAFKA_VERSION=0.9.0.1
PWD=/
JAVA_HOME=/opt/jdk
LANG=C.UTF-8
ZK=172.21.242.141:32785
HOST_IP=172.21.242.141
SHLVL=1
HOME=/root
JAVA_PACKAGE=server-jre
JAVA_VERSION_MINOR=74
_=/usr/bin/env

Create a topic
bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic \
> --partitions 4 --zookeeper $ZK --replication-factor 2
Created topic "topic".
bash-4.3#

check the created topic
bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic --zookeeper $ZK
Topic:topic     PartitionCount:4        ReplicationFactor:2     Configs:
       Topic: topic    Partition: 0    Leader: 1003    Replicas: 1003,1001     Isr: 1003,1001
       Topic: topic    Partition: 1    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
       Topic: topic    Partition: 2    Leader: 1002    Replicas: 1002,1003     Isr: 1002,1003
       Topic: topic    Partition: 3    Leader: 1003    Replicas: 1003,1002     Isr: 1003,1002

Here are broker lists. ( Three brokers are running )
bash-4.3# broker-list.sh
172.21.242.141:32788,172.21.242.141:32787,172.21.242.141:32786

run a producer
bash-4.3# $KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic --broker-list=`broker-list.sh`

run a consumer.
open anther terminal and run a container.
# sh start-kafka-shell.sh 172.21.242.141 172.21.242.141:32785
bash-4.3#

then start a consumer.
bash-4.3# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic --zookeeper=$ZK

enter some strings on the producer.
bash-4.3# $KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic --broker-list=`broker-list.sh`
hello
hi

go back to the consumer.
you will see messages on the consumer.
bash-4.3# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic --zookeeper=$ZK
hello
hi

next, run the kafka-manager to manage kafka via GUI.

Before running kafka-manager, check zookeeper’s IP
# docker-compose ps
        Name                        Command               State                          Ports
----------------------------------------------------------------------------------------------------------------------
kafkadocker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32786->9092/tcp
kafkadocker_kafka_2       start-kafka.sh                   Up      0.0.0.0:32788->9092/tcp
kafkadocker_kafka_3       start-kafka.sh                   Up      0.0.0.0:32787->9092/tcp
kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:32785->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

In my case, I can access to the zookeeper via docker host IP destined to TCP 32785.
run kafka-manager.
# docker run -it --rm -p 9000:9000 -e ZK_HOSTS="172.21.242.141:32785" -e APPLATION_SECRET=secret sheepkiller/kafka-manager

access to the kafka-manager ( http://docker’s host ip:9000 )

add a cluster.
I have used 0.9.0.1 kafka, but I can not find that version via GUI, so I specified 0.8.2.2.

I am not sure you can manage kafka brokers via GUI, because I am using 0.9.0.1, but this GUI does not seem to support 0.9.0.1. Anyway, this may help you understand kafka.




[ access to the zookeeper (zkCli.sh) ]

Reference
https://zookeeper.apache.org/doc/r3.3.3/zookeeperStarted.html

access to the zookeeper container
# docker exec -it 935848a43538 bash
root@935848a43538:/opt/zookeeper-3.4.6#

root@935848a43538:/opt/zookeeper-3.4.6# env | grep ZK
ZK_HOME=/opt/zookeeper-3.4.6

root@935848a43538:/opt/zookeeper-3.4.6# ls /opt/zookeeper-3.4.6/conf/
configuration.xsl  log4j.properties  zoo.cfg

root@935848a43538:/opt/zookeeper-3.4.6# $ZK_HOME/bin/zkCli.sh -server 127.0.0.1:2181
Connecting to 127.0.0.1:2181
WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2181(CONNECTED) 0]

[zk: 127.0.0.1:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
       connect host:port
       get path [watch]
       ls path [watch]
       set path data [version]
       rmr path
       delquota [-n|-b] path
       quit
       printwatches on|off
       create [-s] [-e] path data acl
       stat path [watch]
       close
       ls2 path [watch]
       history
       listquota path
       setAcl path acl
       getAcl path
       sync path
       redo cmdno
       addauth scheme auth
       delete path [version]
       setquota -n|-b val path

[zk: 127.0.0.1:2181(CONNECTED) 37] ls /
[isr_change_notification, zookeeper, admin, consumers, config, kafka-manager, controller, brokers, controller_epoch]
[zk: 127.0.0.1:2181(CONNECTED) 38] ls /brokers
[seqid, topics, ids]
[zk: 127.0.0.1:2181(CONNECTED) 39]
[zk: 127.0.0.1:2181(CONNECTED) 39] ls /consumers
[console-consumer-63669, console-consumer-25719, console-consumer-68028]
[zk: 127.0.0.1:2181(CONNECTED) 40]
[zk: 127.0.0.1:2181(CONNECTED) 40] get /consumers/console-consumer-63669

cZxid = 0x110
ctime = Mon Mar 07 05:17:31 UTC 2016
mZxid = 0x110
mtime = Mon Mar 07 05:17:31 UTC 2016
pZxid = 0x132
cversion = 5
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

[zk: 127.0.0.1:2181(CONNECTED) 53] get /brokers/topics/topic/partitions/0/state
{"controller_epoch":1,"leader":1003,"version":1,"leader_epoch":0,"isr":[1003,1001]}
cZxid = 0x30
ctime = Mon Mar 07 03:55:43 UTC 2016
mZxid = 0x30
mtime = Mon Mar 07 03:55:43 UTC 2016
pZxid = 0x30
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 83
numChildren = 0

[ get messages from a broker by using kafka-python ]

Reference
https://github.com/dpkp/kafka-python

install kafka-python
# pip install kafka-python

check brokers’s IPs
# docker-compose ps
        Name                        Command               State                          Ports
----------------------------------------------------------------------------------------------------------------------
kafkadocker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32786->9092/tcp
kafkadocker_kafka_2       start-kafka.sh                   Up      0.0.0.0:32788->9092/tcp
kafkadocker_kafka_3       start-kafka.sh                   Up      0.0.0.0:32787->9092/tcp
kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:32785->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

# cat consumer.py
from kafka import KafkaConsumer

# specify broker's IPs, not zookeeper
consumer = KafkaConsumer('topic', bootstrap_servers=['172.21.242.141:32768','172.21.242.141:32788','172.21.242.141:32787'])
for msg in consumer:
   print(msg)

run the script
# python consumer.py

enter some strings on the producer.
bash-4.3# $KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic --broker-list=`broker-list.sh`
hello
hi
by

You will see messages as below.
# python consumer.py
ConsumerRecord(topic=u'topic', partition=2, offset=20, key=None, value='hello')
ConsumerRecord(topic=u'topic', partition=1, offset=20, key=None, value='hi')
ConsumerRecord(topic=u'topic', partition=3, offset=19, key=None, value='by')

Please note that kafka-python done not connect to zookeeper.

https://github.com/dpkp/kafka-python/issues/308

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.