Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/dkg"
)
Expand All @@ -24,6 +25,12 @@
distributed validator key shares and a final cluster lock configuration. Note that all other cluster operators should run
this command at the same time.`,
Args: cobra.NoArgs,
PreRunE: func(cmd *cobra.Command, args []string) error {

Check failure on line 28 in cmd/dkg.go

View workflow job for this annotation

GitHub Actions / golangci

unused-parameter: parameter 'cmd' seems to be unused, consider removing or renaming it as _ (revive)
if len(config.Nickname) > 32 {
return errors.New("--nickname exceeds 32 character limit")
}
return nil
},
RunE: func(cmd *cobra.Command, args []string) error { //nolint:revive // keep args variable name for clarity
if err := log.InitLogger(config.Log); err != nil {
return err
Expand All @@ -50,6 +57,7 @@

cmd.Flags().DurationVar(&config.Timeout, "timeout", 1*time.Minute, "Timeout for the DKG process, should be increased if DKG times out.")
cmd.Flags().BoolVar(&config.Zipped, "zipped", false, "Create a tar archive compressed with gzip of the target directory after creation.")
cmd.Flags().StringVar(&config.Nickname, "nickname", "", "Human friendly peer nickname. Maximum 32 characters.")

return cmd
}
Expand Down
9 changes: 5 additions & 4 deletions dkg/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ type Config struct {

AppendConfig *AppendConfig

Zipped bool
Zipped bool
Nickname string
}

// TestConfig defines additional test-only config for DKG.
Expand Down Expand Up @@ -280,7 +281,7 @@ func Run(ctx context.Context, conf Config) (err error) {
// Improve UX of "context cancelled" errors when sync fails.
ctx = errors.WithCtxErr(ctx, "p2p connection failed, please retry DKG")

nextStepSync, stopSync, err := startSyncProtocol(ctx, p2pNode, key, def.DefinitionHash, peerIDs, cancel, conf.TestConfig)
nextStepSync, stopSync, err := startSyncProtocol(ctx, p2pNode, key, def.DefinitionHash, peerIDs, cancel, conf.TestConfig, conf.Nickname)
if err != nil {
return err
}
Expand Down Expand Up @@ -491,7 +492,7 @@ func Run(ctx context.Context, conf Config) (err error) {
// startSyncProtocol sets up a sync protocol server and clients for each peer and returns a step sync and shutdown functions
// when all peers are connected.
func startSyncProtocol(ctx context.Context, p2pNode host.Host, key *k1.PrivateKey, defHash []byte,
peerIDs []peer.ID, onFailure func(), testConfig TestConfig,
peerIDs []peer.ID, onFailure func(), testConfig TestConfig, nickname string,
) (stepSyncFunc func(context.Context) error, shutdownFunc func(context.Context) error, err error) {
// Sign definition hash with charon-enr-private-key
// Note: libp2p signing does another hash of the defHash.
Expand All @@ -515,7 +516,7 @@ func startSyncProtocol(ctx context.Context, p2pNode host.Host, key *k1.PrivateKe

ctx := log.WithCtx(ctx, z.Str("peer", p2p.PeerName(pID)))

client := sync.NewClient(p2pNode, pID, hashSig, minorVersion, testConfig.SyncOpts...)
client := sync.NewClient(p2pNode, pID, hashSig, minorVersion, nickname, testConfig.SyncOpts...)
clients = append(clients, client)

go func() {
Expand Down
13 changes: 11 additions & 2 deletions dkg/dkgpb/v1/sync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dkg/dkgpb/v1/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ message MsgSync {
bool shutdown = 3;
string version = 4;
int64 step = 5;
string nickname = 6;
}

message MsgSyncResponse {
Expand Down
2 changes: 1 addition & 1 deletion dkg/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func RunProtocol(ctx context.Context, protocol Protocol, lockFilePath, privateKe

log.Info(ctx, "Waiting to connect to all peers...")

nextStepSync, stopSync, err := startSyncProtocol(ctx, thisNode, enrPrivateKey, lock.DefinitionHash, protocolCtx.PeerIDs, cancel, TestConfig{})
nextStepSync, stopSync, err := startSyncProtocol(ctx, thisNode, enrPrivateKey, lock.DefinitionHash, protocolCtx.PeerIDs, cancel, TestConfig{}, config.Nickname)
if err != nil {
return err
}
Expand Down
15 changes: 9 additions & 6 deletions dkg/sync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func WithPeriod(period time.Duration) func(*Client) {
}

// NewClient returns a new Client instance.
func NewClient(p2pNode host.Host, peer peer.ID, hashSig []byte, version version.SemVer, opts ...func(*Client)) *Client {
func NewClient(p2pNode host.Host, peer peer.ID, hashSig []byte, version version.SemVer, nickname string, opts ...func(*Client)) *Client {
c := &Client{
p2pNode: p2pNode,
peer: peer,
Expand All @@ -38,6 +38,7 @@ func NewClient(p2pNode host.Host, peer peer.ID, hashSig []byte, version version.
done: make(chan struct{}),
reconnect: true,
version: version,
nickname: nickname,
period: 100 * time.Millisecond, // Must be at least two times lower than the sync timeout (dkg.go, startSyncProtocol)
}

Expand All @@ -61,11 +62,12 @@ type Client struct {
done chan struct{}

// Immutable state
hashSig []byte
version version.SemVer
p2pNode host.Host
peer peer.ID
period time.Duration
hashSig []byte
version version.SemVer
p2pNode host.Host
peer peer.ID
period time.Duration
nickname string
}

// Run blocks while running the client-side sync protocol. It tries to reconnect if relay connection is dropped or
Expand Down Expand Up @@ -200,6 +202,7 @@ func (c *Client) sendMsg(stream network.Stream, shutdown bool) (*pb.MsgSyncRespo
HashSignature: c.hashSig,
Shutdown: shutdown,
Version: c.version.String(),
Nickname: c.nickname,
Step: int64(c.getStep()),
}

Expand Down
9 changes: 7 additions & 2 deletions dkg/sync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,17 @@ func (s *Server) handleStream(ctx context.Context, stream network.Stream) error
SyncTimestamp: msg.GetTimestamp(),
}

logOpts := []z.Field{z.Str("peer", p2p.PeerName(pID))}
if msg.GetNickname() != "" {
logOpts = append(logOpts, z.Str("nickname", msg.GetNickname()))
}

if err := s.validReq(pubkey, msg); err != nil {
s.setErr(errors.Wrap(err, "invalid sync message", z.Str("peer", p2p.PeerName(pID))))
s.setErr(errors.Wrap(err, "invalid sync message", logOpts...))
resp.Error = err.Error()
} else if !s.isConnected(pID) {
count := s.setConnected(pID)
log.Info(ctx, fmt.Sprintf("Connected to peer %d of %d", count, s.allCount))
log.Info(ctx, fmt.Sprintf("Connected to peer %d of %d", count, s.allCount), logOpts...)
}

if err := s.updateStep(pID, int(msg.GetStep())); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions dkg/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ import (

func TestSyncProtocol(t *testing.T) {
versions := make(map[int]version.SemVer)
nicknames := make(map[int]string)
for i := range 5 {
versions[i] = version.Version
nicknames[i] = fmt.Sprintf("node%d", i)
}

t.Run("2", func(t *testing.T) {
testCluster(t, 2, versions, "")
testCluster(t, 2, versions, "", nicknames)
})

t.Run("3", func(t *testing.T) {
testCluster(t, 3, versions, "")
testCluster(t, 3, versions, "", nicknames)
})

t.Run("5", func(t *testing.T) {
testCluster(t, 5, versions, "")
testCluster(t, 5, versions, "", nicknames)
})

t.Run("invalid version", func(t *testing.T) {
Expand All @@ -47,7 +49,7 @@ func TestSyncProtocol(t *testing.T) {
2: semver(t, "v0.3"),
3: semver(t, "v0.4"),
},
"mismatching charon version; expect=")
"mismatching charon version; expect=", nicknames)
})
}

Expand All @@ -60,7 +62,7 @@ func semver(t *testing.T, v string) version.SemVer {
return sv
}

func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr string) {
func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr string, nicknames map[int]string) {
t.Helper()

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -101,7 +103,7 @@ func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr
hashSig, err := keys[i].Sign(hash)
require.NoError(t, err)

client := sync.NewClient(tcpNodes[i], tcpNodes[j].ID(), hashSig, versions[i], sync.WithPeriod(time.Millisecond*100))
client := sync.NewClient(tcpNodes[i], tcpNodes[j].ID(), hashSig, versions[i], nicknames[i], sync.WithPeriod(time.Millisecond*100))
clients = append(clients, client)

ctx := log.WithTopic(ctx, fmt.Sprintf("client%d_%d", i, j))
Expand Down
Loading