index.js 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. require('dotenv').config();
  2. const Fastify = require('fastify');
  3. const { Queue, Worker, QueueEvents } = require('bullmq');
  4. const IORedis = require('ioredis');
  5. const axios = require('axios');
  6. const { getDb, connect } = require('./utils/MongoDBConnector');
  7. const { createLogger } = require('./utils/logger');
  8. const REDIS_URL = process.env.REDIS_URL || 'redis://redis:6379';
  9. const PLATFORM_SERVICES = {
  10. twitter: process.env.TWITTER_SERVICE_URL || 'http://twitter:3001',
  11. linkedin: process.env.LINKEDIN_SERVICE_URL || 'http://linkedin:3002',
  12. mastodon: process.env.MASTODON_SERVICE_URL || 'http://mastodon:3003',
  13. bluesky: process.env.BLUESKY_SERVICE_URL || 'http://bluesky:3004',
  14. instagram: process.env.INSTAGRAM_SERVICE_URL || 'http://instagram:3005',
  15. facebook: process.env.FACEBOOK_SERVICE_URL || 'http://facebook:3006',
  16. };
  17. const log = createLogger('scheduler');
  18. const app = Fastify({ logger: log });
  19. let postQueue;
  20. let redis;
  21. // ─── Job Worker ──────────────────────────────────────────────────────────────
  22. async function processPostJob(job) {
  23. // destinations: [{ platform, accountId?, imageUrl?, videoUrl?, link? }]
  24. // Falls back to legacy { platforms: string[] } format
  25. const { postId, content, destinations, platforms, media = [] } = job.data;
  26. const destList = destinations || (platforms || []).map((p) => ({ platform: p }));
  27. log.info({ action: 'job_process', jobId: job.id, destinations: destList.map((d) => d.accountId ? `${d.platform}:${d.accountId}` : d.platform) });
  28. const db = await getDb();
  29. const results = {};
  30. for (const dest of destList) {
  31. const { platform, accountId, imageUrl, videoUrl, link } = dest;
  32. const resultKey = accountId ? `${platform}:${accountId}` : platform;
  33. const serviceUrl = PLATFORM_SERVICES[platform];
  34. if (!serviceUrl) {
  35. results[resultKey] = { success: false, error: 'Unknown platform' };
  36. continue;
  37. }
  38. try {
  39. const response = await axios.post(`${serviceUrl}/post`, { content, accountId, imageUrl, videoUrl, link, media }, { timeout: 30000 });
  40. results[resultKey] = { success: true, ...response.data.result };
  41. } catch (err) {
  42. results[resultKey] = { success: false, error: err.message };
  43. }
  44. }
  45. // MongoDB güncelle
  46. await db.collection('posts').updateOne(
  47. { _id: postId },
  48. {
  49. $set: {
  50. status: Object.values(results).every((r) => r.success) ? 'published' : 'failed',
  51. publishedAt: new Date(),
  52. platformResults: results,
  53. },
  54. }
  55. );
  56. await db.collection('scheduled_jobs').updateOne(
  57. { bullJobId: String(job.id) },
  58. {
  59. $set: {
  60. status: 'completed',
  61. completedAt: new Date(),
  62. },
  63. }
  64. );
  65. return results;
  66. }
  67. // ─── HTTP Endpoints ──────────────────────────────────────────────────────────
  68. app.get('/health', async () => ({ status: 'ok', service: 'scheduler' }));
  69. // Create a scheduled post.
  70. // Body: { content, scheduledAt, destinations: [{ platform, accountId?, imageUrl?, videoUrl?, link? }] }
  71. // Legacy { platforms: string[] } still accepted for backwards compatibility.
  72. app.post('/schedule', async (request, reply) => {
  73. const { postId, content, destinations, platforms, scheduledAt, media = [] } = request.body;
  74. const destList = destinations || (platforms || []).map((p) => ({ platform: p }));
  75. if (!content || !destList.length || !scheduledAt) {
  76. return reply.code(400).send({ error: 'content, destinations, and scheduledAt are required' });
  77. }
  78. const delay = new Date(scheduledAt).getTime() - Date.now();
  79. if (delay < 0) {
  80. return reply.code(400).send({ error: 'scheduledAt must be in the future' });
  81. }
  82. const job = await postQueue.add(
  83. 'scheduled-post',
  84. { postId, content, destinations: destList, media },
  85. { delay, attempts: 3, backoff: { type: 'exponential', delay: 60000 } }
  86. );
  87. const db = await getDb();
  88. await db.collection('scheduled_jobs').insertOne({
  89. postId,
  90. type: 'one-time',
  91. scheduledAt: new Date(scheduledAt),
  92. destinations: destList,
  93. status: 'pending',
  94. attempts: 0,
  95. maxAttempts: 3,
  96. bullJobId: String(job.id),
  97. createdAt: new Date(),
  98. });
  99. return { success: true, jobId: job.id, scheduledAt };
  100. });
  101. // Zamanlanmış görevleri listele
  102. app.get('/jobs', async (request) => {
  103. const { status = 'pending' } = request.query;
  104. const db = await getDb();
  105. const jobs = await db
  106. .collection('scheduled_jobs')
  107. .find({ status })
  108. .sort({ scheduledAt: 1 })
  109. .toArray();
  110. return { success: true, count: jobs.length, jobs };
  111. });
  112. // Görevi iptal et
  113. app.delete('/jobs/:jobId', async (request, reply) => {
  114. const { jobId } = request.params;
  115. const job = await postQueue.getJob(jobId);
  116. if (!job) return reply.code(404).send({ error: 'Job bulunamadı' });
  117. await job.remove();
  118. const db = await getDb();
  119. await db.collection('scheduled_jobs').updateOne(
  120. { bullJobId: jobId },
  121. { $set: { status: 'cancelled' } }
  122. );
  123. return { success: true, jobId };
  124. });
  125. // ─── Başlatma ────────────────────────────────────────────────────────────────
  126. async function start() {
  127. await connect();
  128. redis = new IORedis(REDIS_URL, { maxRetriesPerRequest: null });
  129. postQueue = new Queue('post-queue', { connection: redis });
  130. const worker = new Worker('post-queue', processPostJob, { connection: redis });
  131. worker.on('failed', (job, err) => {
  132. log.error({ action: 'job_process', jobId: job?.id, outcome: 'failure', err: err.message });
  133. });
  134. await app.listen({ port: process.env.PORT || 3011, host: '0.0.0.0' });
  135. log.info({ action: 'service_start', port: 3011, outcome: 'success' }, 'Scheduler started');
  136. }
  137. start().catch((err) => { log.error({ action: 'service_start', outcome: 'failure', err: err.message }); process.exit(1); });