-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #7 from Memcrab/redis-pool
create Pool, RPool, Redis classes
- Loading branch information
Showing
3 changed files
with
294 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
<?php | ||
declare(strict_types=1); | ||
|
||
namespace Memcrab\Cache; | ||
|
||
use Monolog\Logger; | ||
use OpenSwoole\Coroutine; | ||
use OpenSwoole\Coroutine\Channel; | ||
|
||
abstract class Pool extends Channel | ||
{ | ||
protected Logger $ErrorHandler; | ||
protected int $waitTimeoutPool; # maximum waiting time from the channel | ||
|
||
const DEFAULT_CAPACITY = 16; | ||
const PING_INTERVAL = 5; | ||
|
||
abstract protected function connect(); | ||
|
||
abstract protected function disconnect($connection); | ||
|
||
abstract protected function checkConnectionForErrors($connection); | ||
|
||
abstract protected function error(\Exception $e); | ||
|
||
public function setErrorHandler(Logger $ErrorHandler): void | ||
{ | ||
$this->ErrorHandler = $ErrorHandler; | ||
} | ||
|
||
public function setWaitTimeoutPool(int $waitTimeoutPool): void | ||
{ | ||
$this->waitTimeoutPool = $waitTimeoutPool; | ||
} | ||
|
||
public function setConnections(): bool | ||
{ | ||
try { | ||
$this->fillConnections(); | ||
$this->startPingConnections(); | ||
return true; | ||
} catch (\Exception $e) { | ||
$this->error($e); | ||
return false; | ||
} | ||
} | ||
|
||
public function get() | ||
{ | ||
$connection = $this->getConnectionFromPool(); | ||
if (!$connection) { | ||
throw new \Exception("Cant connect to " . get_called_class() . '.', 500); | ||
} | ||
|
||
return $connection; | ||
} | ||
|
||
public function put($connection): void | ||
{ | ||
if ($this->checkConnectionForErrors($connection)) { | ||
$this->disconnect($connection); | ||
return; | ||
} | ||
|
||
if ($this->length() >= $this->capacity) { | ||
$this->disconnect($connection); | ||
return; | ||
} | ||
|
||
if (!$this->push($connection, $this->waitTimeoutPool)) { | ||
$this->disconnect($connection); | ||
} | ||
} | ||
|
||
|
||
private function fillConnections(): void | ||
{ | ||
$diff = $this->capacity - $this->length(); | ||
for ($i = 0; $i < $diff; $i++) { | ||
$connection = $this->connect(); | ||
if (!$connection) { | ||
continue; | ||
} | ||
if (!$this->push($connection, $this->waitTimeoutPool)) { | ||
$this->disconnect($connection); | ||
} | ||
} | ||
} | ||
|
||
private function getConnectionFromPool() | ||
{ | ||
$connection = false; | ||
if (!$this->isEmpty()) { | ||
$connection = $this->pop($this->waitTimeoutPool); | ||
} | ||
|
||
if (!$connection) { | ||
$connection = $this->connect(); | ||
} | ||
|
||
return $connection; | ||
} | ||
|
||
private function startPingConnections(): void | ||
{ | ||
Coroutine::create(function () { | ||
while (true) { | ||
Coroutine::sleep(self::PING_INTERVAL); | ||
$this->fillConnections(); | ||
$connection = $this->pop($this->waitTimeoutPool); | ||
|
||
if (!$connection) continue; | ||
|
||
try { | ||
$connection->heartbeat(); | ||
} catch (\Throwable $e) { | ||
$this->disconnect($connection); | ||
$connection = $this->connect(); | ||
} | ||
|
||
if ($connection) $this->put($connection); | ||
} | ||
}); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
<?php | ||
declare(strict_types=1); | ||
|
||
namespace Memcrab\Cache; | ||
|
||
use Monolog\Logger; | ||
use OpenSwoole\Core\Coroutine\Client\RedisConfig; | ||
|
||
class RPool extends Pool | ||
{ | ||
private static self $instance; | ||
private RedisConfig $RedisConfig; | ||
|
||
private function __clone() | ||
{ | ||
} | ||
|
||
public function __wakeup() | ||
{ | ||
} | ||
|
||
public static function obj(): self | ||
{ | ||
return self::$instance; | ||
} | ||
|
||
public static function declareConnection( | ||
string $host, | ||
int $port, | ||
string $password, | ||
int $database, | ||
int $waitTimeout, | ||
int $waitTimeoutPool, | ||
Logger $ErrorHandler, | ||
int $capacity = self::DEFAULT_CAPACITY, | ||
): void | ||
{ | ||
self::$instance = new self($capacity); | ||
self::$instance->RedisConfig = (new RedisConfig()) | ||
->withHost($host) | ||
->withPort($port) | ||
->withAuth($password) | ||
->withDbIndex($database) | ||
->withTimeout($waitTimeout); | ||
self::$instance->setWaitTimeoutPool($waitTimeoutPool); | ||
self::$instance->setErrorHandler($ErrorHandler); | ||
} | ||
|
||
protected function error(\Exception $e): void | ||
{ | ||
$this->ErrorHandler->error('RPool Exception: ' . $e); | ||
} | ||
|
||
protected function connect(): Redis|bool | ||
{ | ||
try { | ||
return (new Redis($this->RedisConfig, $this->ErrorHandler)); | ||
} catch (\Exception $e) { | ||
$this->error($e); | ||
return false; | ||
} | ||
} | ||
|
||
protected function disconnect($connection): bool | ||
{ | ||
try { | ||
$connection->close(); | ||
return true; | ||
} catch (\Exception $e) { | ||
$this->error($e); | ||
return false; | ||
} | ||
} | ||
|
||
protected function checkConnectionForErrors($connection): bool | ||
{ | ||
return !$connection->isConnected(); | ||
} | ||
|
||
public function __destruct() | ||
{ | ||
while (true) { | ||
if (!$this->isEmpty()) { | ||
$connection = $this->pop($this->waitTimeoutPool); | ||
$this->disconnect($connection); | ||
} else { | ||
break; | ||
} | ||
} | ||
$this->close(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
<?php | ||
declare(strict_types=1); | ||
|
||
namespace Memcrab\Cache; | ||
|
||
use Monolog\Logger; | ||
use OpenSwoole\Core\Coroutine\Client\RedisConfig; | ||
|
||
class Redis extends \Redis | ||
{ | ||
private Logger $ErrorHandler; | ||
|
||
public function __construct(RedisConfig $RedisConfig, Logger $ErrorHandler) | ||
{ | ||
$this->ErrorHandler = $ErrorHandler; | ||
|
||
try { | ||
$this->connect($RedisConfig->getHost(), $RedisConfig->getPort(), $RedisConfig->getTimeout()); | ||
} catch (\Exception $e) { | ||
throw new \RedisException("Cant connect to Redis. " . $e, 500); | ||
} | ||
|
||
try { | ||
$this->auth($RedisConfig->getAuth()); | ||
} catch (\Exception $e) { | ||
throw new \RedisException("Can't authenticate Redis user by password. " . $e, 500); | ||
} | ||
|
||
try { | ||
$this->select($RedisConfig->getDbIndex()); | ||
} catch (\Exception $e) { | ||
throw new \RedisException("Can't select Redis database with index: " . $RedisConfig->getDbIndex() . ' ' . $e, 500); | ||
} | ||
} | ||
|
||
public function __destruct() | ||
{ | ||
try { | ||
$this->close(); | ||
} catch (\Throwable $e) { | ||
$this->ErrorHandler->error('Redis disconnect error: ' . $e); | ||
} | ||
} | ||
|
||
private function error(\Exception $e): void | ||
{ | ||
$this->ErrorHandler->error('Redis Exception: ' . $e); | ||
} | ||
|
||
public function heartbeat(): void | ||
{ | ||
$this->ping(); | ||
} | ||
|
||
public function getLastNKeys($minuteCounter, $keyNumber, $keyPrefix, &$resultArray): void | ||
{ | ||
try { | ||
for ($i = 1; $i <= $keyNumber; $i++) { | ||
if ($this->exists($keyPrefix . $minuteCounter)) { | ||
$resultRaw = $this->get($keyPrefix . $minuteCounter); | ||
if (!is_bool($resultRaw)) { | ||
$resultArray[] = unserialize($resultRaw); | ||
} | ||
} | ||
|
||
if ($minuteCounter == 0) { | ||
$minuteCounter = 9; | ||
} else { | ||
$minuteCounter--; | ||
} | ||
} | ||
} catch (\Exception $e) { | ||
$this->error($e); | ||
throw $e; | ||
} | ||
} | ||
} |