Job System

The B12 SIS backend includes a built-in job scheduler for running background tasks. Jobs can run on a schedule (cron) or be triggered manually.

Overview

The job system supports:

  • Cron-based scheduling

  • Manual job execution

  • Job logging and monitoring

  • Error handling and retry logic

  • Concurrent job execution control

Architecture

┌─────────────────────────────────────────────────────────┐
│                    Job Scheduler                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────┐  │
│  │   Cron      │  │   Job       │  │    Job          │  │
│  │   Manager   │─▶│   Queue     │─▶│    Executor     │  │
│  └─────────────┘  └─────────────┘  └─────────────────┘  │
│                                             │           │
│                                             ▼           │
│                                    ┌─────────────────┐  │
│                                    │   Job Log       │  │
│                                    │   Repository    │  │
│                                    └─────────────────┘  │
└─────────────────────────────────────────────────────────┘

Job Interface

All jobs implement the Job interface:

type Job interface {
    Name() string      // Unique job identifier
    Schedule() string  // Cron expression
    Execute() error    // Job logic
}

Creating a Job

Step 1: Implement the Job Interface

// internal/jobs/cleanup_temp_files_job.go
package jobs

import (
    "b12/internal/database"
    "time"
)

type CleanupTempFilesJob struct{}

func NewCleanupTempFilesJob() *CleanupTempFilesJob {
    return &CleanupTempFilesJob{}
}

func (j *CleanupTempFilesJob) Name() string {
    return "CleanupTempFilesJob"
}

func (j *CleanupTempFilesJob) Schedule() string {
    // Run daily at 2:00 AM
    return "0 2 * * *"
}

func (j *CleanupTempFilesJob) Execute() error {
    db := database.GetDB()

    // Delete temp files older than 7 days
    cutoff := time.Now().AddDate(0, 0, -7)
    result := db.Where("created_at < ? AND type = ?", cutoff, "temp").
        Delete(&models.FileUpload{})

    if result.Error != nil {
        return result.Error
    }

    log.Printf("Deleted %d temporary files", result.RowsAffected)
    return nil
}

Step 2: Register the Job

// internal/jobs/registry.go
package jobs

var RegisteredJobs = []Job{
    NewImportProcessorJob(),
    NewCanvasSyncJob(),
    NewCleanupTempFilesJob(),  // Add new job
    NewReportCardGeneratorJob(),
}

func GetJobByName(name string) Job {
    for _, job := range RegisteredJobs {
        if job.Name() == name {
            return job
        }
    }
    return nil
}

Cron Expressions

The scheduler uses standard cron syntax:

┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday = 0)
│ │ │ │ │
* * * * *

Common Patterns

Expression

Description

0 * * * *

Every hour

0 0 * * *

Daily at midnight

0 2 * * *

Daily at 2:00 AM

0 0 * * 0

Weekly on Sunday

0 0 1 * *

Monthly on the 1st

*/15 * * * *

Every 15 minutes

0 9-17 * * 1-5

Hourly 9 AM-5 PM, Mon-Fri

Running Jobs

As a Scheduler (Daemon Mode)

Start the application with DEPLOY_AS_JOB=1:

DEPLOY_AS_JOB=1 ./b12-backend

Or via Make:

make run-jobs

Manual Execution

Run a specific job immediately:

make run-job JOB=ImportProcessorJob

Or programmatically:

job := jobs.GetJobByName("ImportProcessorJob")
if job != nil {
    if err := job.Execute(); err != nil {
        log.Printf("Job failed: %v", err)
    }
}

List Available Jobs

make list-jobs

Job Logging

Jobs are logged to the job_logs table:

type JobLog struct {
    BaseModel
    JobName   string    `gorm:"size:100;index" json:"job_name"`
    Status    string    `gorm:"size:20" json:"status"` // running, completed, failed
    StartedAt time.Time `json:"started_at"`
    EndedAt   *time.Time `json:"ended_at"`
    Duration  int64     `json:"duration"` // milliseconds
    Error     string    `gorm:"type:text" json:"error"`
    Metadata  string    `gorm:"type:json" json:"metadata"`
}

Querying Job Logs

curl -X POST http://localhost:8080/api/job_logs/list \
  -H "Authorization: Bearer <token>" \
  -H "X-Team-ID: <team-id>" \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "limit": 20,
    "filter": {
      "group": "AND",
      "conditions": [
        {"field": "job_name", "operator": "=", "value": "ImportProcessorJob"}
      ]
    }
  }'

Built-in Jobs (114 Total)

The system includes 114 registered jobs across several categories:

System Maintenance Jobs

Job

Description

ImportProcessorJob

Processes pending Excel imports

CleanupOldImportsJob

Removes imports older than 30 days

JobCleanupJob

Cleans up old job log entries

GenerateTimetablesDetailJob

Generates timetable detail records

SyncExistingSummaryNamesJob

Updates summary name cache

Data Linking Jobs

Job

Description

LinkTeachersUsersJob

Links teacher records to user accounts

LinkStudentsUsersJob

Links student records to user accounts

LinkGuardianUserJob

Links guardian records to user accounts

FixUserTeamMemberJob

Repairs user team membership records

MergeDuplicateUsersJob

Merges duplicate user accounts

MergeDuplicateGuardiansJob

Merges duplicate guardian records

Data Fix Jobs

Job

Description

FixMissingAutoCodesJob

Generates missing auto-codes

FixEmptyCodesJob

Fixes records with empty codes

FixPersonRelationshipsCodesJob

Repairs person relationship codes

StudentAvatarUploadJob

Processes student avatar uploads

UpdateStudentCurrentClassJob

Updates current class assignments

UpdateCourseDescriptionsFromExcelJob

Updates course data from Excel

Canvas LMS Sync Jobs (6-Phase Execution)

Phase 1 - Foundation:

Job

Description

CanvasSyncUserIdsJob

Syncs user IDs from Canvas

CanvasCreateMissingUsersJob

Creates missing Canvas users

CanvasSyncSemesterIdsJob

Syncs semester/term IDs

CanvasCreateMissingSemestersJob

Creates missing Canvas terms

CanvasSyncGradingSchemesJob

Syncs grading schemes

Phase 2 - Course Setup:

Job

Description

CanvasSyncCourseIdsJob

Syncs course IDs from Canvas

CanvasCreateMissingCoursesJob

Creates missing Canvas courses

Phase 3 - Course Structure:

Job

Description

SyncCanvasAssignmentGroupsJob

Syncs assignment groups

SyncCanvasAssignmentsJob

Syncs assignments

SyncCanvasToAssessmentJob

Maps Canvas assignments to assessments

SyncCanvasModulesJob

Syncs course modules

Phase 4 - Enrollments:

Job

Description

CanvasSyncEnrollmentIdsJob

Syncs enrollment IDs

CanvasCreateEnrollmentsJob

Creates missing enrollments

Phase 5 - Assessment Data:

Job

Description

SyncCanvasAssessmentResultsJob

Syncs grades from Canvas

Phase 6 - Blueprint Templates:

Job

Description

CanvasSyncBlueprintCoursesJob

Syncs blueprint course templates

Gradebook Calculation Jobs

Job

Description

SyncGradebookCoursesJob

Syncs course gradebook records

CreateMissingSemesterGradebooksJob

Creates missing semester gradebooks

CreateMissingGradebookFinalsJob

Creates missing final gradebook records

SyncAssessmentScoresJob

Syncs assessment scores to gradebooks

CalculateGradebookScoresJob

Calculates gradebook totals

CalculateGradebookSemesterJob

Calculates semester grades

Import Jobs

Job

Description

ImportSubjectGradingSimpleJob

Imports subject grading rules

ImportTeacherCommentsJob

Imports teacher comments

Report Jobs

Job

Description

SemesterReportCardsJob

Generates semester report cards

Manual Sync Jobs

Job

Description

ForceSyncAllStudentsJob

Forces sync of all students to Canvas

CanvasForceResyncCoursesJob1time

One-time course resync

Error Handling

Jobs should handle errors gracefully:

func (j *MyJob) Execute() error {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Job panic recovered: %v", r)
        }
    }()

    items, err := j.repo.FindPending()
    if err != nil {
        return fmt.Errorf("failed to fetch items: %w", err)
    }

    var lastErr error
    for _, item := range items {
        if err := j.processItem(item); err != nil {
            log.Printf("Failed to process item %s: %v", item.Id, err)
            lastErr = err
            // Continue processing other items
        }
    }

    return lastErr
}

Concurrency Control

Prevent duplicate job runs:

var jobLocks = sync.Map{}

func (j *MyJob) Execute() error {
    // Try to acquire lock
    if _, loaded := jobLocks.LoadOrStore(j.Name(), true); loaded {
        return errors.New("job already running")
    }
    defer jobLocks.Delete(j.Name())

    // Job logic here
    return nil
}

Job Configuration

Jobs can accept configuration:

type CanvasSyncJob struct {
    BatchSize int
    DryRun    bool
}

func NewCanvasSyncJob() *CanvasSyncJob {
    return &CanvasSyncJob{
        BatchSize: getEnvInt("CANVAS_SYNC_BATCH_SIZE", 100),
        DryRun:    getEnvBool("CANVAS_SYNC_DRY_RUN", false),
    }
}

Monitoring

Job Health Check

curl http://localhost:8080/api/health

Returns job scheduler status:

{
  "status": "ok",
  "jobs": {
    "scheduler_running": true,
    "registered_jobs": 5,
    "last_run": "2024-01-15T02:00:00Z"
  }
}

Job Statistics

curl http://localhost:8080/api/job_logs/statistics \
  -H "Authorization: Bearer <token>"

Response:

{
  "success": true,
  "data": {
    "total_runs": 1500,
    "successful": 1450,
    "failed": 50,
    "by_job": {
      "ImportProcessorJob": {"total": 500, "failed": 10},
      "CanvasSyncJob": {"total": 720, "failed": 30}
    }
  }
}

Best Practices

  1. Idempotency: Jobs should be safe to run multiple times

  2. Logging: Log progress and errors for debugging

  3. Batch Processing: Process large datasets in batches

  4. Timeouts: Set reasonable timeouts for external calls

  5. Error Recovery: Handle partial failures gracefully

  6. Monitoring: Check job logs regularly for failures