index.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. require('dotenv').config();
  2. const Fastify = require('fastify');
  3. const { Queue, Worker, QueueEvents } = require('bullmq');
  4. const IORedis = require('ioredis');
  5. const axios = require('axios');
  6. const { randomUUID } = require('crypto');
  7. const { getDb, connect } = require('./utils/MongoDBConnector');
  8. const { createLogger } = require('./utils/logger');
  9. const REDIS_URL = process.env.REDIS_URL || 'redis://redis:6379';
  10. const GATEWAY_URL = process.env.GATEWAY_URL || 'http://gateway:8084';
  11. const PLATFORM_SERVICES = {
  12. twitter: process.env.TWITTER_SERVICE_URL || 'http://twitter:3001',
  13. linkedin: process.env.LINKEDIN_SERVICE_URL || 'http://linkedin:3002',
  14. mastodon: process.env.MASTODON_SERVICE_URL || 'http://mastodon:3003',
  15. bluesky: process.env.BLUESKY_SERVICE_URL || 'http://bluesky:3004',
  16. instagram: process.env.INSTAGRAM_SERVICE_URL || 'http://instagram:3005',
  17. facebook: process.env.FACEBOOK_SERVICE_URL || 'http://facebook:3006',
  18. pinterest: process.env.PINTEREST_SERVICE_URL || 'http://pinterest:3008',
  19. tiktok: process.env.TIKTOK_SERVICE_URL || 'http://tiktok:3007',
  20. };
  21. const log = createLogger('scheduler');
  22. const app = Fastify({ logger: log });
  23. let postQueue;
  24. let redis;
  25. // ─── Job Worker ──────────────────────────────────────────────────────────────
  26. async function processPostJob(job) {
  27. // destinations: [{ platform, accountId?, imageUrl?, videoUrl?, link? }]
  28. // Falls back to legacy { platforms: string[] } format
  29. const { postId, content, destinations, platforms, media = [], firstComment, workspaceId = 'default' } = job.data;
  30. // Ensure every post has a stable ID for analytics tracking
  31. const effectivePostId = postId || randomUUID();
  32. const destList = destinations || (platforms || []).map((p) => ({ platform: p }));
  33. log.info({ action: 'job_process', jobId: job.id, attempt: job.attemptsMade + 1, destinations: destList.map((d) => d.accountId ? `${d.platform}:${d.accountId}` : d.platform) });
  34. const db = await getDb();
  35. // Load any results already recorded from previous attempts so we can skip
  36. // destinations that already succeeded — preventing duplicate posts on retry.
  37. const existingPost = await db.collection('posts').findOne({ _id: effectivePostId }, { projection: { platformResults: 1 } });
  38. const results = { ...(existingPost?.platformResults || {}) };
  39. for (const dest of destList) {
  40. const { platform, accountId, imageUrl, videoUrl, link } = dest;
  41. const resultKey = accountId ? `${platform}:${accountId}` : platform;
  42. if (results[resultKey]?.success) {
  43. log.info({ action: 'job_skip_dest', jobId: job.id, destination: resultKey, reason: 'already_published' });
  44. continue;
  45. }
  46. const serviceUrl = PLATFORM_SERVICES[platform];
  47. if (!serviceUrl) {
  48. results[resultKey] = { success: false, error: 'Unknown platform' };
  49. continue;
  50. }
  51. try {
  52. const response = await axios.post(
  53. `${serviceUrl}/post`,
  54. { content, accountId, imageUrl, videoUrl, link, media, firstComment: firstComment?.trim() || undefined },
  55. { timeout: 30000, headers: { 'X-Workspace-Id': workspaceId } }
  56. );
  57. // response.data.result may be an array (multi-page services return [{ postId, ... }])
  58. // or a plain object. Normalise to a single object so results[key] is flat.
  59. const raw = response.data.result;
  60. const flat = Array.isArray(raw) ? (raw[0] || {}) : (raw || {});
  61. results[resultKey] = { success: true, ...flat };
  62. } catch (err) {
  63. const apiError = err.response?.data?.error || err.message;
  64. results[resultKey] = { success: false, error: apiError };
  65. }
  66. }
  67. const allOk = Object.values(results).every((r) => r.success);
  68. const anyOk = Object.values(results).some((r) => r.success);
  69. const postStatus = allOk ? 'published' : anyOk ? 'partial' : 'failed';
  70. await db.collection('posts').updateOne(
  71. { _id: effectivePostId },
  72. {
  73. $set: {
  74. content,
  75. destinations: destList,
  76. type: 'scheduled',
  77. status: postStatus,
  78. publishedAt: new Date(),
  79. platformResults: results,
  80. workspaceId,
  81. },
  82. $setOnInsert: { createdAt: new Date() },
  83. },
  84. { upsert: true }
  85. );
  86. await db.collection('scheduled_jobs').updateOne(
  87. { bullJobId: String(job.id) },
  88. {
  89. $set: {
  90. status: postStatus, // 'published' | 'partial' | 'failed'
  91. completedAt: new Date(),
  92. platformResults: results,
  93. },
  94. }
  95. );
  96. return results;
  97. }
  98. // ─── System Job Worker ────────────────────────────────────────────────────────
  99. async function processSystemJob(job) {
  100. if (job.name === 'meta-token-refresh') {
  101. log.info({ action: 'token_refresh', trigger: 'scheduled', outcome: 'start' });
  102. const res = await axios.post(`${GATEWAY_URL}/meta/token-refresh`, {}, { timeout: 60000 });
  103. log.info({ action: 'token_refresh', trigger: 'scheduled', outcome: 'success', refreshed: res.data.refreshed, skipped: res.data.skipped, errors: res.data.errors });
  104. return res.data;
  105. }
  106. if (job.name === 'metrics-crawl') {
  107. log.info({ action: 'metrics_crawl', trigger: 'scheduled', outcome: 'start' });
  108. const res = await axios.post(`${GATEWAY_URL}/analytics/crawl`, {}, { timeout: 120000 });
  109. log.info({ action: 'metrics_crawl', trigger: 'scheduled', outcome: 'success', total: res.data.total });
  110. return res.data;
  111. }
  112. if (job.name === 'competitor-scrape') {
  113. log.info({ action: 'competitor_scrape', trigger: 'scheduled', outcome: 'start' });
  114. const res = await axios.post(`${GATEWAY_URL}/competitors/scrape-all`, {}, { timeout: 120000 });
  115. log.info({ action: 'competitor_scrape', trigger: 'scheduled', outcome: 'success', results: res.data.results?.length });
  116. return res.data;
  117. }
  118. }
  119. // ─── HTTP Endpoints ──────────────────────────────────────────────────────────
  120. app.get('/health', async () => ({ status: 'ok', service: 'scheduler' }));
  121. // Create a scheduled post.
  122. // Body: { content, scheduledAt, destinations: [{ platform, accountId?, imageUrl?, videoUrl?, link? }] }
  123. // Legacy { platforms: string[] } still accepted for backwards compatibility.
  124. app.post('/schedule', async (request, reply) => {
  125. const { postId, content, destinations, platforms, scheduledAt, media = [], firstComment } = request.body;
  126. const workspaceId = request.headers['x-workspace-id'] || 'default';
  127. const destList = destinations || (platforms || []).map((p) => ({ platform: p }));
  128. if (!content || !destList.length || !scheduledAt) {
  129. return reply.code(400).send({ error: 'content, destinations, and scheduledAt are required' });
  130. }
  131. const delay = new Date(scheduledAt).getTime() - Date.now();
  132. if (delay < 0) {
  133. return reply.code(400).send({ error: 'scheduledAt must be in the future' });
  134. }
  135. const job = await postQueue.add(
  136. 'scheduled-post',
  137. { postId, content, destinations: destList, media, firstComment: firstComment?.trim() || undefined, workspaceId },
  138. { delay, attempts: 3, backoff: { type: 'exponential', delay: 60000 } }
  139. );
  140. const db = await getDb();
  141. await db.collection('scheduled_jobs').insertOne({
  142. postId,
  143. type: 'one-time',
  144. content,
  145. scheduledAt: new Date(scheduledAt),
  146. destinations: destList,
  147. status: 'pending',
  148. attempts: 0,
  149. maxAttempts: 3,
  150. bullJobId: String(job.id),
  151. workspaceId,
  152. createdAt: new Date(),
  153. });
  154. return { success: true, jobId: job.id, scheduledAt };
  155. });
  156. // Zamanlanmış görevleri listele
  157. app.get('/jobs', async (request) => {
  158. const { status = 'pending' } = request.query;
  159. const workspaceId = request.headers['x-workspace-id'] || 'default';
  160. const db = await getDb();
  161. // Include legacy jobs without workspaceId (backwards compat)
  162. const filter = { status, $or: [{ workspaceId }, { workspaceId: { $exists: false } }] };
  163. const jobs = await db
  164. .collection('scheduled_jobs')
  165. .find(filter)
  166. .sort({ scheduledAt: 1 })
  167. .toArray();
  168. return { success: true, count: jobs.length, jobs };
  169. });
  170. // Görevi iptal et
  171. app.delete('/jobs/:jobId', async (request, reply) => {
  172. const { jobId } = request.params;
  173. const workspaceId = request.headers['x-workspace-id'] || 'default';
  174. const db = await getDb();
  175. const jobDoc = await db.collection('scheduled_jobs').findOne({
  176. bullJobId: jobId,
  177. $or: [{ workspaceId }, { workspaceId: { $exists: false } }],
  178. });
  179. if (!jobDoc) return reply.code(404).send({ error: 'Job bulunamadı' });
  180. const job = await postQueue.getJob(jobId);
  181. if (job) await job.remove();
  182. await db.collection('scheduled_jobs').updateOne(
  183. { bullJobId: jobId },
  184. { $set: { status: 'cancelled' } }
  185. );
  186. return { success: true, jobId };
  187. });
  188. // ─── Başlatma ────────────────────────────────────────────────────────────────
  189. async function start() {
  190. await connect();
  191. redis = new IORedis(REDIS_URL, { maxRetriesPerRequest: null });
  192. postQueue = new Queue('post-queue', { connection: redis });
  193. const worker = new Worker('post-queue', processPostJob, { connection: redis });
  194. worker.on('failed', async (job, err) => {
  195. log.error({ action: 'job_process', jobId: job?.id, outcome: 'failure', err: err.message });
  196. if (job?.id) {
  197. try {
  198. const db = await getDb();
  199. await db.collection('scheduled_jobs').updateOne(
  200. { bullJobId: String(job.id), status: 'pending' },
  201. { $set: { status: 'failed', failedAt: new Date(), failReason: err.message } }
  202. );
  203. } catch (_) { /* non-fatal */ }
  204. }
  205. });
  206. // Daily system jobs (housekeeping, token refresh, etc.)
  207. const systemQueue = new Queue('system-queue', { connection: redis });
  208. const systemWorker = new Worker('system-queue', processSystemJob, { connection: redis });
  209. systemWorker.on('failed', (job, err) => {
  210. log.error({ action: 'system_job', jobId: job?.id, jobName: job?.name, outcome: 'failure', err: err.message });
  211. });
  212. // Register daily system jobs — BullMQ deduplicates by repeat key on restart
  213. await systemQueue.add(
  214. 'meta-token-refresh',
  215. {},
  216. { repeat: { every: 24 * 60 * 60 * 1000 }, removeOnComplete: 5, removeOnFail: 5 }
  217. );
  218. log.info({ action: 'system_job_register', job: 'meta-token-refresh', interval: '24h', outcome: 'success' });
  219. await systemQueue.add(
  220. 'metrics-crawl',
  221. {},
  222. { repeat: { every: 24 * 60 * 60 * 1000 }, removeOnComplete: 5, removeOnFail: 5 }
  223. );
  224. log.info({ action: 'system_job_register', job: 'metrics-crawl', interval: '24h', outcome: 'success' });
  225. await systemQueue.add(
  226. 'competitor-scrape',
  227. {},
  228. { repeat: { every: 7 * 24 * 60 * 60 * 1000 }, removeOnComplete: 5, removeOnFail: 5 }
  229. );
  230. log.info({ action: 'system_job_register', job: 'competitor-scrape', interval: '7d', outcome: 'success' });
  231. await app.listen({ port: process.env.PORT || 3011, host: '0.0.0.0' });
  232. log.info({ action: 'service_start', port: 3011, outcome: 'success' }, 'Scheduler started');
  233. }
  234. start().catch((err) => { log.error({ action: 'service_start', outcome: 'failure', err: err.message }); process.exit(1); });