Project Overview
This SMS Gateway application enables scheduling, sending, and tracking SMS and USSD interactions via serial-connected telecommunication modules. It enforces user roles, logs system activity, and exposes a modular API.
Purpose
Provide a self-hosted, extensible SMS/USSD gateway for campaigns, device management, and real-time interaction.
Key Features
- SMS campaign creation, scheduling, and delivery tracking
- USSD session management for interactive menus
- Multi-device and port handling via serial interfaces
- Role-based access control and API key management
- Persistent logging of messages, sessions, and system events
- Pluggable scheduler for task execution
System Anatomy
Flask Application (app.py)
- Initializes Flask, configures sessions, and connects database
- Registers feature blueprints (messaging, campaigns, devices, USSD) and admin blueprint
- Defines protected routes, e.g.,
/home
for the dashboard - Entry point for running the web server
Data Models (database/models.py)
User
,Role
,APIAccess
: manage authentication and permissionsMessage
,Campaign
,LogEntry
: track SMS content, dispatch status, and audit logsDevice
,Port
: represent physical modems and serial portsUSSDSession
,ScheduledTask
: handle interactive USSD flows and scheduled SMS
Scheduler Service (services/scheduler.py)
get_pending_tasks()
: fetches unsent, due tasksmark_task_executed(task_id)
: flags tasks as completed- Designed for integration with cron jobs or long-running daemons
Serial Handler Service (services/serial_handler.py)
send_at_command(port, command, timeout)
: transmits AT commands and returns modem responsessend_ussd(port, ussd_code)
: initiates USSD requests and captures replies- Handles low-level serial communication and error timeouts
When and Why to Use
Use this gateway when you need:
- Bulk or scheduled SMS campaigns without relying on third-party APIs
- Interactive USSD menus for surveys, account balance checks, or two-factor flows
- Direct control over GSM modems and SIM cards
- Auditable communication logs and role-based access
Getting Started
Running the Server
Ensure your environment variables (DATABASE_URL
, SECRET_KEY
, etc.) are set. Then:
pip install -r requirements.txt
python app.py
Fetching and Executing Scheduled Tasks
from services.scheduler import get_pending_tasks, mark_task_executed
from services.serial_handler import send_at_command
# Periodic job to dispatch SMS
for task in get_pending_tasks():
response = send_at_command(task.port.path, f'AT+CMGS="{task.number}"\r{task.message}')
# parse response, update logs
mark_task_executed(task.id)
Sending a USSD Request
from services.serial_handler import send_ussd
# Initiate USSD on port `/dev/ttyUSB0`
result = send_ussd('/dev/ttyUSB0', '*123#')
print("USSD Reply:", result)
This overview equips you to deploy, extend, and integrate the SMS Gateway into your workflows.
Getting Started
This guide takes you from cloning the repository to sending your first SMS in a local environment.
1. Clone the repository
git clone https://github.com/elmadkouryassine/sms_gateway.git
cd sms_gateway
2. Install dependencies
Create a virtual environment and install Python packages.
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
3. Configure environment variables
Define your database URI and Flask settings:
export DATABASE_URI="sqlite:///sms_gateway.db"
export FLASK_APP="app.py"
export FLASK_ENV="development"
4. Initialize the database
Create all tables before seeding:
python - <<EOF
from app import app, db
with app.app_context():
db.create_all()
EOF
5. Seed demo data
Run the scripts in order to populate roles, settings, admin user and demo SMS data.
# Create default roles: admin, manager, viewer
python scripts/seed_roles.py
# Seed default system settings
python scripts/seed_settings.py
# Ensure an admin user exists (username: admin, password: changeme)
python scripts/seed_admin.py
# Populate SIM ports, logs, scheduled SMS/tasks
python scripts/seed_demo.py
# Optional: generate 50 random inbound messages
python scripts/seed_inbox_demo.py 50
6. Run the web application
Launch the Flask server to access the dashboard and SMS/USSD interfaces.
python app.py
Open http://localhost:5000 in your browser and log in as admin / changeme.
7. Start the background scheduler
Process and dispatch scheduled SMS and USSD sessions every 60 seconds.
python run_scheduler.py
8. Verify SMS dispatch
- In the web UI, navigate to Scheduler → Scheduled SMS.
- You should see demo tasks with status “Pending”.
- In the scheduler terminal, look for logs like:
[INFO] Sent SMS id=42 to +1234567890
- In the UI, refresh Outbox and confirm status changes to “Sent”.
You now have a running SMS gateway with demo data. Explore the dashboard to schedule new messages and monitor delivery.
Architecture & Core Concepts
This section outlines the core components of SMS Gateway, showing how the REST API, authentication model, USSD service, and scheduler utilities interact to provide end-to-end SMS and USSD functionality.
SMS REST API: /sms/api/send_single
Provides a JSON endpoint to enqueue outgoing SMS messages, guarded by per-user API keys.
Endpoint
• URL: POST /sms/api/send_single
• Auth: X-API-KEY: <api_key>
header
• Content-Type: application/json
Request
Headers
X-API-KEY
: User’s API key (must be enabled in Admin UI)
JSON body
{
"phone_number": "+2126xxxxxxx", // required
"message": "ALERT: disk > 90%", // required
"port": 2 // optional, defaults to 1
}
Example curl
curl -X POST https://your.domain/sms/api/send_single \
-H "Content-Type: application/json" \
-H "X-API-KEY: 4f2e9d7a...a056" \
-d '{
"phone_number": "+1234567890",
"message": "Server CPU > 95%",
"port": 1
}'
Responses
• 201 Created
{ "status": "queued", "message_id": 123 }
• 400 Bad Request
– Missing fields: { "error": "'phone_number' and 'message' are required" }
– Invalid port: { "error": "'port' must be an integer" }
• 401 Unauthorized
– Invalid/disabled API key
Implementation Snippet
@sms_bp.route('/api/send_single', methods=['POST'])
def api_send_single():
# 1) Authenticate
key = request.headers.get('X-API-KEY', '')
user = User.query.filter_by(api_key=key, api_enabled=True).first()
if not user:
abort(401, description="Invalid or disabled API key")
# 2) Parse JSON
data = request.get_json(silent=True)
if not data:
return jsonify(error="Invalid JSON body"), 400
number = data.get('phone_number')
text = data.get('message')
port = data.get('port', 1)
# 3) Validate
if not number or not text:
return jsonify(error="'phone_number' and 'message' are required"), 400
try:
port = int(port)
except (ValueError, TypeError):
return jsonify(error="'port' must be an integer"), 400
# 4) Queue message
msg = Message(direction='OUT', sim_port=port,
phone_number=number, message=text)
db.session.add(msg)
db.session.commit()
# 5) Respond
return jsonify(status="queued", message_id=msg.id), 201
Practical Guidance
- Enable API access and generate a key in the Admin UI when creating or editing a user.
- Securely store keys; clients must supply
X-API-KEY
on every request. - Monitor the
Message
table or use tracking endpoints (/api/commands
,/api/dashboard/...
) to verify dispatch and delivery. - Default port is 1 if omitted; ensure your gateway hardware has that SIM port configured.
User Model: API Access & Key Management
Defines per-user API toggles and key generation on the User
model.
Fields
• api_enabled
(Boolean): Grants access to protected endpoints.
• api_key
(String(64), unique): Secret token for authentication.
Generating an API Key
import secrets
from database.models import db, User
def create_api_user(username, password_hash, role, enable_api=True):
user = User(
username=username,
password_hash=password_hash,
role=role,
api_enabled=enable_api,
api_key=secrets.token_hex(32)
)
db.session.add(user)
db.session.commit()
return user
# Usage
admin_role = Role.query.filter_by(name='admin').first()
new_user = create_api_user('jdoe', hash_password('s3cret'), admin_role)
print(f"API Key for {new_user.username}: {new_user.api_key}")
Rotating Keys & Toggling Access
def rotate_api_key(user_id):
user = User.query.get(user_id)
if not user:
raise ValueError("User not found")
user.api_key = secrets.token_hex(32)
user.api_enabled = True
db.session.commit()
return user.api_key
def disable_api_access(user_id):
user = User.query.get(user_id)
user.api_enabled = False
db.session.commit()
Protecting Endpoints
from flask import Flask, jsonify
from flask_httpauth import HTTPTokenAuth
from database.models import User
app = Flask(__name__)
auth = HTTPTokenAuth(scheme='Bearer')
@auth.verify_token
def verify_token(token):
return User.query.filter_by(api_key=token, api_enabled=True).first()
@app.route('/api/messages', methods=['GET'])
@auth.login_required
def list_messages():
user = auth.current_user()
messages = [m.to_dict() for m in user.campaigns]
return jsonify(messages=messages)
Key Points: store api_key
server-side, rotate periodically, and toggle api_enabled
to revoke access without deleting the key.
USSD Service: send_ussd Wrapper
Sends USSD codes over a serial modem using the AT+CUSD command.
Function Signature
def send_ussd(port_path: str, ussd_code: str,
baudrate: int = 9600, timeout: int = 5) -> List[str]:
Behavior
- Builds
AT+CUSD=1,"<code>",15
command. - Uses
send_at_command
to communicate with the modem. - Returns decoded response lines or
["Error: <message>"]
on failure.
Usage Example
from services.serial_handler import send_ussd
port = '/dev/ttyUSB0'
ussd = '*123#'
response_lines = send_ussd(port, ussd, baudrate=115200, timeout=10)
for line in response_lines:
print(line)
Parsing the Reply
for line in response_lines:
if line.startswith('+CUSD:'):
# Extract quoted message
_, payload = line.split(',', 1)
message = payload.strip().strip('"')
print("USSD reply:", message)
Underlying Helper
def send_at_command(port_path, command, baudrate=9600, timeout=1):
with serial.Serial(port_path, baudrate, timeout=timeout) as ser:
ser.write((command + '\r').encode())
response = ser.readlines()
return [line.decode(errors='ignore').strip() for line in response]
Guidance: ensure modem supports USSD, adjust baudrate
/timeout
for hardware specifics, and handle errors by checking if the first element starts with "Error:"
.
Scheduler Utilities: services/scheduler.py
Provides database utilities for managing scheduled tasks, decoupling state changes from the scheduling loop.
get_pending_tasks()
Fetches tasks ready for execution (scheduled_at
≤ now, status QUEUED
).
from database.models import ScheduledTask
from datetime import datetime
def get_pending_tasks():
now = datetime.utcnow()
return ScheduledTask.query \
.filter(ScheduledTask.scheduled_at <= now,
ScheduledTask.status == 'QUEUED') \
.all()
mark_task_executed(task_id)
Marks a task as executed and records the execution time.
from database.models import ScheduledTask, db
from datetime import datetime
def mark_task_executed(task_id):
task = ScheduledTask.query.get(task_id)
if not task:
return
task.status = 'SENT'
task.executed_at = datetime.utcnow()
db.session.commit()
Integration in run_scheduler.py
from services.scheduler import get_pending_tasks, mark_task_executed
while True:
tasks = get_pending_tasks()
for task in tasks:
# send logic here...
mark_task_executed(task.id)
sleep(60) # run every minute
Usage Tips: call get_pending_tasks()
once per iteration, immediately invoke mark_task_executed()
after a successful send, and handle send errors outside these utilities. Ensure all timestamps use UTC.
Usage Guides
Practical instructions for daily use of the SMS Gateway from both the browser UI and programmatic clients.
AT Command Page: Dynamic SIM Port Dropdown and Queue Refresh
This page lets managers select a SIM port, enter or choose an AT command, and monitor the command queue in real time.
1. Loading SIM Ports into the Dropdown
Fetch /sim/status/ports
, then populate the Bootstrap dropdown. Clicking an entry updates the visible button label and the hidden selected-port
input.
const portList = document.getElementById('port-list');
const portDropdownBtn = document.getElementById('port-dropdown-btn');
const selectedPortInput = document.getElementById('selected-port');
fetch('/sim/status/ports')
.then(resp => resp.json())
.then(data => {
portList.innerHTML = '';
data.forEach(sim => {
const btn = document.createElement('button');
btn.type = 'button';
btn.className = 'dropdown-item d-flex justify-content-between align-items-center';
btn.dataset.port = sim.port_number;
// Status icon
const statusIcon = sim.status === 'ONLINE'
? '<i class="bi bi-circle-fill text-success"></i>'
: '<i class="bi bi-circle-fill text-danger"></i>';
btn.innerHTML = `Port ${sim.port_number} — ${sim.sim_number} (${sim.operator_name}) ${statusIcon}`;
btn.addEventListener('click', () => {
portDropdownBtn.innerHTML = btn.innerHTML;
selectedPortInput.value = sim.port_number;
});
const li = document.createElement('li');
li.appendChild(btn);
portList.appendChild(li);
});
})
.catch(err => console.error('Failed to load SIM ports:', err));
2. Synchronizing the AT-Command Selector and Input
Keep the <select>
and text <input>
in sync so users can pick or type commands interchangeably.
const commandSelect = document.getElementById('command-select');
const commandInput = document.getElementById('command-input');
// Select → Input
commandSelect.addEventListener('change', () => {
commandInput.value = commandSelect.value;
});
// Input → Select
commandInput.addEventListener('input', () => {
const val = commandInput.value.trim();
const match = [...commandSelect.options].some(o => o.value === val);
commandSelect.value = match ? val : '';
});
3. Periodic Refresh of the AT-Command History Table
Poll /at/api/commands
every 5 seconds, rebuild the table body (#at-queue-body
) with the latest queue.
const queueBody = document.getElementById('at-queue-body');
async function refreshQueue() {
try {
const resp = await fetch('/at/api/commands');
if (!resp.ok) throw new Error(resp.statusText);
const data = await resp.json();
queueBody.innerHTML = '';
data.forEach(cmd => {
const statusBadge = cmd.status === 0
? '<span class="badge bg-warning text-dark">Pending</span>'
: '<span class="badge bg-success">Executed</span>';
const tr = document.createElement('tr');
tr.innerHTML = `
<td>${cmd.id}</td>
<td>${cmd.port_number}</td>
<td><code>${cmd.command_text}</code></td>
<td>${cmd.created_at}</td>
<td>${statusBadge}</td>
<td>${cmd.executed_at || ''}</td>
<td><pre class="mb-0">${cmd.result}</pre></td>
`;
queueBody.appendChild(tr);
});
} catch (e) {
console.error('Failed to refresh AT queue:', e);
}
}
// Initial load + polling
refreshQueue();
setInterval(refreshQueue, 5000);
Practical Guidance
- To change the refresh interval, adjust the
5000
ms insetInterval()
. - Customize icons or labels by editing the
statusIcon
andstatusBadge
templates. - If your backend endpoints differ, update the
fetch()
URLs. - For localization, replace static text (e.g., “Pending”) with template variables.
AT Command Management (routes/at.py)
Provide a secured UI and JSON API for enqueuing and monitoring raw AT commands.
Endpoints
- GET /at/command
Rendersat_command.html
where managers select a port and enter an AT command. - POST /at/command
Reads form fieldsport
andcommand
, creates anATCommand
record, flashes confirmation, and redirects to the form. - GET /at/api/commands
Returns a JSON array of the 20 most recentATCommand
entries (newest first). Each object includes:id
,port_number
,command_text
,status
,result
,created_at
,executed_at
.
Authentication
All routes use the @manager_required
decorator; unauthorized users receive a 403 or redirect.
Code Examples
Enqueue via HTML form:
<form action="{{ url_for('at_bp.at_command') }}" method="post">
<label>Port:</label>
<input name="port" type="text" required>
<label>AT Command:</label>
<input name="command" type="text" required>
<button type="submit">Send</button>
</form>
Fetch last 20 commands via AJAX:
fetch('/at/api/commands')
.then(r => r.json())
.then(cmds => {
cmds.forEach(c => {
console.log(`#${c.id} @${c.created_at} [${c.status}]: ${c.command_text}`);
});
})
.catch(console.error);
Practical Guidance
- Ensure your manager user has the correct role and is logged in.
- Use the API to build a live dashboard or polling widget showing command status and results.
- Extend the
ATCommand
model (e.g., add retries or priority) for custom queue behavior.
Demo Data Seeding (scripts/seed_demo.py)
Populate your development database with realistic sample data: SIM ports, signal‐quality history, SMS traffic, and scheduled tasks.
How It Works
- Clears old demo data from
SignalLog
,Message
,SimPort
, andScheduledTask
. - Creates 20 SIM ports with randomized status, signal quality, operator, and timestamp.
- Generates 12 signal‐quality samples (5 minutes apart) based on current averages.
- Seeds “OUT” messages over the past 24 hours, randomizing count per hour and port.
- Queues 3 upcoming tasks in the next few hours.
Usage
From your project root:
python scripts/seed_demo.py
Ensure your virtualenv is activated and the database is reachable via your Flask configuration.
Key Configuration
- Port count: adjust
range(1, 21)
. - History window: modify sample count (
12
) or interval (timedelta(minutes=5)
). - SMS volume: tweak
random.randint(0, 5)
orrange(24)
. - Upcoming tasks: change
range(1, 4)
or scheduling horizon.
Code Highlights
Creating SIM ports:
for i in range(1, 21):
status = 'ONLINE'
quality = random.randint(5, 30) if status == 'ONLINE' else 0
sim = SimPort(
port_number=i,
sim_number=f'21260000{1000 + i}',
status=status,
signal_quality=quality,
operator_name=random.choice(['Orange','Inwi','IAM']),
last_update=datetime.utcnow() - timedelta(minutes=random.randint(0,10))
)
db.session.add(sim)
Seeding signal‐quality logs:
now = datetime.utcnow().replace(second=0, microsecond=0)
for n in range(12):
ts = now - timedelta(minutes=5*(11-n))
online_ports = [s for s in ports if s.status == 'ONLINE']
avg = sum(s.signal_quality for s in online_ports) / max(1, len(online_ports))
log = SignalLog(timestamp=ts, avg_quality=round(avg + random.uniform(-2,2),1))
db.session.add(log)
db.session.commit()
Generating random outbound SMS:
for hour_offset in range(24):
for _ in range(random.randint(0,5)):
send_time = now - timedelta(hours=hour_offset, minutes=random.randint(0,59))
msg = Message(
direction='OUT',
sim_port=random.choice(ports).port_number,
phone_number=f'+2126{random.randint(10000000,99999999)}',
message='Demo SMS',
status='SENT',
encoding='GSM',
send_time=send_time
)
db.session.add(msg)
db.session.commit()
Practical Tips
- Run this script after resetting your DB schema to regenerate demo data.
- Combine with
scripts/seed_inbox_demo.py
to inject inbound messages. - Extend loops to populate custom model fields.
- To avoid data bloat, comment out deletion queries and seed only needed tables.
Configuration & Deployment
All knobs you can turn to adapt the SMS Gateway to your infrastructure: database, serial ports, settings seeding and production process management.
SQLAlchemy Database Configuration
Configure your MySQL (or other) database via the Config
class in config.py. You can override defaults with environment variables for secure, 12-factor style deployments.
Config Class (config.py)
import os
class Config:
# Load from DATABASE_URL or fall back to local MySQL
SQLALCHEMY_DATABASE_URI = os.getenv(
'DATABASE_URL',
'mysql://root:cyt212al@localhost/sms_gateway'
)
SQLALCHEMY_TRACK_MODIFICATIONS = False
Integrate with Flask
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from config import Config
db = SQLAlchemy()
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
db.init_app(app)
return app
Advanced Tips
Use
?pool_size=10&max_overflow=20&pool_timeout=30
appended to your URI to tune connection pooling.Create subclasses for different environments:
class ProductionConfig(Config): SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URL') DEBUG = False class DevelopmentConfig(Config): SQLALCHEMY_ECHO = True DEBUG = True
In CI/CD pipelines, set
DATABASE_URL
instead of committing credentials.
Seeding Default Settings
Use scripts/seed_settings.py to insert or reset system defaults in the database. Run this whenever you add new configuration keys or deploy a fresh instance.
Usage
# Activate your virtualenv or container shell
python scripts/seed_settings.py
Key Sections (seed_settings.py)
from your_app import create_app, db
from your_app.models import Setting
app = create_app()
default_settings = {
'sms_retry_limit': '3',
'ussd_timeout_sec': '30',
'log_level': 'INFO',
}
with app.app_context():
for key, value in default_settings.items():
setting = Setting.query.filter_by(key=key).first()
if not setting:
db.session.add(Setting(key=key, value=value))
else:
setting.value = value
db.session.commit()
print("✅ Default settings seeded/updated.")
Serial Ports & SIM Monitoring
Configure serial communication and SIM health checks via services/serial_handler.py and services/sim_monitor.py.
Sending AT Commands & USSD
from services.serial_handler import send_at_command, send_ussd
# Send an AT command
response_lines = send_at_command('/dev/ttyUSB0', 'AT+CSQ')
print("Signal Quality:", response_lines)
# Send a USSD request
ussd_reply = send_ussd('/dev/ttyUSB1', '*123#')
print("USSD Response:", "\n".join(ussd_reply))
send_at_command(port_path: str, cmd: str) -> List[str]
send_ussd(port_path: str, code: str) -> List[str]
Ensure your modem device nodes (/dev/ttyUSB0
, etc.) match port numbers in your database or environment.
SIM Status Monitoring
from services.sim_monitor import update_sim_statuses
# Periodically refresh all SIM statuses and log average signal
update_sim_statuses()
- Queries each SIM port for operator, signal quality, status.
- Logs aggregated metrics for Prometheus or file-based metrics.
Production Deployment
Deploy scripts and services under a process manager. Example: systemd unit for the USSD scheduler.
Example systemd Service
Create /etc/systemd/system/sms-gateway-scheduler.service
:
[Unit]
Description=SMS Gateway USSD Scheduler
After=network.target
[Service]
User=smsgateway
WorkingDirectory=/opt/sms_gateway
Environment="DATABASE_URL=mysql://user:pass@db-host/sms_gateway"
ExecStart=/usr/bin/python run_scheduler.py
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
Reload and start:
sudo systemctl daemon-reload
sudo systemctl enable sms-gateway-scheduler
sudo systemctl start sms-gateway-scheduler
Environment Variables
DATABASE_URL
: your DB connection string.SERIAL_PORTS
: comma-separated list if you use dynamic port mapping (optional).LOG_LEVEL
: set toDEBUG
orINFO
.
By tuning these parameters and deploying under a supervisor, you ensure high availability, secure credentials, and flexible scaling for your SMS Gateway.
Development & Contribution Guide
This guide covers local setup, coding standards, database migrations, adding routes/models, and extending the scheduler.
1. Local Development Environment
- Clone and install dependencies
git clone https://github.com/elmadkouryassine/sms_gateway.git cd sms_gateway python3 -m venv .venv source .venv/bin/activate pip install -r requirements.txt
- Configure environment variables
Create a.env
file:DATABASE_URL=postgresql://user:pass@localhost/sms_gateway SERIAL_PORT=/dev/ttyUSB0 SERIAL_BAUDRATE=115200
- Initialize the database
# scripts/init_db.py from database import Base, engine if __name__ == "__main__": Base.metadata.create_all(engine) print("Database schema created")
python scripts/init_db.py
2. Coding Conventions
- Follow PEP8 and Black formatting
pip install black isort black . isort .
- Use meaningful commit messages:
feat(models): add CampaignRetryCount to Campaign model fix(scheduler): mark_executed handles timezone
- Write docstrings for public functions and classes.
3. Database Migrations
We use Alembic for schema changes.
- Install and configure Alembic
Inpip install alembic alembic init alembic
alembic.ini
, setsqlalchemy.url = env>DATABASE_URL
. - Autogenerate a migration
alembic revision --autogenerate -m "Add DeviceLastSeen to Device"
- Apply migrations
alembic upgrade head
4. Adding New Routes
- Create a route module, e.g.
routes/notifications.py
# routes/notifications.py from flask import Blueprint, request, jsonify notifications_bp = Blueprint("notifications", __name__, url_prefix="/notifications") @notifications_bp.post("/") def send_notification(): data = request.get_json() # implement sending logic... return jsonify({"status": "queued"}), 202
- Register in
routes/__init__.py
from flask import Flask from routes.notifications import notifications_bp def register_routes(app: Flask): app.register_blueprint(notifications_bp) # existing registrations...
- Initialize in your app factory
from flask import Flask from routes import register_routes def create_app(): app = Flask(__name__) register_routes(app) return app
5. Adding New Models
Define your model in
database/models.py
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey from database import Base from datetime import datetime class Notification(Base): __tablename__ = "notifications" id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey("users.id"), nullable=False) message = Column(String(255), nullable=False) created_at = Column(DateTime, default=datetime.utcnow)
Generate and apply a migration (see section 3).
Access via session
from database import SessionLocal from database.models import Notification db = SessionLocal() notification = Notification(user_id=42, message="Hello") db.add(notification) db.commit()
6. Extending the Scheduler
The scheduler reads pending tasks from the DB and marks them executed.
- Add a new task type in
database/models.py
class ScheduledTask(Base): __tablename__ = "scheduled_tasks" id = Column(Integer, primary_key=True) task_type = Column(String(50), nullable=False) payload = Column(JSON, nullable=False) run_at = Column(DateTime, nullable=False) executed = Column(Boolean, default=False)
- Create helper in
services/scheduler.py
from database import SessionLocal from database.models import ScheduledTask from datetime import datetime def enqueue_task(task_type: str, payload: dict, run_at: datetime): db = SessionLocal() task = ScheduledTask(task_type=task_type, payload=payload, run_at=run_at) db.add(task) db.commit() def get_pending_tasks(): db = SessionLocal() return db.query(ScheduledTask).filter( ScheduledTask.run_at <= datetime.utcnow(), ScheduledTask.executed.is_(False) ).all() def mark_executed(task_id: int): db = SessionLocal() task = db.query(ScheduledTask).get(task_id) task.executed = True db.commit()
- Integrate in your worker loop
import time from services.scheduler import get_pending_tasks, mark_executed while True: for task in get_pending_tasks(): # dispatch based on task.task_type process(task.payload) mark_executed(task.id) time.sleep(5)
7. Extending Serial Handler
Use send_at_command
and send_ussd
in services/serial_handler.py
.
from services.serial_handler import send_at_command, send_ussd
# Send an AT command
response = send_at_command("AT+CSQ")
print("Signal Quality:", response)
# Initiate a USSD session
ussd_response = send_ussd("*123#")
print("Balance:", ussd_response)
To add custom commands:
def send_custom_command(cmd: str, timeout: float = 2.0) -> str:
return send_at_command(cmd, timeout=timeout)
Integrate this helper alongside existing methods in services/serial_handler.py
.