lost and found ( for me ? )

Showing posts with label kafka. Show all posts
Showing posts with label kafka. Show all posts

Kafka : sample scripts of producer/broker with PyKafka

Here are sample scripts of Kafka producer/consumer with Pykafka.

Reference
https://jujucharms.com/u/hazmat/kafka/
https://github.com/Parsely/pykafka

set up kafka and zookeeper clusters with juju.
$ juju --version
2.0.2-xenial-amd64

$ juju deploy kafka -n 6
$ juju deploy zookeeper -n 3
$ juju add-relation kafka zookeeper

$ juju status --format=short
- kafka/0: 10.219.27.133 (agent:idle, workload:active) 9092/tcp
- kafka/1: 10.219.27.167 (agent:idle, workload:active) 9092/tcp
- kafka/2: 10.219.27.124 (agent:idle, workload:active) 9092/tcp
- kafka/3: 10.219.27.8 (agent:idle, workload:active) 9092/tcp
- kafka/4: 10.219.27.231 (agent:idle, workload:active) 9092/tcp
- kafka/5: 10.219.27.77 (agent:idle, workload:active) 9092/tcp
- ubuntu/0: 10.219.27.105 (agent:idle, workload:active)
- ubuntu/1: 10.219.27.17 (agent:idle, workload:active)
- zookeeper/0: 10.219.27.221 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- zookeeper/1: 10.219.27.134 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- zookeeper/2: 10.219.27.58 (agent:idle, workload:active) 2181/tcp, 9998/tcp

$ juju run --unit=kafka/0 "systemctl list-unit-files --type=service | grep kafka"
jujud-unit-kafka-0.service                 enabled

juju run --unit=kafka/0 "dpkg -l | grep kafka"
ii  kafka                            0.8.1.1-1                                  all          Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.
ii  kafka-server                     0.8.1.1-1                                  all          Bundles the init script for kafka server.

$ juju run --unit=kafka/0 "ls /usr/bin | egrep 'kafka|zoo'"
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-run-class.sh
kafka-topics.sh
zookeeper-client
zookeeper-server
zookeeper-server-cleanup
zookeeper-server-initialize

$ juju run --unit=zookeeper/0 "systemctl list-unit-files --type=service | grep zookeeper"
jujud-unit-zookeeper-0.service             enabled
hattori@ubuntu05:~$ juju run --unit=zookeeper/0 "dpkg -l | grep zoo"
ii  zookeeper                        3.4.6-1                                    all          A high-performance coordination service for distributed applications.
ii  zookeeper-server                 3.4.6-1                                    all          The Hadoop Zookeeper server

All nodes are running within LXD containers.
$ lxc list juju- -c n
+----------------+
|      NAME      |
+----------------+
| juju-0a148b-28 |
+----------------+
| juju-0a148b-29 |
+----------------+
| juju-0a148b-30 |
+----------------+
| juju-0a148b-31 |
+----------------+
| juju-0a148b-32 |
+----------------+

for pykafka nodes. one for producer, one for consumer
install pykafka.
$ juju deploy ubuntu -n 2

$ juju ssh ubuntu/0
$ sudo pip install pykafka

$ juju ssh ubuntu/1
$ sudo pip install pykafka

- create a topic

$ juju run --unit=kafka/0 "/usr/bin/kafka-topics.sh --zookeeper 10.219.27.221:2181 --create --topic topic01 --partitions 12 --replication-factor 2"

$ juju run --unit=kafka/0 "/usr/bin/kafka-topics.sh --zookeeper 10.219.27.221:2181 --describe"
Topic:topic01   PartitionCount:12       ReplicationFactor:2     Configs:
       Topic: topic01  Partition: 0    Leader: 3       Replicas: 3,0   Isr: 3,0
       Topic: topic01  Partition: 1    Leader: 4       Replicas: 4,1   Isr: 4,1
       Topic: topic01  Partition: 2    Leader: 5       Replicas: 5,2   Isr: 5,2
       Topic: topic01  Partition: 3    Leader: 0       Replicas: 0,3   Isr: 0,3
       Topic: topic01  Partition: 4    Leader: 1       Replicas: 1,4   Isr: 1,4
       Topic: topic01  Partition: 5    Leader: 2       Replicas: 2,5   Isr: 2,5
       Topic: topic01  Partition: 6    Leader: 3       Replicas: 3,1   Isr: 3,1
       Topic: topic01  Partition: 7    Leader: 4       Replicas: 4,2   Isr: 4,2
       Topic: topic01  Partition: 8    Leader: 5       Replicas: 5,3   Isr: 5,3
       Topic: topic01  Partition: 9    Leader: 0       Replicas: 0,4   Isr: 0,4
       Topic: topic01  Partition: 10   Leader: 1       Replicas: 1,5   Isr: 1,5
       Topic: topic01  Partition: 11   Leader: 2       Replicas: 2,0   Isr: 2,0

[ producer with pykafka ]

log into ubuntu/0 node.

$ juju ssh ubuntu/0

ubuntu@juju-0a148b-28:~$ cat pykafka-producer.py
#!/usr/bin/python3

from pykafka import KafkaClient

brokers_list = "10.219.27.133:9092,10.219.27.167:9092,10.219.27.124:9092,10.219.27.8:9092,10.219.27.231:9092,10.219.27.77:9092"
zk_list = "10.219.27.221:2181,10.219.27.134:2181,10.219.27.58:2181"

client = KafkaClient(hosts=brokers_list)
#print(client.topics)

topic = client.topics[b'topic01']

with topic.get_sync_producer() as producer:
   for i in range(4):
       producer.produce(b"test message")

send massages
ubuntu@juju-0a148b-28:~$ python3 pykafka-producer.py
ubuntu@juju-0a148b-28:~$

read data with kafka-console-consumer.sh
$ juju ssh kafka/0

ubuntu@juju-0a148b-30:~$ /usr/bin/kafka-console-consumer.sh --from-beginning --topic topic01 --zookeeper 10.219.27.221:2181
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
test message
test message
test message
test message


[ consumer with pykafka ]

log into ubuntu/1 node.
$ juju ssh ubuntu/1


ubuntu@juju-0a148b-29:~$ cat pykafka-simple-consumer.py
#!/usr/bin/python3

from pykafka import KafkaClient

brokers_list = "10.219.27.133:9092,10.219.27.167:9092,10.219.27.124:9092,10.219.27.8:9092,10.219.27.231:9092,10.219.27.77:9092"
zk_list = "10.219.27.221:2181,10.219.27.134:2181,10.219.27.58:2181"

client = KafkaClient(hosts=brokers_list)
#print(client.topics)

topic = client.topics[b'topic01']
consumer = topic.get_simple_consumer()

for message in consumer:
   if message is not False:
       print(message.offset, message.value)

read data
ubuntu@juju-0a148b-29:~$ python3 pykafka-simple-consumer.py
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
1 b'test message'
1 b'test message'
1 b'test message'
0 b'test message'
0 b'test message'
1 b'test message'
0 b'test message'
2 b'test message'
3 b'test message'
4 b'test message'
0 b'test message'
1 b'test message'
2 b'test message'
5 b'test message'
6 b'test message'
0 b'test message'
1 b'test message'


Set up Kafka, Zookeeper clusters with Ubuntu Juju

Here are logs when I set up kafka, zookeeper clusters with Juju.

Reference
https://jujucharms.com/apache-kafka/
https://kafka.apache.org/

I am not familiar with kafka, zookeeper. So I installed kafka and zookeepr to learn zookeeper/kafka myself. It’s easy to set up multiple zookeeper/kafka clusters with Juju.

$ juju --version
2.0.2-xenial-amd64

$ juju status
Model    Controller     Cloud/Region         Version
default  my-controller  localhost/localhost  2.0.2

App  Version  Status  Scale  Charm  Store  Rev  OS  Notes

Unit  Workload  Agent  Machine  Public address  Ports  Message

Machine  State  DNS  Inst id  Series  AZ

deploy 3 zookeepers
$ juju deploy apache-zookeeper -n 3

deploy 3 kafka
$ juju deploy apache-kafka -n 3

add relation
$ juju add-relation apache-kafka apache-zookeeper

confirm all units are active
$ juju status
Model    Controller     Cloud/Region         Version
default  my-controller  localhost/localhost  2.0.2

App               Version  Status  Scale  Charm             Store       Rev  OS      Notes
apache-kafka               active      3  apache-kafka      jujucharms    5  ubuntu
apache-zookeeper           active      3  apache-zookeeper  jujucharms    3  ubuntu

Unit                 Workload  Agent  Machine  Public address  Ports              Message
apache-kafka/3       active    idle   7        10.40.27.108   9092/tcp           Ready
apache-kafka/4       active    idle   8        10.40.27.23    9092/tcp           Ready
apache-kafka/5*      active    idle   9        10.40.27.201   9092/tcp           Ready
apache-zookeeper/1   active    idle   4        10.40.27.184   2181/tcp,9998/tcp  Ready (3 zk units)
apache-zookeeper/2*  active    idle   5        10.40.27.174   2181/tcp,9998/tcp  Ready (3 zk units)
apache-zookeeper/3   active    idle   6        10.40.27.192   2181/tcp,9998/tcp  Ready (3 zk units)

Machine  State    DNS            Inst id        Series  AZ
4        started  10.40.27.184  juju-0a148b-4  trusty
5        started  10.40.27.174  juju-0a148b-5  trusty
6        started  10.40.27.192  juju-0a148b-6  trusty
7        started  10.40.27.108  juju-0a148b-7  trusty
8        started  10.40.27.23   juju-0a148b-8  trusty
9        started  10.40.27.201  juju-0a148b-9  trusty

Relation  Provides          Consumes          Type
zkclient  apache-kafka      apache-zookeeper  regular
zkpeer    apache-zookeeper  apache-zookeeper  peer

$ juju list-actions apache-kafka
Action        Description
create-topic  Create a new Kafka topic
delete-topic  Delete a Kafka topic
list-topics   List all Kafka topics
list-zks      List ip:port info for connected Zookeeper servers
read-topic    Consume an existing kafka topic
smoke-test    Verify that Kafka is working as expected by listing zookeepers, then creating/listing/deleting a topic
write-topic   Write to a kafka topic

$ juju list-actions apache-zookeeper
Action      Description
start-rest  Start the Zookeeper REST service
stop-rest   Stop the Zookeeper REST service

list zookeeper servers.

$ juju run-action apache-kafka/3 list-zks
Action queued with id: be9738ec-dc2c-4717-88e9-e85ff0da3cfc

$ juju show-action-status be9738ec-dc2c-4717-88e9-e85ff0da3cfc
actions:
- id: be9738ec-dc2c-4717-88e9-e85ff0da3cfc
 status: completed
 unit: apache-kafka/3

$ juju show-action-output be9738ec-dc2c-4717-88e9-e85ff0da3cfc
results:
 output: 10.40.27.174:2181,10.40.27.184:2181,10.40.27.192:2181
status: completed
timing:
 completed: 2017-04-01 04:38:02 +0000 UTC
 enqueued: 2017-04-01 04:37:59 +0000 UTC
 started: 2017-04-01 04:38:02 +0000 UTC

- create a topic

$ juju run-action apache-kafka/3 create-topic topic=topic01 partitions=3 replication=3
Action queued with id: a624f711-d6c1-4d2b-8d0c-11a160d26300

$ juju show-action-status a624f711-d6c1-4d2b-8d0c-11a160d26300
actions:
- id: a624f711-d6c1-4d2b-8d0c-11a160d26300
 status: completed
 unit: apache-kafka/3

- list topics

$ juju run-action apache-kafka/3 list-topics
Action queued with id: 3a83ee77-f733-4a52-8454-bfd5af15f289

$ juju show-action-output 3a83ee77-f733-4a52-8454-bfd5af15f289
results:
 output: |
   topic01
status: completed

- write to a topic

$ juju run-action apache-kafka/3 write-topic topic=topic01 data=hello
Action queued with id: f445cd70-87f7-448a-8bf9-778c2a88d066

$ juju show-action-output f445cd70-87f7-448a-8bf9-778c2a88d066
results:
 output: |
   [2017-04-01 04:47:58,010] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
status: completed

- read data from a topic

read date from kafka/3
$ juju run-action apache-kafka/3 read-topic topic=topic01 partition=0
Action queued with id: 8d9bc3a9-dc82-4185-833c-5a3288e2ae62

$ juju show-action-output 8d9bc3a9-dc82-4185-833c-5a3288e2ae62
results:
 output: |
   hello
   Terminating. Reached the end of partition (topic01, 0) at offset 1
status: completed

read data from kafka/5
$ juju show-action-output 6543f40e-e3a1-4fb8-80e5-a6ad4cb6e9e8
results:
 output: |
   hello
   Terminating. Reached the end of partition (topic01, 0) at offset 1
status: completed

- delete a topic

$ juju run-action apache-kafka/4 delete-topic topic=topic01
Action queued with id: 09325b5f-c4ab-4380-8aed-025cd0656aae

$ juju show-action-output 09325b5f-c4ab-4380-8aed-025cd0656aae
results:
 output: |
   Topic topic01 is marked for deletion.
   Note: This will have no impact if delete.topic.enable is not set to true.
status: completed

- add three more kafka units

$ juju add-unit -n 3 apache-kafka

$ juju status --format=short

- apache-kafka/3: 10.40.27.108 (agent:idle, workload:active) 9092/tcp
- apache-kafka/4: 10.40.27.23 (agent:idle, workload:active) 9092/tcp
- apache-kafka/5: 10.40.27.201 (agent:idle, workload:active) 9092/tcp
- apache-kafka/6: 10.40.27.122 (agent:idle, workload:active) 9092/tcp
- apache-kafka/7: 10.40.27.216 (agent:idle, workload:active) 9092/tcp
- apache-kafka/8: 10.40.27.252 (agent:idle, workload:active) 9092/tcp
- apache-zookeeper/1: 10.40.27.184 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- apache-zookeeper/2: 10.40.27.174 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- apache-zookeeper/3: 10.40.27.192 (agent:idle, workload:active) 2181/tcp, 9998/tcp

[ kafka-topic.sh , kafka-console-producer.sh, kafka-console-consumer.sh ]

create a topic
$ juju ssh apache-kafka/3

$ kafka-topics.sh --create --zookeeper 10.40.27.184:2181 --replication-factor 3 --partitions 1 --topic test
Created topic "test".

$ kafka-topics.sh --list --zookeeper 10.40.27.184:2181
my-replicated-topic - marked for deletion
smoketest - marked for deletion
test
topic01 - marked for deletion

send messages to a topic
$ kafka-console-producer.sh --broker-list 10.40.27.108:9092 --topic test
[2017-04-01 06:49:33,913] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
hello
hi
aaa

you can specify multiple brokers as below.
$ kafka-console-producer.sh --broker-list 10.40.27.108:9092,10.40.27.23:9092,10.40.27.201:9092 --topic test

read data from a topic “test”
$ juju ssh apache-kafka/3

$ kafka-console-consumer.sh --topic test --from-beginning --zookeeper 10.40.27.184:2181,10.40.24:2181,10.40.27.192:2181
hello
hi
aaa

[ kafka configuration ]

$ juju ssh apache-kafka/3

$ ls /etc/kafka/conf/
consumer.properties  producer.properties  test-log4j.properties   zookeeper.properties
log4j.properties     server.properties    tools-log4j.properties

$ grep -v ^# /etc/kafka/conf/server.properties | grep -v ^$
broker.id=3
port=9092
advertised.host.name=juju-0a148b-7
num.network.threads=3

num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.40.27.174:2181,10.40.27.184:2181,10.40.27.192:2181
zookeeper.connection.timeout.ms=6000

[ zookeeper configuration ]

$ juju status apache-zookeeper --format=short

- apache-zookeeper/1: 10.40.27.184 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- apache-zookeeper/2: 10.40.27.174 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- apache-zookeeper/3: 10.40.27.192 (agent:idle, workload:active) 2181/tcp, 9998/tcp

$ juju ssh apache-zookeeper/1

ubuntu@juju-0a148b-4:~$ cd /etc/zookeeper/conf/
ubuntu@juju-0a148b-4:/etc/zookeeper/conf$ ls
configuration.xsl  log4j.properties  zoo.cfg  zoo_sample.cfg

$ grep -v ^# zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181
server.1=10.40.27.184:2888:3888
server.3=10.40.27.192:2888:3888
server.2=10.40.27.174:2888:3888

[ logs when I set up 5 zookeepers and 9 kafka ]

five zookeeper and nine kafka are running.

five zookeeper running
$ juju status apache-zookeeper --format=short

- apache-zookeeper/1: 10.40.27.184 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- apache-zookeeper/2: 10.40.27.174 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- apache-zookeeper/3: 10.40.27.192 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- apache-zookeeper/4: 10.40.27.134 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- apache-zookeeper/5: 10.40.27.227 (agent:idle, workload:active) 2181/tcp, 9998/tcp

nine kafka running
$ juju status apache-kafka --format=short

- apache-kafka/3: 10.40.27.108 (agent:idle, workload:active) 9092/tcp
- apache-kafka/4: 10.40.27.23 (agent:idle, workload:active) 9092/tcp
- apache-kafka/5: 10.40.27.201 (agent:idle, workload:active) 9092/tcp
- apache-kafka/9: 10.40.27.90 (agent:idle, workload:active) 9092/tcp
- apache-kafka/10: 10.40.27.35 (agent:idle, workload:active) 9092/tcp
- apache-kafka/11: 10.40.27.177 (agent:idle, workload:active) 9092/tcp
- apache-kafka/12: 10.40.27.235 (agent:idle, workload:active) 9092/tcp
- apache-kafka/13: 10.40.27.74 (agent:idle, workload:active) 9092/tcp
- apache-kafka/14: 10.40.27.239 (agent:idle, workload:active) 9092/tcp

Create a topic
$ juju ssh apache-kafka/5

$ kafka-topics.sh --create --zookeeper 10.40.27.227:2181 --replication-factor 3 --partitions 3 --topic topic
05
Created topic "topic05".

$ kafka-topics.sh --describe --zookeeper 10.40.27.227:2181 --topic topic05
Topic:topic05   PartitionCount:3        ReplicationFactor:3     Configs:
       Topic: topic05  Partition: 0    Leader: 5       Replicas: 5,13,14       Isr: 5,13,14
       Topic: topic05  Partition: 1    Leader: 9       Replicas: 9,14,3        Isr: 9,14,3
       Topic: topic05  Partition: 2    Leader: 10      Replicas: 10,3,4        Isr: 10,3,4

Producer
ubuntu@juju-0a148b-9:~$ kafka-console-producer.sh --broker-list 10.40.27.201:9092 --topic topic05
[2017-04-01 09:18:20,841] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
hello
hi
bye

Consumer
ubuntu@juju-0a148b-8:~$ kafka-console-consumer.sh --zookeeper 10.40.27.184:2181 --from-beginning --topic topic05
hello
hi
bye

connects to other zookeepers and read data
ubuntu@juju-0a148b-8:~$
ubuntu@juju-0a148b-8:~$ kafka-console-consumer.sh --zookeeper 10.40.27.174:2181 --from-beginning --topic topic05
hello
hi
bye
^CConsumed 3 messages
ubuntu@juju-0a148b-8:~$ kafka-console-consumer.sh --zookeeper 10.40.27.192:2181 --from-beginning --topic topic05
hello
hi
bye
^CConsumed 3 messages