Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cmd/traffic/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

import (
"os"

"github.com/named-data/ndnd/tools"
"github.com/spf13/cobra"
)

func main() {
root := &cobra.Command{
Use: "ndnd-traffic",
Short: "NDN traffic generator",
}
root.AddCommand(tools.CmdTraffic())
if err := root.Execute(); err != nil {
os.Exit(1)
}
}
6 changes: 6 additions & 0 deletions dv/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

enc "github.com/named-data/ndnd/std/encoding"
"github.com/named-data/ndnd/std/ndn"
mgmt "github.com/named-data/ndnd/std/ndn/mgmt_2022"
)

Expand Down Expand Up @@ -42,6 +43,11 @@ type Config struct {
// List of permanent neighbors.
Neighbors []Neighbor `json:"neighbors"`

// Pre-built KeyChain (bypasses KeyChainUri if set).
KeyChain ndn.KeyChain `json:"-"`
// Pre-built Store (bypasses internal store creation if set).
Store ndn.Store `json:"-"`

// Parsed Global Prefix
networkNameN enc.Name
// Parsed Router Prefix
Expand Down
8 changes: 4 additions & 4 deletions dv/dv/advert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (a *advertModule) generate() {
a.objDir.Evict(a.dv.client)

// Notify neighbors with sync for new advertisement
go a.sendSyncInterest()
a.dv.GoFunc(func() { a.sendSyncInterest() })
}

// (AI GENERATED DESCRIPTION): Fetches a neighbor’s DV advertisement Data packet (retrying on error) and invokes `dataHandler` with its content when the neighbor’s boot time and sequence number match the expected values.
Expand All @@ -56,14 +56,14 @@ func (a *advertModule) dataFetch(nName enc.Name, bootTime uint64, seqNo uint64)
a.dv.client.Consume(advName, func(state ndn.ConsumeState) {
if err := state.Error(); err != nil {
log.Warn(a, "Failed to fetch advertisement", "name", state.Name(), "err", err)
time.AfterFunc(1*time.Second, func() {
a.dv.engine.Timer().Schedule(1*time.Second, func() {
a.dataFetch(nName, bootTime, seqNo)
})
return
}

// Process the advertisement
go a.dataHandler(nName, seqNo, state.Content())
a.dv.GoFunc(func() { a.dataHandler(nName, seqNo, state.Content()) })
})
}

Expand Down Expand Up @@ -92,5 +92,5 @@ func (a *advertModule) dataHandler(nName enc.Name, seqNo uint64, data enc.Wire)

// Update the local advertisement list
ns.Advert = advert
go a.dv.updateRib(ns)
a.dv.GoFunc(func() { a.dv.updateRib(ns) })
}
6 changes: 3 additions & 3 deletions dv/dv/advert_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (a *advertModule) OnSyncInterest(args ndn.InterestHandlerArgs, active bool)
}

// Process the state vector
go a.onStateVector(params.StateVector, args.IncomingFaceId.Unwrap(), active)
a.dv.GoFunc(func() { a.onStateVector(params.StateVector, args.IncomingFaceId.Unwrap(), active) })
},
})
}
Expand Down Expand Up @@ -197,13 +197,13 @@ func (a *advertModule) onStateVector(sv *spec_svs.StateVector, faceId uint64, ac
ns.AdvertBoot = entry.BootstrapTime
ns.AdvertSeq = entry.SeqNo

time.AfterFunc(10*time.Millisecond, func() { // debounce
a.dv.engine.Timer().Schedule(10*time.Millisecond, func() { // debounce
a.dataFetch(node.Name, entry.BootstrapTime, entry.SeqNo)
})
}

// Update FIB if needed
if fibDirty {
go a.dv.updateFib()
a.dv.GoFunc(a.dv.updateFib)
}
}
157 changes: 136 additions & 21 deletions dv/dv/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/named-data/ndnd/dv/config"
"github.com/named-data/ndnd/dv/nfdc"
"github.com/named-data/ndnd/dv/table"
"github.com/named-data/ndnd/fw/core"
enc "github.com/named-data/ndnd/std/encoding"
"github.com/named-data/ndnd/std/log"
"github.com/named-data/ndnd/std/ndn"
Expand Down Expand Up @@ -37,6 +38,19 @@ type Router struct {
// single mutex for all operations
mutex sync.Mutex

// GoFunc dispatches work asynchronously.
// In production, this launches a goroutine.
// In simulation, this schedules via the simulation clock.
GoFunc func(func())

// NowFunc returns current time. Defaults to time.Now.
// In simulation, returns simulation clock time.
NowFunc func() time.Time

// AfterFunc schedules f after d. Returns cancel function.
// Defaults to time.AfterFunc wrapper.
AfterFunc func(time.Duration, func()) func()

// channel to stop the DV
stop chan bool
// heartbeat for outgoing Advertisements
Expand Down Expand Up @@ -70,12 +84,29 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) {
return nil, err
}

// Create packet store
store := storage.NewMemoryStore()
// Create packet store (use pre-built store if provided)
var store ndn.Store
if config.Store != nil {
store = config.Store
} else {
store = storage.NewMemoryStore()
}

// Create security configuration
var trust *sec.TrustConfig = nil
if config.KeyChainUri == "insecure" {
if config.KeyChain != nil {
// Use pre-built keychain (e.g. from simulation trust setup)
schema, err := trust_schema.NewLvsSchema(config.SchemaBytes())
if err != nil {
return nil, err
}
anchors := config.TrustAnchorNames()
trust, err = sec.NewTrustConfig(config.KeyChain, schema, anchors)
if err != nil {
return nil, err
}
trust.UseDataNameFwHint = true
} else if config.KeyChainUri == "insecure" {
log.Warn(nil, "Security is disabled - insecure mode")
} else {
kc, err := keychain.NewKeyChain(config.KeyChainUri, store)
Expand All @@ -98,25 +129,34 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) {

// Create the DV router
dv := &Router{
engine: engine,
config: config,
trust: trust,
client: object.NewClient(engine, store, trust),
nfdc: nfdc.NewNfdMgmtThread(engine),
mutex: sync.Mutex{},
engine: engine,
config: config,
trust: trust,
client: object.NewClient(engine, store, trust),
nfdc: nfdc.NewNfdMgmtThread(engine),
mutex: sync.Mutex{},
GoFunc: func(f func()) { go f() },
NowFunc: core.Now,
AfterFunc: func(d time.Duration, f func()) func() {
t := time.AfterFunc(d, f)
return func() { t.Stop() }
},
}

// Initialize advertisement module
// Initialize advertisement module.
// NOTE: bootTime must NOT be computed here — NowFunc may still point to
// the real wall clock at construction time. For production Start() and
// simulation Init(), bootTime is computed lazily via dv.NowFunc() at the
// start of those methods, after any override (e.g. SimDvRouter) is in place.
// createPrefixTable() is also deferred for the same reason: it embeds
// bootTime into pfxSvs, which would then compare incoming BootstrapTimes
// against the wrong clock.
dv.advert = advertModule{
dv: dv,
bootTime: uint64(time.Now().Unix()),
seq: 0,
objDir: storage.NewMemoryFifoDir(32), // keep last few advertisements
dv: dv,
seq: 0,
objDir: storage.NewMemoryFifoDir(32), // keep last few advertisements
}

// Create prefix table
dv.createPrefixTable()

// Create DV tables
dv.neighbors = table.NewNeighborTable(config, dv.nfdc)
dv.rib = table.NewRib(config)
Expand All @@ -135,6 +175,11 @@ func (dv *Router) Start() (err error) {
log.Info(dv, "Starting DV router", "version", utils.NDNdVersion)
defer log.Info(dv, "Stopped DV router")

// Lazily initialize bootTime and prefix table so that any NowFunc
// override (e.g. from simulation) is in effect before pfxSvs is created.
dv.advert.bootTime = max(uint64(dv.NowFunc().Unix()), 1)
dv.createPrefixTable()

// Initialize channels
dv.stop = make(chan bool, 1)

Expand All @@ -153,7 +198,7 @@ func (dv *Router) Start() (err error) {
defer dv.client.Stop()

// Start management thread
go dv.nfdc.Start()
dv.GoFunc(func() { dv.nfdc.Start() })
defer dv.nfdc.Stop()

// Configure face
Expand Down Expand Up @@ -194,6 +239,71 @@ func (dv *Router) Stop() {
dv.stop <- true
}

// Init initializes the DV router without blocking.
// This is the simulation-compatible variant of Start() — it performs all
// setup (faces, handlers, sync groups, initial advertisement) but returns
// immediately instead of entering a ticker-driven select loop.
// The caller is responsible for periodically calling RunHeartbeat() and
// RunDeadcheck() using the simulation clock.
func (dv *Router) Init() error {
log.Info(dv, "Initializing DV router (sim)", "version", utils.NDNdVersion)

// Lazily initialize bootTime and prefix table so that the NowFunc
// override installed by SimDvRouter (clock.Now) is used instead of the
// wall clock. If bootTime were set at NewRouter() time, it would carry
// the real Unix timestamp (~1773989593 in 2026), which SVS on every peer
// would reject as "far future" because the simulation clock starts near
// the Unix epoch.
dv.advert.bootTime = max(uint64(dv.NowFunc().Unix()), 1)
dv.createPrefixTable()

// Start object client
dv.client.Start()

// Register interest handlers
if err := dv.register(); err != nil {
return err
}

// Start sync groups
dv.pfxSvs.Start()

// Add self to the RIB and make initial advertisement
dv.rib.Set(dv.config.RouterName(), dv.config.RouterName(), 0)
dv.advert.generate()

// Initialize prefix table
dv.pfx.Reset()

return nil
}

// RunHeartbeat sends a sync Interest to all neighbors.
// In production this is driven by time.Ticker; in simulation it is
// called by the simulation clock scheduler.
func (dv *Router) RunHeartbeat() {
dv.advert.sendSyncInterest()
}

// RunDeadcheck checks for dead neighbors and prunes routes.
// In production this is driven by time.Ticker; in simulation it is
// called by the simulation clock scheduler.
func (dv *Router) RunDeadcheck() {
dv.checkDeadNeighbors()
}

// Cleanup tears down the DV router (simulation variant of deferred cleanup in Start).
func (dv *Router) Cleanup() {
dv.pfxSvs.Stop()
dv.client.Stop()
log.Info(dv, "Cleaned up DV router (sim)")
}

// Nfdc returns the NFD management thread.
func (dv *Router) Nfdc() *nfdc.NfdMgmtThread {
return dv.nfdc
}

// Configure the face to forwarder.
func (dv *Router) configureFace() (err error) {
// Enable local fields on face. This includes incoming face indication.
Expand All @@ -215,7 +325,7 @@ func (dv *Router) register() (err error) {
// Advertisement Sync (active)
err = dv.engine.AttachHandler(dv.config.AdvertisementSyncActivePrefix(),
func(args ndn.InterestHandlerArgs) {
go dv.advert.OnSyncInterest(args, true)
dv.GoFunc(func() { dv.advert.OnSyncInterest(args, true) })
})
if err != nil {
return err
Expand All @@ -224,7 +334,7 @@ func (dv *Router) register() (err error) {
// Advertisement Sync (passive)
err = dv.engine.AttachHandler(dv.config.AdvertisementSyncPassivePrefix(),
func(args ndn.InterestHandlerArgs) {
go dv.advert.OnSyncInterest(args, false)
dv.GoFunc(func() { dv.advert.OnSyncInterest(args, false) })
})
if err != nil {
return err
Expand All @@ -233,7 +343,7 @@ func (dv *Router) register() (err error) {
// Router management
err = dv.engine.AttachHandler(dv.config.MgmtPrefix(),
func(args ndn.InterestHandlerArgs) {
go dv.mgmtOnInterest(args)
dv.GoFunc(func() { dv.mgmtOnInterest(args) })
})
if err != nil {
return err
Expand Down Expand Up @@ -355,6 +465,9 @@ func (dv *Router) createPrefixTable() {
Client: dv.client,
GroupPrefix: dv.config.PrefixTableGroupPrefix(),
BootTime: dv.advert.bootTime,
GoFunc: func(f func()) { dv.GoFunc(f) },
NowFunc: func() time.Time { return dv.NowFunc() },
AfterFunc: func(d time.Duration, f func()) func() { return dv.AfterFunc(d, f) },
},
Snapshot: &ndn_sync.SnapshotNodeLatest{
Client: dv.client,
Expand All @@ -373,5 +486,7 @@ func (dv *Router) createPrefixTable() {
if _, _, err := dv.pfxSvs.Publish(w); err != nil {
log.Error(dv, "Failed to publish prefix table update", "err", err)
}
}, table.PrefixTableOptions{
NowFunc: dv.NowFunc,
})
}
Loading
Loading