config = $config; $this->logger = $logger; $this->logger->setLogLevel($this->config->getLogLevel() ?: self::DEFAULT_LOG_LEVEL); $timeStart = microtime(true); // Database config $host = $this->config->getHost() ?: self::DEFAULT_HOST; $port = $this->config->getPort() ?: self::DEFAULT_PORT; $pass = $this->config->getPassword() ?: null; $timeout = $this->config->getTimeout() ?: self::DEFAULT_TIMEOUT; $persistent = $this->config->getPersistentIdentifier() ?: ''; $this->_dbNum = $this->config->getDatabase() ?: self::DEFAULT_DATABASE; // General config $this->_compressionThreshold = $this->config->getCompressionThreshold() ?: self::DEFAULT_COMPRESSION_THRESHOLD; $this->_compressionLibrary = $this->config->getCompressionLibrary() ?: self::DEFAULT_COMPRESSION_LIBRARY; $this->_maxConcurrency = $this->config->getMaxConcurrency() ?: self::DEFAULT_MAX_CONCURRENCY; $this->_maxLifetime = $this->config->getMaxLifetime() ?: self::DEFAULT_MAX_LIFETIME; $this->_minLifetime = $this->config->getMinLifetime() ?: self::DEFAULT_MIN_LIFETIME; $this->_useLocking = $this->config->getDisableLocking() ?: self::DEFAULT_DISABLE_LOCKING; // Use sleep time multiplier so break time is in seconds $this->_failAfter = (int) round((1000000 / self::SLEEP_TIME) * self::FAIL_AFTER); // Connect and authenticate $this->_redis = new \Credis_Client($host, $port, $timeout, $persistent, 0, $pass); if ($this->hasConnection() == false) { throw new ConnectionFailedException('Unable to connect to Redis'); } // Destructor order cannot be predicted $this->_redis->setCloseOnDestruct(false); $this->_log( sprintf( "%s initialized for connection to %s:%s after %.5f seconds", get_class($this), $host, $port, (microtime(true) - $timeStart) ) ); } /** * Open session * * @param string $savePath ignored * @param string $sessionName ignored * @return bool * @SuppressWarnings(PHPMD.UnusedFormalParameter) */ public function open($savePath, $sessionName) { return true; } /** * @param $msg * @param $level */ private function _log($msg, $level = LoggerInterface::DEBUG) { $this->logger->log("{$this->_getPid()}: $msg", $level); } /** * Check Redis connection * * @return bool */ protected function hasConnection() { try { $this->_redis->connect(); $this->_log("Connected to Redis"); return true; } catch (\Exception $e) { $this->logger->logException($e); $this->_log('Unable to connect to Redis'); return false; } } /** * Fetch session data * * @param string $sessionId * @return string * @throws ConcurrentConnectionsExceededException */ public function read($sessionId) { // Get lock on session. Increment the "lock" field and if the new value is 1, we have the lock. $sessionId = self::SESSION_PREFIX.$sessionId; $tries = $waiting = $lock = 0; $lockPid = $oldLockPid = null; // Restart waiting for lock when current lock holder changes $detectZombies = false; $breakAfter = $this->_getBreakAfter(); $timeStart = microtime(true); $this->_log(sprintf("Attempting to take lock on ID %s", $sessionId)); $this->_redis->select($this->_dbNum); while ($this->_useLocking) { // Increment lock value for this session and retrieve the new value $oldLock = $lock; $lock = $this->_redis->hIncrBy($sessionId, 'lock', 1); // Get the pid of the process that has the lock if ($lock != 1 && $tries + 1 >= $breakAfter) { $lockPid = $this->_redis->hGet($sessionId, 'pid'); } // If we got the lock, update with our pid and reset lock and expiration if ( $lock == 1 // We actually do have the lock || ( $tries >= $breakAfter // We are done waiting and want to start trying to break it && $oldLockPid == $lockPid // Nobody else got the lock while we were waiting ) ) { $this->_hasLock = true; break; } // Otherwise, add to "wait" counter and continue else if ( ! $waiting) { $i = 0; do { $waiting = $this->_redis->hIncrBy($sessionId, 'wait', 1); } while (++$i < $this->_maxConcurrency && $waiting < 1); } // Handle overloaded sessions else { // Detect broken sessions (e.g. caused by fatal errors) if ($detectZombies) { $detectZombies = false; // Lock shouldn't be less than old lock (another process broke the lock) if ($lock > $oldLock // Lock should be old+waiting, otherwise there must be a dead process && $lock + 1 < $oldLock + $waiting ) { // Reset session to fresh state $this->_log( sprintf( "Detected zombie waiter after %.5f seconds for ID %s (%d waiting)", (microtime(true) - $timeStart), $sessionId, $waiting ), LoggerInterface::INFO ); $waiting = $this->_redis->hIncrBy($sessionId, 'wait', -1); continue; } } // Limit concurrent lock waiters to prevent server resource hogging if ($waiting >= $this->_maxConcurrency) { // Overloaded sessions get 503 errors $this->_redis->hIncrBy($sessionId, 'wait', -1); $this->_sessionWritten = true; // Prevent session from getting written $writes = $this->_redis->hGet($sessionId, 'writes'); $this->_log( sprintf( 'Session concurrency exceeded for ID %s; displaying HTTP 503 (%s waiting, %s total ' . 'requests)', $sessionId, $waiting, $writes ), LoggerInterface::WARNING ); throw new ConcurrentConnectionsExceededException(); } } $tries++; $oldLockPid = $lockPid; $sleepTime = self::SLEEP_TIME; // Detect dead lock waiters if ($tries % self::DETECT_ZOMBIES == 1) { $detectZombies = true; $sleepTime += 10000; // sleep + 0.01 seconds } // Detect dead lock holder every 10 seconds (only works on same node as lock holder) if ($tries % self::DETECT_ZOMBIES == 0) { $this->_log( sprintf( "Checking for zombies after %.5f seconds of waiting...", (microtime(true) - $timeStart) ) ); $pid = $this->_redis->hGet($sessionId, 'pid'); if ($pid && ! $this->_pidExists($pid)) { // Allow a live process to get the lock $this->_redis->hSet($sessionId, 'lock', 0); $this->_log( sprintf( "Detected zombie process (%s) for %s (%s waiting)", $pid, $sessionId, $waiting ), LoggerInterface::INFO ); continue; } } // Timeout if ($tries >= $breakAfter + $this->_failAfter) { $this->_hasLock = false; $this->_log( sprintf( 'Giving up on read lock for ID %s after %.5f seconds (%d attempts)', $sessionId, (microtime(true) - $timeStart), $tries ), LoggerInterface::NOTICE ); break; } else { $this->_log( sprintf( "Waiting %.2f seconds for lock on ID %s (%d tries, lock pid is %s, %.5f seconds elapsed)", $sleepTime / 1000000, $sessionId, $tries, $lockPid, (microtime(true) - $timeStart) ) ); usleep($sleepTime); } } $this->failedLockAttempts = $tries; // Session can be read even if it was not locked by this pid! $timeStart2 = microtime(true); list($sessionData, $sessionWrites) = $this->_redis->hMGet($sessionId, array('data','writes')); $this->_log(sprintf("Data read for ID %s in %.5f seconds", $sessionId, (microtime(true) - $timeStart2))); $this->_sessionWrites = (int) $sessionWrites; // This process is no longer waiting for a lock if ($tries > 0) { $this->_redis->hIncrBy($sessionId, 'wait', -1); } // This process has the lock, save the pid if ($this->_hasLock) { $setData = array( 'pid' => $this->_getPid(), 'lock' => 1, ); // Save request data in session so if a lock is broken we can know which page it was for debugging if (empty($_SERVER['REQUEST_METHOD'])) { $setData['req'] = $_SERVER['SCRIPT_NAME']; } else { $setData['req'] = "{$_SERVER['REQUEST_METHOD']} {$_SERVER['SERVER_NAME']}{$_SERVER['REQUEST_URI']}"; } if ($lock != 1) { $this->_log( sprintf( "Successfully broke lock for ID %s after %.5f seconds (%d attempts). Lock: %d\nLast request of ' . 'broken lock: %s", $sessionId, (microtime(true) - $timeStart), $tries, $lock, $this->_redis->hGet($sessionId, 'req') ), LoggerInterface::INFO ); } } // Set session data and expiration $this->_redis->pipeline(); if ( ! empty($setData)) { $this->_redis->hMSet($sessionId, $setData); } $this->_redis->expire($sessionId, min($this->getLifeTime(), $this->_maxLifetime)); $this->_redis->exec(); // Reset flag in case of multiple session read/write operations $this->_sessionWritten = false; return $sessionData ? (string) $this->_decodeData($sessionData) : ''; } /** * Update session * * @param string $sessionId * @param string $sessionData * @return boolean */ public function write($sessionId, $sessionData) { if ($this->_sessionWritten) { $this->_log(sprintf("Repeated session write detected; skipping for ID %s", $sessionId)); return true; } $this->_sessionWritten = true; $timeStart = microtime(true); // Do not overwrite the session if it is locked by another pid try { if($this->_dbNum) $this->_redis->select($this->_dbNum); // Prevent conflicts with other connections? if ( ! $this->_useLocking || ( ! ($pid = $this->_redis->hGet('sess_'.$sessionId, 'pid')) || $pid == $this->_getPid()) ) { $this->_writeRawSession($sessionId, $sessionData, $this->getLifeTime()); $this->_log(sprintf("Data written to ID %s in %.5f seconds", $sessionId, (microtime(true) - $timeStart))); } else { if ($this->_hasLock) { $this->_log(sprintf("Did not write session for ID %s: another process took the lock.", $sessionId ), LoggerInterface::WARNING); } else { $this->_log(sprintf("Did not write session for ID %s: unable to acquire lock.", $sessionId ), LoggerInterface::WARNING); } } } catch(\Exception $e) { $this->logger->logException($e); return false; } return true; } /** * Destroy session * * @param string $sessionId * @return boolean */ public function destroy($sessionId) { $this->_log(sprintf("Destroying ID %s", $sessionId)); $this->_redis->pipeline(); if($this->_dbNum) $this->_redis->select($this->_dbNum); $this->_redis->del(self::SESSION_PREFIX.$sessionId); $this->_redis->exec(); return true; } /** * Overridden to prevent calling getLifeTime at shutdown * * @return bool */ public function close() { $this->_log("Closing connection"); if ($this->_redis) $this->_redis->close(); return true; } /** * Garbage collection * * @param int $maxLifeTime ignored * @return boolean */ public function gc($maxLifeTime) { return true; } /** * Get the number of failed lock attempts * * @return int */ public function getFailedLockAttempts() { return $this->failedLockAttempts; } /** * Get lock lifetime * * @return int|mixed */ private function getLifeTime() { if (is_null($this->_lifeTime)) { $lifeTime = null; // Detect bots by user agent $botLifetime = $this->config->getBotLifetime() ?: self::DEFAULT_BOT_LIFETIME; if ($botLifetime) { $userAgent = empty($_SERVER['HTTP_USER_AGENT']) ? false : $_SERVER['HTTP_USER_AGENT']; $isBot = ! $userAgent || preg_match(self::BOT_REGEX, $userAgent); if ($isBot) { $this->_log(sprintf("Bot detected for user agent: %s", $userAgent)); if ( $this->_sessionWrites <= 1 && ($botFirstLifetime = $this->config->getBotFirstLifetime() ?: self::DEFAULT_BOT_FIRST_LIFETIME) ) { $lifeTime = $botFirstLifetime * (1+$this->_sessionWrites); } else { $lifeTime = $botLifetime; } } } // Use different lifetime for first write if ($lifeTime === null && $this->_sessionWrites <= 1) { $firstLifetime = $this->config->getFirstLifetime() ?: self::DEFAULT_FIRST_LIFETIME; if ($firstLifetime) { $lifeTime = $firstLifetime * (1+$this->_sessionWrites); } } // Neither bot nor first write if ($lifeTime === null) { $lifeTime = $this->config->getLifetime(); } $this->_lifeTime = $lifeTime; if ($this->_lifeTime < $this->_minLifetime) { $this->_lifeTime = $this->_minLifetime; } if ($this->_lifeTime > $this->_maxLifetime) { $this->_lifeTime = $this->_maxLifetime; } } return $this->_lifeTime; } /** * Encode data * * @param string $data * @return string */ protected function _encodeData($data) { $originalDataSize = strlen($data); if ($this->_compressionThreshold > 0 && $this->_compressionLibrary != 'none' && $originalDataSize >= $this->_compressionThreshold) { $this->_log(sprintf("Compressing %s bytes with %s", $originalDataSize,$this->_compressionLibrary)); $timeStart = microtime(true); $prefix = ':'.substr($this->_compressionLibrary,0,2).':'; switch($this->_compressionLibrary) { case 'snappy': $data = snappy_compress($data); break; case 'lzf': $data = lzf_compress($data); break; case 'lz4': $data = lz4_compress($data); $prefix = ':l4:'; break; case 'gzip': $data = gzcompress($data, 1); break; } if($data) { $data = $prefix.$data; $this->_log( sprintf( "Data compressed by %.1f percent in %.5f seconds", ($originalDataSize == 0 ? 0 : (100 - (strlen($data) / $originalDataSize * 100))), (microtime(true) - $timeStart) ) ); } else { $this->_log( sprintf("Could not compress session data using %s", $this->_compressionLibrary), LoggerInterface::WARNING ); } } return $data; } /** * Decode data * * @param string $data * @return string */ protected function _decodeData($data) { switch (substr($data,0,4)) { // asking the data which library it uses allows for transparent changes of libraries case ':sn:': $data = snappy_uncompress(substr($data,4)); break; case ':lz:': $data = lzf_decompress(substr($data,4)); break; case ':l4:': $data = lz4_uncompress(substr($data,4)); break; case ':gz:': $data = gzuncompress(substr($data,4)); break; } return $data; } /** * Write session data to Redis * * @param $id * @param $data * @param $lifetime * @throws \Exception */ protected function _writeRawSession($id, $data, $lifetime) { $sessionId = 'sess_' . $id; $this->_redis->pipeline() ->select($this->_dbNum) ->hMSet($sessionId, array( 'data' => $this->_encodeData($data), 'lock' => 0, // 0 so that next lock attempt will get 1 )) ->hIncrBy($sessionId, 'writes', 1) ->expire($sessionId, min($lifetime, $this->_maxLifetime)) ->exec(); } /** * Get pid * * @return string */ private function _getPid() { return gethostname().'|'.getmypid(); } /** * Check if pid exists * * @param $pid * @return bool */ private function _pidExists($pid) { list($host,$pid) = explode('|', $pid); if (PHP_OS != 'Linux' || $host != gethostname()) { return true; } return @file_exists('/proc/'.$pid); } /** * Get break time, calculated later than other config settings due to requiring session name to be set * * @return int */ private function _getBreakAfter() { // Has break after already been calculated? Only fetch from config once, then reuse variable. if (!$this->_breakAfter) { // Fetch relevant setting from config using session name $this->_breakAfter = (float)($this->config->getBreakAfter() ?: self::DEFAULT_BREAK_AFTER); // Use sleep time multiplier so break time is in seconds $this->_breakAfter = (int)round((1000000 / self::SLEEP_TIME) * $this->_breakAfter); } return $this->_breakAfter; } }