Skip to content
Go back

Cleaning Up GitHub Notifications with Python

If you use GitHub heavily, you might find your notifications piling up with issues and pull requests that are already closed or merged. Manually clearing these notifications can be tedious and time-consuming.

To solve this, I created a Python script that:

This script can be run periodically or integrated into your workflow to keep your GitHub notifications manageable without manual effort.

Here’s the full script:

#!/usr/bin/env python3
"""
GitHub Notifications Cleanup Script
Fetches all GitHub notifications, checks if their associated issues/PRs are closed/merged,
and marks them as done by deleting them via the GitHub API.
"""
import argparse
import logging
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Tuple
import requests
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
class SubjectType(Enum):
"""Enum for GitHub notification subject types"""
PULL_REQUEST = "PullRequest"
ISSUE = "Issue"
RELEASE = "Release"
COMMIT = "Commit"
DISCUSSION = "Discussion"
OTHER = "Other"
@dataclass
class NotificationStats:
"""Statistics for the cleanup operation"""
total: int = 0
processed: int = 0
deleted: int = 0
errors: int = 0
skipped: int = 0
class GitHubNotificationCleaner:
"""Main class for managing GitHub notification cleanup"""
GITHUB_API_URL = "https://api.github.com"
DEFAULT_MAX_WORKERS = 5
DEFAULT_PER_PAGE = 100
RATE_LIMIT_THRESHOLD = 100 # Stop if fewer than this many requests remaining
def __init__(
self, token: str, dry_run: bool = False, max_workers: int = DEFAULT_MAX_WORKERS
):
"""
Initialize the GitHub notification cleaner.
Args:
token: GitHub API token
dry_run: If True, don't actually delete notifications
max_workers: Maximum number of parallel workers
"""
self.token = token
self.dry_run = dry_run
self.max_workers = max_workers
self.session = self._create_session()
self.stats = NotificationStats()
def _create_session(self) -> requests.Session:
"""Create a requests session with proper headers and retry configuration"""
session = requests.Session()
session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
)
# Add retry logic for transient failures
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
return session
def check_rate_limit(self) -> Tuple[int, int]:
"""
Check GitHub API rate limit status.
Returns:
Tuple of (remaining requests, reset timestamp)
"""
try:
response = self.session.get(f"{self.GITHUB_API_URL}/rate_limit")
response.raise_for_status()
data = response.json()
core = data["rate"]["core"]
return core["remaining"], core["reset"]
except Exception as e:
logger.warning(f"Failed to check rate limit: {e}")
return -1, -1
def wait_for_rate_limit_reset(self, reset_timestamp: int) -> None:
"""Wait until rate limit resets"""
reset_time = datetime.fromtimestamp(reset_timestamp)
wait_seconds = (reset_time - datetime.now()).total_seconds()
if wait_seconds > 0:
logger.info(
f"Rate limit exceeded. Waiting {wait_seconds:.0f} seconds until reset..."
)
time.sleep(wait_seconds + 1) # Add 1 second buffer
def get_all_notifications(
self, only_participating: bool = False, since: Optional[datetime] = None
) -> List[Dict]:
"""
Fetch all notifications with pagination and filtering options.
Args:
only_participating: Only fetch notifications where user is directly participating
since: Only fetch notifications updated after this time
Returns:
List of notification dictionaries
"""
notifications = []
params = {"all": "true", "per_page": self.DEFAULT_PER_PAGE}
if only_participating:
params["participating"] = "true"
if since:
params["since"] = since.isoformat() + "Z"
url = f"{self.GITHUB_API_URL}/notifications"
while url:
try:
# Check rate limit before making request
remaining, reset = self.check_rate_limit()
if 0 < remaining < self.RATE_LIMIT_THRESHOLD:
logger.warning(
f"Approaching rate limit ({remaining} requests remaining)"
)
if remaining < 10:
self.wait_for_rate_limit_reset(reset)
response = self.session.get(
url, params=params if not notifications else None
)
response.raise_for_status()
batch = response.json()
notifications.extend(batch)
logger.debug(f"Fetched {len(batch)} notifications from page")
# Get next page URL from Link header
url = self._get_next_page_url(response.headers.get("Link"))
params = None # Clear params for subsequent requests
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching notifications: {e}")
break
logger.info(f"Fetched {len(notifications)} total notifications")
return notifications
def _get_next_page_url(self, link_header: Optional[str]) -> Optional[str]:
"""Parse the Link header to get the next page URL"""
if not link_header:
return None
links = link_header.split(", ")
for link in links:
if 'rel="next"' in link:
return link[link.find("<") + 1 : link.find(">")]
return None
def is_closed_or_merged(self, notification: Dict) -> bool:
"""
Check if a notification's associated issue/PR is closed or merged.
Args:
notification: Notification dictionary
Returns:
True if the associated item is closed/merged
"""
subject = notification.get("subject", {})
subject_type = subject.get("type")
api_url = subject.get("url")
if not api_url:
logger.debug(f"No API URL for notification {notification.get('id')}")
return False
try:
response = self.session.get(api_url)
# Handle 404 - the issue/PR might have been deleted
if response.status_code == 404:
logger.info(
f"Issue/PR not found (404) for notification {notification.get('id')}"
)
return True # Consider deleted items as "closed"
response.raise_for_status()
data = response.json()
# Check based on subject type
if subject_type == SubjectType.PULL_REQUEST.value:
return (
data.get("merged_at") is not None or data.get("state") == "closed"
)
elif subject_type == SubjectType.ISSUE.value:
return data.get("state") == "closed"
elif subject_type == SubjectType.RELEASE.value:
return True # Releases are always "done" once created
elif subject_type == SubjectType.DISCUSSION.value:
return (
data.get("state") == "closed"
or data.get("answer_chosen_at") is not None
)
# For other types, we can't determine if they're "done"
return False
except requests.exceptions.RequestException as e:
logger.error(f"Error checking status for {api_url}: {e}")
return False
def delete_notification(self, thread_id: str) -> bool:
"""
Delete a notification (mark as done).
Args:
thread_id: The notification thread ID
Returns:
True if successfully deleted
"""
if self.dry_run:
logger.info(f"[DRY RUN] Would delete notification {thread_id}")
return True
url = f"{self.GITHUB_API_URL}/notifications/threads/{thread_id}"
try:
response = self.session.delete(url)
if response.status_code in [204, 205]: # 204 No Content, 205 Reset Content
logger.info(f"✅ Deleted notification {thread_id}")
return True
else:
logger.error(
f"❌ Failed to delete {thread_id} (HTTP {response.status_code})"
)
return False
except requests.exceptions.RequestException as e:
logger.error(f"Error deleting notification {thread_id}: {e}")
return False
def process_notification(self, notification: Dict) -> bool:
"""
Process a single notification (check if closed/merged, then delete).
Args:
notification: Notification dictionary
Returns:
True if notification was deleted
"""
thread_id = notification["id"]
title = notification["subject"]["title"]
subject_type = notification["subject"].get("type", "Unknown")
logger.debug(f"Processing {subject_type}: {title[:50]}...")
try:
if self.is_closed_or_merged(notification):
if self.delete_notification(thread_id):
self.stats.deleted += 1
return True
else:
self.stats.errors += 1
else:
self.stats.skipped += 1
logger.debug(f"Skipped {thread_id} (still open)")
except Exception as e:
logger.error(f"Unexpected error processing {thread_id}: {e}")
self.stats.errors += 1
self.stats.processed += 1
return False
def cleanup_notifications(self, notifications: List[Dict]) -> NotificationStats:
"""
Process all notifications in parallel.
Args:
notifications: List of notification dictionaries
Returns:
Statistics about the cleanup operation
"""
self.stats.total = len(notifications)
if not notifications:
logger.info("No notifications to process")
return self.stats
logger.info(
f"Processing {len(notifications)} notifications with {self.max_workers} workers..."
)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(self.process_notification, notif): notif
for notif in notifications
}
# Process with progress updates
for i, future in enumerate(as_completed(futures), 1):
try:
future.result()
if i % 10 == 0: # Progress update every 10 notifications
logger.info(f"Progress: {i}/{len(notifications)} processed")
except Exception as e:
logger.error(f"Worker exception: {e}")
self.stats.errors += 1
return self.stats
def print_summary(self) -> None:
"""Print a summary of the cleanup operation"""
logger.info("=" * 50)
logger.info("CLEANUP SUMMARY")
logger.info("=" * 50)
logger.info(f"Total notifications found: {self.stats.total}")
logger.info(f"Notifications processed: {self.stats.processed}")
logger.info(f"Notifications deleted: {self.stats.deleted}")
logger.info(f"Notifications skipped (still open): {self.stats.skipped}")
logger.info(f"Errors encountered: {self.stats.errors}")
if self.dry_run:
logger.info("[DRY RUN MODE - No actual deletions were made]")
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="Clean up GitHub notifications for closed/merged issues and PRs"
)
parser.add_argument(
"--token",
help="GitHub API token (or set GITHUB_TOKEN env var)",
default=os.environ.get("GITHUB_TOKEN"),
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Run without actually deleting notifications",
)
parser.add_argument(
"--workers", type=int, default=5, help="Number of parallel workers (default: 5)"
)
parser.add_argument(
"--participating",
action="store_true",
help="Only process notifications where you are directly participating",
)
parser.add_argument(
"--since-days", type=int, help="Only process notifications from the last N days"
)
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
return parser.parse_args()
def main():
"""Main entry point"""
args = parse_arguments()
# Set logging level
if args.verbose:
logger.setLevel(logging.DEBUG)
# Validate token
if not args.token:
logger.error(
"Missing GitHub token. Set GITHUB_TOKEN environment variable or use --token"
)
sys.exit(1)
# Calculate since date if specified
since = None
if args.since_days:
since = datetime.now() - timedelta(days=args.since_days)
logger.info(f"Processing notifications since {since.date()}")
# Create cleaner instance
cleaner = GitHubNotificationCleaner(
token=args.token, dry_run=args.dry_run, max_workers=args.workers
)
try:
# Fetch notifications
notifications = cleaner.get_all_notifications(
only_participating=args.participating, since=since
)
# Process notifications
if notifications:
cleaner.cleanup_notifications(notifications)
# Print summary
cleaner.print_summary()
except KeyboardInterrupt:
logger.info("\nOperation cancelled by user")
cleaner.print_summary()
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
view raw mark_as_done.py hosted with ❤ by GitHub

Note: This embedded Gist contains the most up-to-date version of the script, which may differ slightly from the explanation.

Basic Usage

To use the script, you can run commands like:

These options help tailor the cleanup process to your needs.

What’s New in This Version


Share this post on:

Previous Post
10 Git Commands Every Developer Should Know
Next Post
Gatsby Images – Adding to Your Blog