| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- <?php
- /*
- * This file is part of MODX Revolution.
- *
- * Copyright (c) MODX, LLC. All Rights Reserved.
- *
- * For complete copyright and license information, see the COPYRIGHT and LICENSE
- * files found in the top-level directory of this distribution.
- */
- require_once dirname(__FILE__) . '/modregister.class.php';
- /**
- * A simple, database-based implementation of modRegister.
- *
- * This implementation does not address transactional conflicts and should be
- * used in non-critical processes that are easily recoverable.
- *
- * @package modx
- * @subpackage registry.db
- */
- class modDbRegister extends modRegister
- {
- /**
- * The queue object representing this modRegister instance.
- * @access protected
- * @var modDbRegisterQueue $_queue
- */
- protected $_queue = null;
- /**
- * Construct a new modDbRegister instance.
- *
- * @param modX &$modx A reference to the modX instance
- * @param string $key The key of the registry to load
- * @param array $options An array of options to set
- */
- function __construct(modX &$modx, $key, array $options = array()) {
- parent :: __construct($modx, $key, $options);
- $this->_queue = $this->_initQueue($key, $options);
- }
- /**
- * Initialize a new queue
- * @param string $key The new name of the queue
- * @param array $options An array of options
- * @return modDbRegisterQueue A reference to the new Queue object
- */
- protected function _initQueue($key, $options) {
- $queue = $this->modx->getObject('registry.db.modDbRegisterQueue', array(
- 'name' => $key
- ));
- if (!$queue) {
- $queue = $this->modx->newObject('registry.db.modDbRegisterQueue');
- $queue->set('name', $key);
- $queue->set('options', $options);
- } elseif (!empty($options)) {
- $queue->set('options', $options);
- }
- if ($queue && $queue->isDirty('options')) $queue->save();
- return $queue;
- }
- /**
- * Connect to the register service implementation. If we made it here, we connected fine.
- *
- * @param array $attributes A collection of attributes required for
- * connection to the register.
- * @return boolean Indicates if the connection was successful.
- */
- public function connect(array $attributes = array()) {
- return true;
- }
- /**
- * Clear the register messages.
- *
- * {@inheritdoc}
- */
- public function clear($topic)
- {
- $topicObject = $this->modx->getObject('registry.db.modDbRegisterTopic', array(
- 'queue' => $this->_queue->get('id'),
- 'name' => $topic
- ));
- if (!$topicObject) {
- return false;
- }
- return (bool) $this->modx->removeCollection('registry.db.modDbRegisterMessage', array(
- 'topic' => $topicObject->get('id')
- ));
- }
- /**
- * This implementation supports the following options and default behavior:
- * <ul>
- * <li>msg_limit: Only poll until the specified limit of messages has
- * been digested. Default is 5 messages.</li>
- * <li>time_limit: Poll for new messages for a specified number of
- * seconds. Default is the result of the php time_limit system variable.</li>
- * <li>poll_limit: Only poll for new subscriptions a specified number
- * of times. Default is unlimited.</li>
- * <li>poll_interval: Wait a specified number of seconds between each
- * additional polling iteration, after the initial one. Default is no
- * interval.</li>
- * <li>remove_read: Remove the message immediately upon digesting it.
- * Default is true.</li>
- * <li>include_keys: Include the message keys in the array of messages returned.
- * Default is false.</li>
- * </ul>
- *
- * @param array $options An array of general or protocol specific options.
- * @return mixed The resulting message from the register.
- */
- public function read(array $options = array()) {
- $this->__kill = false;
- $messages = array();
- $topicMessages = array();
- $msgLimit = isset($options['msg_limit']) ? intval($options['msg_limit']) : 5;
- $timeLimit = isset($options['time_limit']) ? intval($options['time_limit']) : ini_get('max_execution_time');
- $pollLimit = isset($options['poll_limit']) ? intval($options['poll_limit']) : 0;
- $pollInterval = isset($options['poll_interval']) ? intval($options['poll_interval']) : 0;
- $removeRead = isset($options['remove_read']) ? (boolean) $options['remove_read'] : true;
- $includeKeys = isset($options['include_keys']) ? (boolean) $options['include_keys'] : false;
- $startTime = microtime(true);
- $time = $timeLimit <= 0 ? -1 : $startTime;
- $expires = $startTime + $timeLimit;
- $msgCount = 0;
- $iteration = 0;
- while ($time < $expires && $msgCount < $msgLimit && !$this->__kill) {
- if ($iteration > 0) {
- if ($pollLimit > 0 && $iteration >= $pollLimit) {
- break;
- }
- if ($pollInterval > 0) sleep($pollInterval);
- }
- $iteration++;
- foreach ($this->subscriptions as $subIdx => $topic) {
- $topicMessages = array();
- $balance = $msgLimit - $msgCount;
- $args = array(
- &$this,
- $topic,
- dirname($topic) . '/',
- basename($topic),
- $balance,
- array('fetchMode' => PDO::FETCH_OBJ)
- );
- foreach ($this->modx->call('registry.db.modDbRegisterMessage', 'getValidMessages', $args) as $msg) {
- $newMsg = $this->_readMessage($msg, $removeRead);
- if ($newMsg !== null) {
- if (!$includeKeys) {
- $topicMessages[] = $newMsg;
- } else {
- $topicMessages[$msg->id] = $newMsg;
- }
- $msgCount++;
- } else {
- $this->modx->log(modX::LOG_LEVEL_INFO, 'Message was null or expired: ' . print_r($msg, 1));
- }
- if ($this->__kill) break;
- }
- }
- if (!empty($topicMessages)) {
- if (!$includeKeys) {
- $messages = $messages + $topicMessages;
- } else {
- $messages = array_merge($messages, $topicMessages);
- }
- }
- $time = microtime(true);
- }
- return $messages;
- }
- /**
- * Read a message record from the queue topic.
- *
- * @todo Implement support for reading various message types, other than
- * executable PHP format.
- *
- * @param object $obj The message data to read.
- * @param boolean $remove Indicates if the message should be deleted once it is read.
- * @return mixed The message returned
- */
- protected function _readMessage($obj, $remove = true) {
- $message = null;
- if (is_object($obj) && !empty($obj->payload)) {
- $message = eval($obj->payload);
- if ($remove || ($obj->expires > 1 && $obj->expires < time())) {
- $this->modx->removeObject('registry.db.modDbRegisterMessage', array('topic' => $obj->topic, 'id' => $obj->id));
- }
- if ($obj->kill) $this->__kill = true;
- }
- return $message;
- }
- /**
- * This implementation provides support for sending messages using either
- * time-based indexes so they are consumed in the order they are produced,
- * or named indexes typically used when consumers want to subscribe to a
- * specific, unique message. Individual messages or message collections
- * passed in numerically indexed arrays are treated as time-based messages
- * and message collections passed in associative arrays are treated as named
- * messages. e.g., to send a single message as named, wrap it in an array
- * with the intended message name as the key.
- *
- * This implementation also supports a message_type option to indicate the
- * format of the message being sent to the register. Currently only supports
- * executable PHP format.
- *
- * Other implementation specific options include:
- * <ul>
- * <li>delay: Number of seconds to delay the message. This option is only
- * supported for time-based messages.</li>
- * <li>ttl: Number of seconds the message is valid in the queue.
- * Default is forever or 0.</li>
- * <li>kill: Tells a message consumer to stop consuming any more
- * messages after reading any message sent with this option.</li>
- * </ul>
- *
- * @param string $topic A topic container in which to broadcast the message.
- * @param mixed $message A message, or collection of messages to be sent to
- * the register.
- * @param array $options An optional array of general or protocol
- * specific message properties.
- * @return boolean Indicates if the message was recorded.
- *
- * @todo Implement support for sending various message types, other than
- * executable PHP format.
- */
- public function send($topic, $message, array $options = array()) {
- $sent = false;
- if (empty($topic) || $topic[0] != '/') $topic = $this->_currentTopic . $topic;
- $topicIdx = array_search($topic, $this->subscriptions);
- $queueId = $this->_queue->get('id');
- if ($queueId && $topicIdx !== false) {
- $error = false;
- $messageType = isset($options['message_type']) ? $options['message_type'] : 'php';
- if (!$topicObj = $this->modx->getObject('registry.db.modDbRegisterTopic', array('queue' => $queueId, 'name' => $topic))) {
- $topicObj = $this->modx->newObject('registry.db.modDbRegisterTopic');
- $topicObj->set('queue', $queueId);
- $topicObj->set('name', $topic);
- $topicObj->set('created', strftime('%Y-%m-%d %H:%M:%S'));
- if (!$topicObj->save()) {
- $error = true;
- }
- }
- if (!$error) {
- if (!is_array($message)) {
- $message = array($message);
- }
- foreach ($message as $msgIdx => $msg) {
- $payload = '';
- if (is_scalar($msg) || is_array($msg) || is_object($msg)) {
- switch ($messageType) {
- //TODO: implement more message types
- case 'php' :
- default :
- $timestamp = isset($options['delay']) ? time() + intval($options['delay']) : time();
- $expires = isset($options['ttl']) && intval($options['ttl']) ? time() + intval($options['ttl']) : 0;
- $kill = isset($options['kill']) ? (boolean) $options['kill'] : false;
- if (!is_int($msgIdx)) {
- $msgKey = $msgIdx;
- } else {
- $msgKey = strftime('%Y%m%dT%H%M%S', $timestamp) . '-' . sprintf("%03d", $msgIdx);
- }
- if ($expires > 0) $payload.= "if (time() > {$expires}) return null;\n";
- $payload.= 'return ' . var_export($msg, true) . ";\n";
- $messageObj = $this->modx->getObject('registry.db.modDbRegisterMessage', array('topic' => $topicObj->get('id'), 'id' => $msgKey));
- if (!$messageObj) {
- $messageObj = $this->modx->newObject('registry.db.modDbRegisterMessage');
- $messageObj->set('topic', $topicObj->get('id'));
- $messageObj->set('id', $msgKey);
- }
- if ($messageObj) {
- $messageObj->set('created', strftime('%Y-%m-%d %H:%M:%S'));
- $messageObj->set('valid', strftime('%Y-%m-%d %H:%M:%S', $timestamp));
- $messageObj->set('expires', $expires);
- $messageObj->set('payload', $payload);
- $messageObj->set('kill', $kill);
- $sent = $messageObj->save();
- }
- }
- }
- }
- } else {
- $this->modx->log(modX::LOG_LEVEL_ERROR, "Could not send message to queue {$queueId}, topic {$topic}. Message payload is " . print_r($message, 1));
- }
- }
- if (!$sent) $this->modx->log(modX::LOG_LEVEL_ERROR, "Could not send message to queue {$queueId}, topic {$topic}. Message payload is " . print_r($message, 1));
- return $sent;
- }
- /**
- * Close the connection to the register service implementation.
- * @return boolean Indicates if the connection was closed successfully.
- */
- public function close() {
- return true;
- }
- }
|