Writing a Postgres Logical Replication System in Golang
We’re constructing Dolt, the world’s first version-controlled SQL database. Dolt
is MySQL appropriate, however lots of our potential clients wished a Postgres-compatible model
as an alternative, so now we have been onerous at work constructing DoltgreSQL.
The primary potential DoltgreSQL buyer to achieve out to us desires to proceed utilizing Postgres for
their main database server, however get diffs of all their adjustments in Doltgres. So we talked it over
with them and determined that Postgres’s logical
replication could be the perfect match
for this performance, and began constructing it. A number of weeks later, DoltgreSQL’s
replicate-from-Postgres characteristic is nearly prepared for manufacturing use, and you can attempt it out
within the subsequent launch of the product.
This weblog discusses what we realized constructing this characteristic, and walks you step-by-step by way of learn how to
construct a replication system consuming Postgres’s logical replication protocol in Go. We’ll be utilizing
the jackc/pglogrepl library to devour and ship messages with
the Postgres main, however many of the classes generalize to different shoppers as properly.
Logical replication is one in every of two modes of replication supported by Postgres. Each devour the
write-ahead log (WAL) produced by the first database, which is the place Postgres serializes all information
it writes to disk. The WAL could be very low-level binary information containing data associated to bodily
blocks on disk that Postgres wrote because of write operations. It’s troublesome to interpret if
you are not a Postgres database server your self. The 2 sorts of replication Postgres helps are:
- Bodily replication copies the bytes of the WAL instantly from main to duplicate. It is actually
solely acceptable for replicating between a main and duplicate working the identical binary launch of
Postgres. - Logical replication interprets updates to the WAL on the first database, then sends them to
the duplicate as a collection of tuple messages describing anINSERT
,UPDATE
, orDELETE
operation. As a result of it abstracts away the bodily serialization of the WAL, it may be used to
replicate between completely different variations of Postgres, resembling completely different main releases and even
working programs. Or, as we’ll see, to copy to a very non-Postgres system.
What we discovered is that, whereas it is definitely potential (even simple) to obtain replication messages
from Postgres, decoding and making use of them appropriately within the face of concurrent transactions and
shopper crashes is kind of difficult, and requires a fairly deep understanding of how the protocol
works.
Let’s dig in.
Earlier than we talk about the main points of our replication implementation, we should always begin by defining design
objectives for the way a dependable replication system ought to behave. These could seem apparent, but it surely’s very
simple to put in writing a system that unintentionally omits one in every of these crticial properties in some
circumstances. We should at all times maintain them entrance of thoughts whereas writing any replication system,
and have exams completely exercising them within the face of a wide range of situations.
- The system should not miss any updates. Each tuple change despatched by the first have to be durably
recorded by the duplicate. - The system should not apply any replace greater than as soon as. Each tuple replace have to be utilized
precisely as soon as. - Modifications in a transaction have to be utilized atomically. The first sends every tuple replace in
its personal message, however none of them should grow to be seen to shoppers of the duplicate till the
corresponding transaction on the duplicate has been dedicated. - The system have to be resilient to sudden crashes. A replication course of might die at any level
throughout execution, and the above properties should nonetheless be preserved. - The system should inform the first about its progress. Admins on the first database can run
queries to find out how far behind every duplicate is. This data does not should be precisely
updated, but when it is too far behind the first will likely be prevented from performing mandatory
cleanup on older WAL recordsdata that aren’t really wanted any longer.
Along with these important necessities, a replication system may additionally implement some
quality-of-life options. These are nice-to-haves.
- Simple to cold-start. It ought to be potential to begin a brand new duplicate with no downtime on the
main and a minimal quantity of effort on the duplicate. - Simple to get better from errors. For a wide range of causes, it is probably not potential to use each
change on the first to the duplicate. When this occurs, it ought to be simple to search out the error and
restart the method to proceed replication from the place the error first occurred.
Now that we perceive how our system should behave, lets take a look at the logical replication protocol
messages despatched by a Postgres main and talk about learn how to deal with them.
Like many advanced programs, the logical replication system on Postgres is assiduously documented in
the fantastic particulars however profoundly missing within the massive image. That’s to say: it is simple sufficient to
be taught what number of features of the system work, however very obscure how they need to match
collectively to realize some purpose. In our case, along with studying many (many!) pages of
documentation, it was additionally mandatory to put in writing and take a look at varied prototypes to grasp the habits
of the first in varied situations.
Logical replication is a two-way course of during which the first and the duplicate should each behave as
anticipated to forestall incorrect habits. We found these properties by way of trial and error, however
hopefully somebody studying this sooner or later will have the ability to use our studying course of as a shortcut
in their very own implementation.
Making a publication and a slot
Postgres requires all replicas to register themselves forward of time in a two-step course of.
First, create a publication on the first. A publication tells the first which tables you
need to replicate. You can even replicate all tables if you would like. As a database admin, run the
following SQL on the first:
CREATE PUBLICATION pub1 FOR ALL TABLES;"
CREATE PUBLICATION pub2 FOR TABLE t1;"
...
Subsequent, create a replication slot on the first. Every publication can have a number of impartial
subscribers, and every wants a replication slot. The full variety of slots is restricted by the first
server and may be modified through configuration. As a database admin, run the next SQL on the
main:
CREATE_REPLICATION_SLOT slot1 LOGICAL pgoutput;
The pglogrepl
library has a comfort technique to do that for you:
pglogrepl.CreateReplicationSlot(context.Background(), conn.PgConn(), "slot1", "pgoutput", pglogrepl.CreateReplicationSlotOptions{})
Now that now we have a publication and a slot, we are able to start replication.
Beginning replication
The remainder of the replication protocol takes place on a single connection. Messages stream again and
forth between the first and the duplicate.
To start, open a connection to the first and run the START_REPLICATION
command like so:
START_REPLICATION SLOT slot1 LOGICAL 0/0 (proto_version '2', publication_names 'pub1', messages 'true', streaming 'true');
Once more, plgogrepl
has a wrapper you need to use to work in an object-oriented type:
conn, err := pgconn.Join(context.Background(), "postgres://postgres:password@127.0.0.1:5432/postgres?replication=database")
if err != nil {
return nil, err
}
pluginArguments := []string{
"proto_version '2'",
fmt.Sprintf("publication_names '%s'", publicationName),
"messages 'true'",
"streaming 'true'",
}
log.Printf("Beginning logical replication on slot %s at WAL location %s", slotName, lastFlushLsn+1)
err = pglogrepl.StartReplication(context.Background(), conn, slotName, lastFlushLsn+1, pglogrepl.StartReplicationOptions{
PluginArgs: pluginArguments,
})
if err != nil {
return nil, err
}
return conn, nil
We’ll return to particulars concerning the lastFlushLsn
in a bit.
Notice that the connection you execute this command on should embody a replication=database
question
param. In the event you neglect to incorporate this question param, the START_REPLICATION
command will really
fail with a syntax error. Invoking a separate parser syntax relying on the presence of a question
param is a weird design resolution on the a part of Postgres that makes errors needlessly troublesome to
debug, so pay attention to it.
If the START_REPLICATION
command succceds, your connection is in replication streaming mode, and
replication messages will stream to your duplicate. You are anticipated to answer to them appropriately.
Replication message stream
The first sends one in every of two top-level
messages again and again:
- Major keepalive message: That is the idle message that the first sends each time there’s
nothing else taking place. It comprises the present server time and place within the WAL, and a boolean
requesting a reply. The duplicate ought to reply as quickly as potential when requested to keep away from
disconnection. - XLogData: This message bundles one other logical message inside itself, containing the precise
replication directions. It additionally contains some metadata concerning the message, such because the server
time and WAL place of this message.
Utilizing pglogrepl, we are able to obtain the subsequent replication message like so:
rawMsg, err := primaryConn.ReceiveMessage(ctx)
msg, okay := rawMsg.(*pgproto3.CopyData)
if !okay {
log.Printf("Obtained sudden message: %Tn", rawMsg)
return nil
}
change msg.Information[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Information[1:])
if err != nil {
log.Fatalln("ParsePrimaryKeepaliveMessage failed:", err)
}
return handleKeepalive(pkm)
case pglogrepl.XLogDataByteID:
xld, err := pglogrepl.ParseXLogData(msg.Information[1:])
if err != nil {
return err
}
err := processXdMessage(xld, state)
The duplicate replies to those messages with a single message in all instances: the standby status update
message. This
message comprises three items of important information used to synchronize the duplicate in case of connection
loss or sudden restart:
- Final written WAL place
- Final flushed WAL place
- Final utilized WAL place
Utilizing pglogrepl
, you cand ship the standing replace like this:
err := pglogrepl.SendStandbyStatusUpdate(context.Background(), primaryConn, pglogrepl.StandbyStatusUpdate{
WALWritePosition: lastWrittenLSN + 1,
WALFlushPosition: lastWrittenLSN + 1,
WALApplyPosition: lastReceivedLSN + 1,
})
The first server tracks these values and can use them to re-establish replication on the proper
WAL place on a restart. They’re additionally used to tell any admins of how far this duplicate is behind
the first (replication lag). See the next sections for particulars on the meanings of the WAL
parameters on this message.
An important messages are the XLogData
messages, which comprise precise tuple information adjustments.
Processing XLogData messages
Lastly, we arrive on the meat of the issue: processing information updates and making use of them to our personal
duplicate. Every XLogData
message wraps one different logical message for us to decode and interpret. However
right here too Postgres provides an extra layer of complexity, sending two sorts of wrapped logical
messages:
- Metadata messages that descibe the tables being replicated and their sorts. An important
is the Relation
message,
which provides details about the schema of tables being replicated. You might also obtain Type
messages,
which establish customized sorts together with their names and OIDs. - Information messages that describe the adjustments to the WAL being replicated. Along with messages
comparable toINSERT
,UPDATE
, andDELETE
statements, the first additionally sends Begin
messages,
which mark the beginning of a set of tuple adjustments dedicated in a transaction, and a Commit
message,
which ends the set.
So, after starting replication, you will start receiving XLogData
messages from the first. These
comply with a dependable sequence:
- First, any Relation
messages
for the tables being replicated. Relying on what you are attempting to do, you most likely need to
parse these messages and keep in mind the schemas they describe. - Then, for every dedicated transaction, you will obtain a Begin
message,
which incorporates the WAL place of the eventual commit. Solely transactions that had been efficiently
dedicated to the first are despatched to logical replicas: you will not even obtain a Start message
that corresonds to a transaction that was rolled again. - Subsequent, for each row that modified because of this transaction, you will obtain one in every of: an
Insert
message;
an Update
message;
or a Delete
message. Every
of those correspond to their equal SQL assertion and have sufficient data to use them
accordingly. Notice that these will not be the statements executed on the first as a part of every
transaction: they’re logical updates to a single row that occurred because of no matter
statements had been executed. - Lastly, you will obtain a Commit
message
indicating the beforehand acquired set of tuple updates ought to be dedicated in a
transaction. Not like the Begin
message,
the Commit message does not embody any WAL place metadata.
This stream is finest illustrated with an instance.
To know how messages stream from the first to the duplicate, it helps to grasp how adjustments
in Postgres’s write-ahead log (WAL) get translated into replication messages. Postgres writes all
information adjustments from queries into the WAL as they occur. Every report within the WAL comprises
physical-data-level adjustments to tuples, in addition to metadata such because the transaction answerable for
that change. Crucially, entries within the WAL are written all through a transaction’s lifecycle, however do
not grow to be everlasting or seen to different transactions till that transaction is efficiently
dedicated. The COMMIT
is itself a message written to the WAL as properly, and makes all the information
adjustments that had been part of that transaction everlasting.
You may image the WAL as a collection of knowledge updates coming from their varied transactions. Underneath
concurrency, these entries can and will likely be interleaved. Right here we see statements being executed by
three concurrent transactions (color-coded pink, blue, and inexperienced) and their respective WAL entries.
When Postgres replicates these transactions to logical subscribers, it does so on a per-transaction
foundation, within the order that every transaction commits. For every transaction, the subscriber will obtain
a Start
message, adopted by a number of tuple change messages, and concluding with a Commit
message. Not like within the bodily WAL, messages for various transactions will not be interleaved. For
the above WAL instance, the replication messages acquired by the subscriber will look one thing like
this:
There are some things to notice right here:
TX3
(the inexperienced one) shouldn’t be replicated, because it was rolled again.TX2
comes first because it dedicated first, despite the fact thatTX1
began writing WAL entries first.- The WAL place of every tuple is behind the eventual
COMMIT
‘s WAL place. TheKeepalive
message sends the present WAL of the first as an entire, which is usually forward of the tuple
messasges being acquired. - Every replication assertion corresponds to a single tuple that was modified because of an
INSERT
,UPDATE
, orDELETE
assertion. So there could also be a number of tuple replication messages
despatched for every assertion executed on the first.
Subsequent, let’s take a look at some instance Go code for processing the tuple information we get in XLogData
messages.
Utilizing pglogrepl
, we are able to course of these messages like this:
func (r *LogicalReplicator) processMessage(
xld pglogrepl.XLogData,
state *replicationState,
) (bool, error) {
walData := xld.WALData
logicalMsg, err := pglogrepl.ParseV2(walData, state.inStream)
if err != nil {
return false, err
}
log.Printf("XLogData (%T) => WALStart %s ServerWALEnd %s ServerTime %s", logicalMsg, xld.WALStart, xld.ServerWALEnd, xld.ServerTime)
state.lastReceivedLSN = xld.ServerWALEnd
change logicalMsg := logicalMsg.(kind) {
case *pglogrepl.RelationMessageV2:
state.relations[logicalMsg.RelationID] = logicalMsg
case *pglogrepl.BeginMessage:
if state.lastWrittenLSN > logicalMsg.FinalLSN {
log.Printf("Obtained stale message, ignoring. Final written LSN: %s Message LSN: %s", state.lastWrittenLSN, logicalMsg.FinalLSN)
state.processMessages = false
return false, nil
}
state.processMessages = true
state.currentTransactionLSN = logicalMsg.FinalLSN
log.Printf("BeginMessage: %v", logicalMsg)
err = r.replicateQuery(state.replicaConn, "START TRANSACTION")
case *pglogrepl.CommitMessage:
log.Printf("CommitMessage: %v", logicalMsg)
err = r.replicateQuery(state.replicaConn, "COMMIT")
if err != nil {
return false, err
}
state.processMessages = false
return true, nil
case *pglogrepl.InsertMessageV2:
return r.handleInsert(logicalMsg)
case *pglogrepl.UpdateMessageV2:
return r.handleUpdate(logicalMsg)
case *pglogrepl.DeleteMessageV2:
return r.handleDelete(logicalMsg)
case *pglogrepl.TruncateMessageV2:
log.Printf("truncate for xid %dn", logicalMsg.Xid)
case *pglogrepl.TypeMessageV2:
log.Printf("typeMessage for xid %dn", logicalMsg.Xid)
case *pglogrepl.OriginMessage:
log.Printf("originMessage for xid %sn", logicalMsg.Title)
case *pglogrepl.LogicalDecodingMessageV2:
log.Printf("Logical decoding message: %q, %q, %d", logicalMsg.Prefix, logicalMsg.Content material, logicalMsg.Xid)
case *pglogrepl.StreamStartMessageV2:
state.inStream = true
log.Printf("Stream begin message: xid %d, first section? %d", logicalMsg.Xid, logicalMsg.FirstSegment)
case *pglogrepl.StreamStopMessageV2:
state.inStream = false
log.Printf("Stream cease message")
case *pglogrepl.StreamCommitMessageV2:
log.Printf("Stream commit message: xid %d", logicalMsg.Xid)
case *pglogrepl.StreamAbortMessageV2:
log.Printf("Stream abort message: xid %d", logicalMsg.Xid)
default:
log.Printf("Unknown message kind in pgoutput stream: %T", logicalMsg)
}
return false, nil
}
We do not deal with all of the logical replication occasions potential, simply those we have to. What you do
with these messages is in fact as much as you. We’re replicating them into one other database system, so
we need to rework the tuple information into equivalant SQL statements. The one for INSERT
seems like
this:
func (r *LogicalReplicator) handleInsert(
xld pglogrepl.XLogData,
state *replicationState,
) (bool, error) {
if !state.processMessages {
log.Printf("Obtained stale message, ignoring. Final written LSN: %s Message LSN: %s", state.lastWrittenLSN, xld.ServerWALEnd)
return false, nil
}
rel, okay := state.relations[logicalMsg.RelationID]
if !okay {
log.Fatalf("unknown relation ID %d", logicalMsg.RelationID)
}
columnStr := strings.Builder{}
valuesStr := strings.Builder{}
for idx, col := vary logicalMsg.Tuple.Columns {
if idx > 0 {
columnStr.WriteString(", ")
valuesStr.WriteString(", ")
}
colName := rel.Columns[idx].Title
columnStr.WriteString(colName)
change col.DataType {
case 'n':
valuesStr.WriteString("NULL")
case 't':
val, err := decodeTextColumnData(state.typeMap, col.Information, rel.Columns[idx].DataType)
if err != nil {
log.Fatalln("error decoding column information:", err)
}
colData, err := encodeColumnData(state.typeMap, val, rel.Columns[idx].DataType)
if err != nil {
return false, err
}
valuesStr.WriteString(colData)
default:
log.Printf("unknown column information kind: %c", col.DataType)
}
}
err = r.replicateQuery(state.replicaConn, fmt.Sprintf("INSERT INTO %s.%s (%s) VALUES (%s)", rel.Namespace, rel.RelationName, columnStr.String(), valuesStr.String()))
if err != nil {
return false, err
}
}
There’s one closing essential element that you just want to concentrate on when processing replication messages
from the Postgres main: you will need to regionally monitor the final location within the main’s WAL that you just
have flushed to disk. You need to persist this someplace durably, ideally with the identical
sturdiness ensures as wherever you are replicating the information updates, and atomically with the information
updates themselves. In a pinch, you could possibly use a traditional file to trace this piece of knowledge, however then
you’ve got a knowledge race. In case your course of dies between flushing tuple information to disk and flushing the WAL
location to disk, on a restart you will both start processing a replica message, or skip a
message, relying on the order of the writes.
In our implementation, the processMessage
technique returns a boolean worth indicating whether or not the
logical message was a Commit
, which corresponds to us commiting our personal transaction. If it was, we
additionally retailer the final WAL place of the transaction we simply dedicated, which we initially acquired
from a Start message
from the first.
dedicated, err := r.processMessage(xld, state)
if err != nil {
return handleErrWithRetry(err)
}
if dedicated {
state.lastWrittenLSN = state.currentTransactionLSN
log.Printf("Writing LSN %s to filen", state.lastWrittenLSN.String())
err := r.writeWALPosition(state.lastWrittenLSN)
if err != nil {
return err
}
}
Considerably confusingly, Postgres additionally durably tracks every subscriber’s final confirmed flush location
within the WAL, and it is tempting to imagine you possibly can simply let Postgres monitor this state for you. However
that answer shouldn’t be workable, for a similar cause you will get information races when utilizing a traditional file
to trace the WAL location. It is even worse within the case of Postgres, as a result of now the state and
failure modes are break up throughout two nodes within the distributed system.
It is essential to periodically replace the Postgres main in your replication standby’s standing,
as a result of it retains monitor of which WAL recordsdata are protected to recycle. Over time, a main with subscribers
who do not replace their flushed place within the WAL will undergo file bloat and probably degraded
efficiency in some situations.
The message to update the
primary
comprises three fields for WAL positions:
Int64
The placement of the final WAL byte + 1 acquired and written to disk within the standby.
Int64
The placement of the final WAL byte + 1 flushed to disk within the standby.
Int64
The placement of the final WAL byte + 1 utilized within the standby.
Notice the + 1
in all these area descriptions. Postgres expects to you add 1 to each WAL location
you obtain or write as a way to get the right habits. The docs do not make this 100% clear, however
a very powerful of those fields is the second (flushed to disk), since that is the situation
Postgres makes use of when resuming replication streaming after an interruption. It is actually essential to
learn the fantastic print within the START_REPLICATION
command
about how this works:
Instructs server to begin streaming WAL for logical replication, beginning at both WAL location
XXX/XXX or the slot’s confirmed_flush_lsn (see Part 54.19), whichever is bigger. This habits
makes it simpler for shoppers to keep away from updating their native LSN standing when there isn’t any information to
course of. Nevertheless, beginning at a distinct LSN than requested may not catch sure sorts of
shopper errors; so the shopper could want to test that confirmed_flush_lsn matches its expectations
earlier than issuing START_REPLICATION.
In different phrases, it is potential to skip forward of the WAL location you final confirmed flushed to the
main, however unimaginable to rewind the stream. When you ship a Standby standing replace
message with a
explicit flush location, the first won’t ever ship you one other replication occasion earlier than then.
You can also (and may) periodically replace the first with the final WAL place you acquired,
together with the one included in a keepalive
message. Simply make sure to not ship a flushed
WAL
location that you have not really flushed.
Our implementation hundreds the final flushed WAL location at startup and makes use of that in its
START_REPLICATION
message.
lastWrittenLsn, err := r.readWALPosition()
if err != nil {
return err
}
This habits handles the sting case the place our duplicate flushes a transaction to disk however the course of
dies earlier than telling Postgres about it. In our testing we additionally discovered that even in the event you efficiently
ship a Standby replace
message with the final flushed WAL place, Postgres may not durably
persist it if the connection is interrupted after receipt. Our implementation could be very conservative
about these edge instances, which is the explanation for the processMessages
bool we monitor in our state
struct. From what we are able to inform, the replication course of on the Postgres main is essentially
aysnchronous, and in sure failure modes it is potential to be despatched messages from earlier than the WAL
place you requested for. Our implementation simply skips all such messages till we see a Start
message previous our final identified flush level.
DoltgreSQL is free and open supply, so go test it out if
you are all in favour of replicating your PostgreSQL main to a knowledge retailer with built-in diff
capabilities, or in the event you’re constructing your personal Postgres replication answer and need a working
instance.
Have questions on DoltgreSQL or Postgres replication? Join us on
Discord to speak to our engineering staff and meet different Dolt customers.