## Introduction

Fashionable MLOps is complicated as a result of it entails too many elements. You want a message bus, a stream processing engine, an API, a mannequin retailer, a function retailer, a monitoring service, and many others. Sadly, containerisation software program and the unbundling development have inspired an urge for food for complexity. I consider MLOps shouldn’t be this complicated. For example, MLOps may be made less complicated by bundling the logic into your database.

On this submit, I wish to push this concept, and truly implement a machine studying algorithm inside a relational database, utilizing SQL. Some databases enable doing inference with an already educated mannequin. Truly coaching the mannequin within the database would take away altogether the necessity for a separate inference/coaching service.

Being conversant in on-line machine studying, I picked on-line gradient descent. My intestine feeling is that this ought to be an easy implementation utilizing `WITH RECURSIVE`

. I made a decision to work my means as much as it by first implementing less complicated on-line algorithms, beginning with a working common.

## Some information

For instance, I took some Yahoo! Finance information:

```
import yfinance as yf
figures = yf.obtain(
tickers=['AAPL'],
begin='2020-01-01',
finish='2022-01-01'
)
figures /= figures.std()
figures.tail()
```

Date |
Open |
Excessive |
Low |
Shut |
Adj Shut |
Quantity |

2021-12-27 |
2.00543 |
2.06042 |
2.06425 |
2.11303 |
2.11677 |
-0.7789 |

2021-12-28 |
2.10966 |
2.09118 |
2.11413 |
2.0777 |
2.08174 |
-0.712006 |

2021-12-29 |
2.08148 |
2.06752 |
2.1008 |
2.08076 |
2.08477 |
-0.977945 |

2021-12-30 |
2.08623 |
2.06549 |
2.0991 |
2.04068 |
2.04502 |
-1.01873 |

2021-12-31 |
2.03938 |
2.0202 |
2.07074 |
2.01928 |
2.0238 |
-0.950815 |

☝️ *I normalized the information utilizing normal scaling. This places the figures on a friendlier scale. It’s going to additionally assist the net gradient descent converge. This might very properly be performed in SQL, however that is wonderful too.*

## Working common

With SQL, we might clearly simply use `AVG`

to acquire a median. We might additionally use a window function if we needed to calculate the typical at each time limit.

I’m unsure how widespread information that is, however there’s a method that permits updating a running average with a brand new information level in $mathcal{O}(1)$ time. This may be utilized to a knowledge stream, as a result of the replace method solely requires the present information level, in addition to the present common.

$$mu_0 = 0$$

$$mu_{t+1} = mu_t + frac{x – mu_t}{t + 1}$$

I’ll be utilizing DuckDB. A pleasant function is that it’s conscious of any current pandas dataframe – offered you’re working DuckDB using Python. Certainly, we will immediately question the `figures`

dataframe. Additionally, DuckDB helps `WITH RECURSIVE`

, which is the cornerstone I’ll make heavy use of.

There are a lot of good tutorials about how `WITH RECURSIVE`

works, so I received’t develop on it. The best way I’ll use it’s a bit explicit, in that I leverage it to replace some present state. The present state factors to the present row. At every recursion step, the present state is joined with the following row, which permits updating the state.

Recursive state replace
The primary thought is to assign a step quantity to every row. Assuming the rows are pre-sorted, a `ROW_NUMBER`

can be utilized to assign an auto-incrementing integer to every row. This step column is then used to attach every state to the following row.

```
WITH RECURSIVE
stream AS (
SELECT
ROW_NUMBER() OVER () AS step,
"Adj Shut" AS x
FROM figures
),
state(step, x, avg) AS (
-- Initialize
SELECT step, x, x AS avg
FROM stream
WHERE step = 1
UNION ALL
-- Replace
SELECT
stream.step,
stream.x,
state.avg + (stream.x - state.avg) / stream.step AS avg
FROM stream
INNER JOIN state ON state.step + 1 = stream.step
)
SELECT *
FROM state
ORDER BY step DESC
LIMIT 5
```

```
┌───────┬───────────────────┬────────────────────┐
│ step │ x │ avg │
│ int64 │ double │ double │
├───────┼───────────────────┼────────────────────┤
│ 505 │ 5.981568542028378 │ 3.9577706471349923 │
│ 504 │ 6.002789566151079 │ 3.953755175121315 │
│ 503 │ 6.042539700173864 │ 3.949681548101375 │
│ 502 │ 6.039508125299193 │ 3.945512507957804 │
│ 501 │ 6.074541325571636 │ 3.941332875987063 │
└───────┴───────────────────┴────────────────────┘
```

We are able to confirm that is appropriate by doing a rolling imply in pandas:

```
(
figures['Adj Close']
.rolling(len(figures), min_periods=1)
.imply()
.tail()[::-1]
)
```

```
Date
2021-12-31 3.957771
2021-12-30 3.953755
2021-12-29 3.949682
2021-12-28 3.945513
2021-12-27 3.941333
```

☝️ *This utilization of *`WITH PARTITION`

primarily boils all the way down to a window operate. This could possibly be applied as such, which might keep away from the headache of pondering by way of recursion. For example, PostgreSQL helps user-defined aggregates, which may be utilized over a window. Nevertheless, the `WITH PARTITION`

syntax has higher assist throughout databases.

## Working covariance

The question above measures the working common for a single variable – particularly `Adj Shut`

. What if we wish to compute one thing that entails a couple of variable? The naive means is to simply copy/paste the logic for every variable. For example, to calculate a working covariance, it’s essential to compute the working common of two variables. Try Welford’s algorithm for extra info.

```
WITH RECURSIVE
stream AS (
SELECT
ROW_NUMBER() OVER () AS step,
"Adj Shut" AS x,
"Shut" AS y
FROM figures
),
state(step, x, x_avg, y, y_avg, cov) AS (
-- Initialize
SELECT
step,
x,
x AS x_avg,
y,
y AS y_avg,
0::DOUBLE AS cov
FROM stream
WHERE step = 1
UNION ALL
-- Replace
SELECT
step,
x,
x_new_avg AS x_avg,
y,
y_new_avg AS y_avg,
cov + ((x - x_prev_avg) * (y - y_new_avg) - cov) / step AS cov
FROM (
SELECT
stream.step,
stream.x,
stream.y,
state.x_avg AS x_prev_avg,
state.x_avg + (stream.x - state.x_avg) / stream.step AS x_new_avg,
state.y_avg AS y_prev_avg,
state.y_avg + (stream.y - state.y_avg) / stream.step AS y_new_avg,
state.cov
FROM stream
INNER JOIN state ON state.step + 1 = stream.step
)
)
SELECT step, cov
FROM state
ORDER BY step DESC
LIMIT 5
```

```
┌───────┬────────────────────┐
│ step │ cov │
│ int64 │ double │
├───────┼────────────────────┤
│ 505 │ 0.9979967767965502 │
│ 504 │ 0.9918524780369538 │
│ 503 │ 0.985478504290919 │
│ 502 │ 0.9787158318485241 │
│ 501 │ 0.9719167545245742 │
└───────┴────────────────────┘
```

Aside from dealing with two variables, the most important distinction with this question is {that a} subquery is used to calculate some middleman state. We’ll reuse this concept for on-line gradient descent.

We are able to additionally confirm the output is appropriate by evaluating to pandas:

```
(
figures
.rolling(len(figures), min_periods=1)
.cov(ddof=0)['Adj Close']
.loc[:, 'Close']
.tail()[::-1]
)
```

```
Date
2021-12-31 0.997997
2021-12-30 0.991852
2021-12-29 0.985479
2021-12-28 0.978716
2021-12-27 0.971917
```

## Dealing with many variables

The draw back of the queries above is that the variable names need to be hardcoded. There is no such thing as a solution to deal with an arbitrary variety of variables. For example, if we’ve got a number of variables, how would we calculate the typical of every variable, with out expliciting them within the question?

As is usually the case, changing the information to a tidy representation makes life simpler. On this case, tidy information is obtained by melting – i.e. unpivoting – the dataframe.

```
figures_flat = figures.soften(ignore_index=False).reset_index()
figures_flat.columns = ['date', 'variable', 'value']
figures_flat = figures_flat.sort_values(['date', 'variable'])
figures_flat.head(10)
```

date |
variable |
worth |

2020-01-02 |
Adj Shut |
-1.46542 |

2020-01-02 |
Shut |
-1.46182 |

2020-01-02 |
Excessive |
-1.49763 |

2020-01-02 |
Low |
-1.46396 |

2020-01-02 |
Open |
-1.49242 |

2020-01-02 |
Quantity |
0.180024 |

2020-01-03 |
Adj Shut |
-1.48965 |

2020-01-03 |
Shut |
-1.48662 |

2020-01-03 |
Excessive |
-1.4978 |

2020-01-03 |
Low |
-1.45277 |

```
WITH RECURSIVE
stream AS (
SELECT RANK_DENSE() OVER (ORDER BY date) AS step, *
FROM figures_flat
ORDER BY date
),
state(step, variable, worth, avg) AS (
-- Initialize
SELECT step, variable, worth, worth AS avg
FROM stream
WHERE step = 1
UNION ALL
-- Replace
SELECT
stream.step,
stream.variable,
stream.worth,
state.avg + (stream.worth - state.avg) / stream.step AS avg
FROM stream
INNER JOIN state ON
state.step + 1 = stream.step AND
state.variable = stream.variable
)
SELECT *
FROM state
WHERE step = (SELECT MAX(step) FROM state)
ORDER BY variable
```

```
┌───────┬───────────┬────────────────────┬────────────────────┐
│ step │ variable │ worth │ avg │
│ int64 │ varchar │ double │ double │
├───────┼───────────┼────────────────────┼────────────────────┤
│ 505 │ Adj Shut │ 5.981568542028378 │ 3.9577706471349923 │
│ 505 │ Shut │ 6.03165394229666 │ 4.012373756823449 │
│ 505 │ Excessive │ 6.057853942108038 │ 4.03765319364954 │
│ 505 │ Low │ 6.05591789308585 │ 3.985178489614261 │
│ 505 │ Open │ 6.046125216781687 │ 4.006746251814558 │
│ 505 │ Quantity │ 1.0143664144585565 │ 1.9651814487272024 │
└───────┴───────────┴────────────────────┴────────────────────┘
```

The principle distinction with the primary question is that the be a part of situation within the recursion contains the variable identify, in addition to the step quantity. A `RANK_DENSE`

assertion can be used as an alternative of `ROW_NUMBER`

to assign a step quantity to every group of rows.

Right here is the equal utilizing pandas:

```
(
figures_flat
.groupby('variable')['value']
.rolling(len(figures_flat), min_periods=1)
.imply()
.groupby('variable')
.tail(1)[::-1].sort_index()
)
```

```
variable
Adj Shut 3.957771
Shut 4.012374
Excessive 4.037653
Low 3.985178
Open 4.006746
Quantity 1.965181
```

## On-line gradient descent

Lastly, we’ve got sufficient expertise to implement on-line gradient descent. To maintain issues easy, we are going to use a really vanilla model:

- Fixed studying charge, versus a schedule.
- Single epoch, we solely do one cross on the information.
- Not stochastic: the rows will not be shuffled.
- Squared loss, which is the usual loss for regression.
- No gradient clipping.
- No weight regularisation.
- No intercept time period.

None of those are unimaginable to implement utilizing SQL. I simply thought I’d maintain issues easy with a view to maintain the code digest. Anyway, these assumptions result in the next replace formulation:

$$p_t = dot{w}_t cdot dot{x}_t$$

See Also
$$l_t = p_t – y_t$$

$$dot{g}_t = l_t dot{x}_t$$

$$dot{w}_{t+1} = dot{w}_t – eta dot{g}_t$$

I’ve added a $dot{}$ image to the vector variables. Subsequently $p_t$ is the prediction, outlined because the dot product between weights $dot{w}_t$ and options $dot{x}_t$. The gradient of the loss $l_t$ is used to acquire the error gradient for the options $dot{g}_t$, which respect to the present weights. This all results in the straightforward weight replace method $dot{w}_t – eta dot{g}_t$.

For example, I made a decision to foretell the `Adj Shut`

variable utilizing the opposite variables. I’m not saying this makes a whole lot of sense, it’s only for the sake of instance.

```
WITH RECURSIVE
X AS (
SELECT
RANK_DENSE() OVER (ORDER BY date) AS step, *
FROM figures_flat
WHERE variable != 'Adj Shut'
ORDER BY date
),
y AS (
SELECT
RANK_DENSE() OVER (ORDER BY date) AS step, *
FROM figures_flat
WHERE variable = 'Adj Shut'
ORDER BY date
),
stream AS (
SELECT X.*, y.worth AS goal
FROM X
INNER JOIN y ON X.step = y.step
),
state AS (
-- Initialize
SELECT
step,
goal,
variable,
worth,
0::DOUBLE AS weight,
0::DOUBLE AS prediction
FROM stream
WHERE step = 1
UNION ALL
-- Replace
SELECT
step,
goal,
variable,
worth,
weight,
SUM(weight * worth) OVER () AS prediction
FROM (
SELECT
stream.step,
stream.goal,
stream.variable,
stream.worth,
state.prediction - state.goal AS loss_gradient,
loss_gradient * state.worth AS gradient,
state.weight - 0.01 * gradient AS weight
FROM stream
INNER JOIN state ON
state.step + 1 = stream.step AND
state.variable = stream.variable
)
)
SELECT *
FROM state
WHERE step = (SELECT MAX(step) FROM state)
ORDER BY variable
```

```
┌───────┬──────────┬──────────────────────┬───────────────────┬───────────────────┐
│ step │ variable │ weight │ goal │ prediction │
│ int64 │ varchar │ double │ double │ double │
├───────┼──────────┼──────────────────────┼───────────────────┼───────────────────┤
│ 505 │ Shut │ 0.2511547716803354 │ 5.981568542028378 │ 5.938875441702928 │
│ 505 │ Excessive │ 0.24043897039853313 │ 5.981568542028378 │ 5.938875441702928 │
│ 505 │ Low │ 0.2447191283620627 │ 5.981568542028378 │ 5.938875441702928 │
│ 505 │ Open │ 0.23603830762609726 │ 5.981568542028378 │ 5.938875441702928 │
│ 505 │ Quantity │ 0.057510279698874206 │ 5.981568542028378 │ 5.938875441702928 │
└───────┴──────────┴──────────────────────┴───────────────────┴───────────────────┘
```

It appears to be working! How can we examine it’s appropriate although? Properly, we will match an occasion of scikit-learn’s `SGDRegressor`

. The weights ought to correspond precisely to what we obtained in SQL, as lengthy we offer the proper parameters. That is to align with the simplifying assumptions that have been made within the SQL implementation.

```
from pprint import pprint
from sklearn import linear_model
mannequin = linear_model.SGDRegressor(
loss='squared_error',
penalty=None,
fit_intercept=False,
learning_rate='fixed',
eta0=0.01,
max_iter=1,
shuffle=False
)
X = figures[:-1].copy()
y = X.pop('Adj Shut')
mannequin = mannequin.match(X, y)
pprint(dict(zip(X.columns, mannequin.coef_)))
```

```
{'Shut': 0.2511547716803354,
'Excessive': 0.2404389703985331,
'Low': 0.2447191283620624,
'Open': 0.23603830762609757,
'Quantity': 0.05751027969887417}
```

Spot on! To be much more sure that is appropriate, we will evaluate with River’s linear regression implementation, which makes use of on-line gradient descent beneath the hood.

```
from river import linear_model
from river import optim
class CustomSquaredLoss:
def gradient(self, y_true, y_pred):
return y_pred - y_true
mannequin = linear_model.LinearRegression(
optimizer=optim.SGD(lr=0.01),
loss=CustomSquaredLoss(),
intercept_lr=0.0,
l2=0.0
)
for i, x in enumerate(figures[:-1].to_dict(orient='data')):
y = x.pop('Adj Shut')
mannequin.learn_one(x, y)
pprint(mannequin.weights)
```

```
{'Shut': 0.2511547716803356,
'Excessive': 0.2404389703985331,
'Low': 0.24471912836206253,
'Open': 0.2360383076260972,
'Quantity': 0.057510279698874255}
```

✅ ???? ???? ????

## Conclusion

A machine studying algorithm which may be educated utilizing SQL opens a world of prospects. The mannequin and the information stay in the identical house. This is so simple as it will get by way of structure. Principally, you solely want a database which runs SQL.

After all, the implementation we made is kind of fundamental. Furthermore, fashions utilizing on-line gradient descent aren’t essentially the strongest ones. Nevertheless, one might argue that what issues most in a mannequin are the options you feed it with. As such, on-line gradient descent performed within the database generally is a nice baseline from which to start out with.

The important thing benefit of on-line machine studying is that you just don’t must revisit previous information factors to replace a mannequin. Nevertheless, all of the queries we’ve written are stateless, and can run from the highest when they’re refreshed. This kind of defeats the aim of doing issues on-line. Fortunately, stream processing engines are popping up, they usually often present an SQL interface. For example, Materialize is engaged on providing `WITH RECURSIVE`

semantics. Doing on-line gradient descent on prime of Materialize sounds very highly effective to me.