Skip to content

Commit

Permalink
Refactor logs (#130)
Browse files Browse the repository at this point in the history
* refactor logs

* fix log broadcastAtomicOP

* fix clients ip log

* fix typo

* fix lint
  • Loading branch information
agnusmor authored Jul 30, 2024
1 parent 81bd3ec commit 95faf88
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,11 @@ func (s *StreamServer) addStream(desc string, etype EntryType, data []byte) (uin

// CommitAtomicOp commits the current atomic operation and streams it to the clients
func (s *StreamServer) CommitAtomicOp() error {
start := time.Now().UnixNano()
defer log.Infof("CommitAtomicOp process time: %vns", time.Now().UnixNano()-start)
start := time.Now()

log.Infof("commit datastream atomic operation, startEntry: %d", s.atomicOp.startEntry)
log.Debugf("committing datastream atomic operation, startEntry: %d", s.atomicOp.startEntry)
if s.atomicOp.status != aoStarted {
log.Errorf("Commit not allowed, AtomicOp is not in the started state")
log.Errorf("commit not allowed, atomic operation is not in the started state")
return ErrCommitNotAllowed
}

Expand All @@ -470,6 +469,8 @@ func (s *StreamServer) CommitAtomicOp() error {
// No atomic operation in progress
s.clearAtomicOp()

log.Infof("committed datastream atomic operation, startEntry: %d, time: %v", s.atomicOp.startEntry, time.Since(start))

return nil
}

Expand Down Expand Up @@ -688,21 +689,24 @@ func (s *StreamServer) broadcastAtomicOp() {
for {
// Wait for new atomic operation to broadcast
broadcastOp := <-s.stream
start := time.Now().UnixMilli()
start := time.Now()
var killedClientMap = map[string]struct{}{}
var clientMap = map[string]struct{}{}
s.mutexClients.RLock()
// For each connected and started client
log.Infof("sending %d datastream entries to %d clients", len(broadcastOp.entries), len(s.clients))
log.Debug("sending datastream entries, count: %d, clients: %d", len(broadcastOp.entries), len(s.clients))
for id, cli := range s.clients {
log.Debugf("Client %s status %d[%s]", id, cli.status, StrClientStatus[cli.status])
log.Debugf("client %s status %d (%s)", id, cli.status, StrClientStatus[cli.status])
clientMap[id] = struct{}{}
if cli.status != csSynced {
continue
}

// Send entries
for _, entry := range broadcastOp.entries {
if entry.Number >= cli.fromEntry {
log.Debugf("Sending data entry %d (type %d) to %s", entry.Number, entry.Type, id)
log.Debugf("sending data entry %d (type %d) to %s", entry.Number, entry.Type, id)

binaryEntry := encodeFileEntryToBinary(entry)

// Send the file data entry
Expand All @@ -713,7 +717,7 @@ func (s *StreamServer) broadcastAtomicOp() {
}
if err != nil {
// Kill client connection
log.Warnf("Error sending entry to %s: %v", id, err)
log.Warnf("error sending entry to %s, error: %v", id, err)
killedClientMap[id] = struct{}{}
break // skip rest of entries for this client
}
Expand All @@ -726,7 +730,17 @@ func (s *StreamServer) broadcastAtomicOp() {
s.killClient(k)
}

log.Infof("broadcastAtomicOp process time: %vms", time.Now().UnixMilli()-start)
sClients := ""
for c := range clientMap {
if sClients == "" {
sClients = c
} else {
sClients += ", " + c
}
}

log.Infof("sent datastream entries, count: %d, clients: %d, time: %v, clients-ip: {%s}",
len(broadcastOp.entries), len(s.clients), time.Since(start), sClients)
}
}

Expand Down

0 comments on commit 95faf88

Please sign in to comment.