Chat about this codebase

AI-powered code exploration

Online

Project Overview

This SMS Gateway application enables scheduling, sending, and tracking SMS and USSD interactions via serial-connected telecommunication modules. It enforces user roles, logs system activity, and exposes a modular API.

Purpose

Provide a self-hosted, extensible SMS/USSD gateway for campaigns, device management, and real-time interaction.

Key Features

  • SMS campaign creation, scheduling, and delivery tracking
  • USSD session management for interactive menus
  • Multi-device and port handling via serial interfaces
  • Role-based access control and API key management
  • Persistent logging of messages, sessions, and system events
  • Pluggable scheduler for task execution

System Anatomy

Flask Application (app.py)

  • Initializes Flask, configures sessions, and connects database
  • Registers feature blueprints (messaging, campaigns, devices, USSD) and admin blueprint
  • Defines protected routes, e.g., /home for the dashboard
  • Entry point for running the web server

Data Models (database/models.py)

  • User, Role, APIAccess: manage authentication and permissions
  • Message, Campaign, LogEntry: track SMS content, dispatch status, and audit logs
  • Device, Port: represent physical modems and serial ports
  • USSDSession, ScheduledTask: handle interactive USSD flows and scheduled SMS

Scheduler Service (services/scheduler.py)

  • get_pending_tasks(): fetches unsent, due tasks
  • mark_task_executed(task_id): flags tasks as completed
  • Designed for integration with cron jobs or long-running daemons

Serial Handler Service (services/serial_handler.py)

  • send_at_command(port, command, timeout): transmits AT commands and returns modem responses
  • send_ussd(port, ussd_code): initiates USSD requests and captures replies
  • Handles low-level serial communication and error timeouts

When and Why to Use

Use this gateway when you need:

  • Bulk or scheduled SMS campaigns without relying on third-party APIs
  • Interactive USSD menus for surveys, account balance checks, or two-factor flows
  • Direct control over GSM modems and SIM cards
  • Auditable communication logs and role-based access

Getting Started

Running the Server

Ensure your environment variables (DATABASE_URL, SECRET_KEY, etc.) are set. Then:

pip install -r requirements.txt
python app.py

Fetching and Executing Scheduled Tasks

from services.scheduler import get_pending_tasks, mark_task_executed
from services.serial_handler import send_at_command

# Periodic job to dispatch SMS
for task in get_pending_tasks():
    response = send_at_command(task.port.path, f'AT+CMGS="{task.number}"\r{task.message}')
    # parse response, update logs
    mark_task_executed(task.id)

Sending a USSD Request

from services.serial_handler import send_ussd

# Initiate USSD on port `/dev/ttyUSB0`
result = send_ussd('/dev/ttyUSB0', '*123#')
print("USSD Reply:", result)

This overview equips you to deploy, extend, and integrate the SMS Gateway into your workflows.

Getting Started

This guide takes you from cloning the repository to sending your first SMS in a local environment.

1. Clone the repository

git clone https://github.com/elmadkouryassine/sms_gateway.git
cd sms_gateway

2. Install dependencies

Create a virtual environment and install Python packages.

python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

3. Configure environment variables

Define your database URI and Flask settings:

export DATABASE_URI="sqlite:///sms_gateway.db"
export FLASK_APP="app.py"
export FLASK_ENV="development"

4. Initialize the database

Create all tables before seeding:

python - <<EOF
from app import app, db
with app.app_context():
    db.create_all()
EOF

5. Seed demo data

Run the scripts in order to populate roles, settings, admin user and demo SMS data.

# Create default roles: admin, manager, viewer
python scripts/seed_roles.py

# Seed default system settings
python scripts/seed_settings.py

# Ensure an admin user exists (username: admin, password: changeme)
python scripts/seed_admin.py

# Populate SIM ports, logs, scheduled SMS/tasks
python scripts/seed_demo.py

# Optional: generate 50 random inbound messages
python scripts/seed_inbox_demo.py 50

6. Run the web application

Launch the Flask server to access the dashboard and SMS/USSD interfaces.

python app.py

Open http://localhost:5000 in your browser and log in as admin / changeme.

7. Start the background scheduler

Process and dispatch scheduled SMS and USSD sessions every 60 seconds.

python run_scheduler.py

8. Verify SMS dispatch

  1. In the web UI, navigate to Scheduler → Scheduled SMS.
  2. You should see demo tasks with status “Pending”.
  3. In the scheduler terminal, look for logs like:
    [INFO] Sent SMS id=42 to +1234567890
    
  4. In the UI, refresh Outbox and confirm status changes to “Sent”.

You now have a running SMS gateway with demo data. Explore the dashboard to schedule new messages and monitor delivery.

Architecture & Core Concepts

This section outlines the core components of SMS Gateway, showing how the REST API, authentication model, USSD service, and scheduler utilities interact to provide end-to-end SMS and USSD functionality.

SMS REST API: /sms/api/send_single

Provides a JSON endpoint to enqueue outgoing SMS messages, guarded by per-user API keys.

Endpoint

• URL: POST /sms/api/send_single
• Auth: X-API-KEY: <api_key> header
• Content-Type: application/json

Request

Headers

  • X-API-KEY: User’s API key (must be enabled in Admin UI)

JSON body

{
  "phone_number": "+2126xxxxxxx",   // required
  "message":      "ALERT: disk > 90%", // required
  "port":         2                   // optional, defaults to 1
}

Example curl

curl -X POST https://your.domain/sms/api/send_single \
  -H "Content-Type: application/json" \
  -H "X-API-KEY: 4f2e9d7a...a056" \
  -d '{
        "phone_number": "+1234567890",
        "message": "Server CPU > 95%",
        "port": 1
      }'

Responses

201 Created

{ "status": "queued", "message_id": 123 }

400 Bad Request
– Missing fields: { "error": "'phone_number' and 'message' are required" }
– Invalid port: { "error": "'port' must be an integer" }
401 Unauthorized
– Invalid/disabled API key

Implementation Snippet

@sms_bp.route('/api/send_single', methods=['POST'])
def api_send_single():
    # 1) Authenticate
    key = request.headers.get('X-API-KEY', '')
    user = User.query.filter_by(api_key=key, api_enabled=True).first()
    if not user:
        abort(401, description="Invalid or disabled API key")

    # 2) Parse JSON
    data = request.get_json(silent=True)
    if not data:
        return jsonify(error="Invalid JSON body"), 400

    number = data.get('phone_number')
    text   = data.get('message')
    port   = data.get('port', 1)

    # 3) Validate
    if not number or not text:
        return jsonify(error="'phone_number' and 'message' are required"), 400
    try:
        port = int(port)
    except (ValueError, TypeError):
        return jsonify(error="'port' must be an integer"), 400

    # 4) Queue message
    msg = Message(direction='OUT', sim_port=port,
                  phone_number=number, message=text)
    db.session.add(msg)
    db.session.commit()

    # 5) Respond
    return jsonify(status="queued", message_id=msg.id), 201

Practical Guidance

  • Enable API access and generate a key in the Admin UI when creating or editing a user.
  • Securely store keys; clients must supply X-API-KEY on every request.
  • Monitor the Message table or use tracking endpoints (/api/commands, /api/dashboard/...) to verify dispatch and delivery.
  • Default port is 1 if omitted; ensure your gateway hardware has that SIM port configured.

User Model: API Access & Key Management

Defines per-user API toggles and key generation on the User model.

Fields

api_enabled (Boolean): Grants access to protected endpoints.
api_key (String(64), unique): Secret token for authentication.

Generating an API Key

import secrets
from database.models import db, User

def create_api_user(username, password_hash, role, enable_api=True):
    user = User(
        username=username,
        password_hash=password_hash,
        role=role,
        api_enabled=enable_api,
        api_key=secrets.token_hex(32)
    )
    db.session.add(user)
    db.session.commit()
    return user

# Usage
admin_role = Role.query.filter_by(name='admin').first()
new_user = create_api_user('jdoe', hash_password('s3cret'), admin_role)
print(f"API Key for {new_user.username}: {new_user.api_key}")

Rotating Keys & Toggling Access

def rotate_api_key(user_id):
    user = User.query.get(user_id)
    if not user:
        raise ValueError("User not found")
    user.api_key = secrets.token_hex(32)
    user.api_enabled = True
    db.session.commit()
    return user.api_key

def disable_api_access(user_id):
    user = User.query.get(user_id)
    user.api_enabled = False
    db.session.commit()

Protecting Endpoints

from flask import Flask, jsonify
from flask_httpauth import HTTPTokenAuth
from database.models import User

app = Flask(__name__)
auth = HTTPTokenAuth(scheme='Bearer')

@auth.verify_token
def verify_token(token):
    return User.query.filter_by(api_key=token, api_enabled=True).first()

@app.route('/api/messages', methods=['GET'])
@auth.login_required
def list_messages():
    user = auth.current_user()
    messages = [m.to_dict() for m in user.campaigns]
    return jsonify(messages=messages)

Key Points: store api_key server-side, rotate periodically, and toggle api_enabled to revoke access without deleting the key.


USSD Service: send_ussd Wrapper

Sends USSD codes over a serial modem using the AT+CUSD command.

Function Signature

def send_ussd(port_path: str, ussd_code: str,
              baudrate: int = 9600, timeout: int = 5) -> List[str]:

Behavior

  • Builds AT+CUSD=1,"<code>",15 command.
  • Uses send_at_command to communicate with the modem.
  • Returns decoded response lines or ["Error: <message>"] on failure.

Usage Example

from services.serial_handler import send_ussd

port = '/dev/ttyUSB0'
ussd = '*123#'
response_lines = send_ussd(port, ussd, baudrate=115200, timeout=10)

for line in response_lines:
    print(line)

Parsing the Reply

for line in response_lines:
    if line.startswith('+CUSD:'):
        # Extract quoted message
        _, payload = line.split(',', 1)
        message = payload.strip().strip('"')
        print("USSD reply:", message)

Underlying Helper

def send_at_command(port_path, command, baudrate=9600, timeout=1):
    with serial.Serial(port_path, baudrate, timeout=timeout) as ser:
        ser.write((command + '\r').encode())
        response = ser.readlines()
        return [line.decode(errors='ignore').strip() for line in response]

Guidance: ensure modem supports USSD, adjust baudrate/timeout for hardware specifics, and handle errors by checking if the first element starts with "Error:".


Scheduler Utilities: services/scheduler.py

Provides database utilities for managing scheduled tasks, decoupling state changes from the scheduling loop.

get_pending_tasks()

Fetches tasks ready for execution (scheduled_at ≤ now, status QUEUED).

from database.models import ScheduledTask
from datetime import datetime

def get_pending_tasks():
    now = datetime.utcnow()
    return ScheduledTask.query \
        .filter(ScheduledTask.scheduled_at <= now,
                ScheduledTask.status == 'QUEUED') \
        .all()

mark_task_executed(task_id)

Marks a task as executed and records the execution time.

from database.models import ScheduledTask, db
from datetime import datetime

def mark_task_executed(task_id):
    task = ScheduledTask.query.get(task_id)
    if not task:
        return
    task.status = 'SENT'
    task.executed_at = datetime.utcnow()
    db.session.commit()

Integration in run_scheduler.py

from services.scheduler import get_pending_tasks, mark_task_executed

while True:
    tasks = get_pending_tasks()
    for task in tasks:
        # send logic here...
        mark_task_executed(task.id)
    sleep(60)  # run every minute

Usage Tips: call get_pending_tasks() once per iteration, immediately invoke mark_task_executed() after a successful send, and handle send errors outside these utilities. Ensure all timestamps use UTC.

Usage Guides

Practical instructions for daily use of the SMS Gateway from both the browser UI and programmatic clients.


AT Command Page: Dynamic SIM Port Dropdown and Queue Refresh

This page lets managers select a SIM port, enter or choose an AT command, and monitor the command queue in real time.

1. Loading SIM Ports into the Dropdown

Fetch /sim/status/ports, then populate the Bootstrap dropdown. Clicking an entry updates the visible button label and the hidden selected-port input.

const portList = document.getElementById('port-list');
const portDropdownBtn = document.getElementById('port-dropdown-btn');
const selectedPortInput = document.getElementById('selected-port');

fetch('/sim/status/ports')
  .then(resp => resp.json())
  .then(data => {
    portList.innerHTML = '';
    data.forEach(sim => {
      const btn = document.createElement('button');
      btn.type = 'button';
      btn.className = 'dropdown-item d-flex justify-content-between align-items-center';
      btn.dataset.port = sim.port_number;

      // Status icon
      const statusIcon = sim.status === 'ONLINE'
        ? '<i class="bi bi-circle-fill text-success"></i>'
        : '<i class="bi bi-circle-fill text-danger"></i>';

      btn.innerHTML = `Port ${sim.port_number} — ${sim.sim_number} (${sim.operator_name}) ${statusIcon}`;
      btn.addEventListener('click', () => {
        portDropdownBtn.innerHTML = btn.innerHTML;
        selectedPortInput.value = sim.port_number;
      });

      const li = document.createElement('li');
      li.appendChild(btn);
      portList.appendChild(li);
    });
  })
  .catch(err => console.error('Failed to load SIM ports:', err));

2. Synchronizing the AT-Command Selector and Input

Keep the <select> and text <input> in sync so users can pick or type commands interchangeably.

const commandSelect = document.getElementById('command-select');
const commandInput = document.getElementById('command-input');

// Select → Input
commandSelect.addEventListener('change', () => {
  commandInput.value = commandSelect.value;
});

// Input → Select
commandInput.addEventListener('input', () => {
  const val = commandInput.value.trim();
  const match = [...commandSelect.options].some(o => o.value === val);
  commandSelect.value = match ? val : '';
});

3. Periodic Refresh of the AT-Command History Table

Poll /at/api/commands every 5 seconds, rebuild the table body (#at-queue-body) with the latest queue.

const queueBody = document.getElementById('at-queue-body');

async function refreshQueue() {
  try {
    const resp = await fetch('/at/api/commands');
    if (!resp.ok) throw new Error(resp.statusText);
    const data = await resp.json();

    queueBody.innerHTML = '';
    data.forEach(cmd => {
      const statusBadge = cmd.status === 0
        ? '<span class="badge bg-warning text-dark">Pending</span>'
        : '<span class="badge bg-success">Executed</span>';

      const tr = document.createElement('tr');
      tr.innerHTML = `
        <td>${cmd.id}</td>
        <td>${cmd.port_number}</td>
        <td><code>${cmd.command_text}</code></td>
        <td>${cmd.created_at}</td>
        <td>${statusBadge}</td>
        <td>${cmd.executed_at || ''}</td>
        <td><pre class="mb-0">${cmd.result}</pre></td>
      `;
      queueBody.appendChild(tr);
    });
  } catch (e) {
    console.error('Failed to refresh AT queue:', e);
  }
}

// Initial load + polling
refreshQueue();
setInterval(refreshQueue, 5000);

Practical Guidance

  • To change the refresh interval, adjust the 5000 ms in setInterval().
  • Customize icons or labels by editing the statusIcon and statusBadge templates.
  • If your backend endpoints differ, update the fetch() URLs.
  • For localization, replace static text (e.g., “Pending”) with template variables.

AT Command Management (routes/at.py)

Provide a secured UI and JSON API for enqueuing and monitoring raw AT commands.

Endpoints

  • GET /at/command
    Renders at_command.html where managers select a port and enter an AT command.
  • POST /at/command
    Reads form fields port and command, creates an ATCommand record, flashes confirmation, and redirects to the form.
  • GET /at/api/commands
    Returns a JSON array of the 20 most recent ATCommand entries (newest first). Each object includes:
    id, port_number, command_text, status, result, created_at, executed_at.

Authentication

All routes use the @manager_required decorator; unauthorized users receive a 403 or redirect.

Code Examples

Enqueue via HTML form:

<form action="{{ url_for('at_bp.at_command') }}" method="post">
  <label>Port:</label>
  <input name="port" type="text" required>
  <label>AT Command:</label>
  <input name="command" type="text" required>
  <button type="submit">Send</button>
</form>

Fetch last 20 commands via AJAX:

fetch('/at/api/commands')
  .then(r => r.json())
  .then(cmds => {
    cmds.forEach(c => {
      console.log(`#${c.id} @${c.created_at} [${c.status}]: ${c.command_text}`);
    });
  })
  .catch(console.error);

Practical Guidance

  • Ensure your manager user has the correct role and is logged in.
  • Use the API to build a live dashboard or polling widget showing command status and results.
  • Extend the ATCommand model (e.g., add retries or priority) for custom queue behavior.

Demo Data Seeding (scripts/seed_demo.py)

Populate your development database with realistic sample data: SIM ports, signal‐quality history, SMS traffic, and scheduled tasks.

How It Works

  1. Clears old demo data from SignalLog, Message, SimPort, and ScheduledTask.
  2. Creates 20 SIM ports with randomized status, signal quality, operator, and timestamp.
  3. Generates 12 signal‐quality samples (5 minutes apart) based on current averages.
  4. Seeds “OUT” messages over the past 24 hours, randomizing count per hour and port.
  5. Queues 3 upcoming tasks in the next few hours.

Usage

From your project root:

python scripts/seed_demo.py

Ensure your virtualenv is activated and the database is reachable via your Flask configuration.

Key Configuration

  • Port count: adjust range(1, 21).
  • History window: modify sample count (12) or interval (timedelta(minutes=5)).
  • SMS volume: tweak random.randint(0, 5) or range(24).
  • Upcoming tasks: change range(1, 4) or scheduling horizon.

Code Highlights

Creating SIM ports:

for i in range(1, 21):
    status = 'ONLINE'
    quality = random.randint(5, 30) if status == 'ONLINE' else 0
    sim = SimPort(
        port_number=i,
        sim_number=f'21260000{1000 + i}',
        status=status,
        signal_quality=quality,
        operator_name=random.choice(['Orange','Inwi','IAM']),
        last_update=datetime.utcnow() - timedelta(minutes=random.randint(0,10))
    )
    db.session.add(sim)

Seeding signal‐quality logs:

now = datetime.utcnow().replace(second=0, microsecond=0)
for n in range(12):
    ts = now - timedelta(minutes=5*(11-n))
    online_ports = [s for s in ports if s.status == 'ONLINE']
    avg = sum(s.signal_quality for s in online_ports) / max(1, len(online_ports))
    log = SignalLog(timestamp=ts, avg_quality=round(avg + random.uniform(-2,2),1))
    db.session.add(log)
db.session.commit()

Generating random outbound SMS:

for hour_offset in range(24):
    for _ in range(random.randint(0,5)):
        send_time = now - timedelta(hours=hour_offset, minutes=random.randint(0,59))
        msg = Message(
            direction='OUT',
            sim_port=random.choice(ports).port_number,
            phone_number=f'+2126{random.randint(10000000,99999999)}',
            message='Demo SMS',
            status='SENT',
            encoding='GSM',
            send_time=send_time
        )
        db.session.add(msg)
db.session.commit()

Practical Tips

  • Run this script after resetting your DB schema to regenerate demo data.
  • Combine with scripts/seed_inbox_demo.py to inject inbound messages.
  • Extend loops to populate custom model fields.
  • To avoid data bloat, comment out deletion queries and seed only needed tables.

Configuration & Deployment

All knobs you can turn to adapt the SMS Gateway to your infrastructure: database, serial ports, settings seeding and production process management.


SQLAlchemy Database Configuration

Configure your MySQL (or other) database via the Config class in config.py. You can override defaults with environment variables for secure, 12-factor style deployments.

Config Class (config.py)

import os

class Config:
    # Load from DATABASE_URL or fall back to local MySQL
    SQLALCHEMY_DATABASE_URI = os.getenv(
        'DATABASE_URL',
        'mysql://root:cyt212al@localhost/sms_gateway'
    )
    SQLALCHEMY_TRACK_MODIFICATIONS = False

Integrate with Flask

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from config import Config

db = SQLAlchemy()

def create_app():
    app = Flask(__name__)
    app.config.from_object(Config)
    db.init_app(app)
    return app

Advanced Tips

  • Use ?pool_size=10&max_overflow=20&pool_timeout=30 appended to your URI to tune connection pooling.

  • Create subclasses for different environments:

    class ProductionConfig(Config):
        SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URL')
        DEBUG = False
    
    class DevelopmentConfig(Config):
        SQLALCHEMY_ECHO = True
        DEBUG = True
    
  • In CI/CD pipelines, set DATABASE_URL instead of committing credentials.


Seeding Default Settings

Use scripts/seed_settings.py to insert or reset system defaults in the database. Run this whenever you add new configuration keys or deploy a fresh instance.

Usage

# Activate your virtualenv or container shell
python scripts/seed_settings.py

Key Sections (seed_settings.py)

from your_app import create_app, db
from your_app.models import Setting

app = create_app()

default_settings = {
    'sms_retry_limit': '3',
    'ussd_timeout_sec': '30',
    'log_level': 'INFO',
}

with app.app_context():
    for key, value in default_settings.items():
        setting = Setting.query.filter_by(key=key).first()
        if not setting:
            db.session.add(Setting(key=key, value=value))
        else:
            setting.value = value
    db.session.commit()
    print("✅ Default settings seeded/updated.")

Serial Ports & SIM Monitoring

Configure serial communication and SIM health checks via services/serial_handler.py and services/sim_monitor.py.

Sending AT Commands & USSD

from services.serial_handler import send_at_command, send_ussd

# Send an AT command
response_lines = send_at_command('/dev/ttyUSB0', 'AT+CSQ')
print("Signal Quality:", response_lines)

# Send a USSD request
ussd_reply = send_ussd('/dev/ttyUSB1', '*123#')
print("USSD Response:", "\n".join(ussd_reply))
  • send_at_command(port_path: str, cmd: str) -> List[str]
  • send_ussd(port_path: str, code: str) -> List[str]

Ensure your modem device nodes (/dev/ttyUSB0, etc.) match port numbers in your database or environment.

SIM Status Monitoring

from services.sim_monitor import update_sim_statuses

# Periodically refresh all SIM statuses and log average signal
update_sim_statuses()
  • Queries each SIM port for operator, signal quality, status.
  • Logs aggregated metrics for Prometheus or file-based metrics.

Production Deployment

Deploy scripts and services under a process manager. Example: systemd unit for the USSD scheduler.

Example systemd Service

Create /etc/systemd/system/sms-gateway-scheduler.service:

[Unit]
Description=SMS Gateway USSD Scheduler
After=network.target

[Service]
User=smsgateway
WorkingDirectory=/opt/sms_gateway
Environment="DATABASE_URL=mysql://user:pass@db-host/sms_gateway"
ExecStart=/usr/bin/python run_scheduler.py
Restart=on-failure
RestartSec=10

[Install]
WantedBy=multi-user.target

Reload and start:

sudo systemctl daemon-reload
sudo systemctl enable sms-gateway-scheduler
sudo systemctl start sms-gateway-scheduler

Environment Variables

  • DATABASE_URL: your DB connection string.
  • SERIAL_PORTS: comma-separated list if you use dynamic port mapping (optional).
  • LOG_LEVEL: set to DEBUG or INFO.

By tuning these parameters and deploying under a supervisor, you ensure high availability, secure credentials, and flexible scaling for your SMS Gateway.

Development & Contribution Guide

This guide covers local setup, coding standards, database migrations, adding routes/models, and extending the scheduler.

1. Local Development Environment

  1. Clone and install dependencies
    git clone https://github.com/elmadkouryassine/sms_gateway.git
    cd sms_gateway
    python3 -m venv .venv
    source .venv/bin/activate
    pip install -r requirements.txt
    
  2. Configure environment variables
    Create a .env file:
    DATABASE_URL=postgresql://user:pass@localhost/sms_gateway
    SERIAL_PORT=/dev/ttyUSB0
    SERIAL_BAUDRATE=115200
    
  3. Initialize the database
    # scripts/init_db.py
    from database import Base, engine
    
    if __name__ == "__main__":
        Base.metadata.create_all(engine)
        print("Database schema created")
    
    python scripts/init_db.py
    

2. Coding Conventions

  • Follow PEP8 and Black formatting
    pip install black isort
    black .
    isort .
    
  • Use meaningful commit messages:
    feat(models): add CampaignRetryCount to Campaign model
    fix(scheduler): mark_executed handles timezone
    
  • Write docstrings for public functions and classes.

3. Database Migrations

We use Alembic for schema changes.

  1. Install and configure Alembic
    pip install alembic
    alembic init alembic
    
    In alembic.ini, set sqlalchemy.url = env>DATABASE_URL.
  2. Autogenerate a migration
    alembic revision --autogenerate -m "Add DeviceLastSeen to Device"
    
  3. Apply migrations
    alembic upgrade head
    

4. Adding New Routes

  1. Create a route module, e.g. routes/notifications.py
    # routes/notifications.py
    from flask import Blueprint, request, jsonify
    
    notifications_bp = Blueprint("notifications", __name__, url_prefix="/notifications")
    
    @notifications_bp.post("/")
    def send_notification():
        data = request.get_json()
        # implement sending logic...
        return jsonify({"status": "queued"}), 202
    
  2. Register in routes/__init__.py
    from flask import Flask
    from routes.notifications import notifications_bp
    
    def register_routes(app: Flask):
        app.register_blueprint(notifications_bp)
        # existing registrations...
    
  3. Initialize in your app factory
    from flask import Flask
    from routes import register_routes
    
    def create_app():
        app = Flask(__name__)
        register_routes(app)
        return app
    

5. Adding New Models

  1. Define your model in database/models.py

    from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
    from database import Base
    from datetime import datetime
    
    class Notification(Base):
        __tablename__ = "notifications"
        id = Column(Integer, primary_key=True)
        user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
        message = Column(String(255), nullable=False)
        created_at = Column(DateTime, default=datetime.utcnow)
    
  2. Generate and apply a migration (see section 3).

  3. Access via session

    from database import SessionLocal
    from database.models import Notification
    
    db = SessionLocal()
    notification = Notification(user_id=42, message="Hello")
    db.add(notification)
    db.commit()
    

6. Extending the Scheduler

The scheduler reads pending tasks from the DB and marks them executed.

  1. Add a new task type in database/models.py
    class ScheduledTask(Base):
        __tablename__ = "scheduled_tasks"
        id = Column(Integer, primary_key=True)
        task_type = Column(String(50), nullable=False)
        payload = Column(JSON, nullable=False)
        run_at = Column(DateTime, nullable=False)
        executed = Column(Boolean, default=False)
    
  2. Create helper in services/scheduler.py
    from database import SessionLocal
    from database.models import ScheduledTask
    from datetime import datetime
    
    def enqueue_task(task_type: str, payload: dict, run_at: datetime):
        db = SessionLocal()
        task = ScheduledTask(task_type=task_type, payload=payload, run_at=run_at)
        db.add(task)
        db.commit()
    
    def get_pending_tasks():
        db = SessionLocal()
        return db.query(ScheduledTask).filter(
            ScheduledTask.run_at <= datetime.utcnow(),
            ScheduledTask.executed.is_(False)
        ).all()
    
    def mark_executed(task_id: int):
        db = SessionLocal()
        task = db.query(ScheduledTask).get(task_id)
        task.executed = True
        db.commit()
    
  3. Integrate in your worker loop
    import time
    from services.scheduler import get_pending_tasks, mark_executed
    
    while True:
        for task in get_pending_tasks():
            # dispatch based on task.task_type
            process(task.payload)
            mark_executed(task.id)
        time.sleep(5)
    

7. Extending Serial Handler

Use send_at_command and send_ussd in services/serial_handler.py.

from services.serial_handler import send_at_command, send_ussd

# Send an AT command
response = send_at_command("AT+CSQ")
print("Signal Quality:", response)

# Initiate a USSD session
ussd_response = send_ussd("*123#")
print("Balance:", ussd_response)

To add custom commands:

def send_custom_command(cmd: str, timeout: float = 2.0) -> str:
    return send_at_command(cmd, timeout=timeout)

Integrate this helper alongside existing methods in services/serial_handler.py.