moddbregister.class.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. <?php
  2. /*
  3. * This file is part of MODX Revolution.
  4. *
  5. * Copyright (c) MODX, LLC. All Rights Reserved.
  6. *
  7. * For complete copyright and license information, see the COPYRIGHT and LICENSE
  8. * files found in the top-level directory of this distribution.
  9. */
  10. require_once dirname(__FILE__) . '/modregister.class.php';
  11. /**
  12. * A simple, database-based implementation of modRegister.
  13. *
  14. * This implementation does not address transactional conflicts and should be
  15. * used in non-critical processes that are easily recoverable.
  16. *
  17. * @package modx
  18. * @subpackage registry.db
  19. */
  20. class modDbRegister extends modRegister
  21. {
  22. /**
  23. * The queue object representing this modRegister instance.
  24. * @access protected
  25. * @var modDbRegisterQueue $_queue
  26. */
  27. protected $_queue = null;
  28. /**
  29. * Construct a new modDbRegister instance.
  30. *
  31. * @param modX &$modx A reference to the modX instance
  32. * @param string $key The key of the registry to load
  33. * @param array $options An array of options to set
  34. */
  35. function __construct(modX &$modx, $key, array $options = array()) {
  36. parent :: __construct($modx, $key, $options);
  37. $this->_queue = $this->_initQueue($key, $options);
  38. }
  39. /**
  40. * Initialize a new queue
  41. * @param string $key The new name of the queue
  42. * @param array $options An array of options
  43. * @return modDbRegisterQueue A reference to the new Queue object
  44. */
  45. protected function _initQueue($key, $options) {
  46. $queue = $this->modx->getObject('registry.db.modDbRegisterQueue', array(
  47. 'name' => $key
  48. ));
  49. if (!$queue) {
  50. $queue = $this->modx->newObject('registry.db.modDbRegisterQueue');
  51. $queue->set('name', $key);
  52. $queue->set('options', $options);
  53. } elseif (!empty($options)) {
  54. $queue->set('options', $options);
  55. }
  56. if ($queue && $queue->isDirty('options')) $queue->save();
  57. return $queue;
  58. }
  59. /**
  60. * Connect to the register service implementation. If we made it here, we connected fine.
  61. *
  62. * @param array $attributes A collection of attributes required for
  63. * connection to the register.
  64. * @return boolean Indicates if the connection was successful.
  65. */
  66. public function connect(array $attributes = array()) {
  67. return true;
  68. }
  69. /**
  70. * Clear the register messages.
  71. *
  72. * {@inheritdoc}
  73. */
  74. public function clear($topic)
  75. {
  76. $topicObject = $this->modx->getObject('registry.db.modDbRegisterTopic', array(
  77. 'queue' => $this->_queue->get('id'),
  78. 'name' => $topic
  79. ));
  80. if (!$topicObject) {
  81. return false;
  82. }
  83. return (bool) $this->modx->removeCollection('registry.db.modDbRegisterMessage', array(
  84. 'topic' => $topicObject->get('id')
  85. ));
  86. }
  87. /**
  88. * This implementation supports the following options and default behavior:
  89. * <ul>
  90. * <li>msg_limit: Only poll until the specified limit of messages has
  91. * been digested. Default is 5 messages.</li>
  92. * <li>time_limit: Poll for new messages for a specified number of
  93. * seconds. Default is the result of the php time_limit system variable.</li>
  94. * <li>poll_limit: Only poll for new subscriptions a specified number
  95. * of times. Default is unlimited.</li>
  96. * <li>poll_interval: Wait a specified number of seconds between each
  97. * additional polling iteration, after the initial one. Default is no
  98. * interval.</li>
  99. * <li>remove_read: Remove the message immediately upon digesting it.
  100. * Default is true.</li>
  101. * <li>include_keys: Include the message keys in the array of messages returned.
  102. * Default is false.</li>
  103. * </ul>
  104. *
  105. * @param array $options An array of general or protocol specific options.
  106. * @return mixed The resulting message from the register.
  107. */
  108. public function read(array $options = array()) {
  109. $this->__kill = false;
  110. $messages = array();
  111. $topicMessages = array();
  112. $msgLimit = isset($options['msg_limit']) ? intval($options['msg_limit']) : 5;
  113. $timeLimit = isset($options['time_limit']) ? intval($options['time_limit']) : ini_get('max_execution_time');
  114. $pollLimit = isset($options['poll_limit']) ? intval($options['poll_limit']) : 0;
  115. $pollInterval = isset($options['poll_interval']) ? intval($options['poll_interval']) : 0;
  116. $removeRead = isset($options['remove_read']) ? (boolean) $options['remove_read'] : true;
  117. $includeKeys = isset($options['include_keys']) ? (boolean) $options['include_keys'] : false;
  118. $startTime = microtime(true);
  119. $time = $timeLimit <= 0 ? -1 : $startTime;
  120. $expires = $startTime + $timeLimit;
  121. $msgCount = 0;
  122. $iteration = 0;
  123. while ($time < $expires && $msgCount < $msgLimit && !$this->__kill) {
  124. if ($iteration > 0) {
  125. if ($pollLimit > 0 && $iteration >= $pollLimit) {
  126. break;
  127. }
  128. if ($pollInterval > 0) sleep($pollInterval);
  129. }
  130. $iteration++;
  131. foreach ($this->subscriptions as $subIdx => $topic) {
  132. $topicMessages = array();
  133. $balance = $msgLimit - $msgCount;
  134. $args = array(
  135. &$this,
  136. $topic,
  137. dirname($topic) . '/',
  138. basename($topic),
  139. $balance,
  140. array('fetchMode' => PDO::FETCH_OBJ)
  141. );
  142. foreach ($this->modx->call('registry.db.modDbRegisterMessage', 'getValidMessages', $args) as $msg) {
  143. $newMsg = $this->_readMessage($msg, $removeRead);
  144. if ($newMsg !== null) {
  145. if (!$includeKeys) {
  146. $topicMessages[] = $newMsg;
  147. } else {
  148. $topicMessages[$msg->id] = $newMsg;
  149. }
  150. $msgCount++;
  151. } else {
  152. $this->modx->log(modX::LOG_LEVEL_INFO, 'Message was null or expired: ' . print_r($msg, 1));
  153. }
  154. if ($this->__kill) break;
  155. }
  156. }
  157. if (!empty($topicMessages)) {
  158. if (!$includeKeys) {
  159. $messages = $messages + $topicMessages;
  160. } else {
  161. $messages = array_merge($messages, $topicMessages);
  162. }
  163. }
  164. $time = microtime(true);
  165. }
  166. return $messages;
  167. }
  168. /**
  169. * Read a message record from the queue topic.
  170. *
  171. * @todo Implement support for reading various message types, other than
  172. * executable PHP format.
  173. *
  174. * @param object $obj The message data to read.
  175. * @param boolean $remove Indicates if the message should be deleted once it is read.
  176. * @return mixed The message returned
  177. */
  178. protected function _readMessage($obj, $remove = true) {
  179. $message = null;
  180. if (is_object($obj) && !empty($obj->payload)) {
  181. $message = eval($obj->payload);
  182. if ($remove || ($obj->expires > 1 && $obj->expires < time())) {
  183. $this->modx->removeObject('registry.db.modDbRegisterMessage', array('topic' => $obj->topic, 'id' => $obj->id));
  184. }
  185. if ($obj->kill) $this->__kill = true;
  186. }
  187. return $message;
  188. }
  189. /**
  190. * This implementation provides support for sending messages using either
  191. * time-based indexes so they are consumed in the order they are produced,
  192. * or named indexes typically used when consumers want to subscribe to a
  193. * specific, unique message. Individual messages or message collections
  194. * passed in numerically indexed arrays are treated as time-based messages
  195. * and message collections passed in associative arrays are treated as named
  196. * messages. e.g., to send a single message as named, wrap it in an array
  197. * with the intended message name as the key.
  198. *
  199. * This implementation also supports a message_type option to indicate the
  200. * format of the message being sent to the register. Currently only supports
  201. * executable PHP format.
  202. *
  203. * Other implementation specific options include:
  204. * <ul>
  205. * <li>delay: Number of seconds to delay the message. This option is only
  206. * supported for time-based messages.</li>
  207. * <li>ttl: Number of seconds the message is valid in the queue.
  208. * Default is forever or 0.</li>
  209. * <li>kill: Tells a message consumer to stop consuming any more
  210. * messages after reading any message sent with this option.</li>
  211. * </ul>
  212. *
  213. * @param string $topic A topic container in which to broadcast the message.
  214. * @param mixed $message A message, or collection of messages to be sent to
  215. * the register.
  216. * @param array $options An optional array of general or protocol
  217. * specific message properties.
  218. * @return boolean Indicates if the message was recorded.
  219. *
  220. * @todo Implement support for sending various message types, other than
  221. * executable PHP format.
  222. */
  223. public function send($topic, $message, array $options = array()) {
  224. $sent = false;
  225. if (empty($topic) || $topic[0] != '/') $topic = $this->_currentTopic . $topic;
  226. $topicIdx = array_search($topic, $this->subscriptions);
  227. $queueId = $this->_queue->get('id');
  228. if ($queueId && $topicIdx !== false) {
  229. $error = false;
  230. $messageType = isset($options['message_type']) ? $options['message_type'] : 'php';
  231. if (!$topicObj = $this->modx->getObject('registry.db.modDbRegisterTopic', array('queue' => $queueId, 'name' => $topic))) {
  232. $topicObj = $this->modx->newObject('registry.db.modDbRegisterTopic');
  233. $topicObj->set('queue', $queueId);
  234. $topicObj->set('name', $topic);
  235. $topicObj->set('created', strftime('%Y-%m-%d %H:%M:%S'));
  236. if (!$topicObj->save()) {
  237. $error = true;
  238. }
  239. }
  240. if (!$error) {
  241. if (!is_array($message)) {
  242. $message = array($message);
  243. }
  244. foreach ($message as $msgIdx => $msg) {
  245. $payload = '';
  246. if (is_scalar($msg) || is_array($msg) || is_object($msg)) {
  247. switch ($messageType) {
  248. //TODO: implement more message types
  249. case 'php' :
  250. default :
  251. $timestamp = isset($options['delay']) ? time() + intval($options['delay']) : time();
  252. $expires = isset($options['ttl']) && intval($options['ttl']) ? time() + intval($options['ttl']) : 0;
  253. $kill = isset($options['kill']) ? (boolean) $options['kill'] : false;
  254. if (!is_int($msgIdx)) {
  255. $msgKey = $msgIdx;
  256. } else {
  257. $msgKey = strftime('%Y%m%dT%H%M%S', $timestamp) . '-' . sprintf("%03d", $msgIdx);
  258. }
  259. if ($expires > 0) $payload.= "if (time() > {$expires}) return null;\n";
  260. $payload.= 'return ' . var_export($msg, true) . ";\n";
  261. $messageObj = $this->modx->getObject('registry.db.modDbRegisterMessage', array('topic' => $topicObj->get('id'), 'id' => $msgKey));
  262. if (!$messageObj) {
  263. $messageObj = $this->modx->newObject('registry.db.modDbRegisterMessage');
  264. $messageObj->set('topic', $topicObj->get('id'));
  265. $messageObj->set('id', $msgKey);
  266. }
  267. if ($messageObj) {
  268. $messageObj->set('created', strftime('%Y-%m-%d %H:%M:%S'));
  269. $messageObj->set('valid', strftime('%Y-%m-%d %H:%M:%S', $timestamp));
  270. $messageObj->set('expires', $expires);
  271. $messageObj->set('payload', $payload);
  272. $messageObj->set('kill', $kill);
  273. $sent = $messageObj->save();
  274. }
  275. }
  276. }
  277. }
  278. } else {
  279. $this->modx->log(modX::LOG_LEVEL_ERROR, "Could not send message to queue {$queueId}, topic {$topic}. Message payload is " . print_r($message, 1));
  280. }
  281. }
  282. if (!$sent) $this->modx->log(modX::LOG_LEVEL_ERROR, "Could not send message to queue {$queueId}, topic {$topic}. Message payload is " . print_r($message, 1));
  283. return $sent;
  284. }
  285. /**
  286. * Close the connection to the register service implementation.
  287. * @return boolean Indicates if the connection was closed successfully.
  288. */
  289. public function close() {
  290. return true;
  291. }
  292. }