If you use GitHub heavily, you might find your notifications piling up with issues and pull requests that are already closed or merged. Manually clearing these notifications can be tedious and time-consuming.
To solve this, I created a Python script that:
- Connects to your GitHub account using a personal access token.
- Scans your notifications for issues and pull requests that are already closed or merged.
- Automatically marks these notifications as done, helping you keep your inbox clean.
- Provides logs so you can track what was processed.
This script can be run periodically or integrated into your workflow to keep your GitHub notifications manageable without manual effort.
Here’s the full script:
#!/usr/bin/env python3 | |
""" | |
GitHub Notifications Cleanup Script | |
Fetches all GitHub notifications, checks if their associated issues/PRs are closed/merged, | |
and marks them as done by deleting them via the GitHub API. | |
""" | |
import argparse | |
import logging | |
import os | |
import sys | |
import time | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from dataclasses import dataclass | |
from datetime import datetime, timedelta | |
from enum import Enum | |
from typing import Dict, List, Optional, Tuple | |
import requests | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
) | |
logger = logging.getLogger(__name__) | |
class SubjectType(Enum): | |
"""Enum for GitHub notification subject types""" | |
PULL_REQUEST = "PullRequest" | |
ISSUE = "Issue" | |
RELEASE = "Release" | |
COMMIT = "Commit" | |
DISCUSSION = "Discussion" | |
OTHER = "Other" | |
@dataclass | |
class NotificationStats: | |
"""Statistics for the cleanup operation""" | |
total: int = 0 | |
processed: int = 0 | |
deleted: int = 0 | |
errors: int = 0 | |
skipped: int = 0 | |
class GitHubNotificationCleaner: | |
"""Main class for managing GitHub notification cleanup""" | |
GITHUB_API_URL = "https://api.github.com" | |
DEFAULT_MAX_WORKERS = 5 | |
DEFAULT_PER_PAGE = 100 | |
RATE_LIMIT_THRESHOLD = 100 # Stop if fewer than this many requests remaining | |
def __init__( | |
self, token: str, dry_run: bool = False, max_workers: int = DEFAULT_MAX_WORKERS | |
): | |
""" | |
Initialize the GitHub notification cleaner. | |
Args: | |
token: GitHub API token | |
dry_run: If True, don't actually delete notifications | |
max_workers: Maximum number of parallel workers | |
""" | |
self.token = token | |
self.dry_run = dry_run | |
self.max_workers = max_workers | |
self.session = self._create_session() | |
self.stats = NotificationStats() | |
def _create_session(self) -> requests.Session: | |
"""Create a requests session with proper headers and retry configuration""" | |
session = requests.Session() | |
session.headers.update( | |
{ | |
"Authorization": f"Bearer {self.token}", | |
"Accept": "application/vnd.github+json", | |
"X-GitHub-Api-Version": "2022-11-28", | |
} | |
) | |
# Add retry logic for transient failures | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry | |
retry_strategy = Retry( | |
total=3, | |
backoff_factor=1, | |
status_forcelist=[429, 500, 502, 503, 504], | |
) | |
adapter = HTTPAdapter(max_retries=retry_strategy) | |
session.mount("https://", adapter) | |
return session | |
def check_rate_limit(self) -> Tuple[int, int]: | |
""" | |
Check GitHub API rate limit status. | |
Returns: | |
Tuple of (remaining requests, reset timestamp) | |
""" | |
try: | |
response = self.session.get(f"{self.GITHUB_API_URL}/rate_limit") | |
response.raise_for_status() | |
data = response.json() | |
core = data["rate"]["core"] | |
return core["remaining"], core["reset"] | |
except Exception as e: | |
logger.warning(f"Failed to check rate limit: {e}") | |
return -1, -1 | |
def wait_for_rate_limit_reset(self, reset_timestamp: int) -> None: | |
"""Wait until rate limit resets""" | |
reset_time = datetime.fromtimestamp(reset_timestamp) | |
wait_seconds = (reset_time - datetime.now()).total_seconds() | |
if wait_seconds > 0: | |
logger.info( | |
f"Rate limit exceeded. Waiting {wait_seconds:.0f} seconds until reset..." | |
) | |
time.sleep(wait_seconds + 1) # Add 1 second buffer | |
def get_all_notifications( | |
self, only_participating: bool = False, since: Optional[datetime] = None | |
) -> List[Dict]: | |
""" | |
Fetch all notifications with pagination and filtering options. | |
Args: | |
only_participating: Only fetch notifications where user is directly participating | |
since: Only fetch notifications updated after this time | |
Returns: | |
List of notification dictionaries | |
""" | |
notifications = [] | |
params = {"all": "true", "per_page": self.DEFAULT_PER_PAGE} | |
if only_participating: | |
params["participating"] = "true" | |
if since: | |
params["since"] = since.isoformat() + "Z" | |
url = f"{self.GITHUB_API_URL}/notifications" | |
while url: | |
try: | |
# Check rate limit before making request | |
remaining, reset = self.check_rate_limit() | |
if 0 < remaining < self.RATE_LIMIT_THRESHOLD: | |
logger.warning( | |
f"Approaching rate limit ({remaining} requests remaining)" | |
) | |
if remaining < 10: | |
self.wait_for_rate_limit_reset(reset) | |
response = self.session.get( | |
url, params=params if not notifications else None | |
) | |
response.raise_for_status() | |
batch = response.json() | |
notifications.extend(batch) | |
logger.debug(f"Fetched {len(batch)} notifications from page") | |
# Get next page URL from Link header | |
url = self._get_next_page_url(response.headers.get("Link")) | |
params = None # Clear params for subsequent requests | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error fetching notifications: {e}") | |
break | |
logger.info(f"Fetched {len(notifications)} total notifications") | |
return notifications | |
def _get_next_page_url(self, link_header: Optional[str]) -> Optional[str]: | |
"""Parse the Link header to get the next page URL""" | |
if not link_header: | |
return None | |
links = link_header.split(", ") | |
for link in links: | |
if 'rel="next"' in link: | |
return link[link.find("<") + 1 : link.find(">")] | |
return None | |
def is_closed_or_merged(self, notification: Dict) -> bool: | |
""" | |
Check if a notification's associated issue/PR is closed or merged. | |
Args: | |
notification: Notification dictionary | |
Returns: | |
True if the associated item is closed/merged | |
""" | |
subject = notification.get("subject", {}) | |
subject_type = subject.get("type") | |
api_url = subject.get("url") | |
if not api_url: | |
logger.debug(f"No API URL for notification {notification.get('id')}") | |
return False | |
try: | |
response = self.session.get(api_url) | |
# Handle 404 - the issue/PR might have been deleted | |
if response.status_code == 404: | |
logger.info( | |
f"Issue/PR not found (404) for notification {notification.get('id')}" | |
) | |
return True # Consider deleted items as "closed" | |
response.raise_for_status() | |
data = response.json() | |
# Check based on subject type | |
if subject_type == SubjectType.PULL_REQUEST.value: | |
return ( | |
data.get("merged_at") is not None or data.get("state") == "closed" | |
) | |
elif subject_type == SubjectType.ISSUE.value: | |
return data.get("state") == "closed" | |
elif subject_type == SubjectType.RELEASE.value: | |
return True # Releases are always "done" once created | |
elif subject_type == SubjectType.DISCUSSION.value: | |
return ( | |
data.get("state") == "closed" | |
or data.get("answer_chosen_at") is not None | |
) | |
# For other types, we can't determine if they're "done" | |
return False | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error checking status for {api_url}: {e}") | |
return False | |
def delete_notification(self, thread_id: str) -> bool: | |
""" | |
Delete a notification (mark as done). | |
Args: | |
thread_id: The notification thread ID | |
Returns: | |
True if successfully deleted | |
""" | |
if self.dry_run: | |
logger.info(f"[DRY RUN] Would delete notification {thread_id}") | |
return True | |
url = f"{self.GITHUB_API_URL}/notifications/threads/{thread_id}" | |
try: | |
response = self.session.delete(url) | |
if response.status_code in [204, 205]: # 204 No Content, 205 Reset Content | |
logger.info(f"✅ Deleted notification {thread_id}") | |
return True | |
else: | |
logger.error( | |
f"❌ Failed to delete {thread_id} (HTTP {response.status_code})" | |
) | |
return False | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error deleting notification {thread_id}: {e}") | |
return False | |
def process_notification(self, notification: Dict) -> bool: | |
""" | |
Process a single notification (check if closed/merged, then delete). | |
Args: | |
notification: Notification dictionary | |
Returns: | |
True if notification was deleted | |
""" | |
thread_id = notification["id"] | |
title = notification["subject"]["title"] | |
subject_type = notification["subject"].get("type", "Unknown") | |
logger.debug(f"Processing {subject_type}: {title[:50]}...") | |
try: | |
if self.is_closed_or_merged(notification): | |
if self.delete_notification(thread_id): | |
self.stats.deleted += 1 | |
return True | |
else: | |
self.stats.errors += 1 | |
else: | |
self.stats.skipped += 1 | |
logger.debug(f"Skipped {thread_id} (still open)") | |
except Exception as e: | |
logger.error(f"Unexpected error processing {thread_id}: {e}") | |
self.stats.errors += 1 | |
self.stats.processed += 1 | |
return False | |
def cleanup_notifications(self, notifications: List[Dict]) -> NotificationStats: | |
""" | |
Process all notifications in parallel. | |
Args: | |
notifications: List of notification dictionaries | |
Returns: | |
Statistics about the cleanup operation | |
""" | |
self.stats.total = len(notifications) | |
if not notifications: | |
logger.info("No notifications to process") | |
return self.stats | |
logger.info( | |
f"Processing {len(notifications)} notifications with {self.max_workers} workers..." | |
) | |
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
futures = { | |
executor.submit(self.process_notification, notif): notif | |
for notif in notifications | |
} | |
# Process with progress updates | |
for i, future in enumerate(as_completed(futures), 1): | |
try: | |
future.result() | |
if i % 10 == 0: # Progress update every 10 notifications | |
logger.info(f"Progress: {i}/{len(notifications)} processed") | |
except Exception as e: | |
logger.error(f"Worker exception: {e}") | |
self.stats.errors += 1 | |
return self.stats | |
def print_summary(self) -> None: | |
"""Print a summary of the cleanup operation""" | |
logger.info("=" * 50) | |
logger.info("CLEANUP SUMMARY") | |
logger.info("=" * 50) | |
logger.info(f"Total notifications found: {self.stats.total}") | |
logger.info(f"Notifications processed: {self.stats.processed}") | |
logger.info(f"Notifications deleted: {self.stats.deleted}") | |
logger.info(f"Notifications skipped (still open): {self.stats.skipped}") | |
logger.info(f"Errors encountered: {self.stats.errors}") | |
if self.dry_run: | |
logger.info("[DRY RUN MODE - No actual deletions were made]") | |
def parse_arguments(): | |
"""Parse command line arguments""" | |
parser = argparse.ArgumentParser( | |
description="Clean up GitHub notifications for closed/merged issues and PRs" | |
) | |
parser.add_argument( | |
"--token", | |
help="GitHub API token (or set GITHUB_TOKEN env var)", | |
default=os.environ.get("GITHUB_TOKEN"), | |
) | |
parser.add_argument( | |
"--dry-run", | |
action="store_true", | |
help="Run without actually deleting notifications", | |
) | |
parser.add_argument( | |
"--workers", type=int, default=5, help="Number of parallel workers (default: 5)" | |
) | |
parser.add_argument( | |
"--participating", | |
action="store_true", | |
help="Only process notifications where you are directly participating", | |
) | |
parser.add_argument( | |
"--since-days", type=int, help="Only process notifications from the last N days" | |
) | |
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") | |
return parser.parse_args() | |
def main(): | |
"""Main entry point""" | |
args = parse_arguments() | |
# Set logging level | |
if args.verbose: | |
logger.setLevel(logging.DEBUG) | |
# Validate token | |
if not args.token: | |
logger.error( | |
"Missing GitHub token. Set GITHUB_TOKEN environment variable or use --token" | |
) | |
sys.exit(1) | |
# Calculate since date if specified | |
since = None | |
if args.since_days: | |
since = datetime.now() - timedelta(days=args.since_days) | |
logger.info(f"Processing notifications since {since.date()}") | |
# Create cleaner instance | |
cleaner = GitHubNotificationCleaner( | |
token=args.token, dry_run=args.dry_run, max_workers=args.workers | |
) | |
try: | |
# Fetch notifications | |
notifications = cleaner.get_all_notifications( | |
only_participating=args.participating, since=since | |
) | |
# Process notifications | |
if notifications: | |
cleaner.cleanup_notifications(notifications) | |
# Print summary | |
cleaner.print_summary() | |
except KeyboardInterrupt: | |
logger.info("\nOperation cancelled by user") | |
cleaner.print_summary() | |
sys.exit(1) | |
except Exception as e: | |
logger.error(f"Unexpected error: {e}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Note: This embedded Gist contains the most up-to-date version of the script, which may differ slightly from the explanation.
Basic Usage
To use the script, you can run commands like:
-
Mark all notifications as done:
python mark_as_done.py --all
-
Mark notifications from a specific repository:
python mark_as_done.py --repo owner/repo
-
Use a custom GitHub token:
python mark_as_done.py --token YOUR_GITHUB_TOKEN
These options help tailor the cleanup process to your needs.
What’s New in This Version
- Improved error handling to gracefully manage API issues.
- Added protection against GitHub rate limiting to avoid interruptions.
- Enhanced functionality to better detect and mark notifications as done.
- Improved logging for clearer insights into the cleanup process.
- Added a command-line interface (CLI) for easier usage and flexibility.