From ed72a61b342bc834b1b2482d055ee3300e865099 Mon Sep 17 00:00:00 2001 From: Alonso Date: Tue, 12 Nov 2024 09:08:04 +0100 Subject: [PATCH] maxConnections config --- cmd/main.go | 3 ++- datastreamer/datastreamer_test.go | 2 +- datastreamer/streamrelay.go | 2 +- datastreamer/streamserver.go | 13 +++++++------ 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index e0b9c55..7e40a7a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -269,7 +269,7 @@ func runServer(ctx *cli.Context) error { // Create stream server s, err := datastreamer.NewServer(uint16(port), streamerVersion, streamerSystemID, StSequencer, file, time.Duration(writeTimeout)*time.Millisecond, time.Duration(inactivityTimeout)*time.Second, - 5*time.Second, nil) //nolint:mnd + 5*time.Second, nil, 0) //nolint:mnd if err != nil { return err } @@ -750,6 +750,7 @@ func checkEntryBlockSanity( return err } blockNum := l2Block.Number + log.Debug("L2BlockNum: ", blockNum) //Check previous End Block if sanityBlockEnd != blockNum { log.Warnf(`(X) SANITY CHECK failed (%d): BlockStart but the previous one is not closed yet? diff --git a/datastreamer/datastreamer_test.go b/datastreamer/datastreamer_test.go index 4f2cb1a..2a7e0f7 100644 --- a/datastreamer/datastreamer_test.go +++ b/datastreamer/datastreamer_test.go @@ -202,7 +202,7 @@ func TestServer(t *testing.T) { panic(err) } streamServer, err = datastreamer.NewServer(config.Port, 1, 137, streamType, - config.Filename, config.WriteTimeout, config.InactivityTimeout, 5*time.Second, &config.Log) + config.Filename, config.WriteTimeout, config.InactivityTimeout, 5*time.Second, &config.Log, 100) if err != nil { panic(err) } diff --git a/datastreamer/streamrelay.go b/datastreamer/streamrelay.go index 253bc0b..0dcf6ca 100644 --- a/datastreamer/streamrelay.go +++ b/datastreamer/streamrelay.go @@ -28,7 +28,7 @@ func NewRelay(server string, port uint16, version uint8, systemID uint64, // Create server side r.server, err = NewServer(port, version, systemID, streamType, fileName, writeTimeout, - inactivityTimeout, inactivityCheckInterval, cfg) + inactivityTimeout, inactivityCheckInterval, cfg, 100) if err != nil { log.Errorf("Error creating relay server side: %v", err) return nil, err diff --git a/datastreamer/streamserver.go b/datastreamer/streamserver.go index 6816fb8..0c140ba 100644 --- a/datastreamer/streamserver.go +++ b/datastreamer/streamserver.go @@ -38,7 +38,6 @@ type CommandError uint32 const EntryTypeNotFound = math.MaxUint32 const ( - maxConnections = 100 // Maximum number of connected clients streamBuffer = 256 // Buffers for the stream channel maxBookmarkLength = 16 // Maximum number of bytes for a bookmark ) @@ -116,6 +115,7 @@ type StreamServer struct { // Time interval to check for client connections that have reached // the inactivity timeout and kill them inactivityCheckInterval time.Duration + maxConnections uint32 started bool // Flag server started version uint8 @@ -165,7 +165,7 @@ type ResultEntry struct { // NewServer creates a new data stream server func NewServer(port uint16, version uint8, systemID uint64, streamType StreamType, fileName string, writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration, - cfg *log.Config) (*StreamServer, error) { + cfg *log.Config, maxConnections uint32) (*StreamServer, error) { // Create the server data stream s := StreamServer{ port: port, @@ -174,6 +174,7 @@ func NewServer(port uint16, version uint8, systemID uint64, streamType StreamTyp inactivityTimeout: inactivityTimeout, inactivityCheckInterval: inactivityCheckInterval, started: false, + maxConnections: maxConnections, version: version, systemID: systemID, @@ -284,8 +285,8 @@ func (s *StreamServer) waitConnections() { } // Check max connections allowed - if s.getSafeClientsLen() >= maxConnections { - log.Warnf("Unable to accept client connection, maximum number of connections reached (%d)", maxConnections) + if s.maxConnections != 0 && s.getSafeClientsLen() >= s.maxConnections { + log.Warnf("Unable to accept client connection, maximum number of connections reached (%d)", s.maxConnections) conn.Close() time.Sleep(timeout) continue @@ -1165,10 +1166,10 @@ func (s *StreamServer) getSafeClient(clientID string) *client { return s.clients[clientID] } -func (s *StreamServer) getSafeClientsLen() int { +func (s *StreamServer) getSafeClientsLen() uint32 { s.mutexClients.RLock() defer s.mutexClients.RUnlock() - return len(s.clients) + return uint32(len(s.clients)) } // BookmarkPrintDump prints all bookmarks