new changes
This commit is contained in:
19
control-plane-go/internal/orchestration/dispatcher.go
Normal file
19
control-plane-go/internal/orchestration/dispatcher.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package orchestration
|
||||
|
||||
import "yakpanel/control-plane-go/pkg/contracts"
|
||||
|
||||
type Queue interface {
|
||||
Publish(topic string, payload any) error
|
||||
}
|
||||
|
||||
type Dispatcher struct {
|
||||
queue Queue
|
||||
}
|
||||
|
||||
func NewDispatcher(queue Queue) *Dispatcher {
|
||||
return &Dispatcher{queue: queue}
|
||||
}
|
||||
|
||||
func (d *Dispatcher) DispatchCommand(cmd contracts.CommandEnvelope) error {
|
||||
return d.queue.Publish("yakpanel.commands", cmd)
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package orchestration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type RedisStreamQueue struct {
|
||||
client *redis.Client
|
||||
stream string
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewRedisStreamQueue(ctx context.Context, client *redis.Client, stream string) *RedisStreamQueue {
|
||||
return &RedisStreamQueue{
|
||||
client: client,
|
||||
stream: stream,
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *RedisStreamQueue) Publish(topic string, payload any) error {
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = q.client.XAdd(q.ctx, &redis.XAddArgs{
|
||||
Stream: q.stream,
|
||||
Values: map[string]any{
|
||||
"topic": topic,
|
||||
"body": string(body),
|
||||
},
|
||||
}).Result()
|
||||
return err
|
||||
}
|
||||
Reference in New Issue
Block a user