Skip to content

Commit 153324c

Browse files
mbrandenburgerSaidAltury-ibm
authored andcommitted
feature: Initial impl for committer's notification service
Signed-off-by: Marcus Brandenburger <[email protected]>
1 parent 0f15d56 commit 153324c

File tree

7 files changed

+496
-32
lines changed

7 files changed

+496
-32
lines changed

integration/nwo/fabricx/extensions/scv2/ext.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func (e *Extension) CheckTopology() {
5959

6060
func (e *Extension) GenerateArtifacts() {
6161
generateQSExtension(e.network)
62+
generateNSExtension(e.network)
6263
}
6364

6465
func (e *Extension) PostRun(load bool) {
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package scv2
8+
9+
import (
10+
"bytes"
11+
"errors"
12+
"fmt"
13+
"html/template"
14+
"io"
15+
"time"
16+
17+
api2 "github.com/hyperledger-labs/fabric-smart-client/integration/nwo/api"
18+
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/fabricx/network"
19+
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/fsc"
20+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils"
21+
"github.com/hyperledger-labs/fabric-smart-client/platform/fabricx/core/finality"
22+
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/grpc"
23+
)
24+
25+
// generateNSExtensions adds the committers notification service information to the config
26+
func generateNSExtension(n *network.Network) {
27+
context := n.Context
28+
29+
fscTop, ok := context.TopologyByName("fsc").(*fsc.Topology)
30+
if !ok {
31+
utils.Must(errors.New("cannot get fsc topo instance"))
32+
}
33+
34+
// TODO set correct values
35+
notificationServiceHost := "localhost"
36+
notificationServicePort := 5417
37+
38+
// TODO: most of this logic should go somewhere
39+
40+
config := finality.Config{
41+
RequestTimeout: 10 * time.Second,
42+
Endpoints: []finality.Endpoint{
43+
{
44+
Address: fmt.Sprintf("%s:%v", notificationServiceHost, notificationServicePort),
45+
ConnectionTimeout: grpc.DefaultConnectionTimeout,
46+
TLSEnabled: false,
47+
TLSRootCertFile: n.CACertsBundlePath(),
48+
},
49+
},
50+
}
51+
52+
t, err := template.New("view_extension").Funcs(template.FuncMap{
53+
"NetworkName": func() string { return n.Topology().Name() },
54+
"RequestTimeout": func() time.Duration { return config.RequestTimeout },
55+
"Endpoints": func() []finality.Endpoint { return config.Endpoints },
56+
}).Parse(nsExtensionTemplate)
57+
utils.Must(err)
58+
59+
extension := bytes.NewBuffer([]byte{})
60+
err = t.Execute(io.MultiWriter(extension), nil)
61+
utils.Must(err)
62+
63+
for _, fscNode := range fscTop.Nodes {
64+
// TODO: find the correct SC instance to connect ...
65+
66+
logger.Infof(">>> %v", fscNode)
67+
for _, uniqueName := range fscNode.ReplicaUniqueNames() {
68+
context.AddExtension(uniqueName, api2.FabricExtension, extension.String())
69+
}
70+
}
71+
}
72+
73+
const nsExtensionTemplate = `
74+
fabric:
75+
{{ NetworkName }}:
76+
notificationService:
77+
requestTimeout: {{ RequestTimeout }}
78+
endpoints:{{- range Endpoints }}
79+
- address: {{ .Address }}
80+
connectionTimeout: {{ .ConnectionTimeout }}
81+
tlsEnabled: {{ .TLSEnabled }}
82+
tlsRootCertFile: {{ .TLSRootCertFile }}
83+
{{- end }}
84+
`
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package notify
8+
9+
import (
10+
"fmt"
11+
"testing"
12+
"time"
13+
14+
"github.com/hyperledger-labs/fabric-smart-client/platform/fabricx/core/vault/queryservice"
15+
"github.com/hyperledger/fabric-x-committer/api/protonotify"
16+
"github.com/spf13/viper"
17+
"github.com/stretchr/testify/assert"
18+
"github.com/stretchr/testify/require"
19+
"google.golang.org/grpc"
20+
"google.golang.org/protobuf/types/known/durationpb"
21+
)
22+
23+
func TestNotificationService(t *testing.T) {
24+
25+
table := []struct {
26+
name string
27+
cfg map[string]any
28+
checks func(t *testing.T, client *grpc.ClientConn, err error)
29+
}{
30+
{
31+
name: "connect",
32+
cfg: map[string]any{
33+
"queryService.queryTimeout": 10 * time.Second,
34+
"queryService.Endpoints": []any{
35+
map[string]any{
36+
"address": "localhost:5411",
37+
"connectionTimeout": 0 * time.Second,
38+
},
39+
},
40+
},
41+
checks: func(t *testing.T, client *grpc.ClientConn, err error) {
42+
t.Helper()
43+
require.NotNil(t, client)
44+
require.NoError(t, err)
45+
46+
nf := protonotify.NewNotifierClient(client)
47+
48+
notifyStream, err := nf.OpenNotificationStream(t.Context())
49+
require.NoError(t, err)
50+
51+
txIDs := []string{"1"}
52+
err = notifyStream.Send(&protonotify.NotificationRequest{
53+
TxStatusRequest: &protonotify.TxStatusRequest{
54+
TxIds: txIDs,
55+
},
56+
Timeout: durationpb.New(3 * time.Minute),
57+
})
58+
require.NoError(t, err)
59+
60+
var actual []*protonotify.TxStatusEvent
61+
require.EventuallyWithT(t, func(ct *assert.CollectT) {
62+
res, err := notifyStream.Recv()
63+
require.NoError(t, err)
64+
require.NotNil(t, res)
65+
require.Nil(t, res.TimeoutTxIds)
66+
actual = append(actual, res.TxStatusEvents...)
67+
//test.RequireProtoElementsMatch(ct, expected, actual)
68+
}, 15*time.Second, 50*time.Millisecond)
69+
},
70+
},
71+
}
72+
73+
for _, tc := range table {
74+
t.Run(fmt.Sprintf("grpcClient %v", tc.name), func(t *testing.T) {
75+
t.Parallel()
76+
cs := newConfigService(tc.cfg)
77+
c, err := queryservice.NewConfig(cs)
78+
require.NoError(t, err)
79+
client, err := queryservice.GrpcClient(c)
80+
require.NoError(t, err)
81+
tc.checks(t, client, err)
82+
})
83+
}
84+
85+
}
86+
87+
type configService struct {
88+
V *viper.Viper
89+
}
90+
91+
func newConfigService(c map[string]any) *configService {
92+
v := viper.New()
93+
for k, val := range c {
94+
v.Set(k, val)
95+
}
96+
return &configService{V: v}
97+
}
98+
99+
func (c configService) UnmarshalKey(key string, rawVal interface{}) error {
100+
return c.V.UnmarshalKey(key, rawVal)
101+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package finality
8+
9+
import (
10+
"fmt"
11+
"time"
12+
)
13+
14+
const DefaultRequestTimeout = 30 * time.Second
15+
16+
type Config struct {
17+
Endpoints []Endpoint `yaml:"endpoints,omitempty"`
18+
RequestTimeout time.Duration `yaml:"requestTimeout,omitempty"`
19+
}
20+
21+
type Endpoint struct {
22+
Address string `yaml:"address,omitempty"`
23+
ConnectionTimeout time.Duration `yaml:"connectionTimeout,omitempty"`
24+
TLSEnabled bool `yaml:"tlsEnabled,omitempty"`
25+
TLSRootCertFile string `yaml:"tlsRootCertFile,omitempty"`
26+
TLSServerNameOverride string `yaml:"tlsServerNameOverride,omitempty"`
27+
}
28+
29+
type ConfigService interface {
30+
UnmarshalKey(key string, rawVal interface{}) error
31+
}
32+
33+
func NewConfig(configService ConfigService) (*Config, error) {
34+
config := &Config{
35+
RequestTimeout: DefaultRequestTimeout,
36+
}
37+
38+
err := configService.UnmarshalKey("notificationService", &config)
39+
if err != nil {
40+
return config, fmt.Errorf("cannot get notify service config: %w", err)
41+
}
42+
43+
return config, nil
44+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package finality
8+
9+
import (
10+
"errors"
11+
"fmt"
12+
"os"
13+
"time"
14+
15+
"google.golang.org/grpc"
16+
"google.golang.org/grpc/backoff"
17+
"google.golang.org/grpc/credentials"
18+
"google.golang.org/grpc/credentials/insecure"
19+
)
20+
21+
var ErrInvalidAddress = fmt.Errorf("empty address")
22+
23+
func GrpcClient(c *Config) (*grpc.ClientConn, error) {
24+
// no endpoints in config
25+
if len(c.Endpoints) != 1 {
26+
return nil, fmt.Errorf("we need a single endpoint")
27+
}
28+
29+
// currently we only support connections to a single query service
30+
endpoint := c.Endpoints[0]
31+
32+
// check endpoint address
33+
if len(endpoint.Address) == 0 {
34+
return nil, ErrInvalidAddress
35+
}
36+
37+
var opts []grpc.DialOption
38+
opts = append(opts, WithConnectionTime(endpoint.ConnectionTimeout))
39+
opts = append(opts, WithTLS(endpoint))
40+
41+
return grpc.NewClient(endpoint.Address, opts...)
42+
}
43+
44+
func WithTLS(endpoint Endpoint) grpc.DialOption {
45+
if !endpoint.TLSEnabled {
46+
return grpc.WithTransportCredentials(insecure.NewCredentials())
47+
}
48+
49+
if _, err := os.Stat(endpoint.TLSRootCertFile); errors.Is(err, os.ErrNotExist) {
50+
if err != nil {
51+
panic(err)
52+
}
53+
}
54+
55+
creds, err := credentials.NewClientTLSFromFile(endpoint.TLSRootCertFile, endpoint.TLSServerNameOverride)
56+
if err != nil {
57+
panic(err)
58+
}
59+
60+
return grpc.WithTransportCredentials(creds)
61+
}
62+
63+
func WithConnectionTime(timeout time.Duration) grpc.DialOption {
64+
if timeout <= 0 {
65+
timeout = DefaultRequestTimeout
66+
}
67+
return grpc.WithConnectParams(grpc.ConnectParams{
68+
Backoff: backoff.DefaultConfig,
69+
MinConnectTimeout: timeout,
70+
})
71+
}

0 commit comments

Comments
 (0)