mehmet.kirkoca 2 роки тому
батько
коміт
f3c3c1f9b5

+ 37 - 0
services/utils/RabbitMQConnector.js

@@ -0,0 +1,37 @@
+const amqp = require('amqplib');
+
+class RabbitMQConnector {
+  constructor() {
+    this.connection = null;
+    this.channel = null;
+  }
+
+  async connect() {
+    try {
+      this.connection = await amqp.connect('amqp://username:password@messageBroker');
+      this.channel = await this.connection.createChannel();
+      console.log('Connected to RabbitMQ');
+    } catch (error) {
+      console.error('Error connecting to RabbitMQ:', error);
+    }
+  }
+
+  async disconnect() {
+    try {
+      if (this.channel) {
+        await this.channel.close();
+        console.log('Channel closed');
+      }
+
+      if (this.connection) {
+        await this.connection.close();
+        console.log('Connection closed');
+      }
+    } catch (error) {
+      console.error('Error disconnecting from RabbitMQ:', error);
+      throw error;
+    }
+  }
+}
+
+module.exports = RabbitMQConnector;

+ 35 - 3
services/utils/RabbitMQListener.js

@@ -1,13 +1,45 @@
-const RabbitMQWrapper = require('./RabbitMQWrapper');
+const RabbitMQConnector = require('./RabbitMQConnector');
 
 
 class RabbitMQListener {
 class RabbitMQListener {
   constructor() {
   constructor() {
-    this.rabbitmq = new RabbitMQWrapper();
+    this.rabbitmqConnector = new RabbitMQConnector();
+  }
+
+  async connect() { 
+    await this.rabbitmqConnector.connect();
+  }
+
+  async consumeFromQueue(queueName, callback) {
+    await this.connect();
+    try {
+      if (!this.rabbitmqConnector.channel) {
+        throw new Error('Channel is not initialized. Call connect() first.');
+      }
+
+      let result = await this.rabbitmqConnector.channel.assertQueue(queueName, { durable: true });
+      console.log('Queue asserted.',result);
+  
+      this.rabbitmqConnector.channel.consume(queueName, async (msg) => {
+        if (msg !== null) {
+          const content = msg.content.toString();
+          try {
+            await callback(content);
+            this.rabbitmqConnector.channel.ack(msg);
+          } catch (callbackError) {
+            console.error('Callback error:', callbackError);
+          }
+        }
+      });
+  
+      console.log(`Listening for messages on queue "${queueName}"`);
+    } catch (error) {
+      console.error('Error consuming messages:', error.message);
+    }
   }
   }
 
 
   async listenToQueue(queueName, callback) {
   async listenToQueue(queueName, callback) {
     try {
     try {
-      await this.rabbitmq.consumeFromQueue(queueName, async (message) => {
+      await this.consumeFromQueue(queueName, async (message) => {
         await callback(message);
         await callback(message);
       });
       });
     } catch (error) {
     } catch (error) {

+ 5 - 13
services/utils/RabbitMQProducer.js

@@ -1,28 +1,20 @@
-const amqp = require('amqplib');
+const RabbitMQConnector = require('./RabbitMQConnector');
 
 
 class RabbitMQProducer {
 class RabbitMQProducer {
   
   
   constructor() {
   constructor() {
-    this.connectionURL = 'amqp://username:password@messageBroker';
-    this.connection = null;
-    this.channel = null;
     this.connect();
     this.connect();
   }
   }
 
 
   async connect() {
   async connect() {
-    try {
-      this.connection = await amqp.connect(this.connectionURL);
-      this.channel = await this.connection.createChannel();
-      console.log('Connected to RabbitMQ');
-    } catch (error) {
-      console.error('Error connecting to RabbitMQ:', error);
-    }
+    this.rabbitmqConnector = new RabbitMQConnector();
+    this.rabbitmqConnector.connect();
   }
   }
 
 
   async publishToQueue(queueName, message) {
   async publishToQueue(queueName, message) {
     try {
     try {
-      await this.channel.assertQueue(queueName, { durable: true });
-      this.channel.sendToQueue(queueName, Buffer.from(message));
+      await this.rabbitmqConnector.channel.assertQueue(queueName, { durable: true });
+      this.rabbitmqConnector.channel.sendToQueue(queueName, Buffer.from(message));
       console.log(`Published to ${queueName}: ${message}`);
       console.log(`Published to ${queueName}: ${message}`);
     } catch (error) {
     } catch (error) {
       console.error(`Error publishing to ${queueName}:`, error);
       console.error(`Error publishing to ${queueName}:`, error);

+ 0 - 48
services/utils/RabbitMQWrapper.js

@@ -1,48 +0,0 @@
-const amqp = require('amqplib');
-
-class RabbitMQWrapper {
-  constructor() {
-    this.connection = null;
-    this.channel = null;
-  }
-
-  async connect() {
-    try {
-      this.connection = await amqp.connect('amqp://username:password@messageBroker');
-      this.channel = await this.connection.createChannel();
-      console.log('Connected to RabbitMQ');
-    } catch (error) {
-      console.error('Error connecting to RabbitMQ:', error);
-    }
-  }
-
-  async consumeFromQueue(queueName, callback) {
-    await this.connect();
-    try {
-      if (!this.channel) {
-        throw new Error('Channel is not initialized. Call connect() first.');
-      }
-
-      let result = await this.channel.assertQueue(queueName, { durable: true });
-      console.log('Queue asserted.',result);
-  
-      this.channel.consume(queueName, async (msg) => {
-        if (msg !== null) {
-          const content = msg.content.toString();
-          try {
-            await callback(content);
-            this.channel.ack(msg);
-          } catch (callbackError) {
-            console.error('Callback error:', callbackError);
-          }
-        }
-      });
-  
-      console.log(`Listening for messages on queue "${queueName}"`);
-    } catch (error) {
-      console.error('Error consuming messages:', error.message);
-    }
-  }
-}
-
-module.exports = RabbitMQWrapper;