index.js 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. require('dotenv').config();
  2. const Fastify = require('fastify');
  3. const axios = require('axios');
  4. const cron = require('node-cron');
  5. const { getDb } = require('./utils/MongoDBConnector');
  6. const RabbitMQProducer = require('./utils/RabbitMQProducer');
  7. const { createLogger } = require('./utils/logger');
  8. const FEED_REFRESH_INTERVAL = process.env.FEED_REFRESH_INTERVAL || '*/5 * * * *';
  9. const PLATFORM_SERVICES = {
  10. twitter: process.env.TWITTER_SERVICE_URL || 'http://twitter:3001',
  11. linkedin: process.env.LINKEDIN_SERVICE_URL || 'http://linkedin:3002',
  12. mastodon: process.env.MASTODON_SERVICE_URL || 'http://mastodon:3003',
  13. bluesky: process.env.BLUESKY_SERVICE_URL || 'http://bluesky:3004',
  14. instagram: process.env.INSTAGRAM_SERVICE_URL || 'http://instagram:3005',
  15. facebook: process.env.FACEBOOK_SERVICE_URL || 'http://facebook:3006',
  16. pinterest: process.env.PINTEREST_SERVICE_URL || 'http://pinterest:3008',
  17. tiktok: process.env.TIKTOK_SERVICE_URL || 'http://tiktok:3007',
  18. };
  19. const log = createLogger('feed-aggregator');
  20. const app = Fastify({ logger: log });
  21. let producer;
  22. // ─── Feed Çekme ──────────────────────────────────────────────────────────────
  23. async function fetchPlatformFeed(platform, serviceUrl, workspaceId = 'default') {
  24. try {
  25. const response = await axios.get(`${serviceUrl}/feed`, {
  26. timeout: 15000,
  27. headers: { 'X-Workspace-Id': workspaceId },
  28. });
  29. const items = response.data.items || [];
  30. log.info({ action: 'feed_fetch', platform, count: items.length, outcome: 'success' });
  31. // WebSocket üzerinden UI'ya bildir
  32. if (producer && items.length > 0) {
  33. await producer.sendMessage('feed.items', JSON.stringify({ platform, items, fetchedAt: new Date() }));
  34. }
  35. return items;
  36. } catch (err) {
  37. log.error({ action: 'feed_fetch', platform, outcome: 'failure', err: err.message });
  38. return [];
  39. }
  40. }
  41. // Resolve all workspace IDs so the cron refreshes feeds for every workspace,
  42. // not just 'default'. Falls back to ['default'] if the DB is unavailable.
  43. async function getAllWorkspaceIds() {
  44. try {
  45. const db = await getDb();
  46. const rows = await db.collection('workspaces').find({}, { projection: { _id: 1 } }).toArray();
  47. const ids = rows.map((r) => r._id).filter(Boolean);
  48. return ids.length ? ids : ['default'];
  49. } catch {
  50. return ['default'];
  51. }
  52. }
  53. async function fetchAllFeeds() {
  54. const workspaceIds = await getAllWorkspaceIds();
  55. log.info({ action: 'feed_fetch_all', workspaces: workspaceIds.length }, 'Fetching feeds for all workspaces');
  56. const summary = {};
  57. for (const wsId of workspaceIds) {
  58. const results = await Promise.allSettled(
  59. Object.entries(PLATFORM_SERVICES).map(([platform, url]) =>
  60. fetchPlatformFeed(platform, url, wsId)
  61. )
  62. );
  63. Object.keys(PLATFORM_SERVICES).forEach((platform, i) => {
  64. const count = results[i].status === 'fulfilled' ? results[i].value.length : 0;
  65. summary[`${wsId}:${platform}`] = count;
  66. });
  67. }
  68. log.info({ action: 'feed_fetch_all', outcome: 'success', summary });
  69. return summary;
  70. }
  71. // ─── HTTP Endpoints ──────────────────────────────────────────────────────────
  72. app.get('/health', async () => ({ status: 'ok', service: 'feed-aggregator' }));
  73. app.post('/fetch', async (request) => {
  74. const { platform } = request.body || {};
  75. const workspaceId = request.headers['x-workspace-id'] || 'default';
  76. if (platform && PLATFORM_SERVICES[platform]) {
  77. const items = await fetchPlatformFeed(platform, PLATFORM_SERVICES[platform], workspaceId);
  78. return { success: true, platform, count: items.length };
  79. }
  80. const summary = await fetchAllFeeds();
  81. return { success: true, summary };
  82. });
  83. app.get('/feeds', async (request) => {
  84. const { platform, tag, limit = 50, skip = 0 } = request.query;
  85. const workspaceId = request.headers['x-workspace-id'] || 'default';
  86. const db = await getDb();
  87. const col = db.collection('feeds');
  88. // Legacy items (no workspaceId field) only visible in the default workspace to
  89. // avoid cross-workspace feed bleed.
  90. const legacyClause = workspaceId === 'default' ? [{ workspaceId: { $exists: false } }] : [];
  91. const filter = { $or: [{ workspaceId }, ...legacyClause] };
  92. if (platform) filter.platform = platform;
  93. if (tag) filter.tags = tag;
  94. const items = await col
  95. .find(filter)
  96. .sort({ createdAt: -1 })
  97. .skip(Number(skip))
  98. .limit(Number(limit))
  99. .toArray();
  100. return { success: true, count: items.length, items };
  101. });
  102. app.get('/platform-status', async (request) => {
  103. const workspaceId = request.headers['x-workspace-id'] || 'default';
  104. const statuses = await Promise.allSettled(
  105. Object.entries(PLATFORM_SERVICES).map(async ([platform, url]) => {
  106. const response = await axios.get(`${url}/status`, {
  107. timeout: 5000,
  108. headers: { 'X-Workspace-Id': workspaceId },
  109. });
  110. return { platform, ...response.data };
  111. })
  112. );
  113. return statuses.map((r, i) => {
  114. const platform = Object.keys(PLATFORM_SERVICES)[i];
  115. return r.status === 'fulfilled'
  116. ? r.value
  117. : { platform, connected: false, error: r.reason.message };
  118. });
  119. });
  120. // ─── Başlatma ────────────────────────────────────────────────────────────────
  121. async function start() {
  122. producer = new RabbitMQProducer();
  123. await producer.connect();
  124. // Periyodik feed yenileme
  125. cron.schedule(FEED_REFRESH_INTERVAL, () => {
  126. fetchAllFeeds().catch((err) => log.error({ action: 'feed_fetch_all', outcome: 'failure', err: err.message }));
  127. });
  128. await app.listen({ port: process.env.PORT || 3010, host: '0.0.0.0' });
  129. log.info({ action: 'service_start', outcome: 'success', cronInterval: FEED_REFRESH_INTERVAL }, 'Feed aggregator started');
  130. // İlk çalıştırma
  131. setTimeout(() => fetchAllFeeds(), 5000);
  132. }
  133. start().catch((err) => { log.error({ action: 'service_start', outcome: 'failure', err: err.message }); process.exit(1); });