lost and found ( for me ? )

Kafka : sample scripts of producer/broker with PyKafka

Here are sample scripts of Kafka producer/consumer with Pykafka.

Reference
https://jujucharms.com/u/hazmat/kafka/
https://github.com/Parsely/pykafka

set up kafka and zookeeper clusters with juju.
$ juju --version
2.0.2-xenial-amd64

$ juju deploy kafka -n 6
$ juju deploy zookeeper -n 3
$ juju add-relation kafka zookeeper

$ juju status --format=short
- kafka/0: 10.219.27.133 (agent:idle, workload:active) 9092/tcp
- kafka/1: 10.219.27.167 (agent:idle, workload:active) 9092/tcp
- kafka/2: 10.219.27.124 (agent:idle, workload:active) 9092/tcp
- kafka/3: 10.219.27.8 (agent:idle, workload:active) 9092/tcp
- kafka/4: 10.219.27.231 (agent:idle, workload:active) 9092/tcp
- kafka/5: 10.219.27.77 (agent:idle, workload:active) 9092/tcp
- ubuntu/0: 10.219.27.105 (agent:idle, workload:active)
- ubuntu/1: 10.219.27.17 (agent:idle, workload:active)
- zookeeper/0: 10.219.27.221 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- zookeeper/1: 10.219.27.134 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- zookeeper/2: 10.219.27.58 (agent:idle, workload:active) 2181/tcp, 9998/tcp

$ juju run --unit=kafka/0 "systemctl list-unit-files --type=service | grep kafka"
jujud-unit-kafka-0.service                 enabled

juju run --unit=kafka/0 "dpkg -l | grep kafka"
ii  kafka                            0.8.1.1-1                                  all          Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.
ii  kafka-server                     0.8.1.1-1                                  all          Bundles the init script for kafka server.

$ juju run --unit=kafka/0 "ls /usr/bin | egrep 'kafka|zoo'"
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-run-class.sh
kafka-topics.sh
zookeeper-client
zookeeper-server
zookeeper-server-cleanup
zookeeper-server-initialize

$ juju run --unit=zookeeper/0 "systemctl list-unit-files --type=service | grep zookeeper"
jujud-unit-zookeeper-0.service             enabled
hattori@ubuntu05:~$ juju run --unit=zookeeper/0 "dpkg -l | grep zoo"
ii  zookeeper                        3.4.6-1                                    all          A high-performance coordination service for distributed applications.
ii  zookeeper-server                 3.4.6-1                                    all          The Hadoop Zookeeper server

All nodes are running within LXD containers.
$ lxc list juju- -c n
+----------------+
|      NAME      |
+----------------+
| juju-0a148b-28 |
+----------------+
| juju-0a148b-29 |
+----------------+
| juju-0a148b-30 |
+----------------+
| juju-0a148b-31 |
+----------------+
| juju-0a148b-32 |
+----------------+

for pykafka nodes. one for producer, one for consumer
install pykafka.
$ juju deploy ubuntu -n 2

$ juju ssh ubuntu/0
$ sudo pip install pykafka

$ juju ssh ubuntu/1
$ sudo pip install pykafka

- create a topic

$ juju run --unit=kafka/0 "/usr/bin/kafka-topics.sh --zookeeper 10.219.27.221:2181 --create --topic topic01 --partitions 12 --replication-factor 2"

$ juju run --unit=kafka/0 "/usr/bin/kafka-topics.sh --zookeeper 10.219.27.221:2181 --describe"
Topic:topic01   PartitionCount:12       ReplicationFactor:2     Configs:
       Topic: topic01  Partition: 0    Leader: 3       Replicas: 3,0   Isr: 3,0
       Topic: topic01  Partition: 1    Leader: 4       Replicas: 4,1   Isr: 4,1
       Topic: topic01  Partition: 2    Leader: 5       Replicas: 5,2   Isr: 5,2
       Topic: topic01  Partition: 3    Leader: 0       Replicas: 0,3   Isr: 0,3
       Topic: topic01  Partition: 4    Leader: 1       Replicas: 1,4   Isr: 1,4
       Topic: topic01  Partition: 5    Leader: 2       Replicas: 2,5   Isr: 2,5
       Topic: topic01  Partition: 6    Leader: 3       Replicas: 3,1   Isr: 3,1
       Topic: topic01  Partition: 7    Leader: 4       Replicas: 4,2   Isr: 4,2
       Topic: topic01  Partition: 8    Leader: 5       Replicas: 5,3   Isr: 5,3
       Topic: topic01  Partition: 9    Leader: 0       Replicas: 0,4   Isr: 0,4
       Topic: topic01  Partition: 10   Leader: 1       Replicas: 1,5   Isr: 1,5
       Topic: topic01  Partition: 11   Leader: 2       Replicas: 2,0   Isr: 2,0

[ producer with pykafka ]

log into ubuntu/0 node.

$ juju ssh ubuntu/0

ubuntu@juju-0a148b-28:~$ cat pykafka-producer.py
#!/usr/bin/python3

from pykafka import KafkaClient

brokers_list = "10.219.27.133:9092,10.219.27.167:9092,10.219.27.124:9092,10.219.27.8:9092,10.219.27.231:9092,10.219.27.77:9092"
zk_list = "10.219.27.221:2181,10.219.27.134:2181,10.219.27.58:2181"

client = KafkaClient(hosts=brokers_list)
#print(client.topics)

topic = client.topics[b'topic01']

with topic.get_sync_producer() as producer:
   for i in range(4):
       producer.produce(b"test message")

send massages
ubuntu@juju-0a148b-28:~$ python3 pykafka-producer.py
ubuntu@juju-0a148b-28:~$

read data with kafka-console-consumer.sh
$ juju ssh kafka/0

ubuntu@juju-0a148b-30:~$ /usr/bin/kafka-console-consumer.sh --from-beginning --topic topic01 --zookeeper 10.219.27.221:2181
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
test message
test message
test message
test message


[ consumer with pykafka ]

log into ubuntu/1 node.
$ juju ssh ubuntu/1


ubuntu@juju-0a148b-29:~$ cat pykafka-simple-consumer.py
#!/usr/bin/python3

from pykafka import KafkaClient

brokers_list = "10.219.27.133:9092,10.219.27.167:9092,10.219.27.124:9092,10.219.27.8:9092,10.219.27.231:9092,10.219.27.77:9092"
zk_list = "10.219.27.221:2181,10.219.27.134:2181,10.219.27.58:2181"

client = KafkaClient(hosts=brokers_list)
#print(client.topics)

topic = client.topics[b'topic01']
consumer = topic.get_simple_consumer()

for message in consumer:
   if message is not False:
       print(message.offset, message.value)

read data
ubuntu@juju-0a148b-29:~$ python3 pykafka-simple-consumer.py
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
1 b'test message'
1 b'test message'
1 b'test message'
0 b'test message'
0 b'test message'
1 b'test message'
0 b'test message'
2 b'test message'
3 b'test message'
4 b'test message'
0 b'test message'
1 b'test message'
2 b'test message'
5 b'test message'
6 b'test message'
0 b'test message'
1 b'test message'


No comments:

Post a Comment

Note: Only a member of this blog may post a comment.