Job System

The B12 SIS backend includes a built-in job scheduler for running background tasks. Jobs can run on a schedule (cron) or be triggered manually.

Overview

The job system supports:

  • Cron-based scheduling

  • Manual job execution

  • Job logging and monitoring

  • Error handling and retry logic

  • Concurrent job execution control

Architecture

┌─────────────────────────────────────────────────────────┐
│                    Job Scheduler                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────┐  │
│  │   Cron      │  │   Job       │  │    Job          │  │
│  │   Manager   │─▶│   Queue     │─▶│    Executor     │  │
│  └─────────────┘  └─────────────┘  └─────────────────┘  │
│                                             │           │
│                                             ▼           │
│                                    ┌─────────────────┐  │
│                                    │   Job Log       │  │
│                                    │   Repository    │  │
│                                    └─────────────────┘  │
└─────────────────────────────────────────────────────────┘

Job Interface

All jobs implement the Job interface:

type Job interface {
    Name() string      // Unique job identifier
    Schedule() string  // Cron expression
    Execute() error    // Job logic
}

Creating a Job

Step 1: Implement the Job Interface

// internal/jobs/cleanup_temp_files_job.go
package jobs

import (
    "b12/internal/database"
    "time"
)

type CleanupTempFilesJob struct{}

func NewCleanupTempFilesJob() *CleanupTempFilesJob {
    return &CleanupTempFilesJob{}
}

func (j *CleanupTempFilesJob) Name() string {
    return "CleanupTempFilesJob"
}

func (j *CleanupTempFilesJob) Schedule() string {
    // Run daily at 2:00 AM
    return "0 2 * * *"
}

func (j *CleanupTempFilesJob) Execute() error {
    db := database.GetDB()

    // Delete temp files older than 7 days
    cutoff := time.Now().AddDate(0, 0, -7)
    result := db.Where("created_at < ? AND type = ?", cutoff, "temp").
        Delete(&models.FileUpload{})

    if result.Error != nil {
        return result.Error
    }

    log.Printf("Deleted %d temporary files", result.RowsAffected)
    return nil
}

Step 2: Register the Job

// internal/jobs/registry.go
package jobs

var RegisteredJobs = []Job{
    NewImportProcessorJob(),
    NewCanvasSyncJob(),
    NewCleanupTempFilesJob(),  // Add new job
    NewReportCardGeneratorJob(),
}

func GetJobByName(name string) Job {
    for _, job := range RegisteredJobs {
        if job.Name() == name {
            return job
        }
    }
    return nil
}

Cron Expressions

The scheduler uses standard cron syntax:

┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday = 0)
│ │ │ │ │
* * * * *

Common Patterns

Expression

Description

0 * * * *

Every hour

0 0 * * *

Daily at midnight

0 2 * * *

Daily at 2:00 AM

0 0 * * 0

Weekly on Sunday

0 0 1 * *

Monthly on the 1st

*/15 * * * *

Every 15 minutes

0 9-17 * * 1-5

Hourly 9 AM-5 PM, Mon-Fri

Running Jobs

As a Scheduler (Daemon Mode)

Start the application with DEPLOY_AS_JOB=1:

DEPLOY_AS_JOB=1 ./b12-backend

Or via Make:

make run-jobs

Manual Execution

Run a specific job immediately:

make run-job JOB=ImportProcessorJob

Or programmatically:

job := jobs.GetJobByName("ImportProcessorJob")
if job != nil {
    if err := job.Execute(); err != nil {
        log.Printf("Job failed: %v", err)
    }
}

List Available Jobs

make list-jobs

Job Logging

Jobs are logged to the job_logs table:

type JobLog struct {
    BaseModel
    JobName   string    `gorm:"size:100;index" json:"job_name"`
    Status    string    `gorm:"size:20" json:"status"` // running, completed, failed
    StartedAt time.Time `json:"started_at"`
    EndedAt   *time.Time `json:"ended_at"`
    Duration  int64     `json:"duration"` // milliseconds
    Error     string    `gorm:"type:text" json:"error"`
    Metadata  string    `gorm:"type:json" json:"metadata"`
}

Querying Job Logs

curl -X POST http://localhost:8080/api/job_logs/list \
  -H "Authorization: Bearer <token>" \
  -H "X-Team-ID: <team-id>" \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "limit": 20,
    "filter": {
      "group": "AND",
      "conditions": [
        {"field": "job_name", "operator": "=", "value": "ImportProcessorJob"}
      ]
    }
  }'

Built-in Jobs

ImportProcessorJob

Processes queued data imports:

  • Schedule: Every 5 minutes

  • Function: Processes pending ImportData records

func (j *ImportProcessorJob) Execute() error {
    imports, _ := j.repo.FindPending()
    for _, imp := range imports {
        j.processImport(imp)
    }
    return nil
}

CanvasSyncJob

Synchronizes data with Canvas LMS:

  • Schedule: Every hour

  • Function: Syncs students, courses, and enrollments

ReportCardGeneratorJob

Generates report cards in batch:

  • Schedule: Daily at 1:00 AM

  • Function: Processes pending report card requests

FixEmptyCodesJob

Repairs records with missing auto-generated codes:

  • Schedule: Daily at 3:00 AM

  • Function: Generates codes for records that should have them

Error Handling

Jobs should handle errors gracefully:

func (j *MyJob) Execute() error {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Job panic recovered: %v", r)
        }
    }()

    items, err := j.repo.FindPending()
    if err != nil {
        return fmt.Errorf("failed to fetch items: %w", err)
    }

    var lastErr error
    for _, item := range items {
        if err := j.processItem(item); err != nil {
            log.Printf("Failed to process item %s: %v", item.Id, err)
            lastErr = err
            // Continue processing other items
        }
    }

    return lastErr
}

Concurrency Control

Prevent duplicate job runs:

var jobLocks = sync.Map{}

func (j *MyJob) Execute() error {
    // Try to acquire lock
    if _, loaded := jobLocks.LoadOrStore(j.Name(), true); loaded {
        return errors.New("job already running")
    }
    defer jobLocks.Delete(j.Name())

    // Job logic here
    return nil
}

Job Configuration

Jobs can accept configuration:

type CanvasSyncJob struct {
    BatchSize int
    DryRun    bool
}

func NewCanvasSyncJob() *CanvasSyncJob {
    return &CanvasSyncJob{
        BatchSize: getEnvInt("CANVAS_SYNC_BATCH_SIZE", 100),
        DryRun:    getEnvBool("CANVAS_SYNC_DRY_RUN", false),
    }
}

Monitoring

Job Health Check

curl http://localhost:8080/api/health

Returns job scheduler status:

{
  "status": "ok",
  "jobs": {
    "scheduler_running": true,
    "registered_jobs": 5,
    "last_run": "2024-01-15T02:00:00Z"
  }
}

Job Statistics

curl http://localhost:8080/api/job_logs/statistics \
  -H "Authorization: Bearer <token>"

Response:

{
  "success": true,
  "data": {
    "total_runs": 1500,
    "successful": 1450,
    "failed": 50,
    "by_job": {
      "ImportProcessorJob": {"total": 500, "failed": 10},
      "CanvasSyncJob": {"total": 720, "failed": 30}
    }
  }
}

Best Practices

  1. Idempotency: Jobs should be safe to run multiple times

  2. Logging: Log progress and errors for debugging

  3. Batch Processing: Process large datasets in batches

  4. Timeouts: Set reasonable timeouts for external calls

  5. Error Recovery: Handle partial failures gracefully

  6. Monitoring: Check job logs regularly for failures