From 8806c9c751d3408c463439748148c478c8a6f43c Mon Sep 17 00:00:00 2001 From: rost Date: Mon, 21 Aug 2023 10:10:42 +0300 Subject: [PATCH] create Pool, RPool, Redis classes --- src/Pool.php | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/RPool.php | 92 +++++++++++++++++++++++++++++++++++++ src/Redis.php | 77 +++++++++++++++++++++++++++++++ 3 files changed, 294 insertions(+) create mode 100644 src/Pool.php create mode 100644 src/RPool.php create mode 100644 src/Redis.php diff --git a/src/Pool.php b/src/Pool.php new file mode 100644 index 0000000..a8aa2dd --- /dev/null +++ b/src/Pool.php @@ -0,0 +1,125 @@ +ErrorHandler = $ErrorHandler; + } + + public function setWaitTimeoutPool(int $waitTimeoutPool): void + { + $this->waitTimeoutPool = $waitTimeoutPool; + } + + public function setConnections(): bool + { + try { + $this->fillConnections(); + $this->startPingConnections(); + return true; + } catch (\Exception $e) { + $this->error($e); + return false; + } + } + + public function get() + { + $connection = $this->getConnectionFromPool(); + if (!$connection) { + throw new \Exception("Cant connect to " . get_called_class() . '.', 500); + } + + return $connection; + } + + public function put($connection): void + { + if ($this->checkConnectionForErrors($connection)) { + $this->disconnect($connection); + return; + } + + if ($this->length() >= $this->capacity) { + $this->disconnect($connection); + return; + } + + if (!$this->push($connection, $this->waitTimeoutPool)) { + $this->disconnect($connection); + } + } + + + private function fillConnections(): void + { + $diff = $this->capacity - $this->length(); + for ($i = 0; $i < $diff; $i++) { + $connection = $this->connect(); + if (!$connection) { + continue; + } + if (!$this->push($connection, $this->waitTimeoutPool)) { + $this->disconnect($connection); + } + } + } + + private function getConnectionFromPool() + { + $connection = false; + if (!$this->isEmpty()) { + $connection = $this->pop($this->waitTimeoutPool); + } + + if (!$connection) { + $connection = $this->connect(); + } + + return $connection; + } + + private function startPingConnections(): void + { + Coroutine::create(function () { + while (true) { + Coroutine::sleep(self::PING_INTERVAL); + $this->fillConnections(); + $connection = $this->pop($this->waitTimeoutPool); + + if (!$connection) continue; + + try { + $connection->heartbeat(); + } catch (\Throwable $e) { + $this->disconnect($connection); + $connection = $this->connect(); + } + + if ($connection) $this->put($connection); + } + }); + } +} \ No newline at end of file diff --git a/src/RPool.php b/src/RPool.php new file mode 100644 index 0000000..9880b5d --- /dev/null +++ b/src/RPool.php @@ -0,0 +1,92 @@ +RedisConfig = (new RedisConfig()) + ->withHost($host) + ->withPort($port) + ->withAuth($password) + ->withDbIndex($database) + ->withTimeout($waitTimeout); + self::$instance->setWaitTimeoutPool($waitTimeoutPool); + self::$instance->setErrorHandler($ErrorHandler); + } + + protected function error(\Exception $e): void + { + $this->ErrorHandler->error('RPool Exception: ' . $e); + } + + protected function connect(): Redis|bool + { + try { + return (new Redis($this->RedisConfig, $this->ErrorHandler)); + } catch (\Exception $e) { + $this->error($e); + return false; + } + } + + protected function disconnect($connection): bool + { + try { + $connection->close(); + return true; + } catch (\Exception $e) { + $this->error($e); + return false; + } + } + + protected function checkConnectionForErrors($connection): bool + { + return !$connection->isConnected(); + } + + public function __destruct() + { + while (true) { + if (!$this->isEmpty()) { + $connection = $this->pop($this->waitTimeoutPool); + $this->disconnect($connection); + } else { + break; + } + } + $this->close(); + } +} \ No newline at end of file diff --git a/src/Redis.php b/src/Redis.php new file mode 100644 index 0000000..bc22137 --- /dev/null +++ b/src/Redis.php @@ -0,0 +1,77 @@ +ErrorHandler = $ErrorHandler; + + try { + $this->connect($RedisConfig->getHost(), $RedisConfig->getPort(), $RedisConfig->getTimeout()); + } catch (\Exception $e) { + throw new \RedisException("Cant connect to Redis. " . $e, 500); + } + + try { + $this->auth($RedisConfig->getAuth()); + } catch (\Exception $e) { + throw new \RedisException("Can't authenticate Redis user by password. " . $e, 500); + } + + try { + $this->select($RedisConfig->getDbIndex()); + } catch (\Exception $e) { + throw new \RedisException("Can't select Redis database with index: " . $RedisConfig->getDbIndex() . ' ' . $e, 500); + } + } + + public function __destruct() + { + try { + $this->close(); + } catch (\Throwable $e) { + $this->ErrorHandler->error('Redis disconnect error: ' . $e); + } + } + + private function error(\Exception $e): void + { + $this->ErrorHandler->error('Redis Exception: ' . $e); + } + + public function heartbeat(): void + { + $this->ping(); + } + + public function getLastNKeys($minuteCounter, $keyNumber, $keyPrefix, &$resultArray): void + { + try { + for ($i = 1; $i <= $keyNumber; $i++) { + if ($this->exists($keyPrefix . $minuteCounter)) { + $resultRaw = $this->get($keyPrefix . $minuteCounter); + if (!is_bool($resultRaw)) { + $resultArray[] = unserialize($resultRaw); + } + } + + if ($minuteCounter == 0) { + $minuteCounter = 9; + } else { + $minuteCounter--; + } + } + } catch (\Exception $e) { + $this->error($e); + throw $e; + } + } +} \ No newline at end of file