将TDMQ使用配置写入到一个配置文件中: config.py
topic = ''
tdmq_url = ''
tdmq_secret_key = ''
编写生产者发送消息:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/7/24 16:28
# 模拟TDMQ生产者发送消息
import json
import pulsar
from conf.config import tdmq_secret_key, tdmq_url, topic
def produser_send_msg():
"""生产者发送消息"""
tdmq_client = pulsar.Client(
authentication=pulsar.AuthenticationToken(tdmq_secret_key),
service_url=tdmq_url
)
producer = tdmq_client.create_producer(
topic=topic
)
send_data = json.dumps({"uniq_id": "12345670", "project_id": 2})
producer.send(
send_data.encode('utf-8'),
properties={},
partition_key=''
)
if __name__ == '__main__':
produser_send_msg()
编写两个消费者, 消费消息(第二个只需要将subscription_name改为sub_topic2)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/7/24 18:55
# @Author : shanwen.ren
import pulsar
from conf.config import tdmq_secret_key, tdmq_url, post_fund_topic
tdmq_client = pulsar.Client(
authentication=pulsar.AuthenticationToken(tdmq_secret_key),
service_url=tdmq_url
)
consumer = tdmq_client.subscribe(
# topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
topic=topic,
# 订阅名称
subscription_name='sub_topic1'
)
def produser_send_msg():
"""消费者消费消息"""
# 获取消息
msg = consumer.receive()
try:
# 模拟业务
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# 消费成功,回复ack
consumer.acknowledge(msg)
except:
# 消费失败,消息将会重新投递
consumer.negative_acknowledge(msg)
if __name__ == '__main__':
while True:
produser_send_msg()
开启两个消费者进程。
启动生产者发送消息
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- igat.cn 版权所有 赣ICP备2024042791号-1
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务