Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/dkg"
)
Expand All @@ -24,6 +25,12 @@ func newDKGCmd(runFunc func(context.Context, dkg.Config) error) *cobra.Command {
distributed validator key shares and a final cluster lock configuration. Note that all other cluster operators should run
this command at the same time.`,
Args: cobra.NoArgs,
PreRunE: func(_ *cobra.Command, _ []string) error {
if len(config.Nickname) > 32 {
return errors.New("--nickname exceeds 32 character limit")
}
return nil
},
RunE: func(cmd *cobra.Command, args []string) error { //nolint:revive // keep args variable name for clarity
if err := log.InitLogger(config.Log); err != nil {
return err
Expand All @@ -50,6 +57,7 @@ this command at the same time.`,

cmd.Flags().DurationVar(&config.Timeout, "timeout", 1*time.Minute, "Timeout for the DKG process, should be increased if DKG times out.")
cmd.Flags().BoolVar(&config.Zipped, "zipped", false, "Create a tar archive compressed with gzip of the target directory after creation.")
cmd.Flags().StringVar(&config.Nickname, "nickname", "", "Human friendly peer nickname. Maximum 32 characters.")

return cmd
}
Expand Down
9 changes: 5 additions & 4 deletions dkg/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ type Config struct {

AppendConfig *AppendConfig

Zipped bool
Zipped bool
Nickname string
}

// TestConfig defines additional test-only config for DKG.
Expand Down Expand Up @@ -280,7 +281,7 @@ func Run(ctx context.Context, conf Config) (err error) {
// Improve UX of "context cancelled" errors when sync fails.
ctx = errors.WithCtxErr(ctx, "p2p connection failed, please retry DKG")

nextStepSync, stopSync, err := startSyncProtocol(ctx, p2pNode, key, def.DefinitionHash, peerIDs, cancel, conf.TestConfig)
nextStepSync, stopSync, err := startSyncProtocol(ctx, p2pNode, key, def.DefinitionHash, peerIDs, cancel, conf.TestConfig, conf.Nickname)
if err != nil {
return err
}
Expand Down Expand Up @@ -491,7 +492,7 @@ func Run(ctx context.Context, conf Config) (err error) {
// startSyncProtocol sets up a sync protocol server and clients for each peer and returns a step sync and shutdown functions
// when all peers are connected.
func startSyncProtocol(ctx context.Context, p2pNode host.Host, key *k1.PrivateKey, defHash []byte,
peerIDs []peer.ID, onFailure func(), testConfig TestConfig,
peerIDs []peer.ID, onFailure func(), testConfig TestConfig, nickname string,
) (stepSyncFunc func(context.Context) error, shutdownFunc func(context.Context) error, err error) {
// Sign definition hash with charon-enr-private-key
// Note: libp2p signing does another hash of the defHash.
Expand All @@ -515,7 +516,7 @@ func startSyncProtocol(ctx context.Context, p2pNode host.Host, key *k1.PrivateKe

ctx := log.WithCtx(ctx, z.Str("peer", p2p.PeerName(pID)))

client := sync.NewClient(p2pNode, pID, hashSig, minorVersion, testConfig.SyncOpts...)
client := sync.NewClient(p2pNode, pID, hashSig, minorVersion, nickname, testConfig.SyncOpts...)
clients = append(clients, client)

go func() {
Expand Down
13 changes: 11 additions & 2 deletions dkg/dkgpb/v1/sync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dkg/dkgpb/v1/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ message MsgSync {
bool shutdown = 3;
string version = 4;
int64 step = 5;
string nickname = 6;
}

message MsgSyncResponse {
Expand Down
2 changes: 1 addition & 1 deletion dkg/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func RunProtocol(ctx context.Context, protocol Protocol, lockFilePath, privateKe

log.Info(ctx, "Waiting to connect to all peers...")

nextStepSync, stopSync, err := startSyncProtocol(ctx, thisNode, enrPrivateKey, lock.DefinitionHash, protocolCtx.PeerIDs, cancel, TestConfig{})
nextStepSync, stopSync, err := startSyncProtocol(ctx, thisNode, enrPrivateKey, lock.DefinitionHash, protocolCtx.PeerIDs, cancel, TestConfig{}, config.Nickname)
if err != nil {
return err
}
Expand Down
15 changes: 9 additions & 6 deletions dkg/sync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func WithPeriod(period time.Duration) func(*Client) {
}

// NewClient returns a new Client instance.
func NewClient(p2pNode host.Host, peer peer.ID, hashSig []byte, version version.SemVer, opts ...func(*Client)) *Client {
func NewClient(p2pNode host.Host, peer peer.ID, hashSig []byte, version version.SemVer, nickname string, opts ...func(*Client)) *Client {
c := &Client{
p2pNode: p2pNode,
peer: peer,
Expand All @@ -38,6 +38,7 @@ func NewClient(p2pNode host.Host, peer peer.ID, hashSig []byte, version version.
done: make(chan struct{}),
reconnect: true,
version: version,
nickname: nickname,
period: 100 * time.Millisecond, // Must be at least two times lower than the sync timeout (dkg.go, startSyncProtocol)
}

Expand All @@ -61,11 +62,12 @@ type Client struct {
done chan struct{}

// Immutable state
hashSig []byte
version version.SemVer
p2pNode host.Host
peer peer.ID
period time.Duration
hashSig []byte
version version.SemVer
p2pNode host.Host
peer peer.ID
period time.Duration
nickname string
}

// Run blocks while running the client-side sync protocol. It tries to reconnect if relay connection is dropped or
Expand Down Expand Up @@ -200,6 +202,7 @@ func (c *Client) sendMsg(stream network.Stream, shutdown bool) (*pb.MsgSyncRespo
HashSignature: c.hashSig,
Shutdown: shutdown,
Version: c.version.String(),
Nickname: c.nickname,
Step: int64(c.getStep()),
}

Expand Down
9 changes: 7 additions & 2 deletions dkg/sync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,17 @@ func (s *Server) handleStream(ctx context.Context, stream network.Stream) error
SyncTimestamp: msg.GetTimestamp(),
}

logOpts := []z.Field{z.Str("peer", p2p.PeerName(pID))}
if msg.GetNickname() != "" {
logOpts = append(logOpts, z.Str("nickname", msg.GetNickname()))
}

if err := s.validReq(pubkey, msg); err != nil {
s.setErr(errors.Wrap(err, "invalid sync message", z.Str("peer", p2p.PeerName(pID))))
s.setErr(errors.Wrap(err, "invalid sync message", logOpts...))
resp.Error = err.Error()
} else if !s.isConnected(pID) {
count := s.setConnected(pID)
log.Info(ctx, fmt.Sprintf("Connected to peer %d of %d", count, s.allCount))
log.Info(ctx, fmt.Sprintf("Connected to peer %d of %d", count, s.allCount), logOpts...)
}

if err := s.updateStep(pID, int(msg.GetStep())); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions dkg/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ import (

func TestSyncProtocol(t *testing.T) {
versions := make(map[int]version.SemVer)
nicknames := make(map[int]string)
for i := range 5 {
versions[i] = version.Version
nicknames[i] = fmt.Sprintf("node%d", i)
}

t.Run("2", func(t *testing.T) {
testCluster(t, 2, versions, "")
testCluster(t, 2, versions, "", nicknames)
})

t.Run("3", func(t *testing.T) {
testCluster(t, 3, versions, "")
testCluster(t, 3, versions, "", nicknames)
})

t.Run("5", func(t *testing.T) {
testCluster(t, 5, versions, "")
testCluster(t, 5, versions, "", nicknames)
})

t.Run("invalid version", func(t *testing.T) {
Expand All @@ -47,7 +49,7 @@ func TestSyncProtocol(t *testing.T) {
2: semver(t, "v0.3"),
3: semver(t, "v0.4"),
},
"mismatching charon version; expect=")
"mismatching charon version; expect=", nicknames)
})
}

Expand All @@ -60,7 +62,7 @@ func semver(t *testing.T, v string) version.SemVer {
return sv
}

func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr string) {
func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr string, nicknames map[int]string) {
t.Helper()

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -101,7 +103,7 @@ func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr
hashSig, err := keys[i].Sign(hash)
require.NoError(t, err)

client := sync.NewClient(tcpNodes[i], tcpNodes[j].ID(), hashSig, versions[i], sync.WithPeriod(time.Millisecond*100))
client := sync.NewClient(tcpNodes[i], tcpNodes[j].ID(), hashSig, versions[i], nicknames[i], sync.WithPeriod(time.Millisecond*100))
clients = append(clients, client)

ctx := log.WithTopic(ctx, fmt.Sprintf("client%d_%d", i, j))
Expand Down
Loading