modfileregister.class.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. <?php
  2. /**
  3. * This file contains a simple file-based implementation of modRegister.
  4. *
  5. * @package modx
  6. * @subpackage registry
  7. */
  8. /** Make sure the modRegister class is included. */
  9. require_once(dirname(__FILE__) . '/modregister.class.php');
  10. /**
  11. * A simple, file-based implementation of modRegister.
  12. *
  13. * This implementation does not address transactional conflicts and should be
  14. * used in non-critical processes that are easily recoverable.
  15. *
  16. * @package modx
  17. * @subpackage registry
  18. */
  19. class modFileRegister extends modRegister {
  20. /**
  21. * A physical directory where the register stores topics and messages.
  22. * @var string
  23. */
  24. protected $directory = null;
  25. /**
  26. * Construct a new modFileRegister instance.
  27. *
  28. * @param modX &$modx A reference to a modX instance.
  29. * @param string $key A valid PHP variable which will be set on the modRegistry instance.
  30. * @param array $options Optional array of registry options.
  31. */
  32. function __construct(& $modx, $key, $options = array())
  33. {
  34. parent::__construct($modx, $key, $options);
  35. $modx->getCacheManager();
  36. $this->directory = $modx->getCachePath() . 'registry/';
  37. $this->directory .= isset($options['directory'])
  38. ? $options['directory']
  39. : $key;
  40. $this->directory = rtrim($this->directory, '/') . '/';
  41. }
  42. /**
  43. * Make sure the register can write to the specified $directory.
  44. *
  45. * {@inheritdoc}
  46. */
  47. public function connect(array $attributes = array()) {
  48. $connected = false;
  49. if (is_string($this->directory) && strlen($this->directory)) {
  50. $connected = $this->modx->cacheManager->writeTree($this->directory);
  51. }
  52. return $connected;
  53. }
  54. /**
  55. * Clear the register messages.
  56. *
  57. * {@inheritdoc}
  58. */
  59. public function clear($topic)
  60. {
  61. $topicDirectory = $this->directory . ltrim($this->sanitizePath($topic), '/');
  62. return $this->modx->cacheManager->deleteTree(
  63. realpath($topicDirectory),
  64. array(
  65. 'extensions' => array('.msg.php')
  66. )
  67. );
  68. }
  69. /**
  70. * {@inheritdoc}
  71. *
  72. * This implementation supports the following options and default behavior:
  73. * <ul>
  74. * <li>msg_limit: Only poll until the specified limit of messages has
  75. * been digested. Default is 5 messages.</li>
  76. * <li>time_limit: Poll for new messages for a specified number of
  77. * seconds. Default is the result of the php time_limit system variable.</li>
  78. * <li>poll_limit: Only poll for new subscriptions a specified number
  79. * of times. Default is unlimited.</li>
  80. * <li>poll_interval: Wait a specified number of seconds between each
  81. * additional polling iteration, after the initial one. Default is no
  82. * interval.</li>
  83. * <li>remove_read: Remove the message immediately upon digesting it.
  84. * Default is true.</li>
  85. * <li>include_keys: Include the message keys in the array of messages returned.
  86. * Default is false.</li>
  87. * </ul>
  88. */
  89. public function read(array $options = array()) {
  90. $this->__kill = false;
  91. $messages = array();
  92. $topicMessages = array();
  93. $msgLimit = isset($options['msg_limit']) ? intval($options['msg_limit']) : 5;
  94. $timeLimit = isset($options['time_limit']) ? intval($options['time_limit']) : ini_get('time_limit');
  95. $pollLimit = isset($options['poll_limit']) ? intval($options['poll_limit']) : 0;
  96. $pollInterval = isset($options['poll_interval']) ? intval($options['poll_interval']) : 0;
  97. $removeRead = isset($options['remove_read']) ? (boolean) $options['remove_read'] : true;
  98. $includeKeys = isset($options['include_keys']) ? (boolean) $options['include_keys'] : false;
  99. $startTime = microtime(true);
  100. $time = $timeLimit <= 0 ? -1 : $startTime;
  101. $expires = $startTime + $timeLimit;
  102. $msgCount = 0;
  103. $iteration = 0;
  104. while ($time < $expires && $msgCount < $msgLimit && !$this->__kill) {
  105. if ($iteration > 0) {
  106. if ($pollLimit > 0 && $iteration >= $pollLimit) {
  107. break;
  108. }
  109. if ($pollInterval > 0) sleep($pollInterval);
  110. }
  111. $iteration++;
  112. foreach ($this->subscriptions as $subIdx => $topic) {
  113. $topicMessages = array();
  114. $topicDirectory = $this->directory;
  115. $topicDirectory.= $topic[0] == '/' ? substr($topic, 1) : $topic ;
  116. if (is_dir($topicDirectory)) {
  117. $dirListing = $this->getSortedDirectoryListing($topicDirectory);
  118. if (!empty($dirListing)) {
  119. foreach ($dirListing as $idx => $entry) {
  120. if ($msgCount >= $msgLimit || $this->__kill) break;
  121. if ($newMsg = $this->_readMessage($topicDirectory . $entry, $removeRead)) {
  122. if (!$includeKeys) {
  123. $topicMessages[] = $newMsg;
  124. } else {
  125. $msgKey = substr($entry, 0, strpos($entry, '.msg.php'));
  126. $topicMessages[$msgKey] = $newMsg;
  127. }
  128. $msgCount++;
  129. }
  130. }
  131. }
  132. }
  133. elseif ($newMsg = $this->_readMessage($topicDirectory . '.msg.php', $removeRead)) {
  134. if (!$includeKeys) {
  135. $topicMessages[] = $newMsg;
  136. } else {
  137. $topicMessages[$topicDirectory] = $newMsg;
  138. }
  139. $msgCount++;
  140. }
  141. }
  142. if (!empty($topicMessages)) {
  143. if (!$includeKeys) {
  144. $messages = $messages + $topicMessages;
  145. } else {
  146. $messages = array_merge($messages, $topicMessages);
  147. }
  148. }
  149. $time = microtime(true);
  150. }
  151. return $messages;
  152. }
  153. /**
  154. * Get list of topic messages from a directory sorted by modified date.
  155. *
  156. * @param string $dir A valid directory path.
  157. * @return array An array of topic messages sorted by modified date.
  158. */
  159. private function getSortedDirectoryListing($dir) {
  160. $listing = array();
  161. $d = new DirectoryIterator($dir);
  162. $idx = 0;
  163. foreach ($d as $f) {
  164. $filename = $f->getFilename();
  165. if ($f->isFile() && strpos($filename, '.msg.php')) {
  166. $listing[] = $filename;
  167. $idx++;
  168. }
  169. }
  170. if (!empty($listing)) sort($listing);
  171. return $listing;
  172. }
  173. /**
  174. * Read a message file from the queue.
  175. *
  176. * @todo Implement support for reading various message types, other than
  177. * executable PHP format.
  178. * @access private
  179. * @param string $filename An absolute path to a message file to read.
  180. * @param boolean $remove Indicates if the message file should be deleted
  181. * once the message is read from it.
  182. */
  183. private function _readMessage($filename, $remove = true) {
  184. $message = null;
  185. if (file_exists($filename)) {
  186. $message = @ include($filename);
  187. if ($remove) {
  188. @ unlink($filename);
  189. }
  190. }
  191. return $message;
  192. }
  193. /**
  194. * {@inheritdoc}
  195. *
  196. * This implementation provides support for sending messages using either
  197. * time-based indexes so they are consumed in the order they are produced,
  198. * or named indexes typically used when consumers want to subscribe to a
  199. * specific, unique message. Individual messages or message collections
  200. * passed in numerically indexed arrays are treated as time-based messages
  201. * and message collections passed in associative arrays are treated as named
  202. * messages. e.g., to send a single message as named, wrap it in an array
  203. * with the intended message name as the key.
  204. *
  205. * This implementation also supports a message_type option to indicate the
  206. * format of the message being sent to the register. Currently only supports
  207. * executable PHP format.
  208. *
  209. * Other implementation specific options include:
  210. * <ul>
  211. * <li>delay: Number of seconds to delay the message. This option is only
  212. * supported for time-based messages.</li>
  213. * <li>ttl: Number of seconds the message is valid in the queue.
  214. * Default is forever or 0.</li>
  215. * <li>kill: Tells a message consumer to stop consuming any more
  216. * messages after reading any message sent with this option.</li>
  217. * </ul>
  218. *
  219. * @todo Implement support for sending various message types, other than
  220. * executable PHP format.
  221. */
  222. public function send($topic, $message, array $options = array()) {
  223. $sent = false;
  224. if (empty($topic) || $topic[0] != '/') $topic = $this->_currentTopic . $topic;
  225. $topicIdx = array_search($topic, $this->subscriptions);
  226. $topic = substr($topic, 1);
  227. if ($topicIdx !== false) {
  228. $messageType = isset($options['message_type']) ? $options['message_type'] : 'php';
  229. $topicDirectory = $this->directory . $topic;
  230. if ($topicDirectory[strlen($topicDirectory) - 1] != '/') $topicDirectory .= '/';
  231. if (!is_array($message)) {
  232. $message = array($message);
  233. }
  234. foreach ($message as $msgIdx => $msg) {
  235. if (is_scalar($msg) || is_array($msg) || is_object($msg)) {
  236. switch ($messageType) {
  237. //TODO: implement more message types
  238. case 'php' :
  239. default :
  240. $timestamp = isset($options['delay']) ? time() + intval($options['delay']) : time();
  241. $expires = isset($options['ttl']) && !empty($options['ttl']) ? time() + intval($options['ttl']) : 0;
  242. $kill = isset($options['kill']) ? (boolean) $options['kill'] : false;
  243. if (!is_int($msgIdx)) {
  244. $msgKey = $msgIdx;
  245. } else {
  246. $msgKey = strftime('%Y%m%dT%H%M%S', $timestamp) . '-' . sprintf("%03d", $msgIdx);
  247. }
  248. $filename = $topicDirectory . $msgKey . '.msg.php';
  249. $content = "<?php\n";
  250. if ($expires > 0) $content.= "if (time() > {$expires}) return null;\n";
  251. if ($kill) $content.= "\$this->__kill = true;\n";
  252. $content.= 'return ' . var_export($msg, true) . ";\n";
  253. $sent = $this->modx->cacheManager->writeFile($filename, $content);
  254. }
  255. }
  256. }
  257. }
  258. return $sent;
  259. }
  260. public function close() {
  261. return true;
  262. }
  263. /**
  264. * Sanitize the specified path
  265. *
  266. * @param string $path The path to clean
  267. * @return string The sanitized path
  268. */
  269. protected function sanitizePath($path) {
  270. return preg_replace(array("/\.*[\/|\\\]/i", "/[\/|\\\]+/i"), array('/', '/'), $path);
  271. }
  272. }