A Telegram userbot that monitors a source channel for product listings, extracts pricing information, applies markup, and republishes to a target channel while maintaining synchronization across edits and deletions.
Channel Aggregator automates the aggregation of product listings from Telegram channels. It captures messages from a donor channel, validates them as products using regex pattern matching for pricing, applies a configurable markup percentage, and forwards the modified listings to a target channel. All message relationships are tracked in a local SQLite database to handle real-time synchronization of edits and deletions.
- Telegram Client: Hydrogram (async Pyrogram fork)
- Database: SQLAlchemy ORM with SQLite
- Runtime: Python 3.8+
- Utilities: python-dotenv for environment configuration
Channel_Aggregator/
├── config.py # Environment variable loader
├── main.py # Application entry point
├── requirements.txt # Python dependencies
├── .env.example # Configuration template
├── .gitignore # Git exclusions (*.session, .env, *.db)
│
├── core/
│ └── client.py # Telegram userbot client wrapper
│
├── database/
│ ├── database.py # SQLAlchemy engine, session factory, Base model
│ ├── models.py # ORM models (Pairs table)
│ └── db_handler.py # Database operations (CRUD functions)
│
├── logic/
│ └── parser.py # Message parsing and pricing extraction
│
└── handlers/
└── sync_handlers.py # Event handlers for message lifecycle
Loads environment variables from .env using python-dotenv. Validates required keys:
API_ID,API_HASH- Telegram API credentials (obtain from my.telegram.org)SOURCE_CHANNEL- Donor channel ID or usernameTARGET_CHANNEL- Target channel ID or username
All variables are accessible globally throughout the application.
- Initializes SQLite database schema using SQLAlchemy
- Starts the userbot and connects to Telegram
- Calls
catch_up_history()to process historical messages from the source channel - Enters idle state, listening for real-time events via event handlers
- Handles graceful shutdown with error logging
Wrapper class UserBot encapsulating Hydrogram's Client. Key methods:
start()- Authenticates with Telegram APIstop()- Gracefully disconnectssafe_send(chat_id, text, **kwargs)- Sends messages with automatic retry on FloodWait errorssafe_send_album(chat_id, messages, caption)- Sends grouped media (photos/videos) with new captionsafe_edit(chat_id, msg_id, text)- Edits message textsafe_delete(chat_id, msg_id)- Deletes message
All methods implement error handling for:
FloodWait- Telegram rate limiting (auto-retry with exponential backoff)PeerIdInvalid- Invalid chat/channel IDsMessageIdInvalid- Non-existent messagesUnauthorized- Session expiration
Extracts and validates product pricing using regex patterns.
Supported currencies:
- 💲, 💵 → USD
- ₴ → UAH
Price pattern: [emoji]\s*[number with optional spaces/decimals]
Validation rules:
- Must contain exactly one price (rejects no price or price ranges like "💲100-200")
- Price must be positive
- Message length must exceed minimum threshold (default: 1 character)
- Markup applied:
final_price = price × (1 + markup_percent / 100)
Returns ParsedMessage dataclass with:
is_product- Boolean validityprice- Extracted pricecurrency- Currency codefinal_price- Price with markup, rounded to 2 decimalsemoji- Original currency emojioriginal_substring- Matched price string (e.g., "₴ 100")
Single table Pairs tracking message relationships:
CREATE TABLE pairs (
id INTEGER PRIMARY KEY,
source_msg_id INTEGER UNIQUE, -- Message ID in source channel
target_msg_id INTEGER UNIQUE, -- Message ID in target channel
last_updated DATETIME DEFAULT now,
is_deleted BOOLEAN DEFAULT false
);- Database: SQLite at project root (
app.db) - Session management: Context manager
get_db()for automatic connection pooling and error handling - ORM: SQLAlchemy declarative base for model definitions
save_pair(source_id, target_id)- Records new message mappingget_my_id_by_original(source_id)- Retrieves target message ID by source message IDdelete_pair(source_id)- Marks/removes message relationshipget_last_seen_source_id()- Returns highest source message ID for catch-up logic
Three async handlers listening to source channel events:
- Parses message text/caption
- Validates as product using
parse_message() - Updates caption with new price
- Forwards to target channel
- Records source→target message IDs in database
- Looks up target message ID via source message ID
- Re-parses edited text
- Updates target message caption if still valid product
- Keeps database mappings intact
- Retrieves target message ID for each deleted source message
- Deletes corresponding target message
- Removes database pair record
- Python 3.8+
- Telegram account with API credentials from my.telegram.org
- Access to at least two Telegram channels (source and target)
-
Clone and navigate:
git clone https://github.com/Ascent-Group-Tech/Channel_Aggregator.git cd Channel_Aggregator -
Create virtual environment:
python -m venv venv # Activate # Windows: venv\Scripts\activate # Linux/macOS: source venv/bin/activate
-
Install dependencies:
pip install -r requirements.txt
-
Configure environment:
cp .env.example .env # Edit .env with your credentials: # - API_ID and API_HASH from my.telegram.org # - SOURCE_CHANNEL and TARGET_CHANNEL usernames or IDs # - PERCENT_MARKUP (optional, default: 15%)
-
Run:
python main.py
First run generates
sessions/userbot.sessionand authenticates. On subsequent runs, it reuses the session.
SOURCE CHANNEL
↓
Parser validates pricing (regex)
↓
Apply markup to price
↓
Forward to TARGET CHANNEL
↓
Save source_msg_id → target_msg_id mapping in database
↓
Listen for edits/deletes on SOURCE
↓
Update/delete corresponding TARGET message
↓
Keep database in sync
Source message: "Brand new laptop 💵 800.50"
Parsing: Extracts 💵 800.50 → USD 800.50 (valid, text length > 1)
Markup (15%): 800.50 × 1.15 = 920.58
Target message: "Brand new laptop 💵 920.58"
Database: Pairs(source_msg_id=123, target_msg_id=456)
If source is edited to remove price → target is deleted. If source is deleted → target is deleted and database pair removed.
- Session files (
*.session,*.session-journal) are gitignored for security - Environment variables (
.env) are gitignored; use.env.exampleas template - Database (
app.db) is local-only and gitignored - Rate limiting: Automatic backoff on Telegram FloodWait errors; random delays between requests prevent API bans
- Catch-up logic: On startup,
catch_up_history()processes all existing messages in source channel, essential for syncing after downtime
To add dependencies:
pip install package_name
pip freeze > requirements.txt # Or manually add to requirements.txtWhen creating new modules, follow the folder structure:
- Database operations →
database/ - Business logic →
logic/ - Event handlers →
handlers/ - Configuration → root
config.py - Telegram client methods →
core/
See the full contributors list.