Skip to main content

Schedule-Based Sync Integrations

Schedule-based integrations run on a cron schedule to poll external APIs and fetch new activities. This is ideal for services without webhooks or when you want to batch process data. This guide uses GitHub as the primary example.

When to Use Schedule-Based Sync

Choose schedule-based integrations when:
  • The service doesn’t provide webhook events
  • You want to batch process multiple API calls
  • The service has rate limits that require careful timing
  • Real-time sync isn’t critical (5-15 minute delays acceptable)
  • You need to poll multiple endpoints and aggregate data

Architecture Overview

Cron Schedule (every 5-15 min)

    SYNC Event Triggered

    Fetch New Activities

    Compare with Last Sync Time

    Create Activity Messages

    Update State with New Sync Time

    Knowledge Graph

Key Concepts

State Management

Schedule-based integrations maintain state between runs:
interface GitHubSettings {
  lastSyncTime?: string;           // Last time we synced notifications
  lastUserEventTime?: string;      // Last time we synced user events
  username?: string;               // GitHub username
}
State is persisted by returning a state message:
return [
  // ... activity messages
  {
    type: 'state',
    data: {
      lastSyncTime: new Date().toISOString(),
      lastUserEventTime: new Date().toISOString(),
      username: 'octocat'
    }
  }
];

Pagination

Handle large datasets by paginating through results:
let page = 1;
let hasMorePages = true;

while (hasMorePages) {
  const data = await fetchData(page);

  if (!data || data.length === 0) {
    hasMorePages = false;
    break;
  }

  // Process data
  for (const item of data) {
    // Create activities
  }

  // Check if more pages exist
  if (data.length < perPage) {
    hasMorePages = false;
  } else {
    page++;
  }
}

Incremental Sync

Only fetch new data since last sync:
const lastSyncTime = state.lastSyncTime || getDefaultSyncTime(); // 24 hours ago

// Use lastSyncTime to filter API requests
const notifications = await fetch(
  `https://api.github.com/notifications?since=${lastSyncTime}`
);

Full Example: GitHub Integration

spec.json

{
  "name": "GitHub extension",
  "key": "github",
  "description": "Plan, track, and manage your projects in GitHub",
  "icon": "github",

  "schedule": {
    "frequency": "*/5 * * * *"  // Run every 5 minutes
  },

  "auth": {
    "OAuth2": {
      "token_url": "https://github.com/login/oauth/access_token",
      "authorization_url": "https://github.com/login/oauth/authorize",
      "scopes": [
        "user",
        "public_repo",
        "repo",
        "notifications",
        "read:org"
      ],
      "scope_separator": ","
    }
  },

  "mcp": {
    "type": "http",
    "url": "https://api.githubcopilot.com/mcp/",
    "headers": {
      "Authorization": "Bearer ${config:access_token}",
      "Content-Type": "application/json"
    }
  }
}

index.ts

import { handleSchedule } from './schedule';
import { integrationCreate } from './account-create';
import {
  IntegrationCLI,
  IntegrationEventPayload,
  IntegrationEventType,
  Spec,
} from '@redplanethq/sdk';

export async function run(eventPayload: IntegrationEventPayload) {
  switch (eventPayload.event) {
    case IntegrationEventType.SETUP:
      return await integrationCreate(eventPayload.eventBody);

    case IntegrationEventType.SYNC:
      // Handle scheduled sync
      return await handleSchedule(eventPayload.config, eventPayload.state);

    default:
      return { message: `Unknown event type: ${eventPayload.event}` };
  }
}

class GitHubCLI extends IntegrationCLI {
  constructor() {
    super('github', '1.0.0');
  }

  protected async handleEvent(eventPayload: IntegrationEventPayload): Promise<any> {
    return await run(eventPayload);
  }

  protected async getSpec(): Promise<Spec> {
    return {
      name: 'GitHub extension',
      key: 'github',
      // ... rest of spec
    };
  }
}

function main() {
  const githubCLI = new GitHubCLI();
  githubCLI.parse();
}

main();

schedule.ts

import { getUserEvents, getGithubData } from './utils';

interface GitHubActivityCreateParams {
  text: string;
  sourceURL: string;
}

interface GitHubSettings {
  lastSyncTime?: string;
  lastUserEventTime?: string;
  username?: string;
}

function createActivityMessage(params: GitHubActivityCreateParams) {
  return {
    type: 'activity',
    data: {
      text: params.text,
      sourceURL: params.sourceURL,
    },
  };
}

function getDefaultSyncTime(): string {
  // Default to 24 hours ago
  return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
}

async function processNotifications(
  accessToken: string,
  lastSyncTime: string,
  username: string
): Promise<any[]> {
  const activities = [];

  // Filter for relevant notification reasons
  const allowedReasons = [
    'assign',
    'review_requested',
    'mention',
    'state_change',
    'author',
    'comment',
    'team_mention',
  ];

  let page = 1;
  let hasMorePages = true;

  while (hasMorePages) {
    try {
      // Fetch notifications with pagination
      const notifications = await getGithubData(
        `https://api.github.com/notifications?page=${page}&per_page=50&all=true&since=${lastSyncTime}`,
        accessToken
      );

      if (!notifications || notifications.length === 0) {
        hasMorePages = false;
        break;
      }

      // Check if more pages exist
      if (notifications.length < 50) {
        hasMorePages = false;
      } else {
        page++;
      }

      // Process each notification
      for (const notification of notifications) {
        try {
          // Filter by allowed reasons
          if (!allowedReasons.includes(notification.reason)) {
            continue;
          }

          const repository = notification.repository;
          const subject = notification.subject;

          // Fetch detailed data for the notification
          let githubData: any = {};
          if (subject.url) {
            try {
              githubData = await getGithubData(subject.url, accessToken);
            } catch (error) {
              continue; // Skip if we can't fetch details
            }
          }

          const url = githubData.html_url || notification.subject.url || '';
          const isIssue = subject.type === 'Issue';
          const isPR = subject.type === 'PullRequest';

          let title = '';

          // Build contextual activity based on notification reason
          switch (notification.reason) {
            case 'assign':
              title = `${isIssue ? 'Issue' : 'PR'} #${githubData.number} assigned to ${username} in ${repository.full_name}: ${githubData.title}`;
              break;

            case 'review_requested':
              title = `${username} requested to review PR #${githubData.number} in ${repository.full_name}: ${githubData.title}`;
              break;

            case 'mention':
              title = `${githubData.user?.login} mentioned ${username} in ${repository.full_name} ${isIssue ? 'issue' : 'PR'} #${githubData.number}: ${githubData.body}`;
              break;

            case 'comment':
              title = `${githubData.user?.login} commented on ${isIssue ? 'issue' : 'PR'} #${githubData.number} in ${repository.full_name}: ${githubData.body}`;
              break;

            case 'state_change':
              const stateInfo = githubData.state ? `to ${githubData.state}` : '';
              title = `${githubData.user?.login} changed ${isIssue ? 'issue' : 'PR'} #${githubData.number} state ${stateInfo} in ${repository.full_name}: ${githubData.title}`;
              break;

            default:
              title = `GitHub notification for ${username} in ${repository.full_name}`;
              break;
          }

          if (title && url) {
            activities.push(createActivityMessage({
              text: title,
              sourceURL: url,
            }));
          }
        } catch (error) {
          // Silently skip errors for individual notifications
          continue;
        }
      }
    } catch (error) {
      hasMorePages = false;
    }
  }

  return activities;
}

async function processUserEvents(
  username: string,
  accessToken: string,
  lastUserEventTime: string
): Promise<any[]> {
  const activities = [];
  let page = 1;
  let hasMorePages = true;

  while (hasMorePages) {
    try {
      // Fetch user events (PRs, issues they created)
      const userEvents = await getUserEvents(username, page, accessToken, lastUserEventTime);

      if (!userEvents || userEvents.length === 0) {
        hasMorePages = false;
        break;
      }

      if (userEvents.length < 30) {
        hasMorePages = false;
      } else {
        page++;
      }

      for (const event of userEvents) {
        try {
          let title = '';
          const sourceURL = event.html_url || '';

          switch (event.type) {
            case 'pr':
              title = `${username} created PR #${event.number}: ${event.title}`;
              break;
            case 'issue':
              title = `${username} created issue #${event.number}: ${event.title}`;
              break;
            case 'pr_comment':
              title = `${username} commented on PR #${event.number}: ${event.title}`;
              break;
            case 'issue_comment':
              title = `${username} commented on issue #${event.number}: ${event.title}`;
              break;
            default:
              title = `GitHub activity: ${event.title || 'Unknown'}`;
              break;
          }

          if (title && sourceURL) {
            activities.push(createActivityMessage({
              text: title,
              sourceURL: sourceURL,
            }));
          }
        } catch (error) {
          continue;
        }
      }
    } catch (error) {
      hasMorePages = false;
    }
  }

  return activities;
}

export async function handleSchedule(config: any, state: any) {
  try {
    const integrationConfiguration = config;

    // Check for valid access token
    if (!integrationConfiguration?.access_token) {
      return [];
    }

    // Get settings or initialize
    let settings = (state || {}) as GitHubSettings;

    // Default to 24 hours ago if no last sync times
    const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();
    const lastUserEventTime = settings.lastUserEventTime || getDefaultSyncTime();

    // Fetch user info to get username
    let user;
    try {
      user = await getGithubData('https://api.github.com/user', integrationConfiguration.access_token);
    } catch (error) {
      return [];
    }

    if (!user) {
      return [];
    }

    // Update username in settings
    if (!settings.username && user.login) {
      settings.username = user.login;
    }

    // Collect all messages
    const messages = [];

    // Process notifications
    try {
      const notificationActivities = await processNotifications(
        integrationConfiguration.access_token,
        lastSyncTime,
        settings.username || 'user'
      );
      messages.push(...notificationActivities);
    } catch (error) {
      // Continue even if notifications fail
    }

    // Process user events
    if (settings.username) {
      try {
        const userEventActivities = await processUserEvents(
          settings.username,
          integrationConfiguration.access_token,
          lastUserEventTime
        );
        messages.push(...userEventActivities);
      } catch (error) {
        // Continue even if user events fail
      }
    }

    // Update last sync times
    const newSyncTime = new Date().toISOString();

    // Add state message to persist settings
    messages.push({
      type: 'state',
      data: {
        ...settings,
        lastSyncTime: newSyncTime,
        lastUserEventTime: newSyncTime,
      },
    });

    return messages;
  } catch (error) {
    return [];
  }
}

Best Practices

1. State Management

Always persist state to track sync progress:
// Load state at start
let settings = (state || {}) as Settings;
const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();

// ... fetch and process data

// Save state at end
messages.push({
  type: 'state',
  data: {
    ...settings,
    lastSyncTime: new Date().toISOString(),
  }
});

2. Default Sync Window

On first sync, fetch last 24 hours of data:
function getDefaultSyncTime(): string {
  return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
}

const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();

3. Efficient Pagination

Stop paginating when no more data:
while (hasMorePages) {
  const data = await fetchData(page, perPage);

  // No data left
  if (!data || data.length === 0) {
    hasMorePages = false;
    break;
  }

  // Process data...

  // Less than full page means no more data
  if (data.length < perPage) {
    hasMorePages = false;
  } else {
    page++;
  }
}

4. Error Resilience

Continue processing even if individual items fail:
for (const item of items) {
  try {
    // Process item
    const activity = await processItem(item);
    activities.push(activity);
  } catch (error) {
    // Log but continue
    console.error('Failed to process item:', error);
    continue; // Don't let one failure stop everything
  }
}

5. Rate Limit Handling

Respect API rate limits:
async function fetchWithRetry(url: string, accessToken: string, retries = 3) {
  for (let i = 0; i < retries; i++) {
    try {
      const response = await fetch(url, {
        headers: { Authorization: `Bearer ${accessToken}` }
      });

      if (response.status === 429) {
        // Rate limited - wait and retry
        const retryAfter = response.headers.get('Retry-After') || 60;
        await sleep(parseInt(retryAfter) * 1000);
        continue;
      }

      return await response.json();
    } catch (error) {
      if (i === retries - 1) throw error;
      await sleep(1000 * (i + 1)); // Exponential backoff
    }
  }
}

6. Multiple Data Sources

Fetch from multiple endpoints and aggregate:
const messages = [];

// Fetch from multiple sources in parallel
const [notifications, pullRequests, issues] = await Promise.all([
  processNotifications(accessToken, lastSyncTime),
  processPullRequests(accessToken, lastSyncTime),
  processIssues(accessToken, lastSyncTime)
]);

// Aggregate all activities
messages.push(...notifications, ...pullRequests, ...issues);

// Add state
messages.push({ type: 'state', data: newState });

return messages;

7. Schedule Configuration

Choose appropriate sync frequency:
{
  "schedule": {
    "frequency": "*/5 * * * *"   // Every 5 minutes - real-time feel
    "frequency": "*/15 * * * *"  // Every 15 minutes - balance
    "frequency": "0 * * * *"     // Every hour - low volume
  }
}

Handling Account Setup

Set initial sync schedule during OAuth:
// account-create.ts
export async function integrationCreate(data: any) {
  const { oauthResponse } = data;

  const user = await getGithubData(
    'https://api.github.com/user',
    oauthResponse.access_token
  );

  return [{
    type: 'account',
    data: {
      accountId: user.id.toString(),
      config: {
        access_token: oauthResponse.access_token,
        refresh_token: oauthResponse.refresh_token,
        mcp: { tokens: { access_token: oauthResponse.access_token } }
      },
      settings: {
        username: user.login,
        schedule: {
          frequency: '*/15 * * * *' // Can customize per user
        }
      }
    }
  }];
}

Testing Schedule-Based Integrations

  1. Manual Trigger: Call SYNC event handler directly with test state
  2. Time Travel: Mock date/time to simulate different sync scenarios
  3. State Testing: Test with various lastSyncTime values
  4. Pagination: Test with large datasets requiring multiple pages
  5. Error Scenarios: Test with network failures, rate limits, invalid tokens

Performance Optimization

Batch API Calls

// Bad: Sequential calls
for (const id of ids) {
  const data = await fetchData(id);
  process(data);
}

// Good: Parallel calls with limit
const chunks = chunkArray(ids, 10); // Process 10 at a time
for (const chunk of chunks) {
  const results = await Promise.all(chunk.map(id => fetchData(id)));
  results.forEach(process);
}

Cache Frequently Used Data

const userCache = new Map();

async function getUserDetails(userId: string, accessToken: string) {
  if (userCache.has(userId)) {
    return userCache.get(userId);
  }

  const user = await fetch(`https://api.github.com/users/${userId}`, {
    headers: { Authorization: `Bearer ${accessToken}` }
  });

  userCache.set(userId, user);
  return user;
}

Next Steps