Schedule-Based Sync Integrations
Schedule-based integrations run on a cron schedule to poll external APIs and fetch new activities. This is ideal for services without webhooks or when you want to batch process data. This guide uses GitHub as the primary example.When to Use Schedule-Based Sync
Choose schedule-based integrations when:- The service doesn’t provide webhook events
- You want to batch process multiple API calls
- The service has rate limits that require careful timing
- Real-time sync isn’t critical (5-15 minute delays acceptable)
- You need to poll multiple endpoints and aggregate data
Architecture Overview
Copy
Cron Schedule (every 5-15 min)
↓
SYNC Event Triggered
↓
Fetch New Activities
↓
Compare with Last Sync Time
↓
Create Activity Messages
↓
Update State with New Sync Time
↓
Knowledge Graph
Key Concepts
State Management
Schedule-based integrations maintain state between runs:Copy
interface GitHubSettings {
lastSyncTime?: string; // Last time we synced notifications
lastUserEventTime?: string; // Last time we synced user events
username?: string; // GitHub username
}
state message:
Copy
return [
// ... activity messages
{
type: 'state',
data: {
lastSyncTime: new Date().toISOString(),
lastUserEventTime: new Date().toISOString(),
username: 'octocat'
}
}
];
Pagination
Handle large datasets by paginating through results:Copy
let page = 1;
let hasMorePages = true;
while (hasMorePages) {
const data = await fetchData(page);
if (!data || data.length === 0) {
hasMorePages = false;
break;
}
// Process data
for (const item of data) {
// Create activities
}
// Check if more pages exist
if (data.length < perPage) {
hasMorePages = false;
} else {
page++;
}
}
Incremental Sync
Only fetch new data since last sync:Copy
const lastSyncTime = state.lastSyncTime || getDefaultSyncTime(); // 24 hours ago
// Use lastSyncTime to filter API requests
const notifications = await fetch(
`https://api.github.com/notifications?since=${lastSyncTime}`
);
Full Example: GitHub Integration
spec.json
Copy
{
"name": "GitHub extension",
"key": "github",
"description": "Plan, track, and manage your projects in GitHub",
"icon": "github",
"schedule": {
"frequency": "*/5 * * * *" // Run every 5 minutes
},
"auth": {
"OAuth2": {
"token_url": "https://github.com/login/oauth/access_token",
"authorization_url": "https://github.com/login/oauth/authorize",
"scopes": [
"user",
"public_repo",
"repo",
"notifications",
"read:org"
],
"scope_separator": ","
}
},
"mcp": {
"type": "http",
"url": "https://api.githubcopilot.com/mcp/",
"headers": {
"Authorization": "Bearer ${config:access_token}",
"Content-Type": "application/json"
}
}
}
index.ts
Copy
import { handleSchedule } from './schedule';
import { integrationCreate } from './account-create';
import {
IntegrationCLI,
IntegrationEventPayload,
IntegrationEventType,
Spec,
} from '@redplanethq/sdk';
export async function run(eventPayload: IntegrationEventPayload) {
switch (eventPayload.event) {
case IntegrationEventType.SETUP:
return await integrationCreate(eventPayload.eventBody);
case IntegrationEventType.SYNC:
// Handle scheduled sync
return await handleSchedule(eventPayload.config, eventPayload.state);
default:
return { message: `Unknown event type: ${eventPayload.event}` };
}
}
class GitHubCLI extends IntegrationCLI {
constructor() {
super('github', '1.0.0');
}
protected async handleEvent(eventPayload: IntegrationEventPayload): Promise<any> {
return await run(eventPayload);
}
protected async getSpec(): Promise<Spec> {
return {
name: 'GitHub extension',
key: 'github',
// ... rest of spec
};
}
}
function main() {
const githubCLI = new GitHubCLI();
githubCLI.parse();
}
main();
schedule.ts
Copy
import { getUserEvents, getGithubData } from './utils';
interface GitHubActivityCreateParams {
text: string;
sourceURL: string;
}
interface GitHubSettings {
lastSyncTime?: string;
lastUserEventTime?: string;
username?: string;
}
function createActivityMessage(params: GitHubActivityCreateParams) {
return {
type: 'activity',
data: {
text: params.text,
sourceURL: params.sourceURL,
},
};
}
function getDefaultSyncTime(): string {
// Default to 24 hours ago
return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
}
async function processNotifications(
accessToken: string,
lastSyncTime: string,
username: string
): Promise<any[]> {
const activities = [];
// Filter for relevant notification reasons
const allowedReasons = [
'assign',
'review_requested',
'mention',
'state_change',
'author',
'comment',
'team_mention',
];
let page = 1;
let hasMorePages = true;
while (hasMorePages) {
try {
// Fetch notifications with pagination
const notifications = await getGithubData(
`https://api.github.com/notifications?page=${page}&per_page=50&all=true&since=${lastSyncTime}`,
accessToken
);
if (!notifications || notifications.length === 0) {
hasMorePages = false;
break;
}
// Check if more pages exist
if (notifications.length < 50) {
hasMorePages = false;
} else {
page++;
}
// Process each notification
for (const notification of notifications) {
try {
// Filter by allowed reasons
if (!allowedReasons.includes(notification.reason)) {
continue;
}
const repository = notification.repository;
const subject = notification.subject;
// Fetch detailed data for the notification
let githubData: any = {};
if (subject.url) {
try {
githubData = await getGithubData(subject.url, accessToken);
} catch (error) {
continue; // Skip if we can't fetch details
}
}
const url = githubData.html_url || notification.subject.url || '';
const isIssue = subject.type === 'Issue';
const isPR = subject.type === 'PullRequest';
let title = '';
// Build contextual activity based on notification reason
switch (notification.reason) {
case 'assign':
title = `${isIssue ? 'Issue' : 'PR'} #${githubData.number} assigned to ${username} in ${repository.full_name}: ${githubData.title}`;
break;
case 'review_requested':
title = `${username} requested to review PR #${githubData.number} in ${repository.full_name}: ${githubData.title}`;
break;
case 'mention':
title = `${githubData.user?.login} mentioned ${username} in ${repository.full_name} ${isIssue ? 'issue' : 'PR'} #${githubData.number}: ${githubData.body}`;
break;
case 'comment':
title = `${githubData.user?.login} commented on ${isIssue ? 'issue' : 'PR'} #${githubData.number} in ${repository.full_name}: ${githubData.body}`;
break;
case 'state_change':
const stateInfo = githubData.state ? `to ${githubData.state}` : '';
title = `${githubData.user?.login} changed ${isIssue ? 'issue' : 'PR'} #${githubData.number} state ${stateInfo} in ${repository.full_name}: ${githubData.title}`;
break;
default:
title = `GitHub notification for ${username} in ${repository.full_name}`;
break;
}
if (title && url) {
activities.push(createActivityMessage({
text: title,
sourceURL: url,
}));
}
} catch (error) {
// Silently skip errors for individual notifications
continue;
}
}
} catch (error) {
hasMorePages = false;
}
}
return activities;
}
async function processUserEvents(
username: string,
accessToken: string,
lastUserEventTime: string
): Promise<any[]> {
const activities = [];
let page = 1;
let hasMorePages = true;
while (hasMorePages) {
try {
// Fetch user events (PRs, issues they created)
const userEvents = await getUserEvents(username, page, accessToken, lastUserEventTime);
if (!userEvents || userEvents.length === 0) {
hasMorePages = false;
break;
}
if (userEvents.length < 30) {
hasMorePages = false;
} else {
page++;
}
for (const event of userEvents) {
try {
let title = '';
const sourceURL = event.html_url || '';
switch (event.type) {
case 'pr':
title = `${username} created PR #${event.number}: ${event.title}`;
break;
case 'issue':
title = `${username} created issue #${event.number}: ${event.title}`;
break;
case 'pr_comment':
title = `${username} commented on PR #${event.number}: ${event.title}`;
break;
case 'issue_comment':
title = `${username} commented on issue #${event.number}: ${event.title}`;
break;
default:
title = `GitHub activity: ${event.title || 'Unknown'}`;
break;
}
if (title && sourceURL) {
activities.push(createActivityMessage({
text: title,
sourceURL: sourceURL,
}));
}
} catch (error) {
continue;
}
}
} catch (error) {
hasMorePages = false;
}
}
return activities;
}
export async function handleSchedule(config: any, state: any) {
try {
const integrationConfiguration = config;
// Check for valid access token
if (!integrationConfiguration?.access_token) {
return [];
}
// Get settings or initialize
let settings = (state || {}) as GitHubSettings;
// Default to 24 hours ago if no last sync times
const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();
const lastUserEventTime = settings.lastUserEventTime || getDefaultSyncTime();
// Fetch user info to get username
let user;
try {
user = await getGithubData('https://api.github.com/user', integrationConfiguration.access_token);
} catch (error) {
return [];
}
if (!user) {
return [];
}
// Update username in settings
if (!settings.username && user.login) {
settings.username = user.login;
}
// Collect all messages
const messages = [];
// Process notifications
try {
const notificationActivities = await processNotifications(
integrationConfiguration.access_token,
lastSyncTime,
settings.username || 'user'
);
messages.push(...notificationActivities);
} catch (error) {
// Continue even if notifications fail
}
// Process user events
if (settings.username) {
try {
const userEventActivities = await processUserEvents(
settings.username,
integrationConfiguration.access_token,
lastUserEventTime
);
messages.push(...userEventActivities);
} catch (error) {
// Continue even if user events fail
}
}
// Update last sync times
const newSyncTime = new Date().toISOString();
// Add state message to persist settings
messages.push({
type: 'state',
data: {
...settings,
lastSyncTime: newSyncTime,
lastUserEventTime: newSyncTime,
},
});
return messages;
} catch (error) {
return [];
}
}
Best Practices
1. State Management
Always persist state to track sync progress:Copy
// Load state at start
let settings = (state || {}) as Settings;
const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();
// ... fetch and process data
// Save state at end
messages.push({
type: 'state',
data: {
...settings,
lastSyncTime: new Date().toISOString(),
}
});
2. Default Sync Window
On first sync, fetch last 24 hours of data:Copy
function getDefaultSyncTime(): string {
return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
}
const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();
3. Efficient Pagination
Stop paginating when no more data:Copy
while (hasMorePages) {
const data = await fetchData(page, perPage);
// No data left
if (!data || data.length === 0) {
hasMorePages = false;
break;
}
// Process data...
// Less than full page means no more data
if (data.length < perPage) {
hasMorePages = false;
} else {
page++;
}
}
4. Error Resilience
Continue processing even if individual items fail:Copy
for (const item of items) {
try {
// Process item
const activity = await processItem(item);
activities.push(activity);
} catch (error) {
// Log but continue
console.error('Failed to process item:', error);
continue; // Don't let one failure stop everything
}
}
5. Rate Limit Handling
Respect API rate limits:Copy
async function fetchWithRetry(url: string, accessToken: string, retries = 3) {
for (let i = 0; i < retries; i++) {
try {
const response = await fetch(url, {
headers: { Authorization: `Bearer ${accessToken}` }
});
if (response.status === 429) {
// Rate limited - wait and retry
const retryAfter = response.headers.get('Retry-After') || 60;
await sleep(parseInt(retryAfter) * 1000);
continue;
}
return await response.json();
} catch (error) {
if (i === retries - 1) throw error;
await sleep(1000 * (i + 1)); // Exponential backoff
}
}
}
6. Multiple Data Sources
Fetch from multiple endpoints and aggregate:Copy
const messages = [];
// Fetch from multiple sources in parallel
const [notifications, pullRequests, issues] = await Promise.all([
processNotifications(accessToken, lastSyncTime),
processPullRequests(accessToken, lastSyncTime),
processIssues(accessToken, lastSyncTime)
]);
// Aggregate all activities
messages.push(...notifications, ...pullRequests, ...issues);
// Add state
messages.push({ type: 'state', data: newState });
return messages;
7. Schedule Configuration
Choose appropriate sync frequency:Copy
{
"schedule": {
"frequency": "*/5 * * * *" // Every 5 minutes - real-time feel
"frequency": "*/15 * * * *" // Every 15 minutes - balance
"frequency": "0 * * * *" // Every hour - low volume
}
}
Handling Account Setup
Set initial sync schedule during OAuth:Copy
// account-create.ts
export async function integrationCreate(data: any) {
const { oauthResponse } = data;
const user = await getGithubData(
'https://api.github.com/user',
oauthResponse.access_token
);
return [{
type: 'account',
data: {
accountId: user.id.toString(),
config: {
access_token: oauthResponse.access_token,
refresh_token: oauthResponse.refresh_token,
mcp: { tokens: { access_token: oauthResponse.access_token } }
},
settings: {
username: user.login,
schedule: {
frequency: '*/15 * * * *' // Can customize per user
}
}
}
}];
}
Testing Schedule-Based Integrations
- Manual Trigger: Call SYNC event handler directly with test state
- Time Travel: Mock date/time to simulate different sync scenarios
- State Testing: Test with various lastSyncTime values
- Pagination: Test with large datasets requiring multiple pages
- Error Scenarios: Test with network failures, rate limits, invalid tokens
Performance Optimization
Batch API Calls
Copy
// Bad: Sequential calls
for (const id of ids) {
const data = await fetchData(id);
process(data);
}
// Good: Parallel calls with limit
const chunks = chunkArray(ids, 10); // Process 10 at a time
for (const chunk of chunks) {
const results = await Promise.all(chunk.map(id => fetchData(id)));
results.forEach(process);
}
Cache Frequently Used Data
Copy
const userCache = new Map();
async function getUserDetails(userId: string, accessToken: string) {
if (userCache.has(userId)) {
return userCache.get(userId);
}
const user = await fetch(`https://api.github.com/users/${userId}`, {
headers: { Authorization: `Bearer ${accessToken}` }
});
userCache.set(userId, user);
return user;
}
