Here are sample scripts of Kafka producer/consumer with Pykafka.
Reference
https://jujucharms.com/u/hazmat/kafka/
https://github.com/Parsely/pykafka
set up kafka and zookeeper clusters with juju.
$ juju --version
2.0.2-xenial-amd64
$ juju deploy kafka -n 6
$ juju deploy zookeeper -n 3
$ juju add-relation kafka zookeeper
$ juju status --format=short
- kafka/0: 10.219.27.133 (agent:idle, workload:active) 9092/tcp
- kafka/1: 10.219.27.167 (agent:idle, workload:active) 9092/tcp
- kafka/2: 10.219.27.124 (agent:idle, workload:active) 9092/tcp
- kafka/3: 10.219.27.8 (agent:idle, workload:active) 9092/tcp
- kafka/4: 10.219.27.231 (agent:idle, workload:active) 9092/tcp
- kafka/5: 10.219.27.77 (agent:idle, workload:active) 9092/tcp
- ubuntu/0: 10.219.27.105 (agent:idle, workload:active)
- ubuntu/1: 10.219.27.17 (agent:idle, workload:active)
- zookeeper/0: 10.219.27.221 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- zookeeper/1: 10.219.27.134 (agent:idle, workload:active) 2181/tcp, 9998/tcp
- zookeeper/2: 10.219.27.58 (agent:idle, workload:active) 2181/tcp, 9998/tcp
$ juju run --unit=kafka/0 "systemctl list-unit-files --type=service | grep kafka"
jujud-unit-kafka-0.service enabled
juju run --unit=kafka/0 "dpkg -l | grep kafka"
ii kafka 0.8.1.1-1 all Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.
ii kafka-server 0.8.1.1-1 all Bundles the init script for kafka server.
$ juju run --unit=kafka/0 "ls /usr/bin | egrep 'kafka|zoo'"
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-run-class.sh
kafka-topics.sh
zookeeper-client
zookeeper-server
zookeeper-server-cleanup
zookeeper-server-initialize
$ juju run --unit=zookeeper/0 "systemctl list-unit-files --type=service | grep zookeeper"
jujud-unit-zookeeper-0.service enabled
hattori@ubuntu05:~$ juju run --unit=zookeeper/0 "dpkg -l | grep zoo"
ii zookeeper 3.4.6-1 all A high-performance coordination service for distributed applications.
ii zookeeper-server 3.4.6-1 all The Hadoop Zookeeper server
|
All nodes are running within LXD containers.
$ lxc list juju- -c n
+----------------+
| NAME |
+----------------+
| juju-0a148b-28 |
+----------------+
| juju-0a148b-29 |
+----------------+
| juju-0a148b-30 |
+----------------+
| juju-0a148b-31 |
+----------------+
| juju-0a148b-32 |
+----------------+
|
for pykafka nodes. one for producer, one for consumer
install pykafka.
$ juju deploy ubuntu -n 2
$ juju ssh ubuntu/0
$ sudo pip install pykafka
$ juju ssh ubuntu/1
$ sudo pip install pykafka
|
- create a topic
$ juju run --unit=kafka/0 "/usr/bin/kafka-topics.sh --zookeeper 10.219.27.221:2181 --create --topic topic01 --partitions 12 --replication-factor 2"
$ juju run --unit=kafka/0 "/usr/bin/kafka-topics.sh --zookeeper 10.219.27.221:2181 --describe"
Topic:topic01 PartitionCount:12 ReplicationFactor:2 Configs:
Topic: topic01 Partition: 0 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: topic01 Partition: 1 Leader: 4 Replicas: 4,1 Isr: 4,1
Topic: topic01 Partition: 2 Leader: 5 Replicas: 5,2 Isr: 5,2
Topic: topic01 Partition: 3 Leader: 0 Replicas: 0,3 Isr: 0,3
Topic: topic01 Partition: 4 Leader: 1 Replicas: 1,4 Isr: 1,4
Topic: topic01 Partition: 5 Leader: 2 Replicas: 2,5 Isr: 2,5
Topic: topic01 Partition: 6 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: topic01 Partition: 7 Leader: 4 Replicas: 4,2 Isr: 4,2
Topic: topic01 Partition: 8 Leader: 5 Replicas: 5,3 Isr: 5,3
Topic: topic01 Partition: 9 Leader: 0 Replicas: 0,4 Isr: 0,4
Topic: topic01 Partition: 10 Leader: 1 Replicas: 1,5 Isr: 1,5
Topic: topic01 Partition: 11 Leader: 2 Replicas: 2,0 Isr: 2,0
|
[ producer with pykafka ]
log into ubuntu/0 node.
$ juju ssh ubuntu/0
ubuntu@juju-0a148b-28:~$ cat pykafka-producer.py
#!/usr/bin/python3
from pykafka import KafkaClient
brokers_list = "10.219.27.133:9092,10.219.27.167:9092,10.219.27.124:9092,10.219.27.8:9092,10.219.27.231:9092,10.219.27.77:9092"
zk_list = "10.219.27.221:2181,10.219.27.134:2181,10.219.27.58:2181"
client = KafkaClient(hosts=brokers_list)
#print(client.topics)
topic = client.topics[b'topic01']
with topic.get_sync_producer() as producer:
for i in range(4):
producer.produce(b"test message")
|
send massages
ubuntu@juju-0a148b-28:~$ python3 pykafka-producer.py
ubuntu@juju-0a148b-28:~$
|
read data with kafka-console-consumer.sh
$ juju ssh kafka/0
ubuntu@juju-0a148b-30:~$ /usr/bin/kafka-console-consumer.sh --from-beginning --topic topic01 --zookeeper 10.219.27.221:2181
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
test message
test message
test message
test message
|
[ consumer with pykafka ]
log into ubuntu/1 node.
$ juju ssh ubuntu/1
|
ubuntu@juju-0a148b-29:~$ cat pykafka-simple-consumer.py
#!/usr/bin/python3
from pykafka import KafkaClient
brokers_list = "10.219.27.133:9092,10.219.27.167:9092,10.219.27.124:9092,10.219.27.8:9092,10.219.27.231:9092,10.219.27.77:9092"
zk_list = "10.219.27.221:2181,10.219.27.134:2181,10.219.27.58:2181"
client = KafkaClient(hosts=brokers_list)
#print(client.topics)
topic = client.topics[b'topic01']
consumer = topic.get_simple_consumer()
for message in consumer:
if message is not False:
print(message.offset, message.value)
|
read data
ubuntu@juju-0a148b-29:~$ python3 pykafka-simple-consumer.py
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
0 b'test message'
1 b'test message'
1 b'test message'
1 b'test message'
0 b'test message'
0 b'test message'
1 b'test message'
0 b'test message'
2 b'test message'
3 b'test message'
4 b'test message'
0 b'test message'
1 b'test message'
2 b'test message'
5 b'test message'
6 b'test message'
0 b'test message'
1 b'test message'
|
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.