监听读取
import pika
def listen_mq_news_into_recommend():
"""
监听加入推荐,创建redis-key
:return:
"""
conn = pika.BlockingConnection(pika.URLParameters(RABBITMQ_DSN))
channel = conn.channel()
channel.queue_declare(queue='recommend.queue', durable=True)
channel.queue_bind(queue='recommend.queue', exchange='recommen.exchange', routing_key='finish')
def callback(ch, method, properties, body):
logger.info("receive join recommend mq message : {}".format(body))
#news_id = int(body)
#self.create_redis_key(news_id)
#self.news_into_redis_callback(news_id)
channel.basic_consume(queue='recommend.queue.finish', on_message_callback=callback, auto_ack=True)
logger.info("{} start listen join recommend !".format(datetime.datetime.now()))
channel.start_consuming()
获取队列数量
import pika
RABBITMQ_DSN = "amqp://123456@192.168.1.1:8080/"
conn = pika.BlockingConnection(pika.URLParameters(RABBITMQ_DSN))
channel = conn.channel()
queue = channel.queue_declare(queue='bigdata.minio.queue', durable=True)
print(queue.method.message_count)
conn.close()
发送消息
import pika
RABBITMQ_DSN = "amqp://123456@192.168.1.1:8080/"
conn = pika.BlockingConnection(pika.URLParameters(RABBITMQ_DSN))
channel = conn.channel()
channel.queue_declare(queue='chaingame.news.approve', durable=True)
channel.basic_publish(exchange='exchange.chaingame.news', routing_key="chaingame.news.approve", body=b'4847')