Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Oban plugin handler and fix error handling #195

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,16 @@ defmodule OpentelemetryOban.JobHandler do
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

def handle_job_exception(
_event,
_measurements,
%{stacktrace: stacktrace, error: error} = metadata,
_config
) do
def handle_job_exception(_event, _measurements, metadata, _config) do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)

# Record exception and mark the span as errored
Span.record_exception(ctx, error, stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
exception = Exception.normalize(metadata.kind, metadata.reason, metadata.stacktrace)
Span.record_exception(ctx, exception, metadata.stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, format_error(exception)))

OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

defp format_error(%{__exception__: true} = exception), do: Exception.message(exception)
defp format_error(error), do: inspect(error)
end
Original file line number Diff line number Diff line change
Expand Up @@ -3,64 +3,127 @@ defmodule OpentelemetryOban.PluginHandler do

@tracer_id __MODULE__

def attach() do
attach_plugin_start_handler()
attach_plugin_stop_handler()
attach_plugin_exception_handler()
end
@plugin_start [:oban, :plugin, :start]
@plugin_stop [:oban, :plugin, :stop]
@plugin_exception [:oban, :plugin, :exception]

defp attach_plugin_start_handler() do
:telemetry.attach(
"#{__MODULE__}.plugin_start",
[:oban, :plugin, :start],
&__MODULE__.handle_plugin_start/4,
[]
def attach do
:telemetry.attach_many(
[__MODULE__, :plugin],
[
@plugin_start,
@plugin_stop,
@plugin_exception
],
&__MODULE__.handle_event/4,
_config = %{}
)
end

defp attach_plugin_stop_handler() do
:telemetry.attach(
"#{__MODULE__}.plugin_stop",
[:oban, :plugin, :stop],
&__MODULE__.handle_plugin_stop/4,
[]
)
end
@doc false
def handle_event(event, measurements, metadata, config)

defp attach_plugin_exception_handler() do
:telemetry.attach(
"#{__MODULE__}.plugin_exception",
[:oban, :plugin, :exception],
&__MODULE__.handle_plugin_exception/4,
[]
)
end
def handle_event(@plugin_start, _measurements, %{plugin: plugin} = metadata, _config) do
span_name = "#{inspect(plugin)} process"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the inspect intentional? It will add quotes around the plugin name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no. This is an atom right, so there wouldn't be extra quotes.


def handle_plugin_start(_event, _measurements, %{plugin: plugin} = metadata, _config) do
OpentelemetryTelemetry.start_telemetry_span(
@tracer_id,
"#{plugin} process",
metadata,
%{}
)
attributes = %{
"messaging.oban.plugin": inspect(plugin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why do we need inspect here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This is an atom right, so there wouldn't be extra quotes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Atom.to_string() or "#{plugin}" would be more intuitive though

}

OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{
attributes: attributes
})
end

def handle_plugin_stop(_event, _measurements, metadata, _config) do
def handle_event(@plugin_stop, _measurements, metadata, _config) do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)

Span.set_attributes(ctx, plugin_work_attributes(metadata))
maybe_record_scaler_error(ctx, metadata)

OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

def handle_plugin_exception(
_event,
_measurements,
%{stacktrace: stacktrace, error: error} = metadata,
_config
) do
def handle_event(@plugin_exception, _measurements, metadata, _config) do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)

# Record exception and mark the span as errored
Span.record_exception(ctx, error, stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
exception = Exception.normalize(metadata.kind, metadata.reason, metadata.stacktrace)
Span.record_exception(ctx, exception, metadata.stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, format_error(exception)))

OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

defp plugin_work_attributes(%{plugin: Oban.Stager} = metadata) do
%{
"messaging.oban.staged_count": metadata.staged_count
}
end

defp plugin_work_attributes(%{plugin: Oban.Plugins.Gossip} = metadata) do
%{
"messaging.oban.gossip_count": metadata.gossip_count
}
end

defp plugin_work_attributes(%{plugin: Oban.Plugins.Reindexer}), do: %{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could also remove this line and let it go to the default case


defp plugin_work_attributes(%{plugin: plugin, jobs: jobs})
when plugin in [Oban.Plugins.Cron, Oban.Pro.Plugins.DynamicCron] do
%{
"messaging.oban.jobs_count": length(jobs)
}
end

defp plugin_work_attributes(%{plugin: plugin} = metadata)
when plugin in [Oban.Plugins.Lifeline, Oban.Pro.Plugins.DynamicLifeline] do
%{
"messaging.oban.rescued_jobs_count": length(metadata.rescued_jobs),
"messaging.oban.discarded_jobs_count": length(metadata.discarded_jobs)
}
end

defp plugin_work_attributes(%{plugin: plugin} = metadata)
when plugin in [Oban.Plugins.Pruner, Oban.Pro.Plugins.DynamicPruner] do
pruned_count_by_state =
Enum.reduce(metadata.pruned_jobs, %{}, fn %{state: state}, count ->
Map.update(count, state, 1, &(&1 + 1))
end)

%{
"messaging.oban.pruned_jobs_count": length(metadata.pruned_jobs),
"messaging.oban.pruned_completed_jobs_count": Map.get(pruned_count_by_state, :completed, 0),
"messaging.oban.pruned_cancelled_jobs_count": Map.get(pruned_count_by_state, :cancelled, 0),
"messaging.oban.pruned_discarded_jobs_count": Map.get(pruned_count_by_state, :discarded, 0)
}
end

defp plugin_work_attributes(%{plugin: Oban.Pro.Plugins.DynamicPrioritizer} = metadata) do
%{
"messaging.oban.reprioritized_jobs_count": metadata.reprioritized_count
}
end

defp plugin_work_attributes(%{plugin: Oban.Pro.Plugins.DynamicQueues}), do: %{}

defp plugin_work_attributes(%{plugin: Oban.Pro.Plugins.DynamicScaler} = metadata) do
{module, _opts} = metadata.cloud

%{
"messaging.oban.scaler.cloud": inspect(module),
"messaging.oban.scaler.last_rate": metadata.scaler.last_rate,
"messaging.oban.scaler.last_scaled_to": metadata.scaler.last_scaled_to
}
end

defp plugin_work_attributes(_metadata), do: %{}

defp maybe_record_scaler_error(ctx, %{error: error}) do
Span.set_status(ctx, OpenTelemetry.status(:error, format_error(error)))
end

defp maybe_record_scaler_error(_ctx, _metadata), do: :ok

defp format_error(%{__exception__: true} = exception), do: Exception.message(exception)
defp format_error(error), do: inspect(error)
end
Original file line number Diff line number Diff line change
Expand Up @@ -36,77 +36,67 @@ defmodule OpentelemetryOban.PluginHandlerTest do
:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: Elixir.Oban.Plugins.Stager}
%{plugin: Oban.Plugins.Stager}
)

:telemetry.execute(
[:oban, :plugin, :stop],
%{duration: 444},
%{plugin: Elixir.Oban.Plugins.Stager}
%{plugin: Oban.Plugins.Stager}
)

refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
refute_receive {:span, span(name: "Oban.Plugins.Stager process")}
end

test "records span on plugin execution" do
:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: Elixir.Oban.Plugins.Stager}
%{plugin: Oban.Plugins.Stager}
)

:telemetry.execute(
[:oban, :plugin, :stop],
%{duration: 444},
%{plugin: Elixir.Oban.Plugins.Stager}
%{plugin: Oban.Plugins.Stager}
)

assert_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
assert_receive {:span, span(name: "Oban.Plugins.Stager process")}
end

test "records span on plugin error" do
:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: Elixir.Oban.Plugins.Stager}
)

:telemetry.execute(
[:oban, :plugin, :exception],
%{duration: 444},
%{
plugin: Elixir.Oban.Plugins.Stager,
kind: :error,
stacktrace: [
{Some, :error, [], []}
],
error: %UndefinedFunctionError{
arity: 0,
function: :error,
message: nil,
module: Some,
reason: nil
}
}
)
try do
:telemetry.span(
[:oban, :plugin],
%{plugin: Oban.Plugins.Stager},
fn ->
raise "some error"
end
)
rescue
RuntimeError -> :ok
end

expected_status = OpenTelemetry.status(:error, "")
expected_status = OpenTelemetry.status(:error, "some error")

assert_receive {:span,
span(
name: "Elixir.Oban.Plugins.Stager process",
name: "Oban.Plugins.Stager process",
events: events,
status: ^expected_status
)}

[
event(
name: "exception",
attributes: event_attributes
)
] = :otel_events.list(events)

assert ["exception.message", "exception.stacktrace", "exception.type"] ==
Map.keys(:otel_attributes.map(event_attributes))
assert [
event(
name: "exception",
attributes: event_attributes
)
] = :otel_events.list(events)

assert %{
"exception.type" => "Elixir.RuntimeError",
"exception.message" => "some error",
"exception.stacktrace" => _
} = :otel_attributes.map(event_attributes)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ defmodule OpentelemetryObanTest do
OpentelemetryOban.insert(TestJobThatReturnsError.new(%{}))
assert %{success: 0, discard: 1} = Oban.drain_queue(queue: :events)

expected_status = OpenTelemetry.status(:error, "")
expected_status =
OpenTelemetry.status(:error, "TestJobThatReturnsError failed with {:error, :something}")

assert_receive {:span,
span(
Expand Down Expand Up @@ -195,8 +196,11 @@ defmodule OpentelemetryObanTest do
)
] = :otel_events.list(events)

assert ["exception.message", "exception.stacktrace", "exception.type"] ==
Map.keys(:otel_attributes.map(event_attributes))
assert %{
"exception.type" => "Elixir.Oban.PerformError",
"exception.message" => "TestJobThatReturnsError failed with {:error, :something}",
"exception.stacktrace" => _
} = :otel_attributes.map(event_attributes)
end

test "records spans for each retry" do
Expand All @@ -205,7 +209,8 @@ defmodule OpentelemetryObanTest do
assert %{success: 0, failure: 1, discard: 1} =
Oban.drain_queue(queue: :events, with_scheduled: true, with_recursion: true)

expected_status = OpenTelemetry.status(:error, "")
expected_status =
OpenTelemetry.status(:error, "TestJobThatReturnsError failed with {:error, :something}")

assert_receive {:span,
span(
Expand Down Expand Up @@ -241,7 +246,11 @@ defmodule OpentelemetryObanTest do
OpentelemetryOban.insert(TestJobThatThrowsException.new(%{}))
assert %{success: 0, discard: 1} = Oban.drain_queue(queue: :events)

expected_status = OpenTelemetry.status(:error, "")
expected_status =
OpenTelemetry.status(
:error,
"function Some.error/0 is undefined (module Some is not available)"
)

assert_receive {:span,
span(
Expand Down Expand Up @@ -273,8 +282,12 @@ defmodule OpentelemetryObanTest do
)
] = :otel_events.list(events)

assert ["exception.message", "exception.stacktrace", "exception.type"] ==
Map.keys(:otel_attributes.map(event_attributes))
assert %{
"exception.type" => "Elixir.UndefinedFunctionError",
"exception.message" =>
"function Some.error/0 is undefined (module Some is not available)",
"exception.stacktrace" => _
} = :otel_attributes.map(event_attributes)
end

test "spans inside the job are associated with the job trace" do
Expand Down