⼀、下载镜像docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka
⼆、先启动zookeeper#单机⽅式
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
三、启动kafka#单机⽅式
docker run -d --name kafka \\-p 9092:9092 \\
-e KAFKA_BROKER_ID=0 \\
-e KAFKA_ZOOKEEPER_CONNECT=10.0.0.101:2181 \\
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.101:9092 \\-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
四、创建⼀个topic(使⽤代码次步可省略)#进⼊容器
docker exec -it ${CONTAINER ID} /bin/bashcd opt/bin
#单机⽅式:创建⼀个主题
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka#运⾏⼀个⽣产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka#运⾏⼀个消费者
bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning
五、kafka设置分区数量#分区数量的作⽤:有多少分区就能负载多少个消费者,⽣产者会⾃动分配给分区数据,每个消费者只消费⾃⼰分区的数据,每个分区有⾃⼰独⽴的offset#进⼊kafka容器
vi opt/kafka/config/server.properties修改run.partitions=2#退出容器ctrl+p+q#重启容器
docker restart kafka
#修改指定topic
./kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --topic topicname
六、python代码#⽣产者
from kafka import KafkaProducerimport json
import datetime
topic='test'
producer = KafkaProducer(bootstrap_servers='10.0.0.101:9092',value_serializer=lambda m:json.dumps(m).encode(\"utf-8\")) # 连接kafka# 参数bootstrap_servers:指定kafka连接地址
# 参数value_serializer:指定序列化的⽅式,我们定义json来序列化数据,当字典传⼊kafka时⾃动转换成bytes# ⽤户密码登⼊参数
# security_protocol=\"SASL_PLAINTEXT\"# sasl_mechanism=\"PLAIN\"# sasl_plain_username=\"maple\"# sasl_plain_password=\"maple\"
for i in range(1000):
data={\"num\":i,\"ts\":datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")} producer.send(topic,data)producer.close()
#消费者
from kafka import KafkaConsumerimport time
topic = 'test'
consumer = KafkaConsumer(topic, bootstrap_servers=['10.0.0.101:9092'], group_id=\"test\# 参数bootstrap_servers:指定kafka连接地址
# 参数group_id:如果2个程序的topic和group_id相同,那么他们读取的数据不会重复,2个程序的topic相同,group_id不同,那么他们各⾃消费相同的数据,互不影响
# 参数auto_offset_reset:默认为latest表⽰offset设置为当前程序启动时的数据位置,earliest表⽰offset设置为0,在你的group_id第⼀次运⾏时,还没有offset的时候,给你设定初始offset。⼀旦group_id有了offset,那么此参数就不起作⽤了
for msg in consumer:
recv = \"%s:%d:%d: key=%s value=%s\" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print(recv)
# time.sleep(1)
#运⾏3个消费者结果
test:0:3212: key=None value=b'{\"num\": 981, \"ts\": \"2021-02-23 16:38:14est:0:3213: key=None value=b'{\"num\": 982, \"ts\": \"2021-02-23 16:38:14est:0:3214: key=None value=b'{\"num\": 987, \"ts\": \"2021-02-23 16:38:14est:0:3215: key=None value=b'{\"num\": 997, \"ts\": \"2021-02-23 16:38:14est:0:3216: key=None value=b'{\"num\": 998, \"ts\": \"2021-02-23 16:38:14est:0:3217: key=None value=b'{\"num\": 999, \"ts\": \"2021-02-23 16:38:14est:1:353: key=None value=b'{\"num\": 970, \"ts\": \"2021-02-23 16:38:14est:1:3: key=None value=b'{\"num\": 977, \"ts\": \"2021-02-23 16:38:14est:1:355: key=None value=b'{\"num\": 978, \"ts\": \"2021-02-23 16:38:14est:1:356: key=None value=b'{\"num\": 979, \"ts\": \"2021-02-23 16:38:14est:1:357: key=None value=b'{\"num\": 984, \"ts\": \"2021-02-23 16:38:14est:1:358: key=None value=b'{\"num\": 985, \"ts\": \"2021-02-23 16:38:14est:1:359: key=None value=b'{\"num\": 994, \"ts\": \"2021-02-23 16:38:14est:2:317: key=None value=b'{\"num\": 9, \"ts\": \"2021-02-23 16:38:14est:2:318: key=None value=b'{\"num\": 990, \"ts\": \"2021-02-23 16:38:14est:2:319: key=None value=b'{\"num\": 991, \"ts\": \"2021-02-23 16:38:14est:2:320: key=None value=b'{\"num\": 992, \"ts\": \"2021-02-23 16:38:14est:2:321: key=None value=b'{\"num\": 993, \"ts\": \"2021-02-23 16:38:14est:2:322: key=None value=b'{\"num\": 995, \"ts\": \"2021-02-23 16:38:14est:2:323: key=None value=b'{\"num\": 996, \"ts\": \"2021-02-23 16:38:14\
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- igat.cn 版权所有 赣ICP备2024042791号-1
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务