LocalSocket.php 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. <?php declare(strict_types=1);
  2. /*
  3. * This file is part of the Monolog package.
  4. *
  5. * (c) Jordi Boggiano <j.boggiano@seld.be>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Monolog\Util;
  11. use Symfony\Component\Process\Process;
  12. class LocalSocket
  13. {
  14. const TCP = 'tcp';
  15. const UDP = 'udp';
  16. private static $sockets = [];
  17. private static $shutdownHandler = false;
  18. public static function initSocket(int $port = 51984, string $proto = LocalSocket::TCP)
  19. {
  20. if (!isset(self::$sockets[$proto][$port])) {
  21. $file = self::initFile($port, $proto);
  22. $process = new Process(escapeshellarg(PHP_BINARY).' '.escapeshellarg($file));
  23. $process->start(function ($type, $out) use ($proto, $port) {
  24. if ($type === 'err') {
  25. if (substr($out, 0, 4) === 'INIT') {
  26. if ($proto === LocalSocket::UDP) {
  27. self::$sockets[$proto][$port]['comms'] = null;
  28. } else {
  29. $sock = socket_create(AF_INET, SOCK_STREAM, getprotobyname($proto));
  30. socket_connect($sock, '127.0.0.1', $port);
  31. socket_write($sock, "MONITOR\n");
  32. self::$sockets[$proto][$port]['comms'] = $sock;
  33. }
  34. }
  35. }
  36. });
  37. self::$sockets[$proto][$port] = [
  38. 'file' => $file,
  39. 'process' => $process,
  40. 'busy' => false,
  41. ];
  42. // make sure the socket is listening
  43. while (true) {
  44. if ($process->getErrorOutput() === 'INIT') {
  45. break;
  46. }
  47. usleep(100);
  48. }
  49. if (!self::$shutdownHandler) {
  50. register_shutdown_function(function () {
  51. LocalSocket::shutdownSockets();
  52. });
  53. self::$shutdownHandler = true;
  54. }
  55. }
  56. $sock = self::$sockets[$proto][$port];
  57. if (!$sock['process']->isRunning()) {
  58. throw new \RuntimeException(
  59. 'LocalSocket '.$proto.'://127.0.0.1:'.$port.' appears to have died unexpectedly: ' . "\n\n" .
  60. $sock['process']->getOutput()
  61. );
  62. }
  63. self::clearSocket($port, $proto);
  64. return new class($sock['process'], $sock['comms']) {
  65. public function __construct(Process $proc, $comms)
  66. {
  67. $this->process = $proc;
  68. $this->comms = $comms;
  69. }
  70. public function getOutput()
  71. {
  72. // read out until getting a !DONE! ack and then tell the socket to terminate the connection
  73. if ($this->comms) {
  74. $out = '';
  75. socket_write($this->comms, "DONE?\n");
  76. while ($data = socket_read($this->comms, 2048)) {
  77. $out .= $data;
  78. if (substr($out, -6) === '!DONE!') {
  79. $out = substr($out, 0, -6);
  80. break;
  81. }
  82. }
  83. $out = preg_replace('{.*!BEGIN!}', '', $out);
  84. socket_write($this->comms, "TERMINATE\n");
  85. return $out;
  86. }
  87. // wait 3 seconds max for output for UDP
  88. $retries = 3000;
  89. while (!$this->process->getOutput() && $retries-- && $this->process->getStatus()) {
  90. usleep(100);
  91. }
  92. return $this->process->getOutput();
  93. }
  94. };
  95. }
  96. private static function clearSocket(int $port = 51984, string $proto = LocalSocket::TCP)
  97. {
  98. if (isset(self::$sockets[$proto][$port])) {
  99. self::$sockets[$proto][$port]['process']->clearOutput();
  100. }
  101. }
  102. public static function shutdownSocket(int $port = 51984, string $proto = LocalSocket::TCP)
  103. {
  104. if (!isset(self::$sockets[$proto][$port])) {
  105. return;
  106. }
  107. if (is_resource(self::$sockets[$proto][$port]['comms'])) {
  108. socket_write(self::$sockets[$proto][$port]['comms'], "EXIT\n");
  109. socket_close(self::$sockets[$proto][$port]['comms']);
  110. }
  111. $sock = self::$sockets[$proto][$port];
  112. $sock['process']->stop();
  113. @unlink($sock['file']);
  114. unset(self::$sockets[$proto][$port]);
  115. }
  116. public static function shutdownSockets()
  117. {
  118. foreach (self::$sockets as $proto => $ports) {
  119. foreach ($ports as $port => $sock) {
  120. self::shutdownSocket($port, $proto);
  121. }
  122. }
  123. }
  124. private static function initFile(int $port, string $proto): string
  125. {
  126. $tmpFile = sys_get_temp_dir().'/monolog-test-'.$proto.'-socket-'.$port.'.php';
  127. if ($proto === self::UDP) {
  128. file_put_contents($tmpFile, <<<SCRIPT
  129. <?php
  130. \$sock = socket_create(AF_INET, SOCK_DGRAM, getprotobyname('udp'));
  131. if (!socket_bind(\$sock, '127.0.0.1', $port)) {
  132. fwrite(STDERR, 'COULD NOT BIND $port');
  133. }
  134. fwrite(STDERR, 'INIT');
  135. while (true) {
  136. socket_recvfrom(\$sock, \$read, 100*1024, 0, \$ip, \$port);
  137. fwrite(STDOUT, \$read);
  138. }
  139. SCRIPT
  140. );
  141. } else {
  142. file_put_contents($tmpFile, <<<SCRIPT
  143. <?php
  144. \$sock = socket_create(AF_INET, SOCK_STREAM, getprotobyname('tcp'));
  145. if (!socket_bind(\$sock, '127.0.0.1', $port)) {
  146. fwrite(STDERR, 'COULD NOT BIND $port');
  147. }
  148. if (!socket_listen(\$sock)) {
  149. fwrite(STDERR, 'COULD NOT LISTEN $port');
  150. }
  151. fwrite(STDERR, 'INIT');
  152. \$monitor = socket_accept(\$sock);
  153. \$read = socket_read(\$monitor, 1024, PHP_NORMAL_READ);
  154. if (substr(\$read, 0, 7) !== 'MONITOR') {
  155. fwrite(STDERR, "Unexpected input: \$read");
  156. } else {
  157. fwrite(STDERR, "MONITORED");
  158. }
  159. while (true) {
  160. \$res = socket_accept(\$sock);
  161. socket_set_option(\$res, SOL_SOCKET, SO_RCVTIMEO, array("sec" => 0, "usec" => 0));
  162. socket_write(\$monitor, '!BEGIN!');
  163. while (true) {
  164. \$read = [\$res, \$monitor, \$sock];
  165. \$write = [];
  166. \$except = [];
  167. \$timeout = 0;
  168. if (socket_select(\$read, \$write, \$except, \$timeout) < 1) {
  169. continue;
  170. }
  171. foreach (\$read as \$readsock) {
  172. if (\$readsock === \$res) {
  173. \$bytes = socket_read(\$res, 1024);
  174. //if (\$bytes === '' && in_array(\$sock, \$read)) {
  175. // // client closed
  176. // socket_write(\$monitor, 'CLIENTCLOSED');
  177. // break 2;
  178. //}
  179. socket_write(\$monitor, \$bytes);
  180. } else {
  181. \$bytes = socket_read(\$monitor, 1024, PHP_NORMAL_READ);
  182. if (substr(trim(\$bytes), 0, 9) === 'TERMINATE') {
  183. break 2;
  184. } elseif (substr(trim(\$bytes), 0, 5) === 'DONE?') {
  185. socket_write(\$monitor, '!DONE!');
  186. } elseif (substr(trim(\$bytes), 0, 5) === 'EXIT') {
  187. socket_close(\$res);
  188. socket_close(\$monitor);
  189. die;
  190. }
  191. }
  192. }
  193. }
  194. socket_close(\$res);
  195. }
  196. SCRIPT
  197. );
  198. }
  199. return $tmpFile;
  200. }
  201. }