From c342b52ba2d8c6182304a20a0db21e83130799ef Mon Sep 17 00:00:00 2001 From: simaki Date: Tue, 8 Feb 2022 09:28:56 +0900 Subject: [PATCH] Release/0.16.0 (#482) * ENH: Add SVI model: `svi_variance` and `SVIVariance` (#406) (#410) * ENH: Add Sobol quasirandom engine (#430) (#431) (#478) * ENH: Support `BSEuropeanBinary` for put (#434) (#438) * ENH: Enabling customizing tqdm progress bar (#446) * ENH: Add antithetic sampling of randn (close #449) (#450) * DOC: Add an example of hedging variance swap using options (#426) (#435) * DOC: Add `autogreek.theta` to documentation (#429) * DOC: Add citation to Heston model (#447) * DOC: Add an example of sticky strike and sticky delta (#479) * TEST: Add tests for BSEuropean put (#439) * TEST: Add tests for identity between vega and gamma (#441) * MAINT: Update codecov action (#442) --- Makefile | 2 +- docs/source/autogreek.rst | 5 + docs/source/nn.functional.rst | 1 + docs/source/nn.rst | 1 + docs/source/stochastic.rst | 10 + examples/example_hedging_variance_swap.py | 35 +++ examples/example_svi.py | 45 ++++ pfhedge/instruments/primary/brownian.py | 2 +- pfhedge/nn/__init__.py | 1 + pfhedge/nn/functional.py | 28 +++ pfhedge/nn/modules/bs/american_binary.py | 2 - pfhedge/nn/modules/bs/european.py | 1 - pfhedge/nn/modules/bs/european_binary.py | 9 +- pfhedge/nn/modules/hedger.py | 5 +- pfhedge/nn/modules/svi.py | 57 +++++ pfhedge/stochastic/__init__.py | 2 + pfhedge/stochastic/brownian.py | 33 ++- pfhedge/stochastic/cir.py | 2 +- pfhedge/stochastic/engine.py | 63 +++++ pfhedge/stochastic/heston.py | 7 +- pfhedge/stochastic/random.py | 91 +++++++ pyproject.toml | 2 +- tests/features/test_features.py | 8 +- tests/instruments/primary/test_brownian.py | 2 +- tests/nn/modules/bs/test_american_binary.py | 13 +- tests/nn/modules/bs/test_european.py | 237 ++++++++++-------- tests/nn/modules/bs/test_european_binary.py | 252 ++++++++++---------- tests/nn/modules/bs/test_lookback.py | 11 + tests/nn/modules/test_svi.py | 28 +++ tests/stochastic/test_brownian.py | 18 +- tests/stochastic/test_cir.py | 8 +- tests/stochastic/test_randn.py | 15 ++ tests/test_examples.py | 4 + 33 files changed, 741 insertions(+), 259 deletions(-) create mode 100644 examples/example_hedging_variance_swap.py create mode 100644 examples/example_svi.py create mode 100644 pfhedge/nn/modules/svi.py create mode 100644 pfhedge/stochastic/engine.py create mode 100644 pfhedge/stochastic/random.py create mode 100644 tests/nn/modules/test_svi.py create mode 100644 tests/stochastic/test_randn.py diff --git a/Makefile b/Makefile index b54e2fd7..a083d1bf 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ pytest: .PHONY: test-cov test-cov: - $(RUN) pytest --cov=$(PROJECT_NAME) --cov-report=html + $(RUN) pytest --cov=$(PROJECT_NAME) --cov-report=xml .PHONY: lint lint: lint-black lint-isort diff --git a/docs/source/autogreek.rst b/docs/source/autogreek.rst index dd617c07..33f1fee1 100644 --- a/docs/source/autogreek.rst +++ b/docs/source/autogreek.rst @@ -22,3 +22,8 @@ Vega ---- .. autofunction:: vega + +Theta +----- + +.. autofunction:: theta diff --git a/docs/source/nn.functional.rst b/docs/source/nn.functional.rst index cb81cdaf..6dc2f560 100644 --- a/docs/source/nn.functional.rst +++ b/docs/source/nn.functional.rst @@ -48,3 +48,4 @@ Other Functions npdf d1 d2 + svi_variance diff --git a/docs/source/nn.rst b/docs/source/nn.rst index 20b4e612..8a310122 100644 --- a/docs/source/nn.rst +++ b/docs/source/nn.rst @@ -89,3 +89,4 @@ Other Modules :template: classtemplate.rst nn.Naked + nn.SVIVariance diff --git a/docs/source/stochastic.rst b/docs/source/stochastic.rst index cdab31e6..bac86b2d 100644 --- a/docs/source/stochastic.rst +++ b/docs/source/stochastic.rst @@ -33,3 +33,13 @@ Heston Process :toctree: generated generate_heston + +Generators +---------- + +.. autosummary:: + :nosignatures: + :toctree: generated + + randn_antithetic + randn_sobol_boxmuller diff --git a/examples/example_hedging_variance_swap.py b/examples/example_hedging_variance_swap.py new file mode 100644 index 00000000..55cc6e8b --- /dev/null +++ b/examples/example_hedging_variance_swap.py @@ -0,0 +1,35 @@ +import sys + +import matplotlib.pyplot as plt +import torch + +sys.path.append("..") + +from pfhedge.instruments import BrownianStock +from pfhedge.instruments import EuropeanOption +from pfhedge.nn import BlackScholes + +if __name__ == "__main__": + options_list = [] + strikes_list = [] + for call in (True, False): + for strike in torch.arange(70, 180, 10): + option = EuropeanOption(BrownianStock(), call=call, strike=strike) + options_list.append(option) + strikes_list.append(strike) + + spot = torch.linspace(50, 200, 100) + t = options_list[0].maturity + v = options_list[0].ul().sigma + + plt.figure() + total_vega = torch.zeros_like(spot) + for option, strike in zip(options_list, strikes_list): + lm = (spot / strike).log() + vega = BlackScholes(option).vega(lm, t, v) / (strike**2) + total_vega += vega + if option.call: + # 2 is for call and put + plt.plot(spot.numpy(), 2 * vega.numpy()) + plt.plot(spot.numpy(), total_vega.numpy(), color="k", lw=2) + plt.savefig("./output/options-vega.png") diff --git a/examples/example_svi.py b/examples/example_svi.py new file mode 100644 index 00000000..fd452055 --- /dev/null +++ b/examples/example_svi.py @@ -0,0 +1,45 @@ +import sys + +import matplotlib.pyplot as plt +import torch + +sys.path.append("..") + +import pfhedge.autogreek as autogreek +from pfhedge.nn import BSEuropeanOption +from pfhedge.nn.functional import svi_variance + +if __name__ == "__main__": + a, b, rho, m, sigma = 0.02, 0.10, -0.40, 0.00, 0.20 + k = torch.linspace(-0.10, 0.10, 100) + v = svi_variance(k, a=a, b=b, rho=rho, m=m, sigma=sigma) + + plt.figure() + plt.plot(k.numpy(), v.numpy()) + plt.xlabel("Log strike") + plt.xlabel("Variance") + plt.savefig("output/svi_variance.pdf") + print("Saved figure to output/svi_variance.pdf") + + bs = BSEuropeanOption() + t = torch.full_like(k, 0.1) + delta_sticky_strike = bs.delta(-k, t, v.sqrt()) + + def price_sticky_delta(log_moneyness): + v = svi_variance(-log_moneyness, a=a, b=b, rho=rho, m=m, sigma=sigma) + return bs.price(log_moneyness, t, v.sqrt()) + + log_moneyness = -k.requires_grad_() + delta_sticky_delta = autogreek.delta( + price_sticky_delta, log_moneyness=log_moneyness + ) + k.detach_() + + plt.figure() + plt.plot(-k.numpy(), delta_sticky_strike.numpy(), label="Delta for sticky strike") + plt.plot(-k.numpy(), delta_sticky_delta.numpy(), label="Delta for sticky delta") + plt.xlabel("Log strike") + plt.xlabel("Delta") + plt.legend() + plt.savefig("output/svi_delta.pdf") + print("Saved figure to output/svi_delta.pdf") diff --git a/pfhedge/instruments/primary/brownian.py b/pfhedge/instruments/primary/brownian.py index ee58f6cd..7519efbb 100644 --- a/pfhedge/instruments/primary/brownian.py +++ b/pfhedge/instruments/primary/brownian.py @@ -94,7 +94,7 @@ def variance(self) -> Tensor: It is a tensor filled with the square of ``self.sigma``. """ - return torch.full_like(self.spot, self.sigma ** 2) + return torch.full_like(self.spot, self.sigma**2) def simulate( self, diff --git a/pfhedge/nn/__init__.py b/pfhedge/nn/__init__.py index d8f8eb18..7207a80c 100644 --- a/pfhedge/nn/__init__.py +++ b/pfhedge/nn/__init__.py @@ -13,4 +13,5 @@ from .modules.loss import IsoelasticLoss from .modules.mlp import MultiLayerPerceptron from .modules.naked import Naked +from .modules.svi import SVIVariance from .modules.ww import WhalleyWilmott diff --git a/pfhedge/nn/functional.py b/pfhedge/nn/functional.py index 7adec74a..3e055112 100644 --- a/pfhedge/nn/functional.py +++ b/pfhedge/nn/functional.py @@ -565,3 +565,31 @@ def ww_width( torch.Tensor """ return (cost * (3 / 2) * gamma.square() * spot / a).pow(1 / 3) + + +def svi_variance( + input: TensorOrScalar, + a: TensorOrScalar, + b: TensorOrScalar, + rho: TensorOrScalar, + m: TensorOrScalar, + sigma: TensorOrScalar, +) -> Tensor: + r"""Returns variance in the SVI model. + + See :class:`pfhedge.nn.SVIVariance` for details. + + Args: + input (torch.Tensor or float): Log strike of the underlying asset. + That is, :math:`k = \log(K / S)` for spot :math:`S` and strike :math:`K`. + a (torch.Tensor or float): The parameter :math:`a`. + b (torch.Tensor or float): The parameter :math:`b`. + rho (torch.Tensor or float): The parameter :math:`\rho`. + m (torch.Tensor or float): The parameter :math:`m`. + sigma (torch.Tensor or float): The parameter :math:`s`. + + Returns: + torch.Tensor + """ + k_m = torch.as_tensor(input - m) # k - m + return a + b * (rho * k_m + (k_m.square() + sigma**2).sqrt()) diff --git a/pfhedge/nn/modules/bs/american_binary.py b/pfhedge/nn/modules/bs/american_binary.py index 3919a9a4..1b6d92a7 100644 --- a/pfhedge/nn/modules/bs/american_binary.py +++ b/pfhedge/nn/modules/bs/american_binary.py @@ -43,7 +43,6 @@ class BSAmericanBinaryOption(BSModuleMixin): Continuous-time models (Vol. 11). Springer Science & Business Media. Examples: - >>> from pfhedge.nn import BSAmericanBinaryOption >>> >>> m = BSAmericanBinaryOption(strike=1.0) @@ -81,7 +80,6 @@ def from_derivative(cls, derivative): BSAmericanBinaryOption Examples: - >>> from pfhedge.instruments import BrownianStock >>> from pfhedge.instruments import AmericanBinaryOption >>> diff --git a/pfhedge/nn/modules/bs/european.py b/pfhedge/nn/modules/bs/european.py index 835887bf..72c3e99f 100644 --- a/pfhedge/nn/modules/bs/european.py +++ b/pfhedge/nn/modules/bs/european.py @@ -72,7 +72,6 @@ def from_derivative(cls, derivative): BSEuropeanOption Examples: - >>> from pfhedge.instruments import BrownianStock >>> from pfhedge.instruments import EuropeanOption >>> diff --git a/pfhedge/nn/modules/bs/european_binary.py b/pfhedge/nn/modules/bs/european_binary.py index ef0d1408..a23ea453 100644 --- a/pfhedge/nn/modules/bs/european_binary.py +++ b/pfhedge/nn/modules/bs/european_binary.py @@ -59,11 +59,6 @@ class BSEuropeanBinaryOption(BSModuleMixin): """ def __init__(self, call: bool = True, strike: float = 1.0): - if not call: - raise ValueError( - f"{self.__class__.__name__} for a put option is not yet supported." - ) - super().__init__() self.call = call self.strike = strike @@ -92,6 +87,8 @@ def from_derivative(cls, derivative): def extra_repr(self) -> str: params = [] + if not self.call: + params.append("call=" + str(self.call)) params.append("strike=" + _format_float(self.strike)) return ", ".join(params) @@ -148,6 +145,8 @@ def delta( spot = s.exp() * self.strike delta = npdf(d2(s, t, v)) / (spot * v * t.sqrt()) + delta = -delta if not self.call else delta # put-call parity + return delta def gamma( diff --git a/pfhedge/nn/modules/hedger.py b/pfhedge/nn/modules/hedger.py index ba6f80c7..c11c1884 100644 --- a/pfhedge/nn/modules/hedger.py +++ b/pfhedge/nn/modules/hedger.py @@ -461,6 +461,7 @@ def fit( init_state: Optional[Tuple[TensorOrScalar, ...]] = None, verbose: bool = True, validation: bool = True, + tqdm_kwargs: dict = {}, ) -> Optional[List[float]]: """Fit the hedging model to hedge a given derivative. @@ -489,6 +490,8 @@ def fit( standard output. validation (bool, default=True): If ``False``, skip the computation of the validation loss and returns ``None``. + tqdm_kwargs (dict, default={}): Keyword argument passed to ``tqdm.__init__`` + to customize the progress bar. Returns: list[float] @@ -546,7 +549,7 @@ def compute_loss(**kwargs) -> Tensor: ) history = [] - progress = tqdm(range(n_epochs), disable=not verbose) + progress = tqdm(range(n_epochs), disable=not verbose, **tqdm_kwargs) for _ in progress: # Compute training loss and backpropagate self.train() diff --git a/pfhedge/nn/modules/svi.py b/pfhedge/nn/modules/svi.py new file mode 100644 index 00000000..594b699f --- /dev/null +++ b/pfhedge/nn/modules/svi.py @@ -0,0 +1,57 @@ +from torch import Tensor +from torch.nn import Module + +from pfhedge._utils.typing import TensorOrScalar +from pfhedge.nn.functional import svi_variance + + +class SVIVariance(Module): + r"""Returns total variance in the SVI model. + + The total variance for log strike :math:`k = \log(K / S)`, + where :math:`K` and :math:`S` are strike and spot, reads: + + .. math:: + w = a + b \left[ \rho (k - m) + \sqrt{(k - m)^2 + \sigma^2} \right] . + + References: + - Jim Gatheral and Antoine Jacquier, + Arbitrage-free SVI volatility surfaces. + [arXiv:`1204.0646 `_ [q-fin.PR]] + + Args: + a (torch.Tensor or float): The parameter :math:`a`. + b (torch.Tensor or float): The parameter :math:`b`. + rho (torch.Tensor or float): The parameter :math:`\rho`. + m (torch.Tensor or float): The parameter :math:`m`. + sigma (torch.Tensor or float): The parameter :math:`\sigma`. + + Examples: + >>> import torch + >>> + >>> a, b, rho, m, sigma = 0.03, 0.10, 0.10, 0.00, 0.10 + >>> module = SVIVariance(a, b, rho, m, sigma) + >>> input = torch.tensor([-0.10, -0.01, 0.00, 0.01, 0.10]) + >>> module(input) + tensor([0.0431, 0.0399, 0.0400, 0.0401, 0.0451]) + """ + + def __init__( + self, + a: TensorOrScalar, + b: TensorOrScalar, + rho: TensorOrScalar, + m: TensorOrScalar, + sigma: TensorOrScalar, + ) -> None: + super().__init__() + self.a = a + self.b = b + self.rho = rho + self.m = m + self.sigma = sigma + + def forward(self, input: Tensor) -> Tensor: + return svi_variance( + input, a=self.a, b=self.b, rho=self.rho, m=self.m, sigma=self.sigma + ) diff --git a/pfhedge/stochastic/__init__.py b/pfhedge/stochastic/__init__.py index ea753f7e..564f4c31 100644 --- a/pfhedge/stochastic/__init__.py +++ b/pfhedge/stochastic/__init__.py @@ -2,3 +2,5 @@ from .brownian import generate_geometric_brownian from .cir import generate_cir from .heston import generate_heston +from .random import randn_antithetic +from .random import randn_sobol_boxmuller diff --git a/pfhedge/stochastic/brownian.py b/pfhedge/stochastic/brownian.py index c1360f68..a1859151 100644 --- a/pfhedge/stochastic/brownian.py +++ b/pfhedge/stochastic/brownian.py @@ -1,3 +1,5 @@ +from typing import Any +from typing import Callable from typing import Optional from typing import Tuple from typing import Union @@ -17,14 +19,14 @@ def generate_brownian( dt: float = 1 / 250, dtype: Optional[torch.dtype] = None, device: Optional[torch.device] = None, + engine: Callable[..., Tensor] = torch.randn, ) -> Tensor: - """Returns time series following the Brownian motion. + r"""Returns time series following the Brownian motion. The time evolution of the process is given by: .. math:: - - dS(t) = \\sigma dW(t) \\,. + dS(t) = \sigma dW(t) \,. Args: n_paths (int): The number of simulated paths. @@ -33,7 +35,7 @@ def generate_brownian( the time series. This is specified by a tuple :math:`(S(0),)`. It also accepts a :class:`torch.Tensor` or a :class:`float`. - sigma (float, default=0.2): The parameter :math:`\\sigma`, + sigma (float, default=0.2): The parameter :math:`\sigma`, which stands for the volatility of the time series. dt (float, default=1/250): The intervals of the time steps. dtype (torch.dtype, optional): The desired data type of returned tensor. @@ -44,6 +46,11 @@ def generate_brownian( (see :func:`torch.set_default_tensor_type()`). ``device`` will be the CPU for CPU tensor types and the current CUDA device for CUDA tensor types. + engine (callable, default=torch.randn): The desired generator of random numbers + from a standard normal distribution. + A function call ``engine(size, dtype=None, device=None)`` + should return a tensor filled with random numbers + from a standard normal distribution. Shape: - Output: :math:`(N, T)` where @@ -54,7 +61,6 @@ def generate_brownian( torch.Tensor Examples: - >>> from pfhedge.stochastic import generate_brownian >>> >>> _ = torch.manual_seed(42) @@ -72,7 +78,8 @@ def generate_brownian( init_state = tuple(map(lambda t: t.to(dtype=dtype, device=device), init_state)) init_value = init_state[0] - randn = torch.randn((n_paths, n_steps), dtype=dtype, device=device) + # randn = torch.randn((n_paths, n_steps), dtype=dtype, device=device) + randn = engine((n_paths, n_steps), dtype=dtype, device=device) randn[:, 0] = 0.0 return sigma * randn.new_tensor(dt).sqrt() * randn.cumsum(1) + init_value @@ -85,14 +92,15 @@ def generate_geometric_brownian( dt: float = 1 / 250, dtype: Optional[torch.dtype] = None, device: Optional[torch.device] = None, + engine: Callable[..., Tensor] = torch.randn, ) -> Tensor: - """Returns time series following the geometric Brownian motion. + r"""Returns time series following the geometric Brownian motion. The time evolution of the process is given by: .. math:: - dS(t) = \\sigma S(t) dW(t) \\,. + dS(t) = \sigma S(t) dW(t) \,. Args: n_paths (int): The number of simulated paths. @@ -112,6 +120,11 @@ def generate_geometric_brownian( (see :func:`torch.set_default_tensor_type()`). ``device`` will be the CPU for CPU tensor types and the current CUDA device for CUDA tensor types. + engine (callable, default=torch.randn): The desired generator of random numbers + from a standard normal distribution. + A function call ``engine(size, dtype=None, device=None)`` + should return a tensor filled with random numbers + from a standard normal distribution. Shape: - Output: :math:`(N, T)` where @@ -122,7 +135,6 @@ def generate_geometric_brownian( torch.Tensor Examples: - >>> from pfhedge.stochastic import generate_brownian >>> >>> _ = torch.manual_seed(42) @@ -147,6 +159,7 @@ def generate_geometric_brownian( dt=dt, dtype=dtype, device=device, + engine=engine, ) t = dt * torch.arange(n_steps).to(brownian).unsqueeze(0) - return init_state[0] * (brownian - (sigma ** 2) * t / 2).exp() + return init_state[0] * (brownian - (sigma**2) * t / 2).exp() diff --git a/pfhedge/stochastic/cir.py b/pfhedge/stochastic/cir.py index 38023aa7..a93b8407 100644 --- a/pfhedge/stochastic/cir.py +++ b/pfhedge/stochastic/cir.py @@ -105,7 +105,7 @@ def generate_cir( # Compute m, s, psi: Eq(17,18) exp = (-kappa * dt).exp() m = theta + (v - theta) * exp - s2 = v * (sigma ** 2) * exp * (1 - exp) / kappa + theta * (sigma ** 2) * ( + s2 = v * (sigma**2) * exp * (1 - exp) / kappa + theta * (sigma**2) * ( (1 - exp).square() ) / (2 * kappa) psi = s2 / m.square().clamp(min=EPSILON) diff --git a/pfhedge/stochastic/engine.py b/pfhedge/stochastic/engine.py new file mode 100644 index 00000000..112f2a9f --- /dev/null +++ b/pfhedge/stochastic/engine.py @@ -0,0 +1,63 @@ +import math +from typing import Optional +from typing import Tuple + +import torch +from torch import Tensor +from torch.quasirandom import SobolEngine + + +def _box_muller(input0: Tensor, input1: Tensor) -> Tuple[Tensor, Tensor]: + EPSILON = 1e-10 + radius = (-2 * input0.clamp(min=EPSILON).log()).sqrt() + z0 = radius * (2 * math.pi * input1).cos() + z1 = radius * (2 * math.pi * input1).sin() + return z0, z1 + + +def _get_numel(size: Tuple[int, ...]): + out = 1 + for dim in size: + out *= dim + return out + + +class RandnSobolBoxMuller: + """Generator of random numbers from a standard normal distribution + using Sobol sequence and Box-Muller transformation. + + Args: + scramble (bool, optional): Setting this to ``True`` will produce + scrambled Sobol sequences. Scrambling is capable of producing + better Sobol sequences. Default: ``False``. + seed (int, optional): This is the seed for the scrambling. + The seed of the random number generator is set to this, + if specified. Otherwise, it uses a random seed. + Default: ``None``. + """ + + def __init__(self, scramble: bool = False, seed: Optional[int] = None): + self.scramble = scramble + self.seed = seed + + def __call__( + self, + *size: Tuple[int, ...], + dtype: Optional[torch.dtype] = None, + device: Optional[torch.device] = None, + ) -> Tensor: + numel = _get_numel(*size) + output = self._generate_1d(numel, dtype=dtype, device=device) + output.resize_(*size) + return output + + def _generate_1d( + self, + n: int, + dtype: Optional[torch.dtype] = None, + device: Optional[torch.device] = None, + ) -> Tensor: + engine = SobolEngine(2, scramble=self.scramble, seed=self.seed) + rand = engine.draw(n // 2 + 1).to(dtype=dtype, device=device) + z0, z1 = _box_muller(rand[:, 0], rand[:, 1]) + return torch.cat((z0, z1), dim=0)[:n] diff --git a/pfhedge/stochastic/heston.py b/pfhedge/stochastic/heston.py index 3e54ee6f..b6492e03 100644 --- a/pfhedge/stochastic/heston.py +++ b/pfhedge/stochastic/heston.py @@ -55,6 +55,9 @@ def generate_heston( Time-series is generated by Andersen's QE-M method (See Reference for details). References: + - Heston, S.L., 1993. A closed-form solution for options with stochastic volatility + with applications to bond and currency options. + The review of financial studies, 6(2), pp.327-343. - Andersen, Leif B.G., Efficient Simulation of the Heston Stochastic Volatility Model (January 23, 2007). Available at SSRN: https://ssrn.com/abstract=946405 or http://dx.doi.org/10.2139/ssrn.946404 @@ -132,8 +135,8 @@ def generate_heston( k0 = -rho * kappa * theta * dt / sigma k1 = GAMMA1 * dt * (kappa * rho / sigma - 0.5) - rho / sigma k2 = GAMMA2 * dt * (kappa * rho / sigma - 0.5) + rho / sigma - k3 = GAMMA1 * dt * (1 - rho ** 2) - k4 = GAMMA2 * dt * (1 - rho ** 2) + k3 = GAMMA1 * dt * (1 - rho**2) + k4 = GAMMA2 * dt * (1 - rho**2) v0 = variance[:, i_step] v1 = variance[:, i_step + 1] log_spot[:, i_step + 1] = ( diff --git a/pfhedge/stochastic/random.py b/pfhedge/stochastic/random.py new file mode 100644 index 00000000..3bee1fbc --- /dev/null +++ b/pfhedge/stochastic/random.py @@ -0,0 +1,91 @@ +import torch + +from .engine import RandnSobolBoxMuller + + +def randn_antithetic(*size, dtype=None, device=None, dim=0, shuffle=True): + """Returns a tensor filled with random numbers obtained by an antithetic sampling. + + The output should be a normal distribution with mean 0 and variance 1 + (also called the standard normal distribution). + + Parameters: + size (``int``...): a sequence of integers defining the shape of the output tensor. + Can be a variable number of arguments or a collection like a list or tuple. + dtype (torch.dtype, optional): The desired data type of returned tensor. + Default: If ``None``, uses a global default + (see :func:`torch.set_default_tensor_type()`). + device (torch.device, optional): The desired device of returned tensor. + Default: If ``None``, uses the current device for the default tensor type + (see :func:`torch.set_default_tensor_type()`). + ``device`` will be the CPU for CPU tensor types and the current CUDA device + for CUDA tensor types. + + Returns: + torch.Tensor + + Examples: + >>> from pfhedge.stochastic import randn_antithetic + >>> + >>> _ = torch.manual_seed(42) + >>> output = randn_antithetic(4, 3) + >>> output + tensor([[-0.3367, -0.1288, -0.2345], + [ 0.2303, -1.1229, -0.1863], + [-0.2303, 1.1229, 0.1863], + [ 0.3367, 0.1288, 0.2345]]) + >>> output.mean(dim=0).allclose(torch.zeros(3), atol=1e-07, rtol=0.0) + True + """ + if dim != 0: + raise ValueError("dim != 0 is not supported.") + + size = list(size) + size_half = [-(-size[0] // 2)] + size[1:] + randn = torch.randn(*size_half, dtype=dtype, device=device) + + output = torch.cat((randn, -randn), dim=0) + + if shuffle: + output = output[torch.randperm(output.size(dim))] + + output = output[: size[0]] + + return output + + +def randn_sobol_boxmuller(*size, dtype=None, device=None, scramble=True, seed=None): + """Returns a tensor filled with random numbers obtained by a Sobol sequence + applied with the Box-Muller transformation. + + The outputs should be normal distribution with mean 0 and variance 1 + (also called the standard normal distribution). + + Parameters: + size (``int``...): a sequence of integers defining the shape of the output tensor. + Can be a variable number of arguments or a collection like a list or tuple. + dtype (torch.dtype, optional): The desired data type of returned tensor. + Default: If ``None``, uses a global default + (see :func:`torch.set_default_tensor_type()`). + device (torch.device, optional): The desired device of returned tensor. + Default: If ``None``, uses the current device for the default tensor type + (see :func:`torch.set_default_tensor_type()`). + ``device`` will be the CPU for CPU tensor types and the current CUDA device + for CUDA tensor types. + + Returns: + torch.Tensor + + Examples: + >>> from pfhedge.stochastic import randn_sobol_boxmuller + >>> + >>> _ = torch.manual_seed(42) + >>> output = randn_sobol_boxmuller((4, 3)) + >>> output + tensor([[ 0.0559, 0.4954, -0.8578], + [-0.7492, -1.0370, -0.4778], + [ 0.1651, 0.0430, -2.0368], + [ 1.1309, -0.1779, 0.0796]]) + """ + engine = RandnSobolBoxMuller(scramble=scramble, seed=seed) + return engine(*size, dtype=dtype, device=device) diff --git a/pyproject.toml b/pyproject.toml index b4e657a3..38d7b2e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pfhedge" -version = "0.15.0" +version = "0.16.0" description = "Deep Hedging in PyTorch" authors = ["Shota Imaki "] license = "MIT" diff --git a/tests/features/test_features.py b/tests/features/test_features.py index f0559859..bce70467 100644 --- a/tests/features/test_features.py +++ b/tests/features/test_features.py @@ -281,22 +281,22 @@ def test_constant_volatility(self, sigma): f = Variance().of(derivative) result = f[0] - expect = torch.full((2, 1), sigma ** 2) + expect = torch.full((2, 1), sigma**2) expect = expect.unsqueeze(-1) assert_close(result, expect, check_stride=False) result = f[1] - expect = torch.full((2, 1), sigma ** 2) + expect = torch.full((2, 1), sigma**2) expect = expect.unsqueeze(-1) assert_close(result, expect, check_stride=False) result = f[2] - expect = torch.full((2, 1), sigma ** 2) + expect = torch.full((2, 1), sigma**2) expect = expect.unsqueeze(-1) assert_close(result, expect, check_stride=False) result = f[None] - expect = torch.full((2, 3), sigma ** 2) + expect = torch.full((2, 3), sigma**2) expect = expect.unsqueeze(-1) assert_close(result, expect, check_stride=False) diff --git a/tests/instruments/primary/test_brownian.py b/tests/instruments/primary/test_brownian.py index 53fca02a..167b2318 100644 --- a/tests/instruments/primary/test_brownian.py +++ b/tests/instruments/primary/test_brownian.py @@ -182,7 +182,7 @@ def test_volatility(self, sigma): assert_close(result, expect) result = s.variance - expect = torch.full_like(s.spot, sigma ** 2) + expect = torch.full_like(s.spot, sigma**2) assert_close(result, expect) def test_init_device(self): diff --git a/tests/nn/modules/bs/test_american_binary.py b/tests/nn/modules/bs/test_american_binary.py index 941fc307..ad25e78c 100644 --- a/tests/nn/modules/bs/test_american_binary.py +++ b/tests/nn/modules/bs/test_american_binary.py @@ -177,7 +177,7 @@ def test_check_price_monte_carlo(self): k = 1.01 d = AmericanBinaryOption(BrownianStock(), strike=k) m = BSAmericanBinaryOption.from_derivative(d) - d.simulate(n_paths=int(1e6), init_state=(1.0,)) + d.simulate(n_paths=int(1e5), init_state=(1.0,)) k_shift = k * torch.tensor(beta * d.ul().sigma * sqrt(d.ul().dt)).exp() @@ -187,6 +187,17 @@ def test_check_price_monte_carlo(self): expect = d.payoff().mean(0, keepdim=True) assert_close(result, expect, rtol=1e-3, atol=0.0) + def test_vega_and_gamma(self): + m = BSAmericanBinaryOption() + # vega = spot^2 * sigma * (T - t) * gamma + # See Chapter 5 Appendix A, Bergomi "Stochastic volatility modeling" + spot = torch.tensor([0.90, 0.94, 0.98]) + t = torch.tensor([0.1, 0.2, 0.3]) + v = torch.tensor([0.1, 0.2, 0.3]) + vega = m.vega(spot.log(), spot.log(), t, v) + gamma = m.gamma(spot.log(), spot.log(), t, v) + assert_close(vega, spot.square() * v * t * gamma, atol=1e-3, rtol=0) + def test_features(self): m = BSAmericanBinaryOption() assert m.inputs() == [ diff --git a/tests/nn/modules/bs/test_european.py b/tests/nn/modules/bs/test_european.py index 5023d1ea..26575c47 100644 --- a/tests/nn/modules/bs/test_european.py +++ b/tests/nn/modules/bs/test_european.py @@ -28,116 +28,150 @@ def test_features(self): assert m.inputs() == ["log_moneyness", "time_to_maturity", "volatility"] _ = [get_feature(f) for f in m.inputs()] - def test_check_delta(self): - # TODO(simaki): Check for put option - - m = BSEuropeanOption() - - # delta = 0 for spot --> +0 - result = compute_delta(m, torch.tensor([[-10.0, 1.0, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - # delta = 1 for spot --> +inf - result = compute_delta(m, torch.tensor([[10.0, 1.0, 0.2]])) - expect = torch.tensor([1.0]) - assert_close(result, expect) - - # delta = 0 for spot / k < 1 and time --> +0 - result = compute_delta(m, torch.tensor([[-0.01, 1e-10, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - # delta = 1 for spot / k > 1 and time --> +0 - result = compute_delta(m, torch.tensor([[0.01, 1e-10, 0.2]])) - expect = torch.tensor([1.0]) - assert_close(result, expect) - - # delta = 0 for spot / k < 1 and volatility --> +0 - result = compute_delta(m, torch.tensor([[-0.01, 1.0, 1e-10]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - # delta = 0 for spot / k > 1 and volatility --> +0 - result = compute_delta(m, torch.tensor([[0.01, 1.0, 1e-10]])) - expect = torch.tensor([1.0]) - assert_close(result, expect) - - def test_check_gamma(self): - # TODO(simaki): Check for put option - - m = BSEuropeanOption() + def test_delta_limit(self): + EPSILON = 1e-10 + c = BSEuropeanOption() + p = BSEuropeanOption(call=False) + + # delta = 0 (call), -1 (put) for spot --> +0 + result = compute_delta(c, torch.tensor([[-10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_delta(p, torch.tensor([[-10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([-1.0])) + + # delta = 1 (call), 0 (put) for spot --> +inf + result = compute_delta(c, torch.tensor([[10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([1.0])) + result = compute_delta(p, torch.tensor([[10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + + # delta = 0 (call), -1 (put) for spot < k and time --> +0 + result = compute_delta(c, torch.tensor([[-0.01, EPSILON, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_delta(p, torch.tensor([[-0.01, EPSILON, 0.2]])) + assert_close(result, torch.tensor([-1.0])) + + # delta = 1 (call), 0 (put) for spot > k and time --> +0 + result = compute_delta(c, torch.tensor([[0.01, EPSILON, 0.2]])) + assert_close(result, torch.tensor([1.0])) + result = compute_delta(p, torch.tensor([[0.01, EPSILON, 0.2]])) + assert_close(result, torch.tensor([0.0])) + + # delta = 0 (call), -1 (put) for spot < k and volatility --> +0 + result = compute_delta(c, torch.tensor([[-0.01, 1.0, EPSILON]])) + assert_close(result, torch.tensor([0.0])) + result = compute_delta(p, torch.tensor([[-0.01, 1.0, EPSILON]])) + assert_close(result, torch.tensor([-1.0])) + + # delta = 1 (call), 0 (put) for spot > k and volatility --> +0 + result = compute_delta(c, torch.tensor([[0.01, 1.0, EPSILON]])) + assert_close(result, torch.tensor([1.0])) + result = compute_delta(p, torch.tensor([[0.01, 1.0, EPSILON]])) + assert_close(result, torch.tensor([0.0])) + + def test_gamma_limit(self): + EPSILON = 1e-10 + c = BSEuropeanOption() + p = BSEuropeanOption(call=False) # gamma = 0 for spot --> +0 - result = compute_gamma(m, torch.tensor([[-10.0, 1.0, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_gamma(c, torch.tensor([[-10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[-10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) - # gamma = 1 for spot --> +inf - result = compute_gamma(m, torch.tensor([[10.0, 1.0, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + # gamma = 0 for spot --> +inf + result = compute_gamma(c, torch.tensor([[10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) # gamma = 0 for spot / k < 1 and time --> +0 - result = compute_gamma(m, torch.tensor([[-0.01, 1e-10, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_gamma(c, torch.tensor([[-0.01, EPSILON, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[-0.01, EPSILON, 0.2]])) + assert_close(result, torch.tensor([0.0])) # gamma = 0 for spot / k > 1 and time --> +0 - result = compute_gamma(m, torch.tensor([[0.01, 1e-10, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_gamma(c, torch.tensor([[0.01, EPSILON, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[0.01, EPSILON, 0.2]])) + assert_close(result, torch.tensor([0.0])) # gamma = 0 for spot / k < 1 and volatility --> +0 - result = compute_gamma(m, torch.tensor([[-0.01, 1.0, 1e-10]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_gamma(c, torch.tensor([[-0.01, 1.0, EPSILON]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[-0.01, 1.0, EPSILON]])) + assert_close(result, torch.tensor([0.0])) # gamma = 0 for spot / k > 1 and volatility --> +0 - result = compute_gamma(m, torch.tensor([[0.01, 1.0, 1e-10]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - def test_check_price(self): - # TODO(simaki): Check for put option - - m = BSEuropeanOption() - - # price = 0 for spot --> +0 - result = compute_price(m, torch.tensor([[-10.0, 1.0, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - # price = spot - k for spot --> +inf - result = compute_price(m, torch.tensor([[10.0, 1.0, 0.2]])) - expect = torch.tensor([torch.tensor([10.0]).exp() - 1.0]) - assert_close(result, expect) - - # price = 0 for spot / k < 1 and time --> +0 - result = compute_price(m, torch.tensor([[-0.01, 1e-10, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - # price = spot - k for spot / k > 1 and time --> +0 - result = compute_price(m, torch.tensor([[0.01, 1e-10, 0.2]])) - expect = torch.tensor([torch.tensor(0.01).exp() - 1.0]) - assert_close(result, expect) - - # price = 0 for spot / k < 1 and volatility --> +0 - result = compute_price(m, torch.tensor([[-0.01, 1.0, 1e-10]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - # price = spot - k for spot / k > 1 and volatility --> +0 - result = compute_price(m, torch.tensor([[0.01, 1.0, 1e-10]])) - expect = torch.tensor([torch.tensor(0.01).exp() - 1.0]) - assert_close(result, expect) - - def test_check_price_monte_carlo(self): + result = compute_gamma(c, torch.tensor([[0.01, 1.0, EPSILON]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[0.01, 1.0, EPSILON]])) + assert_close(result, torch.tensor([0.0])) + + def test_price_limit(self): + EPSILON = 1e-10 + c = BSEuropeanOption() + p = BSEuropeanOption(call=False) + + # price = 0 (call), k - spot (put) for spot --> +0 + s = torch.tensor([-10.0]).exp() + result = compute_price(c, torch.tensor([[s.log(), 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_price(p, torch.tensor([[s.log(), 1.0, 0.2]])) + assert_close(result, torch.tensor([1.0 - s])) + + # price = spot - k (call), 0 (put) for spot --> +inf + s = torch.tensor([10.0]).exp() + result = compute_price(c, torch.tensor([[s.log(), 1.0, 0.2]])) + assert_close(result, torch.tensor([s - 1.0])) + result = compute_price(p, torch.tensor([[s.log(), 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + + # price = 0 (call), k - s (put) for spot < k and time --> +0 + s = torch.tensor([-0.01]).exp() + result = compute_price(c, torch.tensor([[s.log(), EPSILON, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_price(p, torch.tensor([[s.log(), EPSILON, 0.2]])) + assert_close(result, torch.tensor([1.0 - s])) + + # price = spot - k (call), 0 (put) for spot > k and time --> +0 + s = torch.tensor([0.01]).exp() + result = compute_price(c, torch.tensor([[s.log(), EPSILON, 0.2]])) + assert_close(result, torch.tensor([s - 1.0])) + result = compute_price(p, torch.tensor([[s.log(), EPSILON, 0.2]])) + assert_close(result, torch.tensor([0.0])) + + # price = 0 (call), k - spot (put) for spot < k and volatility --> +0 + s = torch.tensor([-0.01]).exp() + result = compute_price(c, torch.tensor([[s.log(), 1.0, EPSILON]])) + assert_close(result, torch.tensor([0.0])) + result = compute_price(p, torch.tensor([[s.log(), 1.0, EPSILON]])) + assert_close(result, torch.tensor([1.0 - s])) + + # price = spot - k (call), 0 (put) for spot > k and volatility --> +0 + s = torch.tensor([0.01]).exp() + result = compute_price(c, torch.tensor([[s.log(), 1.0, EPSILON]])) + assert_close(result, torch.tensor([s - 1.0])) + result = compute_price(p, torch.tensor([[s.log(), 1.0, EPSILON]])) + assert_close(result, torch.tensor([0.0])) + + def test_price_monte_carlo(self): + d = EuropeanOption(BrownianStock()) + m = BSEuropeanOption.from_derivative(d) torch.manual_seed(42) + d.simulate(n_paths=int(1e6)) - d = EuropeanOption(BrownianStock()) + input = torch.tensor([[0.0, d.maturity, d.ul().sigma]]) + result = compute_price(m, input) + expect = d.payoff().mean(0, keepdim=True) + print(result, expect) + assert_close(result, expect, rtol=1e-2, atol=0.0) + + d = EuropeanOption(BrownianStock(), call=False) m = BSEuropeanOption.from_derivative(d) + torch.manual_seed(42) d.simulate(n_paths=int(1e6)) input = torch.tensor([[0.0, d.maturity, d.ul().sigma]]) @@ -217,6 +251,17 @@ def test_vega(self): expect = torch.tensor([0.1261, 0.1782, 0.2182]) assert_close(result, expect, atol=1e-3, rtol=0) + def test_vega_and_gamma(self): + m = BSEuropeanOption() + # vega = spot^2 * sigma * (T - t) * gamma + # See Chapter 5 Appendix A, Bergomi "Stochastic volatility modeling" + spot = torch.tensor([0.9, 1.0, 1.1]) + t = torch.tensor([0.1, 0.2, 0.3]) + v = torch.tensor([0.1, 0.2, 0.3]) + vega = m.vega(spot.log(), t, v) + gamma = m.gamma(spot.log(), t, v) + assert_close(vega, spot.square() * v * t * gamma, atol=1e-3, rtol=0) + def test_theta(self): input = torch.tensor([[0.0, 0.1, 0.2], [0.0, 0.2, 0.2], [0.0, 0.3, 0.2]]) m = BSEuropeanOption(strike=100) diff --git a/tests/nn/modules/bs/test_european_binary.py b/tests/nn/modules/bs/test_european_binary.py index 6440d202..f822195d 100644 --- a/tests/nn/modules/bs/test_european_binary.py +++ b/tests/nn/modules/bs/test_european_binary.py @@ -26,123 +26,145 @@ def test_repr(self): m = BSEuropeanBinaryOption.from_derivative(derivative) assert repr(m) == "BSEuropeanBinaryOption(strike=1.1000)" - with pytest.raises(ValueError): - # not yet supported - derivative = EuropeanBinaryOption(BrownianStock(), strike=1.1, call=False) - m = BSEuropeanBinaryOption.from_derivative(derivative) - assert repr(m) == "BSEuropeanBinaryOption(call=False, strike=1.1)" - - def test_error_put(self): - with pytest.raises(ValueError): - # not yet supported - derivative = EuropeanBinaryOption(BrownianStock(), call=False) - BSEuropeanBinaryOption.from_derivative(derivative) + derivative = EuropeanBinaryOption(BrownianStock(), strike=1.1, call=False) + m = BSEuropeanBinaryOption.from_derivative(derivative) + assert repr(m) == "BSEuropeanBinaryOption(call=False, strike=1.1000)" def test_features(self): m = BSEuropeanBinaryOption() assert m.inputs() == ["log_moneyness", "time_to_maturity", "volatility"] _ = [get_feature(f) for f in m.inputs()] - def test_check_delta(self): - m = BSEuropeanBinaryOption() + def test_delta_limit(self): + c = BSEuropeanBinaryOption() + p = BSEuropeanBinaryOption(call=False) # delta = 0 for spot --> +0 - result = compute_delta(m, torch.tensor([[-10.0, 1.0, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_delta(c, torch.tensor([[-10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_delta(p, torch.tensor([[-10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) # delta = 0 for spot --> +inf - result = compute_delta(m, torch.tensor([[10.0, 1.0, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_delta(c, torch.tensor([[10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_delta(p, torch.tensor([[10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) # delta = 0 for time --> +0 - result = compute_delta(m, torch.tensor([[-0.01, 1e-10, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_delta(c, torch.tensor([[-0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_delta(p, torch.tensor([[-0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([0.0])) - result = compute_delta(m, torch.tensor([[0.01, 1e-10, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_delta(c, torch.tensor([[0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_delta(p, torch.tensor([[0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([0.0])) # delta = 0 for volatility --> +0 - result = compute_delta(m, torch.tensor([[-0.01, 1.0, 1e-10]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_delta(c, torch.tensor([[-0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([0.0])) + result = compute_delta(p, torch.tensor([[-0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([0.0])) - result = compute_delta(m, torch.tensor([[0.01, 1.0, 1e-10]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_delta(c, torch.tensor([[0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([0.0])) + result = compute_delta(p, torch.tensor([[0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([0.0])) - def test_check_gamma(self): - m = BSEuropeanBinaryOption() + def test_gamma_limit(self): + c = BSEuropeanBinaryOption() + p = BSEuropeanBinaryOption(call=False) # gamma = 0 for spot --> +0 - result = compute_gamma(m, torch.tensor([[-10.0, 1.0, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_gamma(c, torch.tensor([[-10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[-10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) # gamma = 0 for spot --> +inf - result = compute_gamma(m, torch.tensor([[10.0, 1.0, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_gamma(c, torch.tensor([[10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) # gamma = 0 for time --> +0 - result = compute_gamma(m, torch.tensor([[-0.01, 1e-10, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_gamma(c, torch.tensor([[-0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[-0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([0.0])) - result = compute_gamma(m, torch.tensor([[0.01, 1e-10, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) + result = compute_gamma(c, torch.tensor([[0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([0.0])) # gamma = 0 for volatility --> +0 - result = compute_gamma(m, torch.tensor([[-0.01, 1.0, 1e-10]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - result = compute_gamma(m, torch.tensor([[0.01, 1.0, 1e-10]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - def test_check_price(self): - m = BSEuropeanBinaryOption() - - # price = 0 for spot --> +0 - result = compute_price(m, torch.tensor([[-10.0, 1.0, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - # price = 1 for spot --> +inf - result = compute_price(m, torch.tensor([[10.0, 1.0, 0.2]])) - expect = torch.tensor([1.0]) - assert_close(result, expect) - - # price = 0 for spot < strike and time --> +0 - result = compute_price(m, torch.tensor([[-0.01, 1e-10, 0.2]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - # price = 1 for spot > strike and time --> +0 - result = compute_price(m, torch.tensor([[0.01, 1e-10, 0.2]])) - expect = torch.tensor([1.0]) - assert_close(result, expect) - - # price = 0 for spot < strike and volatility --> +0 - result = compute_price(m, torch.tensor([[-0.01, 1.0, 1e-10]])) - expect = torch.tensor([0.0]) - assert_close(result, expect) - - # price = 0 for spot > strike and volatility --> +0 - result = compute_price(m, torch.tensor([[0.01, 1.0, 1e-10]])) - expect = torch.tensor([1.0]) - assert_close(result, expect) - - def test_check_price_monte_carlo(self): + result = compute_gamma(c, torch.tensor([[-0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[-0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([0.0])) + + result = compute_gamma(c, torch.tensor([[0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([0.0])) + result = compute_gamma(p, torch.tensor([[0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([0.0])) + + def test_price_limit(self): + c = BSEuropeanBinaryOption() + p = BSEuropeanBinaryOption(call=False) + + # price = 0 (call), 1 (put) for spot --> +0 + result = compute_price(c, torch.tensor([[-10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_price(p, torch.tensor([[-10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([1.0])) + + # price = 1 (call), 1 (put) for spot --> +inf + result = compute_price(c, torch.tensor([[10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([1.0])) + result = compute_price(p, torch.tensor([[10.0, 1.0, 0.2]])) + assert_close(result, torch.tensor([0.0])) + + # price = 0 (call), 1 (put) for spot < strike and time --> +0 + result = compute_price(c, torch.tensor([[-0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([0.0])) + result = compute_price(p, torch.tensor([[-0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([1.0])) + + # price = 1 (call), 0 (put) for spot > strike and time --> +0 + result = compute_price(c, torch.tensor([[0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([1.0])) + result = compute_price(p, torch.tensor([[0.01, 1e-10, 0.2]])) + assert_close(result, torch.tensor([0.0])) + + # price = 0 (call), 1 (put) for spot < strike and volatility --> +0 + result = compute_price(c, torch.tensor([[-0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([0.0])) + result = compute_price(p, torch.tensor([[-0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([1.0])) + + # price = 0 (call), 1 (put) for spot > strike and volatility --> +0 + result = compute_price(c, torch.tensor([[0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([1.0])) + result = compute_price(p, torch.tensor([[0.01, 1.0, 1e-10]])) + assert_close(result, torch.tensor([0.0])) + + def test_price_monte_carlo(self): + d = EuropeanBinaryOption(BrownianStock()) + m = BSEuropeanBinaryOption.from_derivative(d) torch.manual_seed(42) + d.simulate(n_paths=int(1e6)) - d = EuropeanBinaryOption(BrownianStock()) + input = torch.tensor([[0.0, d.maturity, d.ul().sigma]]) + result = compute_price(m, input) + expect = d.payoff().mean(0, keepdim=True) + assert_close(result, expect, rtol=1e-2, atol=0.0) + + d = EuropeanBinaryOption(BrownianStock(), call=False) m = BSEuropeanBinaryOption.from_derivative(d) + torch.manual_seed(42) d.simulate(n_paths=int(1e6)) input = torch.tensor([[0.0, d.maturity, d.ul().sigma]]) @@ -163,38 +185,28 @@ def test_delta(self): expect = torch.tensor(6.3047) assert_close(result, expect, atol=1e-4, rtol=1e-4) - with pytest.raises(ValueError): - # not yet supported - m = BSEuropeanBinaryOption(call=False) - result = m.gamma(torch.tensor(0.0), torch.tensor(0.1), torch.tensor(0.2)) - # expect = torch.tensor(-6.30468) - # assert_close(result, expect, atol=1e-4, rtol=1e-4) - def test_gamma(self): m = BSEuropeanBinaryOption() result = m.gamma(torch.tensor(0.01), torch.tensor(1.0), torch.tensor(0.2)) expect = torch.tensor(-1.4645787477493286) assert_close(result, expect) - with pytest.raises(ValueError): - # not yet supported - m = BSEuropeanBinaryOption(call=False) - result = m.gamma(torch.tensor(0.0), torch.tensor(1.0), torch.tensor(0.2)) - # expect = torch.tensor(-6.30468) - # assert_close(result, expect, atol=1e-4, rtol=1e-4) - def test_vega(self): m = BSEuropeanBinaryOption() result = m.vega(torch.tensor(0.0), torch.tensor(0.1), torch.tensor(0.2)) expect = torch.tensor(-0.06305) assert_close(result, expect, atol=1e-4, rtol=1e-4) - with pytest.raises(ValueError): - # not yet supported - m = BSEuropeanBinaryOption(call=False) - result = m.vega(torch.tensor(0.0), torch.tensor(0.1), torch.tensor(0.2)) - # expect = torch.tensor(0.06305) - # assert_close(result, expect, atol=1e-4, rtol=1e-4) + def test_vega_and_gamma(self): + m = BSEuropeanBinaryOption() + # vega = spot^2 * sigma * (T - t) * gamma + # See Chapter 5 Appendix A, Bergomi "Stochastic volatility modeling" + spot = torch.tensor([0.9, 1.0, 1.1]) + t = torch.tensor([0.1, 0.2, 0.3]) + v = torch.tensor([0.1, 0.2, 0.3]) + vega = m.vega(spot.log(), t, v) + gamma = m.gamma(spot.log(), t, v) + assert_close(vega, spot.square() * v * t * gamma, atol=1e-3, rtol=0) def test_theta(self): m = BSEuropeanBinaryOption() @@ -202,17 +214,10 @@ def test_theta(self): expect = torch.tensor(0.0630) assert_close(result, expect, atol=1e-4, rtol=1e-4) - with pytest.raises(ValueError): - # not yet supported - m = BSEuropeanBinaryOption(call=False) - result = m.theta(torch.tensor(0.0), torch.tensor(0.1), torch.tensor(0.2)) - # expect = torch.tensor(-0.0630) - # assert_close(result, expect, atol=1e-4, rtol=1e-4) - def test_price(self): m = BSEuropeanBinaryOption() - result = m.price(0.0, 0.1, 0.2) + result = m.price(torch.tensor(0.0), 0.1, 0.2) expect = torch.tensor(0.4874) assert_close(result, expect, atol=1e-4, rtol=1e-4) @@ -221,16 +226,15 @@ def test_price(self): assert_close(result, expect, atol=1e-4, rtol=1e-4) def test_implied_volatility(self): - # log_moneyness, time_to_maturity, price - input = torch.tensor( - [[-0.01, 0.1, 0.40], [-0.01, 0.1, 0.41], [-0.01, 0.1, 0.42]] - ) + lm = torch.full((3,), -0.01) + t = torch.full((3,), 0.1) + price = torch.tensor([0.40, 0.41, 0.42]) + m = BSEuropeanBinaryOption() - iv = m.implied_volatility(input[:, 0], input[:, 1], input[:, 2]) + iv = m.implied_volatility(lm, t, price) - result = BSEuropeanBinaryOption().price(input[:, 0], input[:, 1], iv) - expect = input[:, -1] - assert_close(result, expect, check_stride=False) + result = BSEuropeanBinaryOption().price(lm, t, iv) + assert_close(result, price, check_stride=False) def test_example(self): from pfhedge.instruments import BrownianStock diff --git a/tests/nn/modules/bs/test_lookback.py b/tests/nn/modules/bs/test_lookback.py index 95c1bf5f..61690824 100644 --- a/tests/nn/modules/bs/test_lookback.py +++ b/tests/nn/modules/bs/test_lookback.py @@ -251,6 +251,17 @@ def test_implied_volatility(self): expect = input[:, -1] assert_close(result, expect, atol=1e-4, rtol=1e-4, check_stride=False) + def test_vega_and_gamma(self): + m = BSLookbackOption() + # vega = spot^2 * sigma * (T - t) * gamma + # See Chapter 5 Appendix A, Bergomi "Stochastic volatility modeling" + spot = torch.tensor([0.90, 0.94, 0.98]) + t = torch.tensor([0.1, 0.2, 0.3]) + v = torch.tensor([0.1, 0.2, 0.3]) + vega = m.vega(spot.log(), spot.log(), t, v) + gamma = m.gamma(spot.log(), spot.log(), t, v) + assert_close(vega, spot.square() * v * t * gamma, atol=1e-3, rtol=0) + def test_put_notimplemented(self): with pytest.raises(ValueError): # not yet supported diff --git a/tests/nn/modules/test_svi.py b/tests/nn/modules/test_svi.py new file mode 100644 index 00000000..43a472d3 --- /dev/null +++ b/tests/nn/modules/test_svi.py @@ -0,0 +1,28 @@ +import torch +from torch.nn.functional import relu +from torch.testing import assert_allclose + +from pfhedge.nn import SVIVariance + + +def test_svi(): + input = torch.linspace(-1.0, 1.0, 10) + + # for b = 0, output = a + m = SVIVariance(a=1.0, b=0, rho=0.1, m=0.2, sigma=0.3) + result = m(input) + expect = torch.full_like(result, m.a) + assert_allclose(result, expect) + + # for sigma = 0 and rho = -1, output = a + 2 b relu(x) + m = SVIVariance(a=1.0, b=0, rho=-1, m=0.0, sigma=0.3) + result = m(input) + expect = m.a + 2 * m.b * relu(input - m.m) + assert_allclose(result, expect) + + # m translates + m0 = SVIVariance(a=0.1, b=0.2, rho=0.3, m=0.0, sigma=0.5) + m1 = SVIVariance(a=0.1, b=0.2, rho=0.3, m=0.4, sigma=0.5) + result = m0(input) + expect = m1(input + m1.m) + assert_allclose(result, expect) diff --git a/tests/stochastic/test_brownian.py b/tests/stochastic/test_brownian.py index 6fb3b694..7f312227 100644 --- a/tests/stochastic/test_brownian.py +++ b/tests/stochastic/test_brownian.py @@ -5,6 +5,7 @@ from pfhedge.stochastic import generate_brownian from pfhedge.stochastic import generate_geometric_brownian +from pfhedge.stochastic.engine import RandnSobolBoxMuller def test_generate_brownian_mean(): @@ -52,6 +53,19 @@ def test_generate_brownian_mean(): assert_close(result, expect, atol=3 * std, rtol=0) +def test_generate_brownian_sobol_mean(): + n_paths = 10000 + n_steps = 250 + + engine = RandnSobolBoxMuller(seed=42, scramble=True) + output = generate_brownian(n_paths, n_steps, engine=engine) + assert output.size() == torch.Size((n_paths, n_steps)) + result = output[:, -1].mean() + expect = torch.zeros_like(result) + std = 0.2 * sqrt(1 / n_paths) + assert_close(result, expect, atol=3 * std, rtol=0) + + def test_generate_brownian_dtype(): torch.manual_seed(42) @@ -90,7 +104,3 @@ def test_generate_geometric_brownian_dtype(): output = generate_geometric_brownian(1, 1, dtype=torch.float64) assert output.dtype == torch.float64 - - -def test_generate_geometric_brownian_init_state(): - torch.manual_seed(42) diff --git a/tests/stochastic/test_cir.py b/tests/stochastic/test_cir.py index 1227b23e..12dd96b7 100644 --- a/tests/stochastic/test_cir.py +++ b/tests/stochastic/test_cir.py @@ -18,8 +18,8 @@ def test_generate_cir_mean_1(): t = generate_cir(n_paths, 250, kappa=kappa, theta=theta, sigma=sigma) result = t[:, -1].mean() # Asymptotic distribution is gamma distribution - alpha = 2 * kappa * theta / sigma ** 2 - beta = 2 * kappa / sigma ** 2 + alpha = 2 * kappa * theta / sigma**2 + beta = 2 * kappa / sigma**2 d = Gamma(alpha, beta) expect = torch.full_like(result, d.mean) @@ -41,8 +41,8 @@ def test_generate_cir_mean_2(): ) result = t[:, -1].mean() # Asymptotic distribution is gamma distribution - alpha = 2 * kappa * theta / sigma ** 2 - beta = 2 * kappa / sigma ** 2 + alpha = 2 * kappa * theta / sigma**2 + beta = 2 * kappa / sigma**2 d = Gamma(alpha, beta) expect = torch.full_like(result, d.mean) diff --git a/tests/stochastic/test_randn.py b/tests/stochastic/test_randn.py new file mode 100644 index 00000000..34a68a9a --- /dev/null +++ b/tests/stochastic/test_randn.py @@ -0,0 +1,15 @@ +import pytest +import torch +from torch.testing import assert_close + +from pfhedge.stochastic import randn_antithetic + + +def test_randn_antithetic(): + torch.manual_seed(42) + output = randn_antithetic(200, 100) + assert_close(output.mean(0), torch.zeros_like(output[0])) + + with pytest.raises(ValueError): + # not supported + output = randn_antithetic((200, 100), dim=1) diff --git a/tests/test_examples.py b/tests/test_examples.py index ef2d30b8..1cda77e2 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -15,6 +15,10 @@ def test_net(): _ = hedger.fit(derivative, n_paths=100, n_epochs=10) _ = hedger.price(derivative) + _ = hedger.fit( + derivative, n_paths=100, n_epochs=10, tqdm_kwargs={"desc": "description"} + ) + def test_bs(): derivative = EuropeanOption(BrownianStock(cost=1e-4))