composer require php-amqplib/php-amqplib 3.1.2
<?php
namespace app\ospay\model;
use app\common\library\helper;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPAbstractCollection;
use PhpAmqpLib\Wire\AMQPTable;
use think\facade\Db;
use think\Model;
use app\ospay\serve\LvServe;
/**
* 支付系统
* Class OsPay
* @package app\ospay\model
*/
class OsQueen extends Model
{
public static $QUEEN_KEY = 'OS_PAY_NOTIFY_LIST';
public static $CON;
public static function getQueenCon()
{
// 创建链接
$connection = new AMQPSocketConnection(
config('rabbitmq.host'), config('rabbitmq.port'),
config('rabbitmq.login'), config('rabbitmq.password'),
config('rabbitmq.vhost'), false,
'AMQPLAIN', null, 'en_US',
3, false, 3, 60
);
return $connection;
}
public static function getChan(AMQPSocketConnection $connection)
{
$arg = new AMQPTable();
$arg->set('x-max-length', 100*10000); //缓冲队列 最长100万
$arg->set('x-max-length-bytes', 512 * 1000 * 1000); //单位字节(byte) 512M
$channel = $connection->channel();
$channel->queue_declare(OsQueen::$QUEEN_KEY, false, true, false, false, false, $arg);
return $channel;
}
/**
* 添加到队列
*
*/
public static function sendQueen(AMQPChannel $channel,$data)
{
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', self::$QUEEN_KEY);
}
/**
* 消费者
* @throws \ErrorException
*/
public static function consume(string $tag)
{
echo $tag;
// 创建链接
$con=OsQueen::getQueenCon();
$chan=OsQueen::getChan($con);
$func = function (AMQPMessage $msg) {
OsQueen::notifyOrder($msg->getBody());
$msg->ack();
};
$chan->basic_qos(null, 1, null);
$chan->basic_consume(self::$QUEEN_KEY, $tag, false, false, false, false, $func);
while($chan->is_open()) {
$chan->wait();
}
}
public static function notifyOrder($queen_id)
{
$order = OsQueen::find($queen_id);
$config=OsShopAttach::where('shop_id',$order['shop_id'])->cache(600)->find();
if(!empty($order)&&$order['notify_status']==0){
$data = [
'code' => $order['code'],
'shop_id' => $order['shop_id'],
'out_trade_no' => $order['out_trade_no'],
'order_no' => $order['order_no'],
'total_fee' => $order['total_fee'],
];
$data['sign'] = LvServe::sign($data,$config['md5_key']);
$data['attach'] = $order['attach'];
$data['msg'] = $order['msg'];
$w[] = [
'id', '=', $order['id']
];
try {
$resp = LvServe::curl($order['notify_url'], $data, 6);
$up = [];
$up['notify_status'] = strtoupper($resp) == 'SUCCESS' ? 1 : 0;
$up['update_at'] = time();
$up['notify_num']=Db::raw('notify_num+1');
OsQueen::where($w)->update($up);
} catch (\Exception $e) {
log_write(helper::jsonEncode($e->getMessage()), 'error');
$up['notify_status'] = 0;
$up['update_at'] = time();
OsQueen::where($w)->update($up);
return;
}
}
}
}