diff --git a/config/config.php b/config/config.php index 9b6b2f8..845b035 100644 --- a/config/config.php +++ b/config/config.php @@ -9,12 +9,15 @@ | 日期:2016/7/25 |--------------------------------------------------------------- */ -define('ServerIp',"0.0.0.0"); -define('ServerPort',"9508"); -define('ServerLog',dirname(__DIR__).'/log/FileDistributed.log'); -define('redis_server','192.168.102.163'); -define('redis_port','6379'); -define('redis_auth','123qwe'); -define('LISTENPATH',dirname(__DIR__).'/img'); -define('allsysnc',true); +define('ServerIp', "0.0.0.0"); +define('ServerPort', "9508"); +define('ServerLog', dirname(__DIR__) . '/log/FileDistributed.log'); +define('redis_server', '192.168.102.163'); +define('redis_port', '6379'); +define('redis_auth', '123qwe'); +define('LISTENPATH', dirname(__DIR__) . '/img'); +define('allsysnc', true); +define('maxpackage', 1024 * 1024 * 200); +define('Bincmd', '/usr/local/php/bin/php'); +define('file_arg', 'dfs'); ?> diff --git a/server.php b/server.php index 809a193..575a324 100644 --- a/server.php +++ b/server.php @@ -10,52 +10,48 @@ |--------------------------------------------------------------- */ // 检查扩展 -if(!extension_loaded('inotify')) -{ +if (!extension_loaded('inotify')) { exit("Please install inotify extension.\n"); } -if(!extension_loaded('redis')) -{ +if (!extension_loaded('redis')) { exit("Please install redis extension.\n"); } -if(!extension_loaded('swoole')) -{ +if (!extension_loaded('swoole')) { exit("Please install swoole extension.\n"); } //检查是否为cli模式 -if(php_sapi_name() !== 'cli'){ +if (php_sapi_name() !== 'cli') { exit("Please use php cli mode.\n"); } -$cmd="/usr/local/php/bin/php";//php的绝对路径 function server_call($cmd) { - - foreach(glob(__DIR__.'/server/FileDistributedServer.php') as $start_file) - { - exec($cmd.' '.$start_file); + + foreach (glob(__DIR__ . '/server/FileDistributedServer.php') as $start_file) { + exec($cmd . ' ' . $start_file); } } -$ser_ser=$argv; -if(!isset($ser_ser[1])){ - exit("No argv.\n"); - }else{ -switch ($ser_ser[1]) { - case 'start': - call_user_func('server_call',$cmd); - break; - case 'stop': - exec("ps -ef | grep -E '".$cmd."' |grep -v 'grep'| awk '{print $2}'|xargs kill -9 > /dev/null 2>&1 &"); - echo "Kill all process success.\n"; - break; - case 'restart': - exec("ps -ef | grep -E '".$cmd."' |grep -v 'grep'| awk '{print $2}'|xargs kill -9 > /dev/null 2>&1 &"); - echo "Kill all process success.\n"; - call_user_func('server_call',$cmd); - break; - default: - exit("Not support this argv.\n"); - break; +$ser_ser = $argv; +require_once __DIR__ . '/config/config.php'; +if (!isset($ser_ser[1])) { + exit("No argv.\n"); +} else { + switch ($ser_ser[1]) { + case 'start': + call_user_func('server_call', Bincmd); + break; + case 'stop': + exec("ps -ef | grep -E '" . Bincmd . "' |grep -v 'grep'| awk '{print $2}'|xargs kill -9 > /dev/null 2>&1 &"); + echo "Kill all process success.\n"; + break; + case 'restart': + exec("ps -ef | grep -E '" . Bincmd . "' |grep -v 'grep'| awk '{print $2}'|xargs kill -9 > /dev/null 2>&1 &"); + echo "Kill all process success.\n"; + call_user_func('server_call', Bincmd); + break; + default: + exit("Not support this argv.\n"); + break; } - } +} ?> diff --git a/server/FileDistributedClient.php b/server/FileDistributedClient.php index f6c78f8..89d931f 100644 --- a/server/FileDistributedClient.php +++ b/server/FileDistributedClient.php @@ -39,7 +39,7 @@ public function addServerClient($address) 'package_length_type' => 'N', 'package_length_offset' => 0, //第N个字节是包长度的值 'package_body_offset' => 4, //第几个字节开始计算长度 - 'package_max_length' => 1024 * 1024 * 20 //协议最大长度 + 'package_max_length' => maxpackage //协议最大长度 )); $client->on('Connect', array( &$this, @@ -222,13 +222,14 @@ public function onError($client) $this->removeuser($this->cur_address); $this->del_server[ip2long($this->cur_address)] = $this->cur_address; $this->table->del(ip2long($this->cur_address)); - $this->setkey($this->cur_address); + $this->setkey($this->cur_address, file_arg . 'errserfile'); unset($this->b_client_pool[$this->cur_address]); unset($client); } //获取分布式服务器列表 public function getserlist($keyname = 'FileDistributed') { + $keyname = isset($keyname) && !empty($keyname) ? $keyname : 'FileDistributed'; ob_start(); dredis::getInstance()->getfd($keyname); $result = ob_get_contents(); @@ -238,26 +239,31 @@ public function getserlist($keyname = 'FileDistributed') //添加到分布式服务器列表 public function appendserlist($data, $score, $keyname = 'FileDistributed') { + $keyname = isset($keyname) && !empty($keyname) ? $keyname : 'FileDistributed'; dredis::getInstance()->savefd($data, $score, $keyname); } //从分布式服务器列表删除 public function removeuser($data, $keyname = 'FileDistributed') { + $keyname = isset($keyname) && !empty($keyname) ? $keyname : 'FileDistributed'; dredis::getInstance()->removefd($data, $keyname); } //设置错误服务器 public function setkey($data, $keyname = 'errserfile') { + $keyname = isset($keyname) && !empty($keyname) ? $keyname : 'errserfile'; return dredis::getInstance()->setkey($data, $keyname); } //获取错误服务器 public function getkey($keyname = 'errserfile') { + $keyname = isset($keyname) && !empty($keyname) ? $keyname : 'errserfile'; return dredis::getInstance()->getkey($keyname); } //删除错误服务器 public function delkey($keyname = 'errserfile') { + $keyname = isset($keyname) && !empty($keyname) ? $keyname : 'errserfile'; return dredis::getInstance()->delkey($keyname); } //获取目录 diff --git a/server/FileDistributedServer.php b/server/FileDistributedServer.php index e4d52e3..320ec35 100644 --- a/server/FileDistributedServer.php +++ b/server/FileDistributedServer.php @@ -48,7 +48,7 @@ public function __construct() 'package_length_type' => 'N', 'package_length_offset' => 0, 'package_body_offset' => 4, - 'package_max_length' => 1024 * 1024 * 20, + 'package_max_length' => maxpackage, 'log_file' => ServerLog )); } else { @@ -60,7 +60,7 @@ public function __construct() 'package_length_type' => 'N', 'package_length_offset' => 0, 'package_body_offset' => 4, - 'package_max_length' => 1024 * 1024 * 20, + 'package_max_length' => maxpackage, 'daemonize' => true )); } @@ -134,58 +134,64 @@ public function onStart($serv) if ($events) { foreach ($events as $kk => $vv) { if (isset($vv['name']) && $vv['mask'] != 256) { - if ($vv['mask'] == 1073742080) { - if (in_array($vv['wd'], array_keys($this->wd))) { - $listenpathx = substr($this->wd[$vv['wd']]['path'], 0, strripos($this->wd[$vv['wd']]['path'] . '/', "/") + 1); - $listenpathx .= '/' . $vv['name']; + if(substr($vv['name'], -1) == '~') continue; + if(!strpos($vv['name'],'.')) continue; + preg_match('/(.*?)\.sw(x|o|p)/', $vv['name'], $event_result); + if (empty($event_result)) { + if ($vv['mask'] == 1073742080) { + if (in_array($vv['wd'], array_keys($this->wd))) { + $listenpathx = substr($this->wd[$vv['wd']]['path'], 0, strripos($this->wd[$vv['wd']]['path'] . '/', "/") + 1); + $listenpathx .= '/' . $vv['name']; + + $wd = inotify_add_watch($this->filefd, $listenpathx, IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE); + $this->wd[$wd] = array( + 'wd' => $wd, + 'path' => $listenpathx, + 'pre' => substr($listenpathx, strlen($listenpathex), strlen($listenpathx)) + ); + } else { + $listenpath .= '/' . $vv['name']; + + $wd = inotify_add_watch($this->filefd, $listenpath, IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE); + $this->wd[$wd] = array( + 'wd' => $wd, + 'path' => $listenpath, + 'pre' => substr($listenpath, strlen($listenpathex), strlen($listenpath)) + ); + } + - $wd = inotify_add_watch($this->filefd, $listenpathx, IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE); - $this->wd[$wd] = array( - 'wd' => $wd, - 'path' => $listenpathx, - 'pre' => substr($listenpathx, strlen($listenpathex), strlen($listenpathx)) - ); } else { - $listenpath .= '/' . $vv['name']; + $path_listen = $this->wd[$vv['wd']]['path'] . '/' . $vv['name']; - $wd = inotify_add_watch($this->filefd, $listenpath, IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE); - $this->wd[$wd] = array( - 'wd' => $wd, - 'path' => $listenpath, - 'pre' => substr($listenpath, strlen($listenpathex), strlen($listenpath)) - ); + $extends = explode("/", $path_listen); + $vas = count($extends) - 1; + if (empty($this->wd[$vv['wd']]['pre'])) { + $data = array( + 'type' => 'fileclient', + 'data' => array( + + 'path' => rawurlencode(str_replace("_", "@", $path_listen)), + 'fileex' => rawurlencode(str_replace("_", "@", $extends[$vas])), + 'pre' => '' + ) + ); + } else { + $data = array( + 'type' => 'fileclient', + 'data' => array( + + 'path' => rawurlencode(str_replace("_", "@", $path_listen)), + 'fileex' => rawurlencode(str_replace("_", "@", $extends[$vas])), + 'pre' => rawurlencode(str_replace("_", "@", $this->wd[$vv['wd']]['pre'])) + ) + ); + } + + $localclient->send(FileDistributedClient::getInstance()->packmes($data)); } - - } else { - $path_listen = $this->wd[$vv['wd']]['path'] . '/' . $vv['name']; - - $extends = explode("/", $path_listen); - $vas = count($extends) - 1; - if (empty($this->wd[$vv['wd']]['pre'])) { - $data = array( - 'type' => 'fileclient', - 'data' => array( - - 'path' => rawurlencode(str_replace("_", "@", $path_listen)), - 'fileex' => rawurlencode(str_replace("_", "@", $extends[$vas])), - 'pre' => '' - ) - ); - } else { - $data = array( - 'type' => 'fileclient', - 'data' => array( - - 'path' => rawurlencode(str_replace("_", "@", $path_listen)), - 'fileex' => rawurlencode(str_replace("_", "@", $extends[$vas])), - 'pre' => rawurlencode(str_replace("_", "@", $this->wd[$vv['wd']]['pre'])) - ) - ); - } - $localclient->send(FileDistributedClient::getInstance()->packmes($data)); } - } } @@ -198,7 +204,7 @@ public function onWorkerStart($serv, $worker_id) $localinfo = swoole_get_local_ip(); $this->localip = current($localinfo); - $serverlist = FileDistributedClient::getInstance()->getserlist(); + $serverlist = FileDistributedClient::getInstance()->getserlist(file_arg); $result_fd = json_decode($serverlist, true); if (!empty($result_fd)) { foreach ($result_fd as $id => $fd) { @@ -214,7 +220,7 @@ public function onWorkerStart($serv, $worker_id) } } } - FileDistributedClient::getInstance()->appendserlist($this->localip, ip2long($this->localip)); + FileDistributedClient::getInstance()->appendserlist($this->localip, ip2long($this->localip), file_arg); } @@ -282,33 +288,39 @@ public function onReceive($serv, $fd, $from_id, $data) ); $this->client_a = $remote_info['data']['fd']; } else { - if (FileDistributedClient::getInstance()->getkey()) { + if (FileDistributedClient::getInstance()->getkey(file_arg . 'errserfile')) { $client = FileDistributedClient::getInstance()->addServerClient($remote_info['data']['fd']); $this->b_server_pool[ip2long($remote_info['data']['fd'])] = array( 'fd' => $remote_info['data']['fd'], 'client' => $client ); $this->client_a = $remote_info['data']['fd']; - if ($this->localip == FileDistributedClient::getInstance()->getkey()) { - FileDistributedClient::getInstance()->delkey(); + if ($this->localip == FileDistributedClient::getInstance()->getkey(file_arg . 'errserfile')) { + FileDistributedClient::getInstance()->delkey(file_arg . 'errserfile'); } } } } if ($this->localip != $this->connectioninfo['remote_ip']) { - if (!in_array($this->connectioninfo['remote_ip'], $this->client_pool_ser)) { - $serv->send($fd, FileDistributedClient::getInstance()->packmes(array( - 'type' => 'system', - 'data' => array( - 'code' => 10002, - 'fd' => $this->localip - ) - ))); - array_push($this->client_pool_ser, $this->connectioninfo['remote_ip']); + if (allsysnc) { + if (!in_array($this->connectioninfo['remote_ip'], $this->client_pool_ser)) { + $serv->send($fd, FileDistributedClient::getInstance()->packmes(array( + 'type' => 'system', + 'data' => array( + 'code' => 10002, + 'fd' => $this->localip + ) + ))); + array_push($this->client_pool_ser, $this->connectioninfo['remote_ip']); + } } } - echo date('[ c ]') . str_replace("\n", "", var_export($remote_info, true)); + if (ServerLog) { + swoole_async_write(ServerLog, date('[ c ]') . str_replace("\n", "", var_export($remote_info, true)) . '\r\n', -1); + } else { + echo date('[ c ]') . str_replace("\n", "", var_export($remote_info, true)) . '\r\n'; + } } else { switch ($remote_info['type']) { case 'filesize': @@ -327,14 +339,17 @@ public function onReceive($serv, $fd, $from_id, $data) case 'file': if (isset($remote_info['data']['path'])) { if (!file_exists(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])))) { - $data_s = array( - 'type' => 'filemes', - 'data' => array( - 'path' => $remote_info['data']['path'] - ) - ); - $serv->send($fd, FileDistributedClient::getInstance()->packmes($data_s)); - } + if (substr(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])), -1) != '~') { + $data_s = array( + 'type' => 'filemes', + 'data' => array( + 'path' => $remote_info['data']['path'] + ) + ); + $serv->send($fd, FileDistributedClient::getInstance()->packmes($data_s)); + } + + } } break; case 'asyncfileclient': @@ -374,6 +389,7 @@ public function onReceive($serv, $fd, $from_id, $data) ) ); } + foreach ($this->b_server_pool as $k => $v) { if ($v['fd'] != $this->localip) $v['client']->send(FileDistributedClient::getInstance()->packmes($datas)); @@ -382,7 +398,11 @@ public function onReceive($serv, $fd, $from_id, $data) default: break; } - echo date('[ c ]') . str_replace("\n", "", var_export($remote_info, true)); + if (ServerLog) { + swoole_async_write(ServerLog, date('[ c ]') . str_replace("\n", "", var_export($remote_info, true)) . '\r\n', -1); + } else { + echo date('[ c ]') . str_replace("\n", "", var_export($remote_info, true)) . '\r\n'; + } } } @@ -398,22 +418,35 @@ public function onClose($server, $fd, $from_id) if (!empty($this->client_pool)) { foreach ($this->client_pool as $k => $v) { if ($v['fd'] == $fd) { - FileDistributedClient::getInstance()->removeuser($v['remote_ip'], 'FileDistributed'); - print_r($v['remote_ip'] . " have closed\n"); + FileDistributedClient::getInstance()->removeuser($v['remote_ip'], file_arg); + if (ServerLog) { + swoole_async_write(ServerLog, date('[ c ]') . $v['remote_ip'] . " have closed\r\n", -1); + } else { + echo date('[ c ]') . $v['remote_ip'] . " have closed\r\n"; + } unset($this->client_pool[$k]); } } } else { - FileDistributedClient::getInstance()->removeuser($this->localip, 'FileDistributed'); - print_r($this->localip . " have closed\n"); + FileDistributedClient::getInstance()->removeuser($this->localip, file_arg); + if (ServerLog) { + swoole_async_write(ServerLog, date('[ c ]') . $this->localip . " have closed\r\n", -1); + } else { + echo date('[ c ]') . $this->localip . " have closed\r\n"; + } + } } public function onManagerStop($serv) { if (empty($this->client_pool)) { - FileDistributedClient::getInstance()->removeuser($this->localip, 'FileDistributed'); - print_r($this->localip . " have closed\n"); + FileDistributedClient::getInstance()->removeuser($this->localip, file_arg); + if (ServerLog) { + swoole_async_write(ServerLog, date('[ c ]') . $this->localip . " have closed\r\n", -1); + } else { + echo date('[ c ]') . $this->localip . " have closed\r\n"; + } } swoole_event_del($this->filefd); } @@ -421,8 +454,12 @@ public function onManagerStop($serv) public function onWorkerError($serv, $worker_id, $worker_pid, $exit_code) { if (empty($this->client_pool)) { - FileDistributedClient::getInstance()->removeuser($this->localip, 'FileDistributed'); - print_r($this->localip . " have closed\n"); + FileDistributedClient::getInstance()->removeuser($this->localip, file_arg); + if (ServerLog) { + swoole_async_write(ServerLog, date('[ c ]') . $this->localip . " have closed\r\n", -1); + } else { + echo date('[ c ]') . $this->localip . " have closed\r\n"; + } } } diff --git a/test/test.php b/test/test.php index d2a6c71..f370222 100644 --- a/test/test.php +++ b/test/test.php @@ -9,5 +9,6 @@ | 日期:2016/7/25 |--------------------------------------------------------------- */ + require_once dirname(__DIR__). '/config/config.php'; require_once dirname(__DIR__).'/server/FileDistributedServer.php'; ?>