MQ读写

cooolr 于 2022-09-16 发布

监听读取

import pika
def listen_mq_news_into_recommend():
    """
    监听加入推荐,创建redis-key
    :return:
    """
    conn = pika.BlockingConnection(pika.URLParameters(RABBITMQ_DSN))
    channel = conn.channel()
    channel.queue_declare(queue='recommend.queue', durable=True)
    channel.queue_bind(queue='recommend.queue', exchange='recommen.exchange', routing_key='finish')

    def callback(ch, method, properties, body):
        logger.info("receive join recommend mq message : {}".format(body))
        #news_id = int(body)
        #self.create_redis_key(news_id)
        #self.news_into_redis_callback(news_id)

    channel.basic_consume(queue='recommend.queue.finish', on_message_callback=callback, auto_ack=True)
    logger.info("{} start listen join recommend !".format(datetime.datetime.now()))
    channel.start_consuming()

获取队列数量

import pika
RABBITMQ_DSN = "amqp://123456@192.168.1.1:8080/"
conn = pika.BlockingConnection(pika.URLParameters(RABBITMQ_DSN))
channel = conn.channel()
queue = channel.queue_declare(queue='bigdata.minio.queue', durable=True)
print(queue.method.message_count)
conn.close()

发送消息

import pika
RABBITMQ_DSN = "amqp://123456@192.168.1.1:8080/"
conn = pika.BlockingConnection(pika.URLParameters(RABBITMQ_DSN))
channel = conn.channel()
channel.queue_declare(queue='chaingame.news.approve', durable=True)

channel.basic_publish(exchange='exchange.chaingame.news', routing_key="chaingame.news.approve", body=b'4847')