Skip to content

Commit

Permalink
Polish
Browse files Browse the repository at this point in the history
  • Loading branch information
altafan committed Nov 7, 2024
1 parent 916e00b commit 37254f8
Showing 1 changed file with 6 additions and 20 deletions.
26 changes: 6 additions & 20 deletions server/internal/interface/grpc/handlers/arkservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,19 @@ func (h *handler) SubmitSignedForfeitTxs(
func (h *handler) GetEventStream(
_ *arkv1.GetEventStreamRequest, stream arkv1.ArkService_GetEventStreamServer,
) error {
doneCh := make(chan struct{})

listener := &listener[*arkv1.GetEventStreamResponse]{
id: uuid.NewString(),
done: doneCh,
ch: make(chan *arkv1.GetEventStreamResponse),
id: uuid.NewString(),
ch: make(chan *arkv1.GetEventStreamResponse),
}

h.eventsListenerHandler.pushListener(listener)
defer h.eventsListenerHandler.removeListener(listener.id)
defer close(listener.ch)
defer close(doneCh)

for {
select {
case <-stream.Context().Done():
return nil
case <-doneCh:
return nil
case ev := <-listener.ch:
if err := stream.Send(ev); err != nil {
return err
Expand Down Expand Up @@ -416,9 +410,8 @@ func (h *handler) GetTransactionsStream(
stream arkv1.ArkService_GetTransactionsStreamServer,
) error {
listener := &listener[*arkv1.GetTransactionsStreamResponse]{
id: uuid.NewString(),
done: make(chan struct{}),
ch: make(chan *arkv1.GetTransactionsStreamResponse),
id: uuid.NewString(),
ch: make(chan *arkv1.GetTransactionsStreamResponse),
}

h.transactionsListenerHandler.pushListener(listener)
Expand All @@ -445,7 +438,6 @@ func (h *handler) listenToEvents() {
channel := h.svc.GetEventsChannel(context.Background())
for event := range channel {
var ev *arkv1.GetEventStreamResponse
shouldClose := false

switch e := event.(type) {
case domain.RoundFinalizationStarted:
Expand All @@ -461,7 +453,6 @@ func (h *handler) listenToEvents() {
},
}
case domain.RoundFinalized:
// shouldClose = true
ev = &arkv1.GetEventStreamResponse{
Event: &arkv1.GetEventStreamResponse_RoundFinalized{
RoundFinalized: &arkv1.RoundFinalizedEvent{
Expand All @@ -471,7 +462,6 @@ func (h *handler) listenToEvents() {
},
}
case domain.RoundFailed:
// shouldClose = true
ev = &arkv1.GetEventStreamResponse{
Event: &arkv1.GetEventStreamResponse_RoundFailed{
RoundFailed: &arkv1.RoundFailed{
Expand Down Expand Up @@ -519,9 +509,6 @@ func (h *handler) listenToEvents() {
for _, l := range h.eventsListenerHandler.listeners {
go func(l *listener[*arkv1.GetEventStreamResponse]) {
l.ch <- ev
if shouldClose {
l.done <- struct{}{}
}
}(l)
}
}
Expand Down Expand Up @@ -577,9 +564,8 @@ func convertAsyncPaymentEvent(e application.RedeemTransactionEvent) *arkv1.Redee
}

type listener[T any] struct {
id string
done chan struct{}
ch chan T
id string
ch chan T
}

type listenerHanlder[T any] struct {
Expand Down

0 comments on commit 37254f8

Please sign in to comment.