RabbitMQWrapper.js 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. const amqp = require('amqplib');
  2. class RabbitMQWrapper {
  3. constructor() {
  4. this.connection = null;
  5. this.channel = null;
  6. }
  7. async connect() {
  8. try {
  9. this.connection = await amqp.connect('amqp://username:password@messageBroker');
  10. this.channel = await this.connection.createChannel();
  11. console.log('Connected to RabbitMQ');
  12. } catch (error) {
  13. console.error('Error connecting to RabbitMQ:', error);
  14. }
  15. }
  16. async consumeFromQueue(queueName, callback) {
  17. await this.connect();
  18. try {
  19. if (!this.channel) {
  20. throw new Error('Channel is not initialized. Call connect() first.');
  21. }
  22. let result = await this.channel.assertQueue(queueName, { durable: true });
  23. console.log('Queue asserted.',result);
  24. this.channel.consume(queueName, async (msg) => {
  25. if (msg !== null) {
  26. const content = msg.content.toString();
  27. try {
  28. await callback(content);
  29. this.channel.ack(msg);
  30. } catch (callbackError) {
  31. console.error('Callback error:', callbackError);
  32. }
  33. }
  34. });
  35. console.log(`Listening for messages on queue "${queueName}"`);
  36. } catch (error) {
  37. console.error('Error consuming messages:', error.message);
  38. }
  39. }
  40. }
  41. module.exports = RabbitMQWrapper;