diff --git a/cmd/dkg.go b/cmd/dkg.go index 835b19ae22..c77ae2f45a 100644 --- a/cmd/dkg.go +++ b/cmd/dkg.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" + "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/dkg" ) @@ -24,6 +25,12 @@ func newDKGCmd(runFunc func(context.Context, dkg.Config) error) *cobra.Command { distributed validator key shares and a final cluster lock configuration. Note that all other cluster operators should run this command at the same time.`, Args: cobra.NoArgs, + PreRunE: func(_ *cobra.Command, _ []string) error { + if len(config.Nickname) > 32 { + return errors.New("--nickname exceeds 32 character limit") + } + return nil + }, RunE: func(cmd *cobra.Command, args []string) error { //nolint:revive // keep args variable name for clarity if err := log.InitLogger(config.Log); err != nil { return err @@ -50,6 +57,7 @@ this command at the same time.`, cmd.Flags().DurationVar(&config.Timeout, "timeout", 1*time.Minute, "Timeout for the DKG process, should be increased if DKG times out.") cmd.Flags().BoolVar(&config.Zipped, "zipped", false, "Create a tar archive compressed with gzip of the target directory after creation.") + cmd.Flags().StringVar(&config.Nickname, "nickname", "", "Human friendly peer nickname. Maximum 32 characters.") return cmd } diff --git a/dkg/dkg.go b/dkg/dkg.go index cd2e12e0c4..1031ca439a 100644 --- a/dkg/dkg.go +++ b/dkg/dkg.go @@ -66,7 +66,8 @@ type Config struct { AppendConfig *AppendConfig - Zipped bool + Zipped bool + Nickname string } // TestConfig defines additional test-only config for DKG. @@ -280,7 +281,7 @@ func Run(ctx context.Context, conf Config) (err error) { // Improve UX of "context cancelled" errors when sync fails. ctx = errors.WithCtxErr(ctx, "p2p connection failed, please retry DKG") - nextStepSync, stopSync, err := startSyncProtocol(ctx, p2pNode, key, def.DefinitionHash, peerIDs, cancel, conf.TestConfig) + nextStepSync, stopSync, err := startSyncProtocol(ctx, p2pNode, key, def.DefinitionHash, peerIDs, cancel, conf.TestConfig, conf.Nickname) if err != nil { return err } @@ -491,7 +492,7 @@ func Run(ctx context.Context, conf Config) (err error) { // startSyncProtocol sets up a sync protocol server and clients for each peer and returns a step sync and shutdown functions // when all peers are connected. func startSyncProtocol(ctx context.Context, p2pNode host.Host, key *k1.PrivateKey, defHash []byte, - peerIDs []peer.ID, onFailure func(), testConfig TestConfig, + peerIDs []peer.ID, onFailure func(), testConfig TestConfig, nickname string, ) (stepSyncFunc func(context.Context) error, shutdownFunc func(context.Context) error, err error) { // Sign definition hash with charon-enr-private-key // Note: libp2p signing does another hash of the defHash. @@ -515,7 +516,7 @@ func startSyncProtocol(ctx context.Context, p2pNode host.Host, key *k1.PrivateKe ctx := log.WithCtx(ctx, z.Str("peer", p2p.PeerName(pID))) - client := sync.NewClient(p2pNode, pID, hashSig, minorVersion, testConfig.SyncOpts...) + client := sync.NewClient(p2pNode, pID, hashSig, minorVersion, nickname, testConfig.SyncOpts...) clients = append(clients, client) go func() { diff --git a/dkg/dkgpb/v1/sync.pb.go b/dkg/dkgpb/v1/sync.pb.go index 77d3b1888b..086f87464e 100644 --- a/dkg/dkgpb/v1/sync.pb.go +++ b/dkg/dkgpb/v1/sync.pb.go @@ -29,6 +29,7 @@ type MsgSync struct { Shutdown bool `protobuf:"varint,3,opt,name=shutdown,proto3" json:"shutdown,omitempty"` Version string `protobuf:"bytes,4,opt,name=version,proto3" json:"version,omitempty"` Step int64 `protobuf:"varint,5,opt,name=step,proto3" json:"step,omitempty"` + Nickname string `protobuf:"bytes,6,opt,name=nickname,proto3" json:"nickname,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -98,6 +99,13 @@ func (x *MsgSync) GetStep() int64 { return 0 } +func (x *MsgSync) GetNickname() string { + if x != nil { + return x.Nickname + } + return "" +} + type MsgSyncResponse struct { state protoimpl.MessageState `protogen:"open.v1"` SyncTimestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=sync_timestamp,json=syncTimestamp,proto3" json:"sync_timestamp,omitempty"` @@ -154,13 +162,14 @@ var File_dkg_dkgpb_v1_sync_proto protoreflect.FileDescriptor const file_dkg_dkgpb_v1_sync_proto_rawDesc = "" + "\n" + - "\x17dkg/dkgpb/v1/sync.proto\x12\fdkg.dkgpb.v1\x1a\x1fgoogle/protobuf/timestamp.proto\"\xb4\x01\n" + + "\x17dkg/dkgpb/v1/sync.proto\x12\fdkg.dkgpb.v1\x1a\x1fgoogle/protobuf/timestamp.proto\"\xd0\x01\n" + "\aMsgSync\x128\n" + "\ttimestamp\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\ttimestamp\x12%\n" + "\x0ehash_signature\x18\x02 \x01(\fR\rhashSignature\x12\x1a\n" + "\bshutdown\x18\x03 \x01(\bR\bshutdown\x12\x18\n" + "\aversion\x18\x04 \x01(\tR\aversion\x12\x12\n" + - "\x04step\x18\x05 \x01(\x03R\x04step\"j\n" + + "\x04step\x18\x05 \x01(\x03R\x04step\x12\x1a\n" + + "\bnickname\x18\x06 \x01(\tR\bnickname\"j\n" + "\x0fMsgSyncResponse\x12A\n" + "\x0esync_timestamp\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\rsyncTimestamp\x12\x14\n" + "\x05error\x18\x02 \x01(\tR\x05errorB,Z*github.com/obolnetwork/charon/dkg/dkgpb/v1b\x06proto3" diff --git a/dkg/dkgpb/v1/sync.proto b/dkg/dkgpb/v1/sync.proto index 8cf517c814..e2b6af3f5a 100644 --- a/dkg/dkgpb/v1/sync.proto +++ b/dkg/dkgpb/v1/sync.proto @@ -12,6 +12,7 @@ message MsgSync { bool shutdown = 3; string version = 4; int64 step = 5; + string nickname = 6; } message MsgSyncResponse { diff --git a/dkg/protocol.go b/dkg/protocol.go index ebc607618c..647057cf3e 100644 --- a/dkg/protocol.go +++ b/dkg/protocol.go @@ -161,7 +161,7 @@ func RunProtocol(ctx context.Context, protocol Protocol, lockFilePath, privateKe log.Info(ctx, "Waiting to connect to all peers...") - nextStepSync, stopSync, err := startSyncProtocol(ctx, thisNode, enrPrivateKey, lock.DefinitionHash, protocolCtx.PeerIDs, cancel, TestConfig{}) + nextStepSync, stopSync, err := startSyncProtocol(ctx, thisNode, enrPrivateKey, lock.DefinitionHash, protocolCtx.PeerIDs, cancel, TestConfig{}, config.Nickname) if err != nil { return err } diff --git a/dkg/sync/client.go b/dkg/sync/client.go index 90ad79948a..8366ac6c8f 100644 --- a/dkg/sync/client.go +++ b/dkg/sync/client.go @@ -29,7 +29,7 @@ func WithPeriod(period time.Duration) func(*Client) { } // NewClient returns a new Client instance. -func NewClient(p2pNode host.Host, peer peer.ID, hashSig []byte, version version.SemVer, opts ...func(*Client)) *Client { +func NewClient(p2pNode host.Host, peer peer.ID, hashSig []byte, version version.SemVer, nickname string, opts ...func(*Client)) *Client { c := &Client{ p2pNode: p2pNode, peer: peer, @@ -38,6 +38,7 @@ func NewClient(p2pNode host.Host, peer peer.ID, hashSig []byte, version version. done: make(chan struct{}), reconnect: true, version: version, + nickname: nickname, period: 100 * time.Millisecond, // Must be at least two times lower than the sync timeout (dkg.go, startSyncProtocol) } @@ -61,11 +62,12 @@ type Client struct { done chan struct{} // Immutable state - hashSig []byte - version version.SemVer - p2pNode host.Host - peer peer.ID - period time.Duration + hashSig []byte + version version.SemVer + p2pNode host.Host + peer peer.ID + period time.Duration + nickname string } // Run blocks while running the client-side sync protocol. It tries to reconnect if relay connection is dropped or @@ -200,6 +202,7 @@ func (c *Client) sendMsg(stream network.Stream, shutdown bool) (*pb.MsgSyncRespo HashSignature: c.hashSig, Shutdown: shutdown, Version: c.version.String(), + Nickname: c.nickname, Step: int64(c.getStep()), } diff --git a/dkg/sync/server.go b/dkg/sync/server.go index a91ff1d24d..cd03fe532f 100644 --- a/dkg/sync/server.go +++ b/dkg/sync/server.go @@ -272,12 +272,17 @@ func (s *Server) handleStream(ctx context.Context, stream network.Stream) error SyncTimestamp: msg.GetTimestamp(), } + logOpts := []z.Field{z.Str("peer", p2p.PeerName(pID))} + if msg.GetNickname() != "" { + logOpts = append(logOpts, z.Str("nickname", msg.GetNickname())) + } + if err := s.validReq(pubkey, msg); err != nil { - s.setErr(errors.Wrap(err, "invalid sync message", z.Str("peer", p2p.PeerName(pID)))) + s.setErr(errors.Wrap(err, "invalid sync message", logOpts...)) resp.Error = err.Error() } else if !s.isConnected(pID) { count := s.setConnected(pID) - log.Info(ctx, fmt.Sprintf("Connected to peer %d of %d", count, s.allCount)) + log.Info(ctx, fmt.Sprintf("Connected to peer %d of %d", count, s.allCount), logOpts...) } if err := s.updateStep(pID, int(msg.GetStep())); err != nil { diff --git a/dkg/sync/sync_test.go b/dkg/sync/sync_test.go index cfd805ea69..a3652ed8f1 100644 --- a/dkg/sync/sync_test.go +++ b/dkg/sync/sync_test.go @@ -23,20 +23,22 @@ import ( func TestSyncProtocol(t *testing.T) { versions := make(map[int]version.SemVer) + nicknames := make(map[int]string) for i := range 5 { versions[i] = version.Version + nicknames[i] = fmt.Sprintf("node%d", i) } t.Run("2", func(t *testing.T) { - testCluster(t, 2, versions, "") + testCluster(t, 2, versions, "", nicknames) }) t.Run("3", func(t *testing.T) { - testCluster(t, 3, versions, "") + testCluster(t, 3, versions, "", nicknames) }) t.Run("5", func(t *testing.T) { - testCluster(t, 5, versions, "") + testCluster(t, 5, versions, "", nicknames) }) t.Run("invalid version", func(t *testing.T) { @@ -47,7 +49,7 @@ func TestSyncProtocol(t *testing.T) { 2: semver(t, "v0.3"), 3: semver(t, "v0.4"), }, - "mismatching charon version; expect=") + "mismatching charon version; expect=", nicknames) }) } @@ -60,7 +62,7 @@ func semver(t *testing.T, v string) version.SemVer { return sv } -func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr string) { +func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr string, nicknames map[int]string) { t.Helper() ctx, cancel := context.WithCancel(context.Background()) @@ -101,7 +103,7 @@ func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr hashSig, err := keys[i].Sign(hash) require.NoError(t, err) - client := sync.NewClient(tcpNodes[i], tcpNodes[j].ID(), hashSig, versions[i], sync.WithPeriod(time.Millisecond*100)) + client := sync.NewClient(tcpNodes[i], tcpNodes[j].ID(), hashSig, versions[i], nicknames[i], sync.WithPeriod(time.Millisecond*100)) clients = append(clients, client) ctx := log.WithTopic(ctx, fmt.Sprintf("client%d_%d", i, j))