-
Notifications
You must be signed in to change notification settings - Fork 20
/
distributionreinsurance.py
76 lines (64 loc) · 2.81 KB
/
distributionreinsurance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import scipy.stats
import numpy as np
from math import ceil
import scipy
import pdb
class ReinsuranceDistWrapper():
def __init__(self, dist, lower_bound=None, upper_bound=None):
assert lower_bound is not None or upper_bound is not None
self.dist = dist
self.lower_bound = lower_bound
self.upper_bound = upper_bound
if lower_bound is None:
self.lower_bound = -np.inf
elif upper_bound is None:
self.upper_bound = np.inf
assert self.upper_bound > self.lower_bound
self.redistributed_share = dist.cdf(upper_bound) - dist.cdf(lower_bound)
def pdf(self, x):
x = np.array(x, ndmin=1)
r = map(lambda Y: self.dist.pdf(Y) if Y < self.lower_bound \
else np.inf if Y==self.lower_bound \
else self.dist.pdf(Y + self.upper_bound - self.lower_bound), x)
r = np.array(list(r))
if len(r.flatten()) == 1:
r = float(r)
return r
def cdf(self, x):
x = np.array(x, ndmin=1)
r = map(lambda Y: self.dist.cdf(Y) if Y < self.lower_bound \
else self.dist.cdf(Y + self.upper_bound - self.lower_bound), x)
r = np.array(list(r))
if len(r.flatten()) == 1:
r = float(r)
return r
def ppf(self, x):
x = np.array(x, ndmin=1)
assert (x >= 0).all() and (x <= 1).all()
_lower_cdf = self.dist.cdf(self.lower_bound)
_lower_ppf = self.dist.ppf(_lower_cdf)
_upper_cdf = self.dist.cdf(self.upper_bound)
r = map(lambda Y: self.dist.ppf(Y) if Y <= _lower_cdf \
else _lower_ppf if Y <= _upper_cdf \
else self.dist.ppf(Y) - self.upper_bound + self.lower_bound, x)
r = np.array(list(r))
if len(r.flatten()) == 1:
r = float(r)
return r
def rvs(self, size=1):
sample = self.dist.rvs(size=size)
sample1 = sample[sample<=self.lower_bound]
sample2 = sample[sample>self.lower_bound]
sample3 = sample2[sample2>=self.upper_bound]
sample2 = sample2[sample2<self.upper_bound]
sample2 = np.ones(len(sample2)) * self.lower_bound
sample3 = sample3 -self.upper_bound + self.lower_bound
sample = np.append(np.append(sample1,sample2),sample3)
return sample[:size]
if __name__ == "__main__":
non_truncated = scipy.stats.pareto(b=2, loc=0, scale=0.5)
#truncated = ReinsuranceDistWrapper(lower_bound=0, upper_bound=1, dist=non_truncated)
truncated = ReinsuranceDistWrapper(lower_bound=0.9, upper_bound=1.1, dist=non_truncated)
x = np.linspace(non_truncated.ppf(0.01), non_truncated.ppf(0.99), 100)
x2 = np.linspace(truncated.ppf(0.01), truncated.ppf(0.99), 100)
#pdb.set_trace()