index.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. require('dotenv').config();
  2. const Fastify = require('fastify');
  3. const { Queue, Worker, QueueEvents } = require('bullmq');
  4. const IORedis = require('ioredis');
  5. const axios = require('axios');
  6. const { getDb, connect } = require('./utils/MongoDBConnector');
  7. const REDIS_URL = process.env.REDIS_URL || 'redis://redis:6379';
  8. const PLATFORM_SERVICES = {
  9. twitter: process.env.TWITTER_SERVICE_URL || 'http://twitter:3001',
  10. linkedin: process.env.LINKEDIN_SERVICE_URL || 'http://linkedin:3002',
  11. mastodon: process.env.MASTODON_SERVICE_URL || 'http://mastodon:3003',
  12. bluesky: process.env.BLUESKY_SERVICE_URL || 'http://bluesky:3004',
  13. };
  14. const app = Fastify({ logger: false });
  15. let postQueue;
  16. let redis;
  17. // ─── Job Worker ──────────────────────────────────────────────────────────────
  18. async function processPostJob(job) {
  19. const { postId, content, platforms, media = [] } = job.data;
  20. console.log(`[Scheduler] Job ${job.id} çalışıyor: ${platforms.join(', ')}`);
  21. const db = await getDb();
  22. const results = {};
  23. for (const platform of platforms) {
  24. const serviceUrl = PLATFORM_SERVICES[platform];
  25. if (!serviceUrl) {
  26. results[platform] = { success: false, error: 'Bilinmeyen platform' };
  27. continue;
  28. }
  29. try {
  30. const response = await axios.post(`${serviceUrl}/post`, { content, media }, { timeout: 30000 });
  31. results[platform] = { success: true, ...response.data.result };
  32. } catch (err) {
  33. results[platform] = { success: false, error: err.message };
  34. }
  35. }
  36. // MongoDB güncelle
  37. await db.collection('posts').updateOne(
  38. { _id: postId },
  39. {
  40. $set: {
  41. status: Object.values(results).every((r) => r.success) ? 'published' : 'failed',
  42. publishedAt: new Date(),
  43. platformResults: results,
  44. },
  45. }
  46. );
  47. await db.collection('scheduled_jobs').updateOne(
  48. { bullJobId: String(job.id) },
  49. {
  50. $set: {
  51. status: 'completed',
  52. completedAt: new Date(),
  53. },
  54. }
  55. );
  56. return results;
  57. }
  58. // ─── HTTP Endpoints ──────────────────────────────────────────────────────────
  59. app.get('/health', async () => ({ status: 'ok', service: 'scheduler' }));
  60. // Yeni zamanlanmış gönderi oluştur
  61. app.post('/schedule', async (request, reply) => {
  62. const { postId, content, platforms, scheduledAt, media = [] } = request.body;
  63. if (!content || !platforms?.length || !scheduledAt) {
  64. return reply.code(400).send({ error: 'content, platforms ve scheduledAt zorunlu' });
  65. }
  66. const delay = new Date(scheduledAt).getTime() - Date.now();
  67. if (delay < 0) {
  68. return reply.code(400).send({ error: 'scheduledAt geçmiş bir tarih olamaz' });
  69. }
  70. const job = await postQueue.add(
  71. 'scheduled-post',
  72. { postId, content, platforms, media },
  73. { delay, attempts: 3, backoff: { type: 'exponential', delay: 60000 } }
  74. );
  75. // MongoDB kayıt
  76. const db = await getDb();
  77. await db.collection('scheduled_jobs').insertOne({
  78. postId,
  79. type: 'one-time',
  80. scheduledAt: new Date(scheduledAt),
  81. platforms,
  82. status: 'pending',
  83. attempts: 0,
  84. maxAttempts: 3,
  85. bullJobId: String(job.id),
  86. createdAt: new Date(),
  87. });
  88. return { success: true, jobId: job.id, scheduledAt };
  89. });
  90. // Zamanlanmış görevleri listele
  91. app.get('/jobs', async (request) => {
  92. const { status = 'pending' } = request.query;
  93. const db = await getDb();
  94. const jobs = await db
  95. .collection('scheduled_jobs')
  96. .find({ status })
  97. .sort({ scheduledAt: 1 })
  98. .toArray();
  99. return { success: true, count: jobs.length, jobs };
  100. });
  101. // Görevi iptal et
  102. app.delete('/jobs/:jobId', async (request, reply) => {
  103. const { jobId } = request.params;
  104. const job = await postQueue.getJob(jobId);
  105. if (!job) return reply.code(404).send({ error: 'Job bulunamadı' });
  106. await job.remove();
  107. const db = await getDb();
  108. await db.collection('scheduled_jobs').updateOne(
  109. { bullJobId: jobId },
  110. { $set: { status: 'cancelled' } }
  111. );
  112. return { success: true, jobId };
  113. });
  114. // ─── Başlatma ────────────────────────────────────────────────────────────────
  115. async function start() {
  116. await connect();
  117. redis = new IORedis(REDIS_URL, { maxRetriesPerRequest: null });
  118. postQueue = new Queue('post-queue', { connection: redis });
  119. const worker = new Worker('post-queue', processPostJob, { connection: redis });
  120. worker.on('failed', (job, err) => {
  121. console.error(`[Scheduler] Job ${job?.id} başarısız:`, err.message);
  122. });
  123. await app.listen({ port: process.env.PORT || 3011, host: '0.0.0.0' });
  124. console.log('[Scheduler] Started on port 3011');
  125. }
  126. start().catch(console.error);