Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed missing ReceiveReady event wire-up for PPP worker #10

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 69 additions & 64 deletions src/Pirate Pattern/Paranoid Pirate/ParanoidPirate.Worker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ namespace ParanoidPirate.Worker
{
internal static class Program
{
// if liveliness == 0 -> queue is considered dead/disconnected
private static int liveliness = Commons.HeartbeatLiveliness;

private static int cycles = 0;
private static int interval = Commons.IntervalInit;
private static bool crash = false;
private static bool verbose;
private static Random rnd;

/// <summary>
/// ParanoidPirate.Worker [-v]
///
Expand All @@ -20,71 +29,12 @@ private static void Main(string[] args)
{
Console.Title = "NetMQ ParanoidPirate Worker";

var verbose = args.Length > 0 && args[0] == "-v";

var rnd = new Random();
verbose = args.Length > 0 && args[0] == "-v";
var heartbeatAt = DateTime.UtcNow + TimeSpan.FromMilliseconds(Commons.HeartbeatInterval);
rnd = new Random();
var workerId = rnd.Next(100, 500);
var worker = GetWorkerSocket(verbose, workerId);

// if liveliness == 0 -> queue is considered dead/disconnected
var liveliness = Commons.HeartbeatLiveliness;
var interval = Commons.IntervalInit;
var heartbeatAt = DateTime.UtcNow + TimeSpan.FromMilliseconds(Commons.HeartbeatInterval);
var cycles = 0;
var crash = false;

// upon receiving a message this event handler is called
// read the message, randomly simulate failure/problem
// process message or heartbeat or crash
worker.ReceiveReady += (s, e) =>
{
var msg = e.Socket.ReceiveMultipartMessage();

// message is a NetMQMessage (!)
// - 3 part envelope + content -> request
// - 1 part HEARTBEAT -> heartbeat
if (msg.FrameCount > 3)
{
// in order to test the robustness we simulate a couple of typical problems
// e.g. worker crushing or running very slow
// that is initiated after multiple cycles to give everything time to stabilize first
cycles++;

if (cycles > 3 && rnd.Next(5) == 0)
{
Console.WriteLine("[WORKER] simulating crashing!");
crash = true;
return;
}

if (cycles > 3 && rnd.Next(3) == 0)
{
Console.WriteLine("[WORKER] Simulating CPU overload!");
Thread.Sleep(500);
}

if (verbose)
Console.Write("[WORKER] working ...!");

// simulate high workload
Thread.Sleep(10);

if (verbose)
Console.WriteLine("[WORKER] sending {0}", msg.ToString());

// answer
e.Socket.SendMultipartMessage(msg);
// reset liveliness
liveliness = Commons.HeartbeatLiveliness;
}
else if (IsHeartbeatMessage(msg))
liveliness = Commons.HeartbeatLiveliness;
else
Console.WriteLine("[WORKER] Received invalid message!");

interval = Commons.IntervalInit;
};

while (!crash)
{
// wait for incoming request for specified milliseconds
Expand Down Expand Up @@ -132,6 +82,62 @@ private static void Main(string[] args)
Console.ReadKey();
}

private static void OnWorkerReceiveReady(object sender, NetMQSocketEventArgs e)
{
// upon receiving a message this event handler is called
// read the message, randomly simulate failure/problem
// process message or heartbeat or crash

var msg = e.Socket.ReceiveMultipartMessage();

// message is a NetMQMessage (!)
// - 3 part envelope + content -> request
// - 1 part HEARTBEAT -> heartbeat
if (msg.FrameCount > 3)
{
// in order to test the robustness we simulate a couple of typical problems
// e.g. worker crushing or running very slow
// that is initiated after multiple cycles to give everything time to stabilize first
cycles++;

if (cycles > 3 && rnd.Next(5) == 0)
{
Console.WriteLine("[WORKER] simulating crashing!");
crash = true;
return;
}

if (cycles > 3 && rnd.Next(3) == 0)
{
Console.WriteLine("[WORKER] Simulating CPU overload!");
Thread.Sleep(500);
}

if (verbose)
Console.Write("[WORKER] working ...!");

// simulate high workload
Thread.Sleep(10);

if (verbose)
Console.WriteLine("[WORKER] sending {0}", msg.ToString());

// answer
e.Socket.SendMultipartMessage(msg);
// reset liveliness
liveliness = Commons.HeartbeatLiveliness;
}
else if (IsHeartbeatMessage(msg))
liveliness = Commons.HeartbeatLiveliness;
else
Console.WriteLine("[WORKER] Received invalid message!");

interval = Commons.IntervalInit;


}


/// <summary>
/// Create the DEALER socket and connect it to QUEUE backend.
/// Set the identity.
Expand All @@ -140,8 +146,7 @@ private static void Main(string[] args)
private static DealerSocket GetWorkerSocket(bool verbose, int id)
{
var worker = new DealerSocket { Options = { Identity = Encoding.UTF8.GetBytes("Worker_" + id) } };


worker.ReceiveReady += OnWorkerReceiveReady;
worker.Connect(Commons.QueueBackend);

if (verbose)
Expand Down