您好,欢迎来到爱go旅游网。
搜索
您的当前位置:首页docker安装kafka及使用

docker安装kafka及使用

来源:爱go旅游网
docker安装kafka及使⽤

⼀、下载镜像docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka

⼆、先启动zookeeper#单机⽅式

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

三、启动kafka#单机⽅式

docker run -d --name kafka \\-p 9092:9092 \\

-e KAFKA_BROKER_ID=0 \\

-e KAFKA_ZOOKEEPER_CONNECT=10.0.0.101:2181 \\

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.101:9092 \\-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

四、创建⼀个topic(使⽤代码次步可省略)#进⼊容器

docker exec -it ${CONTAINER ID} /bin/bashcd opt/bin

#单机⽅式:创建⼀个主题

bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka#运⾏⼀个⽣产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka#运⾏⼀个消费者

bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning

五、kafka设置分区数量#分区数量的作⽤:有多少分区就能负载多少个消费者,⽣产者会⾃动分配给分区数据,每个消费者只消费⾃⼰分区的数据,每个分区有⾃⼰独⽴的offset#进⼊kafka容器

vi opt/kafka/config/server.properties修改run.partitions=2#退出容器ctrl+p+q#重启容器

docker restart kafka

#修改指定topic

./kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --topic topicname

六、python代码#⽣产者

from kafka import KafkaProducerimport json

import datetime

topic='test'

producer = KafkaProducer(bootstrap_servers='10.0.0.101:9092',value_serializer=lambda m:json.dumps(m).encode(\"utf-8\")) # 连接kafka# 参数bootstrap_servers:指定kafka连接地址

# 参数value_serializer:指定序列化的⽅式,我们定义json来序列化数据,当字典传⼊kafka时⾃动转换成bytes# ⽤户密码登⼊参数

# security_protocol=\"SASL_PLAINTEXT\"# sasl_mechanism=\"PLAIN\"# sasl_plain_username=\"maple\"# sasl_plain_password=\"maple\"

for i in range(1000):

data={\"num\":i,\"ts\":datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")} producer.send(topic,data)producer.close()

#消费者

from kafka import KafkaConsumerimport time

topic = 'test'

consumer = KafkaConsumer(topic, bootstrap_servers=['10.0.0.101:9092'], group_id=\"test\# 参数bootstrap_servers:指定kafka连接地址

# 参数group_id:如果2个程序的topic和group_id相同,那么他们读取的数据不会重复,2个程序的topic相同,group_id不同,那么他们各⾃消费相同的数据,互不影响

# 参数auto_offset_reset:默认为latest表⽰offset设置为当前程序启动时的数据位置,earliest表⽰offset设置为0,在你的group_id第⼀次运⾏时,还没有offset的时候,给你设定初始offset。⼀旦group_id有了offset,那么此参数就不起作⽤了

for msg in consumer:

recv = \"%s:%d:%d: key=%s value=%s\" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print(recv)

# time.sleep(1)

#运⾏3个消费者结果

test:0:3212: key=None value=b'{\"num\": 981, \"ts\": \"2021-02-23 16:38:14est:0:3213: key=None value=b'{\"num\": 982, \"ts\": \"2021-02-23 16:38:14est:0:3214: key=None value=b'{\"num\": 987, \"ts\": \"2021-02-23 16:38:14est:0:3215: key=None value=b'{\"num\": 997, \"ts\": \"2021-02-23 16:38:14est:0:3216: key=None value=b'{\"num\": 998, \"ts\": \"2021-02-23 16:38:14est:0:3217: key=None value=b'{\"num\": 999, \"ts\": \"2021-02-23 16:38:14est:1:353: key=None value=b'{\"num\": 970, \"ts\": \"2021-02-23 16:38:14est:1:3: key=None value=b'{\"num\": 977, \"ts\": \"2021-02-23 16:38:14est:1:355: key=None value=b'{\"num\": 978, \"ts\": \"2021-02-23 16:38:14est:1:356: key=None value=b'{\"num\": 979, \"ts\": \"2021-02-23 16:38:14est:1:357: key=None value=b'{\"num\": 984, \"ts\": \"2021-02-23 16:38:14est:1:358: key=None value=b'{\"num\": 985, \"ts\": \"2021-02-23 16:38:14est:1:359: key=None value=b'{\"num\": 994, \"ts\": \"2021-02-23 16:38:14est:2:317: key=None value=b'{\"num\": 9, \"ts\": \"2021-02-23 16:38:14est:2:318: key=None value=b'{\"num\": 990, \"ts\": \"2021-02-23 16:38:14est:2:319: key=None value=b'{\"num\": 991, \"ts\": \"2021-02-23 16:38:14est:2:320: key=None value=b'{\"num\": 992, \"ts\": \"2021-02-23 16:38:14est:2:321: key=None value=b'{\"num\": 993, \"ts\": \"2021-02-23 16:38:14est:2:322: key=None value=b'{\"num\": 995, \"ts\": \"2021-02-23 16:38:14est:2:323: key=None value=b'{\"num\": 996, \"ts\": \"2021-02-23 16:38:14\

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- igat.cn 版权所有 赣ICP备2024042791号-1

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务