Job System
The B12 SIS backend includes a built-in job scheduler for running background tasks. Jobs can run on a schedule (cron) or be triggered manually.
Overview
The job system supports:
Cron-based scheduling
Manual job execution
Job logging and monitoring
Error handling and retry logic
Concurrent job execution control
Architecture
┌─────────────────────────────────────────────────────────┐
│ Job Scheduler │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │
│ │ Cron │ │ Job │ │ Job │ │
│ │ Manager │─▶│ Queue │─▶│ Executor │ │
│ └─────────────┘ └─────────────┘ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Job Log │ │
│ │ Repository │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────┘
Job Interface
All jobs implement the Job interface:
type Job interface {
Name() string // Unique job identifier
Schedule() string // Cron expression
Execute() error // Job logic
}
Creating a Job
Step 1: Implement the Job Interface
// internal/jobs/cleanup_temp_files_job.go
package jobs
import (
"b12/internal/database"
"time"
)
type CleanupTempFilesJob struct{}
func NewCleanupTempFilesJob() *CleanupTempFilesJob {
return &CleanupTempFilesJob{}
}
func (j *CleanupTempFilesJob) Name() string {
return "CleanupTempFilesJob"
}
func (j *CleanupTempFilesJob) Schedule() string {
// Run daily at 2:00 AM
return "0 2 * * *"
}
func (j *CleanupTempFilesJob) Execute() error {
db := database.GetDB()
// Delete temp files older than 7 days
cutoff := time.Now().AddDate(0, 0, -7)
result := db.Where("created_at < ? AND type = ?", cutoff, "temp").
Delete(&models.FileUpload{})
if result.Error != nil {
return result.Error
}
log.Printf("Deleted %d temporary files", result.RowsAffected)
return nil
}
Step 2: Register the Job
// internal/jobs/registry.go
package jobs
var RegisteredJobs = []Job{
NewImportProcessorJob(),
NewCanvasSyncJob(),
NewCleanupTempFilesJob(), // Add new job
NewReportCardGeneratorJob(),
}
func GetJobByName(name string) Job {
for _, job := range RegisteredJobs {
if job.Name() == name {
return job
}
}
return nil
}
Cron Expressions
The scheduler uses standard cron syntax:
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday = 0)
│ │ │ │ │
* * * * *
Common Patterns
Expression |
Description |
|---|---|
|
Every hour |
|
Daily at midnight |
|
Daily at 2:00 AM |
|
Weekly on Sunday |
|
Monthly on the 1st |
|
Every 15 minutes |
|
Hourly 9 AM-5 PM, Mon-Fri |
Running Jobs
As a Scheduler (Daemon Mode)
Start the application with DEPLOY_AS_JOB=1:
DEPLOY_AS_JOB=1 ./b12-backend
Or via Make:
make run-jobs
Manual Execution
Run a specific job immediately:
make run-job JOB=ImportProcessorJob
Or programmatically:
job := jobs.GetJobByName("ImportProcessorJob")
if job != nil {
if err := job.Execute(); err != nil {
log.Printf("Job failed: %v", err)
}
}
List Available Jobs
make list-jobs
Job Logging
Jobs are logged to the job_logs table:
type JobLog struct {
BaseModel
JobName string `gorm:"size:100;index" json:"job_name"`
Status string `gorm:"size:20" json:"status"` // running, completed, failed
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at"`
Duration int64 `json:"duration"` // milliseconds
Error string `gorm:"type:text" json:"error"`
Metadata string `gorm:"type:json" json:"metadata"`
}
Querying Job Logs
curl -X POST http://localhost:8080/api/job_logs/list \
-H "Authorization: Bearer <token>" \
-H "X-Team-ID: <team-id>" \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 20,
"filter": {
"group": "AND",
"conditions": [
{"field": "job_name", "operator": "=", "value": "ImportProcessorJob"}
]
}
}'
Built-in Jobs
ImportProcessorJob
Processes queued data imports:
Schedule: Every 5 minutes
Function: Processes pending ImportData records
func (j *ImportProcessorJob) Execute() error {
imports, _ := j.repo.FindPending()
for _, imp := range imports {
j.processImport(imp)
}
return nil
}
CanvasSyncJob
Synchronizes data with Canvas LMS:
Schedule: Every hour
Function: Syncs students, courses, and enrollments
ReportCardGeneratorJob
Generates report cards in batch:
Schedule: Daily at 1:00 AM
Function: Processes pending report card requests
FixEmptyCodesJob
Repairs records with missing auto-generated codes:
Schedule: Daily at 3:00 AM
Function: Generates codes for records that should have them
Error Handling
Jobs should handle errors gracefully:
func (j *MyJob) Execute() error {
defer func() {
if r := recover(); r != nil {
log.Printf("Job panic recovered: %v", r)
}
}()
items, err := j.repo.FindPending()
if err != nil {
return fmt.Errorf("failed to fetch items: %w", err)
}
var lastErr error
for _, item := range items {
if err := j.processItem(item); err != nil {
log.Printf("Failed to process item %s: %v", item.Id, err)
lastErr = err
// Continue processing other items
}
}
return lastErr
}
Concurrency Control
Prevent duplicate job runs:
var jobLocks = sync.Map{}
func (j *MyJob) Execute() error {
// Try to acquire lock
if _, loaded := jobLocks.LoadOrStore(j.Name(), true); loaded {
return errors.New("job already running")
}
defer jobLocks.Delete(j.Name())
// Job logic here
return nil
}
Job Configuration
Jobs can accept configuration:
type CanvasSyncJob struct {
BatchSize int
DryRun bool
}
func NewCanvasSyncJob() *CanvasSyncJob {
return &CanvasSyncJob{
BatchSize: getEnvInt("CANVAS_SYNC_BATCH_SIZE", 100),
DryRun: getEnvBool("CANVAS_SYNC_DRY_RUN", false),
}
}
Monitoring
Job Health Check
curl http://localhost:8080/api/health
Returns job scheduler status:
{
"status": "ok",
"jobs": {
"scheduler_running": true,
"registered_jobs": 5,
"last_run": "2024-01-15T02:00:00Z"
}
}
Job Statistics
curl http://localhost:8080/api/job_logs/statistics \
-H "Authorization: Bearer <token>"
Response:
{
"success": true,
"data": {
"total_runs": 1500,
"successful": 1450,
"failed": 50,
"by_job": {
"ImportProcessorJob": {"total": 500, "failed": 10},
"CanvasSyncJob": {"total": 720, "failed": 30}
}
}
}
Best Practices
Idempotency: Jobs should be safe to run multiple times
Logging: Log progress and errors for debugging
Batch Processing: Process large datasets in batches
Timeouts: Set reasonable timeouts for external calls
Error Recovery: Handle partial failures gracefully
Monitoring: Check job logs regularly for failures