-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathserver.go
More file actions
86 lines (78 loc) · 1.83 KB
/
server.go
File metadata and controls
86 lines (78 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package pubsubrouter
import (
"context"
"errors"
"fmt"
"log"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub/v2"
"github.com/sofyan48/pubsub-router/pkg/client"
"github.com/sofyan48/pubsub-router/pkg/session"
)
type Server struct {
clients *pubsub.Client
ctx context.Context
subClient *pubsub.Subscriber
router *Router
}
func NewSession(ctx context.Context, sess session.Contract) *Server {
cl, err := client.NewClient(sess)
if err != nil {
log.Fatal("Pubsubrouter client not connected")
defer cl.Client().Close()
}
return &Server{
clients: cl.Client(),
ctx: ctx,
}
}
func NewSessionAutoConfig(ctx context.Context, projectID string) *Server {
cl, err := client.NewClientAutoConfig(ctx, projectID)
if err != nil {
log.Fatal("Pubsubrouter client not connected")
defer cl.Client().Close()
}
return &Server{
clients: cl.Client(),
ctx: ctx,
}
}
func (s *Server) Subscribe(topic string, r *Router) *Server {
s.subClient = s.clients.Subscriber(topic)
s.router = r
return s
}
func (s *Server) Publish(topic, path, msg string) (string, error) {
if path == "" {
return "", errors.New("path is required")
}
cl := s.clients.Publisher(topic)
cl.PublishSettings.NumGoroutines = 1
return cl.Publish(
s.ctx,
&pubsub.Message{
Data: []byte(msg),
Attributes: map[string]string{client.MessageAttributeNameRoute: path},
PublishTime: time.Now(),
},
).Get(s.ctx)
}
func (s *Server) Start() {
var received int32
s.subClient.Receive(s.ctx, func(ctx context.Context, msg *pubsub.Message) {
atomic.AddInt32(&received, 1)
m := Message{}
m.Data = msg.Data
m.Attribute = msg.Attributes
m.Payload = msg
m.PublishTime = msg.PublishTime
m.CtlContext = s.ctx
m.ID = msg.ID
err := s.router.HandleMessage(&m)
if err != nil {
msg.Ack()
fmt.Println("error", err.Error())
}
})
}