fix shutdown bug

main
DustyP 11 months ago
parent fbaf70bafc
commit 3ad2c294d2

@ -2,6 +2,7 @@ package main
import ( import (
"bufio" "bufio"
"context"
"flag" "flag"
"fmt" "fmt"
"os" "os"
@ -49,12 +50,16 @@ func main() {
close(eventBroadcaster) close(eventBroadcaster)
}() }()
// Create a context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start web interface if enabled // Start web interface if enabled
if !*noWeb { if !*noWeb {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
startWebInterface(clock, eventBroadcaster, *webPort, sigChan) startWebInterface(clock, eventBroadcaster, *webPort, ctx)
}() }()
} }
@ -63,26 +68,30 @@ func main() {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
startTerminalInterface(clock, eventBroadcaster, sigChan) startTerminalInterface(clock, eventBroadcaster, ctx)
}() }()
} }
// Wait for signal to exit // Wait for signal to exit
<-sigChan <-sigChan
fmt.Println("Shutting down...") fmt.Println("Shutting down...")
time.Sleep(500 * time.Millisecond) // Give a moment for any pending operations to complete
// Cancel context to signal all components to shut down
cancel()
// Give a moment for any pending operations to complete
time.Sleep(500 * time.Millisecond)
// Wait for all interfaces to shut down // Wait for all interfaces to shut down
wg.Wait() wg.Wait()
} }
// startWebInterface initializes and runs the web interface // startWebInterface initializes and runs the web interface
func startWebInterface(clock *derby.DerbyClock, events <-chan derby.Event, webPort int, sigChan chan os.Signal) { func startWebInterface(clock *derby.DerbyClock, events <-chan derby.Event, webPort int, ctx context.Context) {
// Create and start the web server // Create and start the web server
server, err := web.NewServer(clock, events, webPort) server, err := web.NewServer(clock, events, webPort)
if err != nil { if err != nil {
fmt.Printf("Error creating web server: %v\n", err) fmt.Printf("Error creating web server: %v\n", err)
sigChan <- syscall.SIGTERM
return return
} }
@ -91,18 +100,26 @@ func startWebInterface(clock *derby.DerbyClock, events <-chan derby.Event, webPo
// Start the web server // Start the web server
if err := server.Start(); err != nil { if err := server.Start(); err != nil {
fmt.Printf("Web server error: %v\n", err) fmt.Printf("Web server error: %v\n", err)
sigChan <- syscall.SIGTERM return
}
// Wait for context cancellation
<-ctx.Done()
// Gracefully shut down the server
fmt.Println("Shutting down web server...")
if err := server.Stop(); err != nil {
fmt.Printf("Error shutting down web server: %v\n", err)
} }
} }
// startTerminalInterface initializes and runs the terminal interface // startTerminalInterface initializes and runs the terminal interface
func startTerminalInterface(clock *derby.DerbyClock, events <-chan derby.Event, sigChan chan os.Signal) { func startTerminalInterface(clock *derby.DerbyClock, events <-chan derby.Event, ctx context.Context) {
fmt.Println("Terminal interface started") fmt.Println("Terminal interface started")
// Reset the clock to start fresh // Reset the clock to start fresh
if err := clock.Reset(); err != nil { if err := clock.Reset(); err != nil {
fmt.Printf("Error resetting clock: %v\n", err) fmt.Printf("Error resetting clock: %v\n", err)
sigChan <- syscall.SIGTERM
return return
} }
fmt.Println("Clock reset. Ready to start race.") fmt.Println("Clock reset. Ready to start race.")
@ -117,7 +134,12 @@ func startTerminalInterface(clock *derby.DerbyClock, events <-chan derby.Event,
// Process events from the clock // Process events from the clock
go func() { go func() {
raceResults := make([]*derby.Result, 0) raceResults := make([]*derby.Result, 0)
for event := range events { for {
select {
case event, ok := <-events:
if !ok {
return
}
switch event.Type { switch event.Type {
case derby.EventRaceStart: case derby.EventRaceStart:
fmt.Println("\n🏁 Race started!") fmt.Println("\n🏁 Race started!")
@ -137,12 +159,19 @@ func startTerminalInterface(clock *derby.DerbyClock, events <-chan derby.Event,
fmt.Println("\nEnter command (r/f/q/?):") fmt.Println("\nEnter command (r/f/q/?):")
raceResults = nil raceResults = nil
} }
case <-ctx.Done():
return
}
} }
}() }()
// Handle keyboard input // Handle keyboard input
reader := bufio.NewReader(os.Stdin) reader := bufio.NewReader(os.Stdin)
for { for {
select {
case <-ctx.Done():
return
default:
fmt.Print("Enter command (r/f/q/?): ") fmt.Print("Enter command (r/f/q/?): ")
input, err := reader.ReadString('\n') input, err := reader.ReadString('\n')
if err != nil { if err != nil {
@ -170,7 +199,6 @@ func startTerminalInterface(clock *derby.DerbyClock, events <-chan derby.Event,
case "q": case "q":
fmt.Println("Quitting...") fmt.Println("Quitting...")
sigChan <- syscall.SIGTERM
return return
case "?": case "?":
@ -186,4 +214,5 @@ func startTerminalInterface(clock *derby.DerbyClock, events <-chan derby.Event,
} }
} }
} }
}
} }

@ -1,10 +1,12 @@
package web package web
import ( import (
"context"
"embed" "embed"
"fmt" "fmt"
"io/fs" "io/fs"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
@ -23,7 +25,10 @@ type Server struct {
clock *derby.DerbyClock clock *derby.DerbyClock
events <-chan derby.Event events <-chan derby.Event
clients map[chan string]bool clients map[chan string]bool
clientsMux sync.Mutex
port int port int
server *http.Server
shutdown chan struct{}
} }
// NewServer creates a new web server // NewServer creates a new web server
@ -34,7 +39,9 @@ func NewServer(clock *derby.DerbyClock, events <-chan derby.Event, port int) (*S
clock: clock, clock: clock,
events: events, events: events,
clients: make(map[chan string]bool), clients: make(map[chan string]bool),
clientsMux: sync.Mutex{},
port: port, port: port,
shutdown: make(chan struct{}),
} }
// Set up routes // Set up routes
@ -69,17 +76,65 @@ func (s *Server) routes() {
// Start starts the web server // Start starts the web server
func (s *Server) Start() error { func (s *Server) Start() error {
fmt.Printf("Starting web server on port %d...\n", s.port) addr := fmt.Sprintf(":%d", s.port)
return http.ListenAndServe(fmt.Sprintf(":%d", s.port), s.router) s.server = &http.Server{
Addr: addr,
Handler: s.router,
}
// Start server in a goroutine
go func() {
if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("HTTP server error: %v\n", err)
}
}()
return nil
}
// Stop gracefully shuts down the server
func (s *Server) Stop() error {
// Signal event forwarder to stop
close(s.shutdown)
// Close all client connections
s.clientsMux.Lock()
for clientChan := range s.clients {
delete(s.clients, clientChan)
close(clientChan)
}
s.clientsMux.Unlock()
// Create a context with timeout for shutdown
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Shutdown the HTTP server
if s.server != nil {
return s.server.Shutdown(ctx)
}
return nil
} }
// forwardEvents forwards derby events to SSE clients // forwardEvents forwards derby events to SSE clients
func (s *Server) forwardEvents() { func (s *Server) forwardEvents() {
for event := range s.events { for {
// Store the event for new clients select {
// s.raceEvents <- event case event, ok := <-s.events:
if !ok {
return
}
// Process the event and send to clients
s.broadcastEvent(event)
case <-s.shutdown:
return
}
}
}
// Create the SSE message based on the event type // broadcastEvent sends an event to all connected clients
func (s *Server) broadcastEvent(event derby.Event) {
var message string var message string
switch event.Type { switch event.Type {
case derby.EventRaceStart: case derby.EventRaceStart:
@ -94,18 +149,17 @@ func (s *Server) forwardEvents() {
message = "event: race-complete\ndata: {\"status\": \"finished\"}\n\n" message = "event: race-complete\ndata: {\"status\": \"finished\"}\n\n"
} }
// Send to all connected clients // Send to all clients
s.clientsMux.Lock()
for clientChan := range s.clients { for clientChan := range s.clients {
// Non-blocking send to avoid slow clients blocking others
select { select {
case clientChan <- message: case clientChan <- message:
// Message sent successfully
default: default:
// Client is not receiving, remove it // Client channel is full, could log this or take other action
delete(s.clients, clientChan)
close(clientChan)
}
} }
} }
s.clientsMux.Unlock()
} }
// handleIndex handles the index page // handleIndex handles the index page
@ -161,7 +215,7 @@ func (s *Server) handleStatus() http.HandlerFunc {
} }
} }
// handleEvents handles the SSE events endpoint // handleEvents handles SSE events
func (s *Server) handleEvents() http.HandlerFunc { func (s *Server) handleEvents() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// Set headers for SSE // Set headers for SSE
@ -172,11 +226,17 @@ func (s *Server) handleEvents() http.HandlerFunc {
// Create a channel for this client // Create a channel for this client
clientChan := make(chan string, 10) clientChan := make(chan string, 10)
// Add client to map with mutex protection
s.clientsMux.Lock()
s.clients[clientChan] = true s.clients[clientChan] = true
s.clientsMux.Unlock()
// Clean up when the client disconnects // Remove client when connection is closed
defer func() { defer func() {
s.clientsMux.Lock()
delete(s.clients, clientChan) delete(s.clients, clientChan)
s.clientsMux.Unlock()
close(clientChan) close(clientChan)
}() }()

Loading…
Cancel
Save