Files
goodie/pkg/server/bus.go
Marc 66383adf06 feat: Portal, Email Inbound, Discuss + module improvements
- Portal: /my/* routes, signup, password reset, portal user support
- Email Inbound: IMAP polling (go-imap/v2), thread matching
- Discuss: mail.channel, long-polling bus, DM, unread count
- Cron: ir.cron runner (goroutine scheduler)
- Bank Import, CSV/Excel Import
- Automation (ir.actions.server)
- Fetchmail service
- HR Payroll model
- Various fixes across account, sale, stock, purchase, crm, hr, project

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 18:41:57 +02:00

242 lines
6.2 KiB
Go

package server
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
)
// Bus implements a simple long-polling message bus for Discuss.
// Mirrors: odoo/addons/bus/models/bus.py ImBus
//
// Channels subscribe to notifications. A long-poll request blocks until
// a notification arrives or the timeout expires.
type Bus struct {
mu sync.Mutex
channels map[int64][]chan busNotification
lastID int64
}
type busNotification struct {
ID int64 `json:"id"`
Channel string `json:"channel"`
Message interface{} `json:"message"`
}
// NewBus creates a new message bus.
func NewBus() *Bus {
return &Bus{
channels: make(map[int64][]chan busNotification),
}
}
// Notify sends a notification to all subscribers of a channel.
func (b *Bus) Notify(channelID int64, channel string, message interface{}) {
b.mu.Lock()
b.lastID++
notif := busNotification{
ID: b.lastID,
Channel: channel,
Message: message,
}
subs := b.channels[channelID]
b.mu.Unlock()
for _, ch := range subs {
select {
case ch <- notif:
default:
// subscriber buffer full, skip
}
}
}
// Subscribe creates a subscription for a partner's channels.
func (b *Bus) Subscribe(partnerID int64) chan busNotification {
ch := make(chan busNotification, 10)
b.mu.Lock()
b.channels[partnerID] = append(b.channels[partnerID], ch)
b.mu.Unlock()
return ch
}
// Unsubscribe removes a subscription.
func (b *Bus) Unsubscribe(partnerID int64, ch chan busNotification) {
b.mu.Lock()
defer b.mu.Unlock()
subs := b.channels[partnerID]
for i, s := range subs {
if s == ch {
b.channels[partnerID] = append(subs[:i], subs[i+1:]...)
close(ch)
return
}
}
}
// registerBusRoutes adds the long-polling endpoint.
func (s *Server) registerBusRoutes() {
if s.bus == nil {
s.bus = NewBus()
}
s.mux.HandleFunc("/longpolling/poll", s.handleBusPoll)
s.mux.HandleFunc("/discuss/channel/messages", s.handleDiscussMessages)
s.mux.HandleFunc("/discuss/channel/list", s.handleDiscussChannelList)
}
// handleBusPoll implements long-polling for real-time notifications.
// Mirrors: odoo/addons/bus/controllers/main.py poll()
func (s *Server) handleBusPoll(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
sess := GetSession(r)
if sess == nil {
writeJSON(w, []interface{}{})
return
}
// Get partner ID
var partnerID int64
s.pool.QueryRow(r.Context(),
`SELECT COALESCE(partner_id, 0) FROM res_users WHERE id = $1`, sess.UID,
).Scan(&partnerID)
if partnerID == 0 {
writeJSON(w, []interface{}{})
return
}
// Subscribe and wait for notifications (max 30s)
ch := s.bus.Subscribe(partnerID)
defer s.bus.Unsubscribe(partnerID, ch)
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
select {
case notif := <-ch:
writeJSON(w, []busNotification{notif})
case <-ctx.Done():
writeJSON(w, []interface{}{}) // timeout, empty response
}
}
// handleDiscussMessages fetches messages for a channel via JSON-RPC.
func (s *Server) handleDiscussMessages(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
sess := GetSession(r)
if sess == nil {
s.writeJSONRPC(w, nil, nil, &RPCError{Code: 100, Message: "Not authenticated"})
return
}
var req JSONRPCRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.writeJSONRPC(w, nil, nil, &RPCError{Code: -32700, Message: "Parse error"})
return
}
var params struct {
ChannelID int64 `json:"channel_id"`
Limit int `json:"limit"`
}
if err := json.Unmarshal(req.Params, &params); err != nil {
s.writeJSONRPC(w, req.ID, nil, &RPCError{Code: -32602, Message: "Invalid params"})
return
}
if params.Limit <= 0 {
params.Limit = 50
}
rows, err := s.pool.Query(r.Context(),
`SELECT m.id, m.body, m.date, m.author_id, COALESCE(p.name, '')
FROM mail_message m
LEFT JOIN res_partner p ON p.id = m.author_id
WHERE m.model = 'mail.channel' AND m.res_id = $1
ORDER BY m.id DESC LIMIT $2`, params.ChannelID, params.Limit)
if err != nil {
s.writeJSONRPC(w, req.ID, nil, &RPCError{Code: -32603, Message: fmt.Sprintf("Query: %v", err)})
return
}
defer rows.Close()
var messages []map[string]interface{}
for rows.Next() {
var id, authorID int64
var body, authorName string
var date interface{}
if err := rows.Scan(&id, &body, &date, &authorID, &authorName); err != nil {
continue
}
msg := map[string]interface{}{
"id": id, "body": body, "date": date,
}
if authorID > 0 {
msg["author_id"] = []interface{}{authorID, authorName}
} else {
msg["author_id"] = false
}
messages = append(messages, msg)
}
if messages == nil {
messages = []map[string]interface{}{}
}
s.writeJSONRPC(w, req.ID, messages, nil)
}
// handleDiscussChannelList returns channels the current user is member of.
func (s *Server) handleDiscussChannelList(w http.ResponseWriter, r *http.Request) {
sess := GetSession(r)
if sess == nil {
s.writeJSONRPC(w, nil, nil, &RPCError{Code: 100, Message: "Not authenticated"})
return
}
var partnerID int64
s.pool.QueryRow(r.Context(),
`SELECT COALESCE(partner_id, 0) FROM res_users WHERE id = $1`, sess.UID,
).Scan(&partnerID)
rows, err := s.pool.Query(r.Context(),
`SELECT c.id, c.name, c.channel_type,
(SELECT COUNT(*) FROM mail_channel_member WHERE channel_id = c.id) AS members
FROM mail_channel c
JOIN mail_channel_member cm ON cm.channel_id = c.id AND cm.partner_id = $1
WHERE c.active = true
ORDER BY c.last_message_date DESC NULLS LAST`, partnerID)
if err != nil {
log.Printf("discuss: channel list error: %v", err)
writeJSON(w, []interface{}{})
return
}
defer rows.Close()
var channels []map[string]interface{}
for rows.Next() {
var id int64
var name, channelType string
var members int64
if err := rows.Scan(&id, &name, &channelType, &members); err != nil {
continue
}
channels = append(channels, map[string]interface{}{
"id": id, "name": name, "channel_type": channelType, "member_count": members,
})
}
if channels == nil {
channels = []map[string]interface{}{}
}
writeJSON(w, channels)
}