php 连接rabbitmq

作者: 分类: php 时间: 2022-06-27 评论: 暂无评论

composer require php-amqplib/php-amqplib 3.1.2

<?php


namespace app\ospay\model;

use app\common\library\helper;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPAbstractCollection;
use PhpAmqpLib\Wire\AMQPTable;
use think\facade\Db;
use think\Model;
use app\ospay\serve\LvServe;

/**
 * 支付系统
 * Class OsPay
 * @package app\ospay\model
 */
class OsQueen extends Model
{
    public static $QUEEN_KEY = 'OS_PAY_NOTIFY_LIST';

    public static $CON;


    public static function getQueenCon()
    {
        // 创建链接
        $connection = new AMQPSocketConnection(
            config('rabbitmq.host'), config('rabbitmq.port'),
            config('rabbitmq.login'), config('rabbitmq.password'),
            config('rabbitmq.vhost'), false,
            'AMQPLAIN', null, 'en_US',
            3, false, 3, 60
        );
        return $connection;
    }
    public static function getChan(AMQPSocketConnection $connection)
    {
        $arg = new AMQPTable();
        $arg->set('x-max-length', 100*10000); //缓冲队列 最长100万
        $arg->set('x-max-length-bytes', 512 * 1000 * 1000);  //单位字节(byte) 512M
        $channel = $connection->channel();
        $channel->queue_declare(OsQueen::$QUEEN_KEY, false, true, false, false, false, $arg);
        return $channel;
    }

    /**
     * 添加到队列
     *
     */
    public static function sendQueen(AMQPChannel $channel,$data)
    {
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
        $channel->basic_publish($msg, '', self::$QUEEN_KEY);

    }


    /**
     * 消费者
     * @throws \ErrorException
     */
    public static function consume(string $tag)
    {
        echo $tag;
        // 创建链接
        $con=OsQueen::getQueenCon();
        $chan=OsQueen::getChan($con);
        $func = function (AMQPMessage $msg) {
            OsQueen::notifyOrder($msg->getBody());
            $msg->ack();
        };
        $chan->basic_qos(null, 1, null);
        $chan->basic_consume(self::$QUEEN_KEY, $tag, false, false, false, false, $func);
        while($chan->is_open()) {
            $chan->wait();
        }
    }

    public static function notifyOrder($queen_id)
    {
        $order = OsQueen::find($queen_id);
        $config=OsShopAttach::where('shop_id',$order['shop_id'])->cache(600)->find();
        if(!empty($order)&&$order['notify_status']==0){
            $data = [
                'code' => $order['code'],
                'shop_id' => $order['shop_id'],
                'out_trade_no' => $order['out_trade_no'],
                'order_no' => $order['order_no'],
                'total_fee' => $order['total_fee'],
            ];

            $data['sign'] = LvServe::sign($data,$config['md5_key']);
            $data['attach'] = $order['attach'];
            $data['msg'] = $order['msg'];

            $w[] = [
                'id', '=', $order['id']
            ];
            try {
                $resp = LvServe::curl($order['notify_url'], $data, 6);
                $up = [];
                $up['notify_status'] = strtoupper($resp) == 'SUCCESS' ? 1 : 0;
                $up['update_at'] = time();
                $up['notify_num']=Db::raw('notify_num+1');
                OsQueen::where($w)->update($up);
            } catch (\Exception $e) {
                log_write(helper::jsonEncode($e->getMessage()), 'error');
                $up['notify_status'] =  0;
                $up['update_at'] = time();
                OsQueen::where($w)->update($up);
                return;
            }
        }
    }

}
标签: none

订阅本站(RSS)