Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cmd/injectived/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (

"github.com/InjectiveLabs/injective-core/cmd/injectived/config"

injwebsocket "github.com/InjectiveLabs/injective-core/injective-chain/websocket"
"gopkg.in/DataDog/dd-trace-go.v1/profiler"
)

Expand Down Expand Up @@ -267,6 +268,9 @@ func addStartNodeFlags(cmd *cobra.Command, opts server.StartCmdOptions) {
cmd.Flags().Uint64(stream.FlagStreamServerPingInterval, 60, "Amount of time in seconds after which the server will send a keepalive ping to the client on an idle connection")
cmd.Flags().Uint64(stream.FlagStreamServerPingResponseTimeout, 40, "Amount of time in seconds the server waits for the client to respond to a ping message before forcing a disconnection")

// add websocket server flag
cmd.Flags().String(injwebsocket.FlagWebsocketServer, "", "Configure websocket server listen addr")

// add store commit sync flag
cmd.Flags().Bool(FlagMultiStoreCommitSync, false, "Define if commit multistore should use sync mode (false|true)")

Expand Down Expand Up @@ -584,17 +588,38 @@ func startInProcess(svrCtx *server.Context, svrCfg serverconfig.Config, clientCt
if pubBuffCap == 0 {
return fmt.Errorf("invalid publisher buffer capacity %d. Please set a positive value greater than 0", pubBuffCap)
}

eventPublisherStarted := false
injApp.EventPublisher.WithBufferCapacity(pubBuffCap)
if chainStreamServeAddr != "" {
// events are forwarded to StreamEvents channel in cosmos-sdk
injApp.EnableStreamer = true
if err = injApp.EventPublisher.Run(context.Background()); err != nil {
svrCtx.Logger.Error("failed to start event publisher", "error", err)
} else {
eventPublisherStarted = true
}
if err = injApp.ChainStreamServer.Serve(chainStreamServeAddr); err != nil {
svrCtx.Logger.Error("failed to start chainstream server", "error", err)
}
}

websocketServerAddr := cast.ToString(svrCtx.Viper.Get(injwebsocket.FlagWebsocketServer))
if websocketServerAddr != "" {
// don't start streamer server but still need part of its implement for websocket streams
injApp.EnableStreamer = true
if !eventPublisherStarted {
if err = injApp.EventPublisher.Run(context.Background()); err != nil {
svrCtx.Logger.Error("failed to start event publisher", "error", err)
} else {
eventPublisherStarted = true
}
}

if err := injApp.WebsocketServer.Serve(websocketServerAddr); err != nil {
svrCtx.Logger.Error("failed to start websocket server", "error", err)
}
}
}

closer.Bind(func() {
Expand Down
3 changes: 3 additions & 0 deletions injective-chain/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ import (
wasmxtypes "github.com/InjectiveLabs/injective-core/injective-chain/modules/wasmx/types"
"github.com/InjectiveLabs/injective-core/injective-chain/stream"
chaintypes "github.com/InjectiveLabs/injective-core/injective-chain/types"
injwebsocket "github.com/InjectiveLabs/injective-core/injective-chain/websocket"

// unnamed import of statik for swagger UI support
_ "github.com/InjectiveLabs/injective-core/client/docs/statik"
Expand Down Expand Up @@ -340,6 +341,7 @@ type InjectiveApp struct {
// stream server
ChainStreamServer *stream.StreamServer
EventPublisher *stream.Publisher
WebsocketServer *injwebsocket.WebsocketServer
}

// NewInjectiveApp returns a reference to a new initialized Injective application.
Expand Down Expand Up @@ -419,6 +421,7 @@ func NewInjectiveApp(
bus := pubsub.NewServer()
app.EventPublisher = stream.NewPublisher(app.StreamEvents, bus)
app.ChainStreamServer = stream.NewChainStreamServer(bus, appOpts)
app.WebsocketServer = injwebsocket.NewWebsocketServer(app.ChainStreamServer, logger)

authzcdc.GlobalCdc = codec.NewProtoCodec(app.interfaceRegistry)
ante.GlobalCdc = codec.NewProtoCodec(app.interfaceRegistry)
Expand Down
21 changes: 18 additions & 3 deletions injective-chain/stream/stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,11 @@ func (s *StreamServer) Stream(req *types.StreamRequest, server types.Stream_Stre
return status.Error(codes.Internal, err.Error())
}
}
err = server.Send(outResp)
if err != nil {
return status.Error(codes.Internal, err.Error())
if !req.OmitEmptyResponse || !isEmptyResponse(outResp) {
err = server.Send(outResp)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
}
height += 1
case <-server.Context().Done():
Expand All @@ -203,3 +205,16 @@ func (s *StreamServer) GetCurrentServerPort() int {
}
return s.listener.Addr().(*net.TCPAddr).Port
}

func isEmptyResponse(sr *types.StreamResponse) bool {
return len(sr.BankBalances) == 0 &&
len(sr.SubaccountDeposits) == 0 &&
len(sr.SpotTrades) == 0 &&
len(sr.DerivativeTrades) == 0 &&
len(sr.SpotOrders) == 0 &&
len(sr.DerivativeOrders) == 0 &&
len(sr.SpotOrderbookUpdates) == 0 &&
len(sr.DerivativeOrderbookUpdates) == 0 &&
len(sr.Positions) == 0 &&
len(sr.OraclePrices) == 0
}
Loading