package service import ( "context" "fmt" "log" "sync" "time" "github.com/jackc/pgx/v5/pgxpool" "odoo-go/pkg/orm" ) const ( cronPollInterval = 60 * time.Second maxFailureCount = 5 ) // cronJob holds a single scheduled action loaded from the ir_cron table. type cronJob struct { ID int64 Name string ModelName string MethodName string UserID int64 IntervalNumber int IntervalType string NumberCall int NextCall time.Time } // CronScheduler polls ir_cron and executes ready jobs. // Mirrors: odoo/addons/base/models/ir_cron.py IrCron._process_jobs() type CronScheduler struct { pool *pgxpool.Pool ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // NewCronScheduler creates a DB-driven cron scheduler. func NewCronScheduler(pool *pgxpool.Pool) *CronScheduler { ctx, cancel := context.WithCancel(context.Background()) return &CronScheduler{pool: pool, ctx: ctx, cancel: cancel} } // Start begins the polling loop in a background goroutine. func (s *CronScheduler) Start() { s.wg.Add(1) go s.pollLoop() log.Println("cron: scheduler started") } // Stop cancels the polling loop and waits for completion. func (s *CronScheduler) Stop() { s.cancel() s.wg.Wait() log.Println("cron: scheduler stopped") } func (s *CronScheduler) pollLoop() { defer s.wg.Done() // Run once immediately, then on ticker s.processJobs() ticker := time.NewTicker(cronPollInterval) defer ticker.Stop() for { select { case <-s.ctx.Done(): return case <-ticker.C: s.processJobs() } } } // processJobs queries all ready cron jobs and processes them one by one. func (s *CronScheduler) processJobs() { rows, err := s.pool.Query(s.ctx, ` SELECT id, name, model_name, method_name, user_id, interval_number, interval_type, numbercall, nextcall FROM ir_cron WHERE active = true AND nextcall <= now() ORDER BY priority, id `) if err != nil { log.Printf("cron: query error: %v", err) return } var jobs []cronJob for rows.Next() { var j cronJob var modelName, methodName *string // nullable if err := rows.Scan(&j.ID, &j.Name, &modelName, &methodName, &j.UserID, &j.IntervalNumber, &j.IntervalType, &j.NumberCall, &j.NextCall); err != nil { log.Printf("cron: scan error: %v", err) continue } if modelName != nil { j.ModelName = *modelName } if methodName != nil { j.MethodName = *methodName } jobs = append(jobs, j) } rows.Close() for _, job := range jobs { s.processOneJob(job) } } // processOneJob acquires a row-level lock and executes a single cron job. func (s *CronScheduler) processOneJob(job cronJob) { tx, err := s.pool.Begin(s.ctx) if err != nil { return } defer tx.Rollback(s.ctx) // Try to acquire the job with FOR NO KEY UPDATE SKIP LOCKED var lockedID int64 err = tx.QueryRow(s.ctx, ` SELECT id FROM ir_cron WHERE id = $1 AND active = true AND nextcall <= now() FOR NO KEY UPDATE SKIP LOCKED `, job.ID).Scan(&lockedID) if err != nil { // Job already taken by another worker or not ready return } log.Printf("cron: executing %q (id=%d)", job.Name, job.ID) execErr := s.executeJob(job) now := time.Now() nextCall := calculateNextCall(now, job.IntervalNumber, job.IntervalType) if execErr != nil { log.Printf("cron: %q failed: %v", job.Name, execErr) // Update failure count, set first_failure_date if not already set if _, err := tx.Exec(s.ctx, ` UPDATE ir_cron SET failure_count = failure_count + 1, first_failure_date = COALESCE(first_failure_date, $1), lastcall = $1, nextcall = $2 WHERE id = $3 `, now, nextCall, job.ID); err != nil { log.Printf("cron: failed to update failure count for %q: %v", job.Name, err) } // Deactivate if too many consecutive failures if _, err := tx.Exec(s.ctx, ` UPDATE ir_cron SET active = false WHERE id = $1 AND failure_count >= $2 `, job.ID, maxFailureCount); err != nil { log.Printf("cron: failed to deactivate %q: %v", job.Name, err) } } else { log.Printf("cron: %q completed successfully", job.Name) if job.NumberCall > 0 { // Finite run count: decrement newNumberCall := job.NumberCall - 1 if newNumberCall <= 0 { if _, err := tx.Exec(s.ctx, ` UPDATE ir_cron SET active = false, lastcall = $1, nextcall = $2, failure_count = 0, first_failure_date = NULL, numbercall = 0 WHERE id = $3 `, now, nextCall, job.ID); err != nil { log.Printf("cron: failed to update job %q: %v", job.Name, err) } } else { if _, err := tx.Exec(s.ctx, ` UPDATE ir_cron SET lastcall = $1, nextcall = $2, failure_count = 0, first_failure_date = NULL, numbercall = $3 WHERE id = $4 `, now, nextCall, newNumberCall, job.ID); err != nil { log.Printf("cron: failed to update job %q: %v", job.Name, err) } } } else { // numbercall <= 0 means infinite runs if _, err := tx.Exec(s.ctx, ` UPDATE ir_cron SET lastcall = $1, nextcall = $2, failure_count = 0, first_failure_date = NULL WHERE id = $3 `, now, nextCall, job.ID); err != nil { log.Printf("cron: failed to update job %q: %v", job.Name, err) } } } if err := tx.Commit(s.ctx); err != nil { log.Printf("cron: commit error for %q: %v", job.Name, err) } } // executeJob looks up the target method in orm.Registry and calls it. func (s *CronScheduler) executeJob(job cronJob) error { if job.ModelName == "" || job.MethodName == "" { return fmt.Errorf("cron %q: model_name or method_name not set", job.Name) } model := orm.Registry.Get(job.ModelName) if model == nil { return fmt.Errorf("cron %q: model %q not found", job.Name, job.ModelName) } if model.Methods == nil { return fmt.Errorf("cron %q: model %q has no methods", job.Name, job.ModelName) } method, ok := model.Methods[job.MethodName] if !ok { return fmt.Errorf("cron %q: method %q not found on %q", job.Name, job.MethodName, job.ModelName) } // Create ORM environment for job execution uid := job.UserID if uid == 0 { return fmt.Errorf("cron %q: user_id not set, refusing to run as admin", job.Name) } env, err := orm.NewEnvironment(s.ctx, orm.EnvConfig{ Pool: s.pool, UID: uid, Context: map[string]interface{}{ "lastcall": job.NextCall, "cron_id": job.ID, }, }) if err != nil { return fmt.Errorf("cron %q: env error: %w", job.Name, err) } defer env.Close() // Call the method on an empty recordset of the target model _, err = method(env.Model(job.ModelName)) if err != nil { env.Rollback() return err } return env.Commit() } // calculateNextCall computes the next execution time based on interval. // Mirrors: odoo/addons/base/models/ir_cron.py _intervalTypes func calculateNextCall(from time.Time, number int, intervalType string) time.Time { switch intervalType { case "minutes": return from.Add(time.Duration(number) * time.Minute) case "hours": return from.Add(time.Duration(number) * time.Hour) case "days": return from.AddDate(0, 0, number) case "weeks": return from.AddDate(0, 0, number*7) case "months": return from.AddDate(0, number, 0) default: return from.Add(time.Duration(number) * time.Hour) } }