Files
goodie/pkg/service/cron.go
Marc 66383adf06 feat: Portal, Email Inbound, Discuss + module improvements
- Portal: /my/* routes, signup, password reset, portal user support
- Email Inbound: IMAP polling (go-imap/v2), thread matching
- Discuss: mail.channel, long-polling bus, DM, unread count
- Cron: ir.cron runner (goroutine scheduler)
- Bank Import, CSV/Excel Import
- Automation (ir.actions.server)
- Fetchmail service
- HR Payroll model
- Various fixes across account, sale, stock, purchase, crm, hr, project

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 18:41:57 +02:00

273 lines
7.0 KiB
Go

package service
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"odoo-go/pkg/orm"
)
const (
cronPollInterval = 60 * time.Second
maxFailureCount = 5
)
// cronJob holds a single scheduled action loaded from the ir_cron table.
type cronJob struct {
ID int64
Name string
ModelName string
MethodName string
UserID int64
IntervalNumber int
IntervalType string
NumberCall int
NextCall time.Time
}
// CronScheduler polls ir_cron and executes ready jobs.
// Mirrors: odoo/addons/base/models/ir_cron.py IrCron._process_jobs()
type CronScheduler struct {
pool *pgxpool.Pool
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewCronScheduler creates a DB-driven cron scheduler.
func NewCronScheduler(pool *pgxpool.Pool) *CronScheduler {
ctx, cancel := context.WithCancel(context.Background())
return &CronScheduler{pool: pool, ctx: ctx, cancel: cancel}
}
// Start begins the polling loop in a background goroutine.
func (s *CronScheduler) Start() {
s.wg.Add(1)
go s.pollLoop()
log.Println("cron: scheduler started")
}
// Stop cancels the polling loop and waits for completion.
func (s *CronScheduler) Stop() {
s.cancel()
s.wg.Wait()
log.Println("cron: scheduler stopped")
}
func (s *CronScheduler) pollLoop() {
defer s.wg.Done()
// Run once immediately, then on ticker
s.processJobs()
ticker := time.NewTicker(cronPollInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.processJobs()
}
}
}
// processJobs queries all ready cron jobs and processes them one by one.
func (s *CronScheduler) processJobs() {
rows, err := s.pool.Query(s.ctx, `
SELECT id, name, model_name, method_name, user_id,
interval_number, interval_type, numbercall, nextcall
FROM ir_cron
WHERE active = true AND nextcall <= now()
ORDER BY priority, id
`)
if err != nil {
log.Printf("cron: query error: %v", err)
return
}
var jobs []cronJob
for rows.Next() {
var j cronJob
var modelName, methodName *string // nullable
if err := rows.Scan(&j.ID, &j.Name, &modelName, &methodName, &j.UserID,
&j.IntervalNumber, &j.IntervalType, &j.NumberCall, &j.NextCall); err != nil {
log.Printf("cron: scan error: %v", err)
continue
}
if modelName != nil {
j.ModelName = *modelName
}
if methodName != nil {
j.MethodName = *methodName
}
jobs = append(jobs, j)
}
rows.Close()
for _, job := range jobs {
s.processOneJob(job)
}
}
// processOneJob acquires a row-level lock and executes a single cron job.
func (s *CronScheduler) processOneJob(job cronJob) {
tx, err := s.pool.Begin(s.ctx)
if err != nil {
return
}
defer tx.Rollback(s.ctx)
// Try to acquire the job with FOR NO KEY UPDATE SKIP LOCKED
var lockedID int64
err = tx.QueryRow(s.ctx, `
SELECT id FROM ir_cron
WHERE id = $1 AND active = true AND nextcall <= now()
FOR NO KEY UPDATE SKIP LOCKED
`, job.ID).Scan(&lockedID)
if err != nil {
// Job already taken by another worker or not ready
return
}
log.Printf("cron: executing %q (id=%d)", job.Name, job.ID)
execErr := s.executeJob(job)
now := time.Now()
nextCall := calculateNextCall(now, job.IntervalNumber, job.IntervalType)
if execErr != nil {
log.Printf("cron: %q failed: %v", job.Name, execErr)
// Update failure count, set first_failure_date if not already set
if _, err := tx.Exec(s.ctx, `
UPDATE ir_cron SET
failure_count = failure_count + 1,
first_failure_date = COALESCE(first_failure_date, $1),
lastcall = $1,
nextcall = $2
WHERE id = $3
`, now, nextCall, job.ID); err != nil {
log.Printf("cron: failed to update failure count for %q: %v", job.Name, err)
}
// Deactivate if too many consecutive failures
if _, err := tx.Exec(s.ctx, `
UPDATE ir_cron SET active = false
WHERE id = $1 AND failure_count >= $2
`, job.ID, maxFailureCount); err != nil {
log.Printf("cron: failed to deactivate %q: %v", job.Name, err)
}
} else {
log.Printf("cron: %q completed successfully", job.Name)
if job.NumberCall > 0 {
// Finite run count: decrement
newNumberCall := job.NumberCall - 1
if newNumberCall <= 0 {
if _, err := tx.Exec(s.ctx, `
UPDATE ir_cron SET active = false, lastcall = $1, nextcall = $2,
failure_count = 0, first_failure_date = NULL, numbercall = 0
WHERE id = $3
`, now, nextCall, job.ID); err != nil {
log.Printf("cron: failed to update job %q: %v", job.Name, err)
}
} else {
if _, err := tx.Exec(s.ctx, `
UPDATE ir_cron SET lastcall = $1, nextcall = $2,
failure_count = 0, first_failure_date = NULL, numbercall = $3
WHERE id = $4
`, now, nextCall, newNumberCall, job.ID); err != nil {
log.Printf("cron: failed to update job %q: %v", job.Name, err)
}
}
} else {
// numbercall <= 0 means infinite runs
if _, err := tx.Exec(s.ctx, `
UPDATE ir_cron SET lastcall = $1, nextcall = $2,
failure_count = 0, first_failure_date = NULL
WHERE id = $3
`, now, nextCall, job.ID); err != nil {
log.Printf("cron: failed to update job %q: %v", job.Name, err)
}
}
}
if err := tx.Commit(s.ctx); err != nil {
log.Printf("cron: commit error for %q: %v", job.Name, err)
}
}
// executeJob looks up the target method in orm.Registry and calls it.
func (s *CronScheduler) executeJob(job cronJob) error {
if job.ModelName == "" || job.MethodName == "" {
return fmt.Errorf("cron %q: model_name or method_name not set", job.Name)
}
model := orm.Registry.Get(job.ModelName)
if model == nil {
return fmt.Errorf("cron %q: model %q not found", job.Name, job.ModelName)
}
if model.Methods == nil {
return fmt.Errorf("cron %q: model %q has no methods", job.Name, job.ModelName)
}
method, ok := model.Methods[job.MethodName]
if !ok {
return fmt.Errorf("cron %q: method %q not found on %q", job.Name, job.MethodName, job.ModelName)
}
// Create ORM environment for job execution
uid := job.UserID
if uid == 0 {
return fmt.Errorf("cron %q: user_id not set, refusing to run as admin", job.Name)
}
env, err := orm.NewEnvironment(s.ctx, orm.EnvConfig{
Pool: s.pool,
UID: uid,
Context: map[string]interface{}{
"lastcall": job.NextCall,
"cron_id": job.ID,
},
})
if err != nil {
return fmt.Errorf("cron %q: env error: %w", job.Name, err)
}
defer env.Close()
// Call the method on an empty recordset of the target model
_, err = method(env.Model(job.ModelName))
if err != nil {
env.Rollback()
return err
}
return env.Commit()
}
// calculateNextCall computes the next execution time based on interval.
// Mirrors: odoo/addons/base/models/ir_cron.py _intervalTypes
func calculateNextCall(from time.Time, number int, intervalType string) time.Time {
switch intervalType {
case "minutes":
return from.Add(time.Duration(number) * time.Minute)
case "hours":
return from.Add(time.Duration(number) * time.Hour)
case "days":
return from.AddDate(0, 0, number)
case "weeks":
return from.AddDate(0, 0, number*7)
case "months":
return from.AddDate(0, number, 0)
default:
return from.Add(time.Duration(number) * time.Hour)
}
}