Skip to content

Commit

Permalink
added entity implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mohitpubnub committed Jun 4, 2024
1 parent 4295001 commit 7cdc839
Show file tree
Hide file tree
Showing 13 changed files with 588 additions and 268 deletions.
4 changes: 2 additions & 2 deletions src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class SubscribeEndpoint<T>: ISubscribeOperation<T>
private SubscribeEventEngineFactory subscribeEventEngineFactory;
private PresenceOperation<T> presenceOperation;
private string instanceId { get; set; }
public EventEmitter EventEmitter { get; set; }
public List<SubscribeCallback> SubscribeListenerList
{
get;
Expand Down Expand Up @@ -126,8 +127,7 @@ private void Subscribe(string[] channels, string[] channelGroups, SubscriptionCu
subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId);
} else {
var subscribeManager = new SubscribeManager2(config, jsonLibrary, unit, pubnubLog, pubnubTelemetryMgr, pubnubTokenMgr, PubnubInstance);
var eventEmitter = new EventEmitter(config, SubscribeListenerList, jsonLibrary, pubnubTokenMgr, pubnubLog, PubnubInstance);
subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(instanceId, PubnubInstance, config, subscribeManager, eventEmitter, jsonLibrary, StatusEmitter);
subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(instanceId, PubnubInstance, config, subscribeManager, this.EventEmitter, jsonLibrary, StatusEmitter);
subscribeEventEngine.OnStateTransition += SubscribeEventEngine_OnStateTransition;
subscribeEventEngine.OnEventQueued += SubscribeEventEngine_OnEventQueued;
subscribeEventEngine.OnEffectDispatch += SubscribeEventEngine_OnEffectDispatch;
Expand Down
72 changes: 52 additions & 20 deletions src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Threading;

namespace PubnubApi.EndPoint
{
Expand Down Expand Up @@ -78,28 +76,62 @@ private void Unsubscribe(string[] channels, string[] channelGroups)

if (this.subscribeEventEngineFactory.HasEventEngine(instanceId)) {
subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId);
subscribeEventEngine.Unsubscribe(channels, channelGroups);
} else {
LoggingMethod.WriteToLog(pubnubLog, string.Format(CultureInfo.InvariantCulture, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, Attempted Unsubscribe without EventEngine subscribe."), config.LogVerbosity);
}
if (config.PresenceInterval > 0 && presenceEventEngineFactory.HasEventEngine(instanceId)) {
PresenceEventEngine presenceEventEngine = presenceEventEngineFactory.GetEventEngine(instanceId);
presenceEventEngine.EventQueue.Enqueue(new EventEngine.Presence.Events.LeftEvent() { Input = new EventEngine.Presence.Common.PresenceInput() { Channels = channels, ChannelGroups = channelGroups } });
}
if (config.MaintainPresenceState) {
if (ChannelLocalUserState.TryGetValue(PubnubInstance.InstanceId, out
var userState)) {
foreach (var channelName in channels ?? new string[0]) {
userState.TryRemove(channelName, out _);
}
channels = channels ?? new string[] { };
channelGroups = channelGroups ?? new string[] { };
var channelsWithPresence = channels.Concat(channels.Select((c) => $"{c}-pnpres")).ToList();
var filteredChannelNames = new List<string>(subscribeEventEngine.Channels);
foreach (var c in channelsWithPresence) {
filteredChannelNames.Remove(c);
}
if (ChannelGroupLocalUserState.TryGetValue(PubnubInstance.InstanceId, out
var channelGroupUserState)) {
foreach (var channelGroupName in channelGroups ?? new string[0]) {
channelGroupUserState.TryRemove(channelGroupName, out _);
var channelGroupsWithPresence = channelGroups.Concat(channelGroups.Select((cg) => $"{cg}-pnpres")).ToList();
var filteredChannelGroupNames = new List<string>(subscribeEventEngine.ChannelGroups);
foreach (var g in channelGroupsWithPresence) {
filteredChannelGroupNames.Remove(g);
}
if (subscribeEventEngine.Channels.Distinct().Count() != filteredChannelNames.Distinct().Count() ||
subscribeEventEngine.ChannelGroups.Distinct().Count() != filteredChannelGroupNames.Distinct().Count()) {

var channelsToRemove = FindUniqueCommonElements(subscribeEventEngine.Channels, channels.ToList());
var channelGroupsToRemove = FindUniqueCommonElements(subscribeEventEngine.ChannelGroups, channelGroups.ToList());

subscribeEventEngine.Unsubscribe(channelsToRemove.ToArray(), channelGroupsToRemove.ToArray());

if (config.PresenceInterval > 0 && presenceEventEngineFactory.HasEventEngine(instanceId)) {
PresenceEventEngine presenceEventEngine = presenceEventEngineFactory.GetEventEngine(instanceId);
presenceEventEngine.EventQueue.Enqueue(new EventEngine.Presence.Events.LeftEvent() { Input = new EventEngine.Presence.Common.PresenceInput() { Channels = channelsToRemove.ToArray(), ChannelGroups = channelGroupsToRemove.ToArray() } });
}
if (config.MaintainPresenceState) {
if (ChannelLocalUserState.TryGetValue(PubnubInstance.InstanceId, out
var userState)) {
foreach (var channelName in channels ?? new string[0]) {
userState.TryRemove(channelName, out _);
}
}
if (ChannelGroupLocalUserState.TryGetValue(PubnubInstance.InstanceId, out
var channelGroupUserState)) {
foreach (var channelGroupName in channelGroups ?? new string[0]) {
channelGroupUserState.TryRemove(channelGroupName, out _);
}
}
}
} else {
subscribeEventEngine.Channels = filteredChannelNames;
subscribeEventEngine.ChannelGroups = filteredChannelGroupNames;
}

} else {
LoggingMethod.WriteToLog(pubnubLog, string.Format(CultureInfo.InvariantCulture, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, Attempted Unsubscribe without EventEngine subscribe."), config.LogVerbosity);
}

}

private IEnumerable<string> FindUniqueCommonElements(List<string> a, List<string> b)
{
return a
.Where(value =>
b.Contains(value) &&
a.IndexOf(value) == a.LastIndexOf(value) &&
b.IndexOf(value) == b.LastIndexOf(value));
}
}
}
30 changes: 16 additions & 14 deletions src/Api/PubnubApi/Entity/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@
using System.Linq;
using PubnubApi.EventEngine.Common;

namespace PubnubApi {
public class Channel
namespace PubnubApi
{
public string Name { get; set; }
private Pubnub Pubnub { get; set; }
private EventEmitter EventEmitter { get; set; }
public Channel(string name, Pubnub pubnub, EventEmitter eventEmitter)
public class Channel
{
Name = name;
this.Pubnub = pubnub;
this.EventEmitter = eventEmitter;
}
public string Name { get; set; }
private Pubnub Pubnub { get; set; }
private EventEmitter EventEmitter { get; set; }

Subscription Subscription(SubscriptionOptions options = SubscriptionOptions.None)
{
return new Subscription(this.Name, options, this.Pubnub, this.EventEmitter);
public Channel(string name, Pubnub pubnub, EventEmitter eventEmitter)
{
Name = name;
this.Pubnub = pubnub;
this.EventEmitter = eventEmitter;
}

public Subscription Subscription(SubscriptionOptions options = SubscriptionOptions.None)
{
return new Subscription(options == SubscriptionOptions.ReceivePresenceEvents ? new string[] { Name, $"{Name}-pnpres" } : new string[] { Name }, new string[] { }, options, this.Pubnub, this.EventEmitter);
}
}
}
}
4 changes: 2 additions & 2 deletions src/Api/PubnubApi/Entity/ChannelGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public ChannelGroup(string name, Pubnub pubnub, EventEmitter eventEmitter)
this.EventEmitter = eventEmitter;
}

Subscription Subscription(SubscriptionOptions options = SubscriptionOptions.None)
public Subscription Subscription(SubscriptionOptions options = SubscriptionOptions.None)
{
return new Subscription(this.Name, options, this.Pubnub, this.EventEmitter);
return new Subscription(new string[] { }, options == SubscriptionOptions.ReceivePresenceEvents ? new string[] { Name, $"{Name}-pnpres" } : new string[] { Name }, options, this.Pubnub, this.EventEmitter);
}
}
}
76 changes: 76 additions & 0 deletions src/Api/PubnubApi/Entity/SubscribeCapable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System;
using System.Collections.Generic;
using PubnubApi.EventEngine.Common;
using PubnubApi.EventEngine.Subscribe.Common;

namespace PubnubApi
{
public abstract class SubscribeCapable
{
public abstract List<string> ChannelNames { get; set; }
public abstract List<string> ChannelGroupNames { get; set; }
public abstract Pubnub Pubnub { get; set; }
public abstract EventEmitter EventEmitter { get; set; }
public abstract SubscriptionOptions Options { get; set; }
public abstract SubscribeCallbackExt Listener { get; set; }

public Action<Pubnub, PNPresenceEventResult> OnPresence {
set {
Listener.presenceAction = value;
}
}
public Action<Pubnub, PNObjectEventResult> OnObjects {
set {
Listener.objectEventAction = value;
}
}
public Action<Pubnub, PNFileEventResult> OnFile {
set {
Listener.fileAction = value;
}
}
public Action<Pubnub, PNMessageActionEventResult> OnMessageAction {
set {
Listener.messageAction = value;
}
}
public Action<Pubnub, PNMessageResult<object>> OnMessage {
set {
Listener.subscribeAction = value;
}
}
public Action<Pubnub, PNSignalResult<object>> OnSignal {
set {
Listener.signalAction = value;
}
}

public void Subscribe<T>(SubscriptionCursor cursor = null)
{
var subscription = this.Pubnub.Subscribe<T>().Channels(this.ChannelNames.ToArray()).ChannelGroups(this.ChannelGroupNames.ToArray());
if (cursor is not null && cursor.Timetoken != 0) {
var timetoken = cursor.Timetoken ?? 0;
subscription.WithTimetoken(timetoken).Execute();

} else {
subscription.Execute();
}
}

public void AddListener(SubscribeCallbackExt listener)
{
this.EventEmitter.AddListener(listener, this.ChannelNames.ToArray(), this.ChannelGroupNames.ToArray());
}

public void RemoveListener(SubscribeCallbackExt listener)
{
this.EventEmitter.RemoveListener(listener, this.ChannelNames.ToArray(), this.ChannelGroupNames.ToArray());
}

public void UnSubscribe<T>()
{
this.Pubnub.Unsubscribe<T>().Channels(ChannelNames.ToArray()).ChannelGroups(ChannelGroupNames.ToArray()).Execute();
}
}
}

28 changes: 17 additions & 11 deletions src/Api/PubnubApi/Entity/Subscription.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
using System;
using System.Collections.Generic;
using System.Linq;
using PubnubApi.EventEngine.Common;

namespace PubnubApi
{
public class Subscription
public class Subscription : SubscribeCapable
{
public string[] Names { get; set; } = new string[] { };
private Pubnub Pubnub { get; set; }
private EventEmitter EventEmitter { get; set; }
public override List<string> ChannelNames { get; set; } = new List<string>();
public override List<string> ChannelGroupNames { get; set; } = new List<string>();
public override Pubnub Pubnub { get; set; }
public override EventEmitter EventEmitter { get; set; }
public override SubscriptionOptions Options { get; set; }
public override SubscribeCallbackExt Listener { get; set; } = new SubscribeCallbackExt();

public Subscription(string name, SubscriptionOptions options, Pubnub pubnub, EventEmitter eventEmitter)
public Subscription(string[] channels, string[] channelGroups, SubscriptionOptions options, Pubnub pubnub, EventEmitter eventEmitter)
{
Names.ToList().Add(name);
if (options == SubscriptionOptions.ReceivePresenceEvents) {
Names.ToList().Add($"{name}-pnpres");
}
this.ChannelNames = channels.ToList();
this.ChannelGroupNames = channelGroups.ToList();
this.Pubnub = pubnub;
this.EventEmitter = eventEmitter;
this.Options = options;
this.EventEmitter.AddListener(Listener, channels, channelGroups);
}

SubscriptionSet AddSubscription(Subscription subscription)
public SubscriptionSet Add(Subscription subscription)
{
return new SubscriptionSet();
this.ChannelNames.AddRange(subscription.ChannelNames);
this.ChannelGroupNames.AddRange(subscription.ChannelGroupNames);
return new SubscriptionSet(this.ChannelNames.ToArray(),this.ChannelGroupNames.ToArray() , this.Options, this.Pubnub, this.EventEmitter);
}
}
}
87 changes: 80 additions & 7 deletions src/Api/PubnubApi/Entity/SubscriptionSet.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,99 @@
using System;
using System.Linq;
using System.Collections.Generic;
using PubnubApi.EventEngine.Common;

namespace PubnubApi
{

class SubscriptionSet
public class SubscriptionSet : SubscribeCapable
{
public SubscriptionSet()
public override List<string> ChannelNames { get; set; } = new List<string>();
public override List<string> ChannelGroupNames { get; set; } = new List<string>();
public override Pubnub Pubnub { get; set; }
public override EventEmitter EventEmitter { get; set; }
public override SubscriptionOptions Options { get; set; }
List<Subscription> SubscriptionList { get; set; } = new List<Subscription>();
public override SubscribeCallbackExt Listener { get; set; } = new SubscribeCallbackExt();

public SubscriptionSet(string[] channels, string[] channelGroups, SubscriptionOptions options, Pubnub pubnub, EventEmitter eventEmitter)
{
this.Pubnub = pubnub;
this.EventEmitter = eventEmitter;
this.Options = options;

foreach (var c in channels
.Where(c => !c.EndsWith("-pnpres"))) {
var subscription = this.Pubnub.Channel(c).Subscription(this.Options);
this.ChannelNames.AddRange(subscription.ChannelNames);
this.SubscriptionList.Add(subscription);
}

foreach (var cg in channelGroups
.Where(cg => !cg.EndsWith("-pnpres"))) {
var subscription = this.Pubnub.ChannelGroup(cg).Subscription(this.Options);
this.ChannelGroupNames.AddRange(subscription.ChannelGroupNames);
this.SubscriptionList.Add(subscription);
}
}

public SubscriptionSet Add(Subscription subscription)
{
// Ctor with Channels and channelGroups
this.SubscriptionList.ToList().Add(subscription);
if (subscription.ChannelNames.Count > 0) {
this.ChannelNames.AddRange(subscription.ChannelNames);
}
if (subscription.ChannelGroupNames.Count > 0) {
this.ChannelGroupNames.AddRange(subscription.ChannelGroupNames);
}
this.EventEmitter.AddListener(this.Listener, subscription.ChannelNames.ToArray(), subscription.ChannelGroupNames.ToArray());
return this;
}

public SubscriptionSet Remove(Subscription subscription)
{
this.SubscriptionList.Remove(subscription);
if (subscription.ChannelNames.Count > 0) {
foreach (var c in subscription.ChannelNames) {
this.ChannelNames.Remove(c);
}
}
if (subscription.ChannelGroupNames.Count > 0) {
foreach (var g in subscription.ChannelGroupNames) {
this.ChannelGroupNames.Remove(g);
}
}
this.EventEmitter.RemoveListener(this.Listener, subscription.ChannelNames.ToArray(), subscription.ChannelGroupNames.ToArray());
return this;
}

SubscriptionSet AddSubscription(Subscription subscription)
public SubscriptionSet Add(SubscriptionSet subscriptionSet)
{
// CRUD
this.SubscriptionList.AddRange(subscriptionSet.SubscriptionList);
if (subscriptionSet.ChannelNames.Count > 0) {
this.ChannelNames.AddRange(subscriptionSet.ChannelNames);
}
if (subscriptionSet.ChannelGroupNames.Count > 0) {
this.ChannelGroupNames.AddRange(subscriptionSet.ChannelGroupNames);
}
this.EventEmitter.AddListener(this.Listener, subscriptionSet.ChannelNames.ToArray(), subscriptionSet.ChannelGroupNames.ToArray());
return this;
}

SubscriptionSet RemoveSubscription(Subscription subscription)
public SubscriptionSet Remove(SubscriptionSet subscriptionSet)
{
// CRUD
SubscriptionList = this.SubscriptionList.Except(subscriptionSet.SubscriptionList).ToList();
if (subscriptionSet.ChannelNames.Count > 0) {
foreach (var c in subscriptionSet.ChannelNames) {
this.ChannelNames.Remove(c);
}
}
if (subscriptionSet.ChannelGroupNames.Count > 0) {
foreach (var g in subscriptionSet.ChannelGroupNames) {
this.ChannelGroupNames.Remove(g);
}
}
this.EventEmitter.RemoveListener(this.Listener, subscriptionSet.ChannelNames.ToArray(), subscriptionSet.ChannelGroupNames.ToArray());
return this;
}
}
Expand Down
Loading

0 comments on commit 7cdc839

Please sign in to comment.