index.js 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. require('dotenv').config();
  2. const Fastify = require('fastify');
  3. const axios = require('axios');
  4. const cron = require('node-cron');
  5. const { getDb } = require('./utils/MongoDBConnector');
  6. const RabbitMQProducer = require('./utils/RabbitMQProducer');
  7. const FEED_REFRESH_INTERVAL = process.env.FEED_REFRESH_INTERVAL || '*/5 * * * *'; // Her 5 dakika
  8. // Platform servis URL'leri (docker network içinde)
  9. const PLATFORM_SERVICES = {
  10. twitter: process.env.TWITTER_SERVICE_URL || 'http://twitter:3001',
  11. linkedin: process.env.LINKEDIN_SERVICE_URL || 'http://linkedin:3002',
  12. mastodon: process.env.MASTODON_SERVICE_URL || 'http://mastodon:3003',
  13. bluesky: process.env.BLUESKY_SERVICE_URL || 'http://bluesky:3004',
  14. instagram: process.env.INSTAGRAM_SERVICE_URL || 'http://instagram:3005',
  15. facebook: process.env.FACEBOOK_SERVICE_URL || 'http://facebook:3006',
  16. };
  17. const app = Fastify({ logger: false });
  18. let producer;
  19. // ─── Feed Çekme ──────────────────────────────────────────────────────────────
  20. async function fetchPlatformFeed(platform, serviceUrl) {
  21. try {
  22. const response = await axios.get(`${serviceUrl}/feed`, { timeout: 15000 });
  23. const items = response.data.items || [];
  24. console.log(`[FeedAggregator] ${platform}: ${items.length} öğe çekildi`);
  25. // WebSocket üzerinden UI'ya bildir
  26. if (producer && items.length > 0) {
  27. await producer.sendMessage('feed.items', JSON.stringify({ platform, items, fetchedAt: new Date() }));
  28. }
  29. return items;
  30. } catch (err) {
  31. console.error(`[FeedAggregator] ${platform} feed hatası:`, err.message);
  32. return [];
  33. }
  34. }
  35. async function fetchAllFeeds() {
  36. console.log('[FeedAggregator] Tüm platformlardan feed çekiliyor...');
  37. const results = await Promise.allSettled(
  38. Object.entries(PLATFORM_SERVICES).map(([platform, url]) =>
  39. fetchPlatformFeed(platform, url)
  40. )
  41. );
  42. const summary = {};
  43. Object.keys(PLATFORM_SERVICES).forEach((platform, i) => {
  44. summary[platform] = results[i].status === 'fulfilled' ? results[i].value.length : 0;
  45. });
  46. console.log('[FeedAggregator] Tamamlandı:', summary);
  47. return summary;
  48. }
  49. // ─── HTTP Endpoints ──────────────────────────────────────────────────────────
  50. app.get('/health', async () => ({ status: 'ok', service: 'feed-aggregator' }));
  51. app.post('/fetch', async (request) => {
  52. const { platform } = request.body || {};
  53. if (platform && PLATFORM_SERVICES[platform]) {
  54. const items = await fetchPlatformFeed(platform, PLATFORM_SERVICES[platform]);
  55. return { success: true, platform, count: items.length };
  56. }
  57. const summary = await fetchAllFeeds();
  58. return { success: true, summary };
  59. });
  60. app.get('/feeds', async (request) => {
  61. const { platform, tag, limit = 50, skip = 0 } = request.query;
  62. const db = await getDb();
  63. const col = db.collection('feeds');
  64. const filter = {};
  65. if (platform) filter.platform = platform;
  66. if (tag) filter.tags = tag;
  67. const items = await col
  68. .find(filter)
  69. .sort({ createdAt: -1 })
  70. .skip(Number(skip))
  71. .limit(Number(limit))
  72. .toArray();
  73. return { success: true, count: items.length, items };
  74. });
  75. app.get('/platform-status', async () => {
  76. const statuses = await Promise.allSettled(
  77. Object.entries(PLATFORM_SERVICES).map(async ([platform, url]) => {
  78. const response = await axios.get(`${url}/status`, { timeout: 5000 });
  79. return { platform, ...response.data };
  80. })
  81. );
  82. return statuses.map((r, i) => {
  83. const platform = Object.keys(PLATFORM_SERVICES)[i];
  84. return r.status === 'fulfilled'
  85. ? r.value
  86. : { platform, connected: false, error: r.reason.message };
  87. });
  88. });
  89. // ─── Başlatma ────────────────────────────────────────────────────────────────
  90. async function start() {
  91. producer = new RabbitMQProducer();
  92. await producer.connect();
  93. // Periyodik feed yenileme
  94. cron.schedule(FEED_REFRESH_INTERVAL, () => {
  95. fetchAllFeeds().catch(console.error);
  96. });
  97. await app.listen({ port: process.env.PORT || 3010, host: '0.0.0.0' });
  98. console.log(`[FeedAggregator] Started. Cron: ${FEED_REFRESH_INTERVAL}`);
  99. // İlk çalıştırma
  100. setTimeout(() => fetchAllFeeds(), 5000);
  101. }
  102. start().catch(console.error);