基于Swoole协程版本Mysql连接池

<?php
namespace ScarecrowLib\Mysql;

class MysqlPool {
   //MYSQL连接池
   private $pool;

   //最大维持连接数
   protected $maxConnectNum;

   //用户名
   protected $user;

   //用户密码
   protected $pwd;

   //数据库地址
   protected $host;

   //数据库端口
   protected $port;

   //数据库名称
   protected $database;

   //数据库链接超时时间
   protected $connectTimeOut = 30;

   //查询方法的
   protected $queryTimeout = -1;

   protected $executeTimeOut = -1;

   //数据库字符集
   protected $chartSet;

   //开启严格模式(query方法返回的数据也将转为强类型)
   protected $strict_type=false;

   //开启fetch模式, 可与pdo一样使用fetch/fetchAll逐行或获取全部结果集(4.0版本以上)
   protected $fetch_mode=true;

   //检测连接等待时间超时未操作 单位:S
   protected $checkConnectWaitTime = 3600;

   public function __construct($host='127.0.0.1', $port='3306', $user='', $pwd='', $database='', $chartSet='utf8', $maxConnectNum=3)
   {
      $this->host = $host;
      $this->port = $port;
      $this->user = $user;
      $this->pwd = $pwd;
      $this->database = $database;
      $this->chartSet = $chartSet;
      $this->maxConnectNum = $maxConnectNum;
      $this->pool = new \Swoole\Coroutine\Channel($this->maxConnectNum);
      $this->initPool();
   }

   /**
    * @return int
    */
   public function getCheckConnectWaitTime(): int
   {
      return $this->checkConnectWaitTime;
   }

   /**
    * @param int $checkConnectWaitTime
    */
   public function setCheckConnectWaitTime(int $checkConnectWaitTime)
   {
      $this->checkConnectWaitTime = $checkConnectWaitTime;
   }

   protected function initPool() {
      for ($i=0;$i<$this->maxConnectNum;$i++) {
         $this->push($this->createMysqlHandleObj());
      }
   }

   /**
    * @return mixed
    */
   public function getMaxConnectNum()
   {
      return $this->maxConnectNum;
   }

   /**
    * @param mixed $maxConnectNum
    */
   public function setMaxConnectNum($maxConnectNum)
   {
      $this->maxConnectNum = $maxConnectNum;
   }

   /**
    * @return mixed
    */
   public function getUser()
   {
      return $this->user;
   }

   /**
    * @param mixed $user
    */
   public function setUser($user)
   {
      $this->user = $user;
   }

   /**
    * @return mixed
    */
   public function getPwd()
   {
      return $this->pwd;
   }

   /**
    * @param mixed $pwd
    */
   public function setPwd($pwd)
   {
      $this->pwd = $pwd;
   }

   /**
    * @return string
    */
   public function getHost()
   {
      return $this->host;
   }

   /**
    * @param string $host
    */
   public function setHost($host)
   {
      $this->host = $host;
   }

   /**
    * @return mixed
    */
   public function getPort()
   {
      return $this->port;
   }

   /**
    * @param mixed $port
    */
   public function setPort($port)
   {
      $this->port = $port;
   }

   /**
    * @return mixed
    */
   public function getDatabase()
   {
      return $this->database;
   }

   /**
    * @param mixed $database
    */
   public function setDatabase($database)
   {
      $this->database = $database;
   }

   /**
    * @return int
    */
   public function getConnectTimeOut()
   {
      return $this->connectTimeOut;
   }

   /**
    * @param int $connectTimeOut
    */
   public function setConnectTimeOut($connectTimeOut)
   {
      $this->connectTimeOut = $connectTimeOut;
   }

   /**
    * @return int
    */
   public function getQueryTimeout()
   {
      return $this->queryTimeout;
   }

   /**
    * @param int $queryTimeout
    */
   public function setQueryTimeout($queryTimeout)
   {
      $this->queryTimeout = $queryTimeout;
   }

   /**
    * @return int
    */
   public function getExecuteTimeOut()
   {
      return $this->executeTimeOut;
   }

   /**
    * @param int $executeTimeOut
    */
   public function setExecuteTimeOut($executeTimeOut)
   {
      $this->executeTimeOut = $executeTimeOut;
   }

   /**
    * @return mixed
    */
   public function getChartSet()
   {
      return $this->chartSet;
   }

   /**
    * @param mixed $chartSet
    */
   public function setChartSet($chartSet)
   {
      $this->chartSet = $chartSet;
   }

   /**
    * @return mixed
    */
   public function getStrictType()
   {
      return $this->strict_type;
   }

   /**
    * @param mixed $strict_type
    */
   public function setStrictType($strict_type)
   {
      $this->strict_type = $strict_type;
   }

   /**
    * @return mixed
    */
   public function getFetchMode()
   {
      return $this->fetch_mode;
   }

   /**
    * @param mixed $fetch_mode
    */
   public function setFetchMode($fetch_mode)
   {
      $this->fetch_mode = $fetch_mode;
   }

   /**
    * 入池一个MYSQL添加句柄
    * @param $handleObj
    * @return bool
    */
   public function push($handleObj) {
      $obj = [
         'obj'  => $handleObj,
         'timeout'  => time() + $this->checkConnectWaitTime
      ];
      if ($this->pool->push($obj, 60)) {
         return true;
      } else {
         return false;
      }
   }

   /**
    * 弹出一个MYSQL操作句柄
    * @return bool|mixed|\Swoole\Coroutine\MySQL
    */
   public function pop() {
      $obj = $this->pool->pop(60);
      if ($obj['timeout'] > time()) {
         return $obj['obj'];
      } else {
         $obj['obj']->query('SELECT VERSION();');
         if (in_array($obj['obj']->errno, [2006,2013])) {
            $rel = $obj['obj']->connect($this->getMysqlConnectConfig());
            if ($rel) {
               return $obj['obj'];
            } else {
               unset($obj);
               return $this->createMysqlHandleObj();
            }
         } else {
            return $obj['obj'];
         }
      }
   }

   /**
    * 获取池子中MYSQL句柄数量
    * @return mixed
    */
   public function getLength() {
      return $this->pool->length();
   }

   /**
    * 创建一个MYSQL链接句柄
    * @return bool|\Swoole\Coroutine\MySQL
    */
   public function createMysqlHandleObj() {
      $handleObj = new \Swoole\Coroutine\MySQL();
      $rel = $handleObj->connect($this->getMysqlConnectConfig());
      if ($rel) {
         return $handleObj;
      } else {
         throw new \Exception('创建异步数据库连接失败', 500);
      }
   }

   /**
    * 获取数据库链接配置
    * @return array
    */
   protected function getMysqlConnectConfig() {
      return [
         'host' => $this->getHost(),
         'port' => $this->getPort(),
         'user' => $this->getUser(),
         'password' => $this->getPwd(),
         'database' => $this->getDatabase(),
         'timeout'  => $this->getConnectTimeOut(),
         'charset'  => $this->getChartSet(),
         'strict_type'  => $this->getStrictType(),
         'fetch_mode'   => $this->getFetchMode()
      ];
   }

   /**
    * 执行一个查询语句,此方法只能在没有执行事务、预编译等时使用
    * @param $sql
    * @return mixed
    */
   public function query($sql) {
      $handleObj = $this->pop();
      $relData = $handleObj->query($sql, $this->getQueryTimeout());
      $this->push($handleObj);
      return $relData;
   }

   public function __destruct()
   {
      // TODO: Implement __destruct() method.
      echo "WOBEIXIAOHUILE";
   }
}

使用方式,在swoole中的onWorkStart回调中创建此对象然后动态绑定到全局server对象上,这样就可以在任何一个处理方法回调中调用此连接池。

注意:此对象必须在协程上下文中使用,否则会有意想不到的BUG。

阅读数:344
如有疑问请与我联系:点击与我联系