Time Series Simulations: Signature Transformation Method in Python. Part 2

Pavel Zapolskii
Towards AI
Published in
12 min readApr 29, 2024

--

Introduction

In this article, we continue to explore a powerful tool for compressing Time Series information known as the Signature.

Please read the first part if you want to learn more about math and calculation of Signatures:

The purpose of the second part of our journey is to practice data augmentation. We will enrich the dataset with a large number of vectors distributed in the same way as the original ones, for which I will be using a method that has no restrictions on the dimensionality of the original dataset — the evolutionary method of recovery.

Following the plan above, we are now moving to the left part, which includes two separate tasks: inversion and sampling.

The results of this work can be used for:

  • Calculating of sample statistics across a dataset (confidence intervals, probabilities)
  • Enriching a small dataset for further training of complex ML models
  • Anonymizing data while preserving the general properties of time series

I will show you the implementation using financial market data: Gold ETF with ticker GLD and mining company Barrick Gold with ticker GOLD.

We are going to

  • Recover path from the Signature
  • Simulate new Time Series sample in order to increase the robustness of the outputs
  • Calculate some stats on the final extended dataset

Data collection

As usual let’s start with imports.

import yfinance as yf
import datetime
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm
import copy
import tensorflow as tf
import iisignature

For the sake of convenience, let us assume that we are faced with the task of evaluation a monthly option for the growth of two assets simultaneously at the end of month.

Reformulating this task in ML language, we need to find the probability that the event will occur — the more accurate the probability estimate, the better option price we can offer.

We will upload the data using yfinance:

ticker = "GlD"

start = datetime.date(2022, 1, 1)
end = datetime.date(2024, 1, 1)

data_gld = yf.download(ticker, start, end)["Close"]

plt.plot(data_gld / data_gld[0])
plt.xticks([datetime.date(2022, 1, 1), datetime.date(2023, 1, 1), datetime.date(2024, 1, 1)])

plt.title('GLD')

Normalise and check the Time Series:

Gold ETF 2 years dynamic

Same for the second asset.

ticker = "GOLD"

start = datetime.date(2022, 1, 1)
end = datetime.date(2024, 1, 1)

data_gold = yf.download(ticker, start, end)["Close"]

plt.plot(data_gold / data_gold[0])
plt.xticks([datetime.date(2022, 1, 1), datetime.date(2023, 1, 1), datetime.date(2024, 1, 1)])
plt.title('GOLD')
Barrick Gold 2 years dynamic

For convenience, let’s write a class for data extraction and processing. In the future, we will add a couple of functions here responsible for replication.

class MarketGenerator:
def __init__(self, n_dims, tickers_array, start, end, freq, signature_order, data_transform):
self.n_dims = n_dims
self.tickers_array = tickers_array
self.start = start
self.end = end
self.freq = freq
self.order = signature_order
self.data_transform = data_transform

self.load_data()
self.build_dataset()
self.transform_dataset()
self.calculate_signatures()
self.scale_dataset()

def load_data(self):
self.data_array = []
for i in range(self.n_dims):
data = yf.download(self.tickers_array[i], self.start, self.end)["Close"]#.resample('M').last()
self.data_array.append(data)
self.windows_array = []

for i in range(self.n_dims):
windows = []
for _, window in self.data_array[i].resample(self.freq):
windows.append(window)
self.windows_array.append(windows)

def build_dataset(self):
self.paths = []

for i in range(len(self.windows_array[0])):
if (self.n_dims > 1):
df = pd.merge(self.windows_array[0][i], self.windows_array[1][i], left_index=True, right_index=True)

for n in range (2, self.n_dims):
df = pd.merge(df, self.windows_array[n][i], left_index=True, right_index=True)
path = np.c_[df.iloc[:, [0]].values, df.iloc[:, [1]].values]
for n in range(2, self.n_dims):
path = np.c_[path, df.iloc[:, [n]].values]
self.paths.append(path/path[0])
else:
self.paths.append(self.windows_array[0][i].values / self.windows_array[0][i].values[0])

def transform_dataset(self):
self.transformed_paths = []
for path in self.paths:
self.transformed_paths.append(self.data_transform(path))

def calculate_signatures(self):
self.orig_logsig = np.array([stream_to_logsig(path, self.order) for path in tqdm(self.transformed_paths)])

def scale_dataset(self):
self.scaler = MinMaxScaler(feature_range=(0.00001, 0.99999))
scaled_logsig = self.scaler.fit_transform(self.orig_logsig)

self.logsigs = scaled_logsig[1:]
self.conditions = scaled_logsig[:-1] # needed later for CVAE

In this class, we also define Signatures calculation for the paths of our Time Series (assets).

Don’t forget to check out Part 1 of my article to recall how they are calculated 🤓.

n_dims = 2
tickers_array = ["GLD", "GOLD"]

start = datetime.date(2022, 1, 1)
end = datetime.date(2024, 1, 1)

freq = "M"
order = 5

def data_transform(data):
leadlag_path = leadlag(data, use_time = True)
return leadlag_path


MG = MarketGenerator(n_dims, tickers_array, start, end, freq, order, data_transform)

paths = MG.paths

We have prepared the data for cooking and are about to perform the first magic trick.

Recovering Time Series from Signature

Algorithm for recovering from Signatures in 4 steps:

  • We randomly wander through a given distribution. (Organism)
  • We do this many times. (Population)
  • We sum current organisms to create new generation (self.evolve)
  • We compare the error to see how much the Signature of the random Time Series differs from the original. (self.loss)

In short, we “intelligently” iterate over the values until our time series matches in terms of Signatures.

Valid question: why does this even work, it sounds like some kind of kindergarten game. Where is tough inverse functions, differentials, etc?

In fact, all the magic is in one mathematical theorem, which can be formulated as follows:

Return of the King. Generated by Supermeme

It is necessary to find a piecewise linear path that best approximates the initial path (Time Series) X in terms of minimizing the metric:

Approximation X with X hat

Note that convergence of approximations depends on step width: the wider the step, the faster the convergence and the worse the quality, the narrower the step — the more the opposite is true.
The main statement here is.

“Approximation of piecewise paths” theorem

In other words, if a sufficient number of iterations of generating increments for each coordinate of the path from the arithmetic distribution with values from the corresponding grid of steps is carried out, then with a probability of 1, at some point, the optimal path will be obtained.

With that, we have implemented the ‘Organism’ class that represents an instance of a distribution that, through evolution, may become a potential original Time Series.

The main method here is self.mutate that allows an instance of Organism to walk through the distribution.

class Organism:
def __init__(self, n_points, n_dims, data_transform, distribution_array):
self.n_points = n_points
self.n_dims = n_dims
self.data_transform = data_transform

self.distribution_array = distribution_array

self.init_path()

def __add__(self, other):
increments = []
for increment1, increment2 in zip(self.increments, other.increments):
if np.random.random() < 0.5:
increments.append(increment1)
else:
increments.append(increment2)

path = np.cumsum(increments, axis = 0)
transformed_path = self.data_transform(path)

child = Organism(self.n_points, self.n_dims, self.data_transform, self.distribution_array)
child.increments = increments
child.path = path
child.transformed_path = transformed_path

return child

def random_increment(self):
r = []

for dim in range(self.n_dims):
r.append(self.distribution_array[dim]())

return np.array(r)

def init_path(self):
self.increments = np.array([self.random_increment() for _ in range(self.n_points)])

path = np.cumsum(self.increments, axis = 0)
transformed_path = self.data_transform(path)

self.path = path
self.transformed_path = transformed_path

def mutate(self, prob=0.1):
for i in range(len(self.increments)):
if np.random.random() < prob:
self.increments[i] = self.random_increment()

path = np.cumsum(self.increments, axis = 0)
transformed_path = self.data_transform(path)

self.path = path
self.transformed_path = transformed_path

def logsignature(self, order):
return stream_to_logsig(self.transformed_path, order)

def loss(self, logsig, order):
diff = np.abs((logsig - self.logsignature(order)) / logsig)
diff /= 1 + np.arange(len(logsig))

return np.mean(diff)

On the other hand, we have the ‘Population’ class that manages the Organisms and their breeding and is responsible for finding the best organism based on error minimization.

The main idea is to improve the convergence of the Organism’s random increment to the original Time Series, we will crossbreed organisms (self.evolve).

class Population:
def __init__(self, n_organisms, n_points, n_dims, data_transform, distribution_array):
self.n_organisms = n_organisms
self.n_points = n_points
self.n_dims = n_dims
self.data_transform = data_transform

self.distribution_array = distribution_array

self.organisms = [Organism(n_points, n_dims, data_transform, distribution_array) for _ in range(n_organisms)]

def fittest(self, logsig, p, order):
n = int(len(self.organisms) * p)
return sorted(self.organisms, key=lambda o: o.loss(logsig, order))[:n]

def evolve(self, logsig, p, order, mutation_prob=0.1):
parents = self.fittest(logsig, p, order)
new_generation = copy.deepcopy(parents)

while len(new_generation) != self.n_organisms:
i = j = 0
while i == j:
i, j = np.random.choice(range(len(parents)), size=2)
parent1, parent2 = parents[i], parents[j]

child = parent1 + parent2 # function Organism._add
child.mutate(prob=mutation_prob)

new_generation.append(copy.deepcopy(child))

self.organisms = new_generation

return new_generation[0].loss(logsig, order)

def train(logsig, order, n_iterations, n_organisms, n_points, n_dims, data_transform, distribution_array, top_p=0.1, mutation_prob=0.1):
population = Population(n_organisms, n_points, n_dims, data_transform, distribution_array)
pbar = tqdm(range(n_iterations))

for _ in pbar:
loss = population.evolve(logsig, p=top_p, order=order, mutation_prob=mutation_prob)
pbar.set_description(f"Loss: {loss}")
pbar.refresh()

if loss == 0.:
break

return population.fittest(logsig, p=top_p, order=order)[0].path, loss

I won’t keep you waiting, let’s quickly reconstruct 1 month of trajectories (n=21 points, which equals the number of working days in a month) and visualize our ‘twin’ series.

n_dims = 2
n_points = 21

def distribution_x():
pip = 0.001
n_pips = 1/pip * 0.025
return pip * np.random.randint(-n_pips, n_pips)

def distribution_y():
pip = 0.001
n_pips = 1/pip * 0.025
return pip * np.random.randint(-n_pips, n_pips)

disribution_array = [distribution_x, distribution_y]

n_organisms = 250
n_iterations = 80

transformed_path = data_transform(paths[1])
logsig = stream_to_logsig(transformed_path, order)
recovered_path, loss = train(logsig, order, n_iterations,
n_organisms, n_points, n_dims, data_transform, disribution_array)

Next, we will use three projections of our three-dimensional plots to make it easier to analyze the outcome. For the article, a two-dimensional time series was chosen due to the simplicity of interpretation, but it’s important to remember that this method is not limited in dimensionality!

label_x = "GLD"
label_y = "GOLD"

def plotAllProjections3D(filename, path, label_x, label_y, recovered_path=None):
filename = filename
plot3D(filename + "_" + label_x + ".png", [True, False], path, 0, -91, label_x, label_y, recovered_path)
plot3D(filename + "_" + label_y + ".png", [False, True], path, 0, 0, label_x, label_y, recovered_path)
plot3D(filename + "_" + label_x + "_" + label_y + ".png", [True, True], path, 10, -45, label_x, label_y, recovered_path)

def plot3D(filename, draw_tics, path, elev, azim, label_x, label_y, recovered_path=None):
fig = plt.figure(figsize = (8, 8))
ax = fig.add_subplot(111, projection='3d')
ax.view_init(elev, azim)

draw_path = path - path[0]
ax.plot(draw_path[:, 0], draw_path[:, 1], draw_path[:, 2], label='original')

if (recovered_path is not None):
draw_recovered_path = recovered_path - recovered_path[0]
ax.plot(draw_recovered_path[:, 0], draw_recovered_path[:, 1], draw_recovered_path[:, 2], label='recovered')

if (draw_tics[0]):
ax.set_xlabel(label_x)
else:
ax.set_xticks([])

if (draw_tics[1]):
ax.set_ylabel(label_y)
else:
ax.set_yticks([])

ax.set_zlabel("time")

ax.legend()
fig.savefig(filename, bbox_inches='tight')

plotAllProjections3D("recover_path", add_time(paths[1]), label_x, label_y, add_time(recovered_path))
GLD&GOLD 1 month Time Series Recovering

Signatures sampling

To replicate our signatures, we are going to use an autoencoder. We’ll add several more key functions to our market simulation class:

# fitting previous signatures as conditions
def train(self, n_epochs=100):
self.cvae = CVAE(n_latent=8, input_dim=self.logsigs.shape[1], cond_dim=self.conditions.shape[1], n_hidden=100, alpha=0.4)
self.cvae.compile(optimizer='adam')
self.cvae.fit(x=[self.logsigs, self.conditions], epochs=n_epochs)

def generate_logsig(self, cond):
generated = self.cvae.generate(cond)
return self.scaler.inverse_transform(generated.reshape(1, -1))[0]

The first function explains to the autoencoder what Signatures are (trains the model), and the second will generate similar instances

The idea of generating signatures is similar to generating images: once the neural network has learned the definition of a signature, it can easily recreate it. However, just like in life, in ML, you can’t gain something without giving something in return.

Therefore, we feed white noise into the neural network, and the autoencoder figures out how to make it resemble our Signature.

White noise: the ultimate diet for overthinking AI brains — feeds them nothing but keeps them full!

(Mr. Zapolskii)

Implementations of autoencoders can vary: you can experiment with this on your own and see how the cloning of Signatures will differ!

class CVAE(tf.keras.Model):
def __init__(self, n_latent, input_dim, cond_dim, n_hidden=50, alpha=0.2):
super(CVAE, self).__init__()
self.n_latent = n_latent
self.input_dim = input_dim
self.cond_dim = cond_dim
self.n_hidden = n_hidden
self.alpha = alpha
self.encoder_net = self.build_encoder()
self.decoder_net = self.build_decoder()

def build_encoder(self):
inputs = tf.keras.Input(shape=(self.input_dim,))
cond = tf.keras.Input(shape=(self.cond_dim,))
x = tf.keras.layers.Concatenate()([inputs, cond])
x = tf.keras.layers.Dense(self.n_hidden, activation='relu')(x)
mn = tf.keras.layers.Dense(self.n_latent, activation='relu')(x)
sd = tf.keras.layers.Dense(self.n_latent, activation='relu')(x)
return tf.keras.Model(inputs=[inputs, cond], outputs=[mn, sd])

def build_decoder(self):
latent = tf.keras.Input(shape=(self.n_latent,))
cond = tf.keras.Input(shape=(self.cond_dim,))
x = tf.keras.layers.Concatenate()([latent, cond])
x = tf.keras.layers.Dense(self.n_hidden, activation='relu')(x)
outputs = tf.keras.layers.Dense(self.input_dim, activation='sigmoid')(x)
return tf.keras.Model(inputs=[latent, cond], outputs=outputs)

def encode(self, x, cond):
mn, sd = self.encoder_net([x, cond])
batch_size = tf.shape(x)[0]
epsilon = tf.random.normal(shape=(batch_size, self.n_latent))
z = mn + epsilon * tf.exp(sd * 0.5)
return z

def decode(self, z, cond):
return self.decoder_net([z, cond])

def call(self, inputs):
x, cond = inputs
z = self.encode(x, cond)
reconstructed = self.decode(z, cond)
return reconstructed

def compute_loss(self, x, reconstructed, mn, sd):
cross_ent = tf.keras.losses.binary_crossentropy(x, reconstructed)
kl_div = -0.5 * tf.reduce_sum(1. + sd - tf.square(mn) - tf.exp(sd), axis=-1)
return tf.reduce_mean(cross_ent + self.alpha * kl_div)

def train_step(self, data):
x, cond = data[0]
with tf.GradientTape() as tape:
z = self.encode(x, cond)
reconstructed = self.decode(z, cond)
mn, sd = self.encoder_net([x, cond])
loss = self.compute_loss(x, reconstructed, mn, sd)
gradients = tape.gradient(loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
return {'loss': loss}

def generate(self, cond, n_samples=None):
cond = np.asarray(cond, dtype=np.float32)

if n_samples is not None:
randoms = np.random.normal(0, 1, size=(n_samples, self.n_latent))
cond = np.repeat(cond[np.newaxis, :], n_samples, axis=0)
else:
randoms = np.random.normal(0, 1, size=(1, self.n_latent))
cond = cond[np.newaxis, :]

z = self.decode(randoms, cond)
return z.numpy()

In theory, we’re good, but it’s necessary to perform a sanity check — whether the generated signatures have similar statistics:

conditions = MG.conditions

MG.train(1000)

array = []
for cond in conditions:
array.append(MG.generate_logsig(cond))

array = np.array(array)

print(np.mean(array), np.mean(MG.orig_logsig))
print(np.max(array), np.max(MG.orig_logsig))
print(np.min(array), np.min(MG.orig_logsig))
print(np.median(array), np.median(MG.orig_logsig))
print(np.std(array), np.std(MG.orig_logsig))
print(np.var(array), np.var(MG.orig_logsig))

Now everything is fine and we can start the experiments. 🚀

Results : sampling and recovering

To begin with, we will simulate data for one randomly selected month. Here, we choose the 4th month corresponding to the values in path[4] and conditions[3].

generated_paths = []

for _ in range(10):
logsig = MG.generate_logsig(conditions[3])
recovered_path, loss = train(logsig, order, n_iterations, n_organisms, n_points, n_dims, data_transform, disribution_array)

generated_paths.append(recovered_path)

Let’s look at different projections to visually validate the results.

Blue — original 4th month. Red — recovered 4th month 10 times

Let’s recall the initial goal: we want to estimate the probability that by the end of the month, both assets will exhibit either a positive or a negative change.
Also, assume that we are not interested in looking back further than 6 months due to events that have changed the configuration.

generate_count = 6

generated_paths = []
for cond in tqdm(conditions[:generate_count]):
for _ in range(10):
logsig = MG.generate_logsig(cond)
recovered_path, loss = train(logsig, order, n_iterations, n_organisms, n_points, n_dims, data_transform, disribution_array)

generated_paths.append(recovered_path)

Ultimately, we turn 6 points into 66 to estimate the probability.

Blue — the original 6 paths. Red — the recovered 60 paths

We only have to do a couple of simple arithmetic operations to calculate the probability now:

cnt_pos = 0
cnt_neg = 0

for i in range(len(generated_paths)):
if (generated_paths[i][-1][0] > 0) and (generated_paths[i][-1][1] > 0):
cnt_pos += 1
if (generated_paths[i][-1][0] < 0) and (generated_paths[i][-1][1] < 0):
cnt_neg += 1

print(cnt_neg/len(generated_paths), cnt_pos/len(generated_paths))

Both Positive probability : 47.6%
Both Negative probability, 28.6%

We can also estimate confidence intervals, variances, and not only at the end of the month, but for any date.

You may rightfully ask, why is this considered a good approximation? The answer to this question is quite extensive; we can improve our understanding of a correctly chosen trajectory generator using different metrics, such as entropy, and boost its performance with hyperparameter selection, autoencoders, and data preprocessing. The code presented in this article is a baseline, and you can and should improve it to fit your specific use case scenarios.

Conclusion

First of all, thank you very much for taking the time to read my first tough paper on interpreting complex mathematical constructions through empirical data.

The goal of these articles was to showcase interesting math and explore data analysis from various angles, be it Machine Learning or Applied Statistics often the research and study of new methods in probability theory and stochastic processes can bring many benefits.

The code in this article is not end game — you can play around with the data, the restoration process, and the autoencoder yourself to achieve the best results for your applications.

Please leave comments and share your impressions.
Glad to see everyone, all the best 🚀

[1] H. Buhler, B. Horvath, T. Lyons, I. P. Arribas, B. Wood (2020). A data-driven market simulator for small data environments. arXiv:2006.14498.

[2] I. Chevyrev, A. Kormilitzin (2016). A Primer on the Signature Method in Machine Learning. arXiv:1603.03788.

buymeacoffee.com/pavel.zapolskii

--

--

The unsung satiric hero in the dazzling dance of digits, seamlessly navigating the realms of IT and Financial Markets.