Skip to content

Commit

Permalink
Merge pull request #12492 from RasaHQ/ATO-668-configure-pii-anonymiza…
Browse files Browse the repository at this point in the history
…tion-for-logs

Configure PII Anonymization for logs
  • Loading branch information
Tawakalt authored Jun 14, 2023
2 parents f157831 + 2a5b47f commit 641700d
Show file tree
Hide file tree
Showing 26 changed files with 355 additions and 177 deletions.
2 changes: 2 additions & 0 deletions changelog/12492.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Update hook specifications for running the Rasa Pro anonymization pipeline.
Use `structlogs` for logs leaking PII.
37 changes: 36 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ pluggy = "^1.0.0"
slack-sdk = "^3.19.2"
confluent-kafka = ">=1.9.2,<3.0.0"
portalocker = "^2.7.0"
structlog = "^23.1.0"
structlog-sentry = "^2.0.2"

[[tool.poetry.dependencies.dask]]
version = "2022.2.0"
python = "~=3.7.0"
Expand Down
6 changes: 6 additions & 0 deletions rasa/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from rasa_sdk import __version__ as rasa_sdk_version
from rasa.constants import MINIMUM_COMPATIBLE_VERSION
from rasa.utils.log_utils import configure_structlog

import rasa.telemetry
import rasa.utils.io
Expand Down Expand Up @@ -123,6 +124,11 @@ def main() -> None:
rasa.telemetry.initialize_error_reporting()
plugin_manager().hook.init_telemetry(endpoints_file=endpoints_file)
plugin_manager().hook.init_managers(endpoints_file=endpoints_file)
plugin_manager().hook.init_anonymization_pipeline(
endpoints_file=endpoints_file
)
# configure structlog
configure_structlog(log_level)

cmdline_arguments.func(cmdline_arguments)
elif hasattr(cmdline_arguments, "version"):
Expand Down
11 changes: 8 additions & 3 deletions rasa/core/actions/forms.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Text, List, Optional, Union, Any, Dict, Set
import itertools
import logging
import structlog
import json

from rasa.core.actions import action
Expand Down Expand Up @@ -31,6 +32,7 @@
from rasa.utils.endpoints import EndpointConfig

logger = logging.getLogger(__name__)
structlogger = structlog.get_logger()


class FormAction(LoopAction):
Expand Down Expand Up @@ -254,7 +256,7 @@ async def validate_slots(
Otherwise, returns empty list since the extracted slots already have
corresponding `SlotSet` events in the tracker.
"""
logger.debug(f"Validating extracted slots: {slot_candidates}")
structlogger.debug("forms.slots.validate", slot_candidates=slot_candidates)
events: List[Union[SlotSet, Event]] = [
SlotSet(slot_name, value) for slot_name, value in slot_candidates.items()
]
Expand Down Expand Up @@ -538,7 +540,10 @@ async def _validate_if_required(
)

if needs_validation:
logger.debug(f"Validating user input '{tracker.latest_message}'.")
structlogger.debug(
"forms.validation.required",
tracker_latest_message=tracker.latest_message,
)
return await self.validate(tracker, domain, output_channel, nlg)
else:
# Needed to determine which slots to request although there are no slots
Expand Down Expand Up @@ -606,7 +611,7 @@ async def activate(
if not prefilled_slots:
logger.debug("No pre-filled required slots to validate.")
else:
logger.debug(f"Validating pre-filled required slots: {prefilled_slots}.")
structlogger.debug("forms.activate.form", prefilled_slots=prefilled_slots)

validate_name = f"validate_{self.name()}"

Expand Down
11 changes: 0 additions & 11 deletions rasa/core/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from rasa.core.channels.channel import OutputChannel, UserMessage
from rasa.core.constants import DEFAULT_REQUEST_TIMEOUT
from rasa.core.http_interpreter import RasaNLUHttpInterpreter
from rasa.plugin import plugin_manager
from rasa.shared.core.domain import Domain
from rasa.core.exceptions import AgentNotReady
from rasa.shared.constants import DEFAULT_SENDER_ID
Expand Down Expand Up @@ -220,7 +219,6 @@ async def load_agent(
generator = None
action_endpoint = None
http_interpreter = None
anonymization_pipeline = None

if endpoints:
broker = await EventBroker.create(endpoints.event_broker, loop=loop)
Expand All @@ -234,11 +232,6 @@ async def load_agent(
if endpoints.nlu:
http_interpreter = RasaNLUHttpInterpreter(endpoints.nlu)

anonymization_pipeline = plugin_manager().hook.create_anonymization_pipeline(
anonymization_rules=endpoints.anonymization_rules,
event_broker_config=endpoints.event_broker,
)

agent = Agent(
generator=generator,
tracker_store=tracker_store,
Expand All @@ -247,7 +240,6 @@ async def load_agent(
model_server=model_server,
remote_storage=remote_storage,
http_interpreter=http_interpreter,
anonymization_pipeline=anonymization_pipeline,
)

try:
Expand Down Expand Up @@ -309,7 +301,6 @@ def __init__(
model_server: Optional[EndpointConfig] = None,
remote_storage: Optional[Text] = None,
http_interpreter: Optional[RasaNLUHttpInterpreter] = None,
anonymization_pipeline: Optional[Any] = None,
):
"""Initializes an `Agent`."""
self.domain = domain
Expand All @@ -324,7 +315,6 @@ def __init__(
self._set_fingerprint(fingerprint)
self.model_server = model_server
self.remote_storage = remote_storage
self.anonymization_pipeline = anonymization_pipeline

@classmethod
def load(
Expand Down Expand Up @@ -366,7 +356,6 @@ def load_model(
action_endpoint=self.action_endpoint,
generator=self.nlg,
http_interpreter=self.http_interpreter,
anonymization_pipeline=self.anonymization_pipeline,
)
self.domain = self.processor.domain

Expand Down
11 changes: 8 additions & 3 deletions rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import json
import logging
import structlog
import threading
from asyncio import AbstractEventLoop
from typing import Any, Text, List, Optional, Union, Dict, TYPE_CHECKING
Expand All @@ -17,6 +18,7 @@
from confluent_kafka import KafkaError, Producer, Message

logger = logging.getLogger(__name__)
structlogger = structlog.get_logger()


class KafkaEventBroker(EventBroker):
Expand Down Expand Up @@ -237,9 +239,12 @@ def _publish(self, event: Dict[Text, Any]) -> None:
)
]

logger.debug(
f"Calling kafka send({self.topic}, value={event},"
f" key={partition_key!s}, headers={headers})"
structlogger.debug(
"kafka.publish.event",
topic=self.topic,
rasa_event=event,
partition_key=partition_key,
headers=headers,
)

serialized_event = json.dumps(event).encode(DEFAULT_ENCODING)
Expand Down
17 changes: 11 additions & 6 deletions rasa/core/brokers/pika.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import logging
import structlog
import os
import ssl
from asyncio import AbstractEventLoop
Expand All @@ -19,6 +20,7 @@
import rasa.shared.utils.common

logger = logging.getLogger(__name__)
structlogger = structlog.get_logger()

RABBITMQ_EXCHANGE = "rasa-exchange"
DEFAULT_QUEUE_NAME = "rasa_core_events"
Expand Down Expand Up @@ -298,14 +300,17 @@ async def _publish(
try:
await self._exchange.publish(self._message(event, headers), "")

logger.debug(
f"Published Pika events to exchange '{RABBITMQ_EXCHANGE}' on host "
f"'{self.host}':\n{event}"
structlogger.debug(
"pika.events.publish",
rabbitmq_exchange=RABBITMQ_EXCHANGE,
host=self.host,
rasa_event=event,
)
except Exception as e:
logger.error(
f"Failed to publish Pika event on host '{self.host}' due to "
f"error '{e}'. The message was: \n{event}"
structlogger.error(
"pika.events.publish.failed",
host=self.host,
rasa_event=event,
)
if self.should_keep_unpublished_messages:
self._unpublished_events.append(event)
Expand Down
23 changes: 5 additions & 18 deletions rasa/core/channels/facebook.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import hashlib
import hmac
import logging
import structlog
from fbmessenger import MessengerClient
from fbmessenger.attachments import Image
from fbmessenger.elements import Text as FBText
Expand All @@ -16,6 +17,7 @@
from sanic.response import HTTPResponse

logger = logging.getLogger(__name__)
structlogger = structlog.get_logger()


class Messenger:
Expand Down Expand Up @@ -76,7 +78,7 @@ def _is_file_message(message: Dict[Text, Any]) -> bool:

@staticmethod
def _is_user_message(message: Dict[Text, Any]) -> bool:
"""Check if the message is a message from the user"""
"""Check if the message is a message from the user."""
return (
"message" in message
and "text" in message["message"]
Expand Down Expand Up @@ -105,7 +107,6 @@ async def message(
self, message: Dict[Text, Any], metadata: Optional[Dict[Text, Any]]
) -> None:
"""Handle an incoming event from the fb webhook."""

# quick reply and user message both share 'text' attribute
# so quick reply should be checked first
if self._is_quick_reply_message(message):
Expand All @@ -125,10 +126,7 @@ async def message(
attachment = message["message"]["attachments"][0]
text = attachment["payload"]["url"]
else:
logger.warning(
"Received a message from facebook that we can not "
f"handle. Message: {message}"
)
structlogger.warning("facebook.message.handle", message=message)
return

await self._handle_user_message(text, self.get_user_id(), metadata)
Expand All @@ -137,15 +135,13 @@ async def postback(
self, message: Dict[Text, Any], metadata: Optional[Dict[Text, Any]]
) -> None:
"""Handle a postback (e.g. quick reply button)."""

text = message["postback"]["payload"]
await self._handle_user_message(text, self.get_user_id(), metadata)

async def _handle_user_message(
self, text: Text, sender_id: Text, metadata: Optional[Dict[Text, Any]]
) -> None:
"""Pass on the text to the dialogue engine for processing."""

out_channel = MessengerBot(self.client)
await out_channel.send_action(sender_id, sender_action="mark_seen")

Expand Down Expand Up @@ -179,7 +175,6 @@ def __init__(self, messenger_client: MessengerClient) -> None:

def send(self, recipient_id: Text, element: Any) -> None:
"""Sends a message to the recipient using the messenger client."""

# this is a bit hacky, but the client doesn't have a proper API to
# send messages but instead expects the incoming sender to be present
# which we don't have as it is stored in the input channel.
Expand All @@ -189,15 +184,13 @@ async def send_text_message(
self, recipient_id: Text, text: Text, **kwargs: Any
) -> None:
"""Send a message through this channel."""

for message_part in text.strip().split("\n\n"):
self.send(recipient_id, FBText(text=message_part))

async def send_image_url(
self, recipient_id: Text, image: Text, **kwargs: Any
) -> None:
"""Sends an image. Default will just post the url as a string."""

self.send(recipient_id, Image(url=image))

async def send_action(self, recipient_id: Text, sender_action: Text) -> None:
Expand All @@ -207,7 +200,6 @@ async def send_action(self, recipient_id: Text, sender_action: Text) -> None:
recipient_id: recipient
sender_action: action to send, e.g. "typing_on" or "mark_seen"
"""

self.messenger_client.send_action(
SenderAction(sender_action).to_dict(), recipient_id
)
Expand All @@ -220,7 +212,6 @@ async def send_text_with_buttons(
**kwargs: Any,
) -> None:
"""Sends buttons to the output."""

# buttons is a list of tuples: [(option_name,payload)]
if len(buttons) > 3:
rasa.shared.utils.io.raise_warning(
Expand Down Expand Up @@ -254,15 +245,13 @@ async def send_quick_replies(
**kwargs: Any,
) -> None:
"""Sends quick replies to the output."""

quick_replies = self._convert_to_quick_reply(quick_replies)
self.send(recipient_id, FBText(text=text, quick_replies=quick_replies))

async def send_elements(
self, recipient_id: Text, elements: Iterable[Dict[Text, Any]], **kwargs: Any
) -> None:
"""Sends elements to the output."""

for element in elements:
if "buttons" in element:
self._add_postback_info(element["buttons"])
Expand Down Expand Up @@ -301,8 +290,7 @@ def _add_postback_info(buttons: List[Dict[Text, Any]]) -> None:

@staticmethod
def _convert_to_quick_reply(quick_replies: List[Dict[Text, Any]]) -> QuickReplies:
"""Convert quick reply dictionary to FB QuickReplies object"""

"""Convert quick reply dictionary to FB QuickReplies object."""
fb_quick_replies = []
for quick_reply in quick_replies:
try:
Expand Down Expand Up @@ -411,7 +399,6 @@ def validate_hub_signature(
Returns:
bool: indicated that hub signature is validated
"""

# noinspection PyBroadException
try:
hash_method, hub_signature = hub_signature_header.split("=")
Expand Down
Loading

0 comments on commit 641700d

Please sign in to comment.