Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions internal/grpc/server/stream_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package server

import (
"github.com/OpenSlides/openslides-cli/internal/logger"
pb "github.com/OpenSlides/openslides-cli/proto/osmanage"
)

// StreamLogs streams log entries to the client at the requested log level.
func (s *OsmanageServiceServer) StreamLogs(req *pb.LogStreamRequest, stream pb.OsmanageService_StreamLogsServer) error {
minLevel, err := logger.ParseLevel(req.Level)
if err != nil {
minLevel = logger.LevelWarn
}

sub := logger.Subscribe()
defer logger.Unsubscribe(sub)

for {
select {
case entry, ok := <-sub:
if !ok {
return nil
}
if entry.LevelValue < minLevel {
continue
}
if err := stream.Send(&pb.LogEntry{
Level: entry.Level,
Message: entry.Message,
}); err != nil {
return err
}
case <-stream.Context().Done():
return nil
}
}
}
118 changes: 91 additions & 27 deletions internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"
"strings"
"sync"
)

type Level int
Expand All @@ -21,25 +22,33 @@ type Logger struct {
logger *log.Logger
}

type LogEntry struct {
Level string
LevelValue Level
Message string
}

type Subscriber chan LogEntry

type Broadcaster struct {
mu sync.RWMutex
subscribers map[Subscriber]struct{}
}

var global *Logger

var globalBroadcaster = &Broadcaster{
subscribers: make(map[Subscriber]struct{}),
}

func New(levelStr string) (*Logger, error) {
var level Level
switch strings.ToLower(levelStr) {
case "debug":
level = LevelDebug
case "info":
level = LevelInfo
case "warn", "warning":
level = LevelWarn
case "error":
level = LevelError
default:
return nil, fmt.Errorf("invalid log level: %s", levelStr)
level, err := ParseLevel(levelStr)
if err != nil {
return nil, err
}

flags := log.LstdFlags
if os.Getenv("INVOCATION_ID") != "" { // if used as systemd service
if os.Getenv("INVOCATION_ID") != "" {
flags = 0
}

Expand All @@ -49,34 +58,89 @@ func New(levelStr string) (*Logger, error) {
}, nil
}

func ParseLevel(levelStr string) (Level, error) {
switch strings.ToLower(levelStr) {
case "debug":
return LevelDebug, nil
case "info":
return LevelInfo, nil
case "warn", "warning":
return LevelWarn, nil
case "error":
return LevelError, nil
default:
return 0, fmt.Errorf("invalid log level: %s", levelStr)
}
}

func levelToString(level Level) string {
switch level {
case LevelDebug:
return "debug"
case LevelInfo:
return "info"
case LevelWarn:
return "warn"
case LevelError:
return "error"
default:
return "warn"
}
}

func SetGlobal(l *Logger) {
global = l
}

func (l *Logger) Debug(format string, v ...any) {
if l.level <= LevelDebug {
l.logger.Printf("[DEBUG] "+format, v...)
}
func (b *Broadcaster) Subscribe() Subscriber {
ch := make(Subscriber, 100)
b.mu.Lock()
defer b.mu.Unlock()
b.subscribers[ch] = struct{}{}
return ch
}

func (l *Logger) Info(format string, v ...any) {
if l.level <= LevelInfo {
l.logger.Printf("[INFO] "+format, v...)
}
func (b *Broadcaster) Unsubscribe(ch Subscriber) {
b.mu.Lock()
defer b.mu.Unlock()
delete(b.subscribers, ch)
close(ch)
}

func (l *Logger) Warn(format string, v ...any) {
if l.level <= LevelWarn {
l.logger.Printf("[WARN] "+format, v...)
func (b *Broadcaster) publish(entry LogEntry) {
b.mu.RLock()
defer b.mu.RUnlock()
for ch := range b.subscribers {
select {
case ch <- entry:
default:
// drop if subscriber is slow, never block the logger
}
}
}

func (l *Logger) Error(format string, v ...any) {
if l.level <= LevelError {
l.logger.Printf("[ERROR] "+format, v...)
func Subscribe() Subscriber { return globalBroadcaster.Subscribe() }
func Unsubscribe(ch Subscriber) { globalBroadcaster.Unsubscribe(ch) }

func (l *Logger) log(level Level, format string, v ...any) {
msg := fmt.Sprintf(format, v...)

globalBroadcaster.publish(LogEntry{
Level: levelToString(level),
LevelValue: level,
Message: msg,
})

if l.level <= level {
l.logger.Printf("[%s] %s", strings.ToUpper(levelToString(level)), msg)
}
}

func (l *Logger) Debug(format string, v ...any) { l.log(LevelDebug, format, v...) }
func (l *Logger) Info(format string, v ...any) { l.log(LevelInfo, format, v...) }
func (l *Logger) Warn(format string, v ...any) { l.log(LevelWarn, format, v...) }
func (l *Logger) Error(format string, v ...any) { l.log(LevelError, format, v...) }

// Global logging functions
func Debug(format string, v ...any) {
if global != nil {
Expand Down
49 changes: 49 additions & 0 deletions internal/logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,52 @@ func TestGlobalLogger(t *testing.T) {
t.Error("Global logger did not log message")
}
}

func TestBroadcaster(t *testing.T) {
sub := Subscribe()
defer Unsubscribe(sub)

// set global logger to publish
var buf bytes.Buffer
l := &Logger{
level: LevelDebug,
logger: log.New(&buf, "", 0),
}
SetGlobal(l)

Info("test broadcast message")

select {
case entry := <-sub:
if entry.Message != "test broadcast message" {
t.Errorf("Expected 'test broadcast message', got: %s", entry.Message)
}
if entry.Level != "info" {
t.Errorf("Expected level 'info', got: %s", entry.Level)
}
default:
t.Error("Expected broadcast entry but got none")
}
}

func TestBroadcasterDrop(t *testing.T) {
// full channel should not block logger
sub := make(Subscriber, 1)
globalBroadcaster.mu.Lock()
globalBroadcaster.subscribers[sub] = struct{}{}
globalBroadcaster.mu.Unlock()
defer Unsubscribe(sub)

var buf bytes.Buffer
l := &Logger{
level: LevelDebug,
logger: log.New(&buf, "", 0),
}
SetGlobal(l)

// fill the channel
sub <- LogEntry{}

// this should not block even though channel is full
Info("should not block")
}
10 changes: 10 additions & 0 deletions proto/osmanage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ service OsmanageService {
// server side streaming
rpc MigrationsMigrate(MigrationsRequest) returns (stream MigrationsProgressResponse);
rpc MigrationsFinalize(MigrationsRequest) returns (stream MigrationsProgressResponse);
rpc StreamLogs(LogStreamRequest) returns (stream LogEntry);

// unary gRPC calls
rpc MigrationsReset(MigrationsRequest) returns (MigrationsResponse);
Expand All @@ -42,6 +43,15 @@ service OsmanageService {
rpc SendManageAction(SendManageActionRequest) returns (SendManageActionResponse);
}

message LogStreamRequest {
string level = 1;
}

message LogEntry {
string level = 1;
string message = 2;
}

message InstanceConfigRequest {
string instance_dir = 1;
string stack_template_path = 2; // path to template file or directory on server node
Expand Down
Loading
Loading