Sooner PostgreSQL To BigQuery Transfers
For a lot of the previous yr, I’ve been working with Hexvarium. Based mostly in Atherton, California, the corporate builds and manages fiber-optic networks. At current, they’ve a handful of networks within the Bay Space however have plans to broaden throughout the US.
The act of planning out how a fiber-optic community might be deployed throughout a metropolis has many constraints to think about. The method is capital-intensive so that you need the community to archive uptake as quickly as doable. My workforce builds optimum community roll-out plans. That is executed by mixing over 70 datasets collectively and weighing their metrics towards a lot of enterprise objectives. The next is an illustrative instance of a roll-out plan.
The above plans are produced by LocalSolver which is fed knowledge from BigQuery. Nearly all of datasets we use arrive as Shapefiles which can’t be loaded into BigQuery immediately. If a dataset is sufficiently small or wants a whole lot of geospatial enrichments, we are going to load it into PostgreSQL with the PostGIS extension put in.
Beneath is a figurative instance of a Shapefile being imported into PostgreSQL.
$ ogr2ogr --config PG_USE_COPY YES
-f PGDump
/vsistdout/
a00000001.gdbtable
-lco SRID=32631
-lco SCHEMA=schema_name
| psql geodata
If a dataset has 100M+ information and must be joined to a different dataset with at the least 10M+ information, then we’ll load it into ClickHouse as a substitute. ClickHouse has taken a few of our JOINs that may take 2-3 days in PostgreSQL and accomplished them in underneath an hour. It may additionally scale up particular person queries by merely including extra CPU cores. That stated, ClickHouse lacks a lot of the geospatial performance present in PostGIS so we can not use it for all workloads.
Beneath is a figurative instance of a Shapefile being imported into ClickHouse utilizing GeoJSON as an middleman format. To keep away from timeouts, the JSONL file is produced first after which imported into ClickHouse quite than being streamed immediately with a single BASH command.
$ ogr2ogr -f GeoJSONSeq
/vsistdout/
-s_srs EPSG:4326
-t_srs EPSG:4326
a00000001.gdbtable
| jq -S '.properties * {geom: .geometry}
| with_entries(.key |= ascii_downcase)'
> out.jsonl
$ cat out.jsonl
| clickhouse consumer
--multiquery
--query='SET input_format_skip_unknown_fields=1;
INSERT INTO table_name
FORMAT JSONEachRow'
Previously, I might dump PostgreSQL tables out to CSV recordsdata, compress them after which load them into BigQuery. However just lately I’ve begun utilizing ClickHouse to export PostgreSQL tables to Parquet and cargo that into BigQuery as a substitute. This has resulted in sooner load occasions.
On this submit, I am going to stroll via one such instance.
Putting in Conditions
The VM used on this submit is an e2-highcpu-32 with 32 vCPUs and 32 GB of RAM operating Ubuntu 20.04 LTS. It’s operating in Google Cloud’s us-west2-a area in Los Angeles. With a 400 GB balanced persistent disk, this technique prices $1.02 / hour to run.
$ wget -qO-
https://www.postgresql.org/media/keys/ACCC4CF8.asc
| sudo apt-key add -
$ echo "deb http://apt.postgresql.org/pub/repos/apt/ focal-pgdg primary 15"
| sudo tee /and so on/apt/sources.record.d/pgdg.record
$ sudo apt replace
$ sudo apt set up
gdal-bin
jq
pigz
postgresql-15
python3-pip
python3-virtualenv
The instructions used on this submit are operating on Ubuntu however they will additionally run on macOS. To put in them by way of Homebrew run the next:
$ brew set up
gdal
jq
pigz
postgresql
virtualenv
I am going to use PyArrow to look at a Parquet file on the finish of this submit.
$ virtualenv ~/.pq
$ supply ~/.pq/bin/activate
$ python -m pip set up
pyarrow
fastparquet
shpyx
ClickHouse will be put in by way of the next:
$ curl https://clickhouse.com/ | sh
This submit makes use of PostgreSQL 15 and ClickHouse v22.9.7.34. Each of those instruments launch new variations ceaselessly.
I am going to use Rustup to put in Rust model 1.66.0.
$ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Google maintains installation notes for BigQuery’s Python-based consumer.
The Buildings Dataset
Beneath is an instance file from Microsoft’s Buildings dataset. I’ve loaded it into PostgreSQL and enriched it with H3 hexagon values. It has 130,099,920 information, is 28 GB in PostgreSQL’s inner format and takes up 61.6 GB of logical area as soon as loaded into BigQuery.
x on
SELECT *
FROM microsoft_buildings
LIMIT 1;
revision | 2
from_date | 2019-07-17
to_date | 2019-09-25
geom | 010200000005000000F148BC3C9DAB57C07BBE66B96C844340E6E8F17B9BAB57C0AB4203B16C844340FE2AC0779BAB57C02B85402E71844340098B8A389DAB57C0FC00A43671844340F148BC3C9DAB57C07BBE66B96C844340
h3_7 | 872656731ffffff
h3_8 | 8826567311fffff
h3_9 | 8926567311bffff
The geom column is in Well-known Binary (WKB) format. The above worth will be expressed in GeoJSON as the next:
{"kind": "LineString",
"coordinates":[[-94.681472, 39.034568],
[-94.681365, 39.034567],
[-94.681364, 39.034704],
[-94.681471, 39.034705],
[-94.681472, 39.034568]]}
The next exhibits a purple rendering of the above geometry with an OpenStreetMap basemap in QGIS.
Transporting with CSV
The next will ship the above desk from PostgreSQL to BigQuery utilizing GZIP-compressed CSVs.
copy (SELECT revision,
from_date,
to_date,
geom,
h3_7,
h3_8,
h3_9
FROM microsoft_buildings)
TO PROGRAM 'break up --line-bytes=4000000000
--filter="pigz -1 > $FILE.csv.gz"'
WITH CSV;
$ tr 'n' ' ' < dump.sql | psql geodata
The biggest uncompressed CSV file dimension BigQuery will settle for in a single load command is 4 GB. The above will break up the CSV knowledge into separate recordsdata with nobody file exceeding this restrict. The CSV recordsdata are then compressed with the quickest setting GZIP helps. Greater compression settings have by no means resulted in sooner end-to-end load occasions in my expertise.
These are the ensuing 12 GB of GZIP-compressed CSVs and their particular person file sizes.
xaa.csv.gz 1.3 GB
xab.csv.gz 1.3 GB
xac.csv.gz 1.3 GB
xad.csv.gz 1.3 GB
xae.csv.gz 1.3 GB
xaf.csv.gz 1.3 GB
xag.csv.gz 1.3 GB
xah.csv.gz 1.3 GB
xai.csv.gz 1.3 GB
xaj.csv.gz 481 MB
The next will load the CSVs into BigQuery in parallel.
$ ls *.csv.gz
| xargs -n1
-P10
-I%
bash -c "bq load dataset.table_name ./% revision:INT64,from_date:DATE,to_date:DATE,geom:GEOGRAPHY,h3_7:STRING,h3_8:STRING,h3_9:STRING"
The above psql and bq load instructions collectively took 18 minutes and 11 seconds to run.
Transporting with Parquet
The next will arrange a desk in ClickHouse that sources its knowledge from the microsoft_buildings desk in PostgreSQL.
CREATE TABLE pg_ch_bq_microsoft_buildings (
revision Nullable(Int32),
from_date Nullable(DATE),
to_date Nullable(DATE),
geom Nullable(String),
h3_7 Nullable(String),
h3_8 Nullable(String),
h3_9 Nullable(String))
ENGINE = PostgreSQL('localhost:5432',
'airflow',
'microsoft_buildings',
'username',
'password');
I am going to create a desk that’s native to ClickHouse. This avoids any extra single-threaded fetches again to PostgreSQL afterward.
CREATE TABLE microsoft_buildings ENGINE = Log() AS
SELECT *
FROM pg_ch_bq_microsoft_buildings;
I am going to then create a script to dump the contents of the ClickHouse desk throughout 14 Snappy-compressed Parquet recordsdata.
from multiprocessing import Pool
from shpyx import run as _exec
def ch(manifest):
offset, filename = manifest
sql = '''SELECT *
FROM microsoft_buildings
LIMIT 10000000
OFFSET %d
FORMAT Parquet''' % offset
cmd = "clickhouse consumer --query='%s' > %s" %
(sql.change('n', ' '), filename)
print(cmd)
_exec(cmd)
payload = [(x * 10000000, 'out%s.pq' % str(x + 1))
for x in range(0, 14)]
pool = Pool(14)
pool.map(ch, payload)
I am going to then run that script and cargo the ensuing 16 GB of Parquet recordsdata into BigQuery.
$ python run.py
$ ls out*.pq
| xargs -n1
-P14
-I%
bash -c "bq load --source_format=PARQUET geodata.pq_test2 ./% revision:INT64,from_date:DATE,to_date:DATE,geom:GEOGRAPHY,h3_7:STRING,h3_8:STRING,h3_9:STRING"
The above took 14 minutes and 31 seconds to import, dump and transport from my VM to BigQuery. It is a 1.25x speed-up over utilizing GZIP-compressed CSVs for transport.
The 16 GB of Parquet is 4 GB bigger than the GZIP-compressed CSV recordsdata however the disk area and bandwidth financial savings didn’t have a constructive influence on the end-to-end switch time. So as to add to this, ClickHouse took higher benefit of the cores out there throughout the 14-thread dump to Parquet versus what pigz might do.
The Parquet recordsdata use Snappy compression. Snappy was designed to compress sooner whereas accepting the trade-off of decrease compression ratios in comparison with different codecs. Storage and community throughput charges have elevated sooner than single-core compute throughput has up to now 20 years. When a file might be consumed by thousands and thousands of individuals, the compression ratio is essential and it is value taking longer to compress. However when a file is simply ever consumed as soon as, it is extra vital to get to the purpose the place the file has completed decompressing on the buyer’s facet as shortly as doable.
ClickHouse’s Snappy Compression
I am going to dump a single Parquet file of the dataset after which decide it aside beneath.
$ clickhouse consumer
--query='SELECT revision,
from_date,
to_date,
geom,
h3_7,
h3_8,
h3_9
FROM pg_ch_bq_microsoft_buildings
FORMAT Parquet'
> out.pq
$ python
The 130,099,920 information have been damaged up into 1,987-row teams.
from collections import Counter
from operator import itemgetter
from pprint import pprint
import pyarrow.parquet as pq
pf = pq.ParquetFile('out.pq')
print(pf.metadata)
<pyarrow._parquet.FileMetaData object at 0x7fe07f37ab80>
created_by: parquet-cpp model 1.5.1-SNAPSHOT
num_columns: 7
num_rows: 130099920
num_row_groups: 1987
format_version: 1.0
serialized_size: 3839172
Every row group has 65,505 rows or much less.
print(Counter(pf.metadata.row_group(rg).num_rows
for rg in vary(0, pf.metadata.num_row_groups)))
Counter({65505: 1986, 6990: 1})
Each column has been compressed utilizing Snappy.
print(set(pf.metadata.row_group(rg).column(col).compression
for col in vary(0, pf.metadata.num_columns)
for rg in vary(0, pf.metadata.num_row_groups)))
2.6:1 was the best compression ratio achieved amongst any of the columns in any of the row teams. Beneath are the main points of the column that achieved this ratio.
lowest_val, lowest_rg, lowest_col = None, None, None
for rg in vary(0, pf.metadata.num_row_groups):
for col in vary(0, pf.metadata.num_columns):
x = pf.metadata.row_group(rg).column(col)
ratio = x.total_compressed_size / x.total_uncompressed_size
if not lowest_val or lowest_val > ratio:
lowest_val = ratio
lowest_rg, lowest_col = rg, col
print(pf.metadata.row_group(lowest_rg).column(lowest_col))
<pyarrow._parquet.ColumnChunkMetaData object at 0x7fe063224810>
file_offset: 12888987187
file_path:
physical_type: BYTE_ARRAY
num_values: 65505
path_in_schema: h3_9
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x7fe063211d10>
has_min_max: True
min: b'890c732020fffff'
max: b'8944db2db5bffff'
null_count: 0
distinct_count: 0
num_values: 65505
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY
encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE')
has_dictionary_page: True
dictionary_page_offset: 12888564836
data_page_offset: 12888855960
total_compressed_size: 422351
total_uncompressed_size: 1101585
However of the 7 columns throughout the 1,987-row teams, over 30% have a compression ratio of 1:0.9 or worse. It is questionable if it was value compressing these fields within the first place.
ratios = []
for rg in vary(0, pf.metadata.num_row_groups):
for col in vary(0, pf.metadata.num_columns):
x = pf.metadata.row_group(rg).column(col)
ratio = x.total_compressed_size / x.total_uncompressed_size
ratios.append('%.1f' % ratio)
pprint(sorted(Counter(ratios).gadgets(),
key=itemgetter(0)))
[('0.4', 2345),
('0.5', 4020),
('0.6', 1201),
('0.7', 841),
('0.8', 1236),
('0.9', 1376),
('1.0', 2890)]
In 2016, Snappy had its heuristics elevated to be extra aggressive at avoiding compressing knowledge that’s unlikely to yield sturdy outcomes. However with ratios like 1:0.9, it will be fascinating to see if this might be taken additional by turning off compression on a per-column foundation.
Google additionally has a drop-in S2 extension for Snappy however help has but to seek out its manner into BigQuery. After I reviewed a Go and Assembler implementation of S2 in 2021, I discovered it might velocity up compression by 4.2x and decompression 1.7x over common Snappy. Klaus Submit, the creator of the S2 utility, famous the speed-up ought to have been even greater had it not been for the I/O limitations of the laptop computer I ran the benchmark on.
Klaus has since pointed me to a benchmarks sheet the place he managed to scale back a ~6 GB JSON file by 83% at a price of 15 GB/s on a 16-core, AMD Ryzen 3950X. LZ4 took 10x longer to attain the same compression ratio on the identical workload and GZIP took 74x longer. Snappy solely ever managed a 75% discount however even that was at a price 16.5x slower than S2’s 82% discount price.
Can Rust Do It Sooner?
BigQuery additionally supports LZO and ZSTD compression in Parquet recordsdata so there’s potential for additional optimisations. I discovered json2parquet which is a Rust-based utility that may convert JSONL into Parquet recordsdata. It would not help LZO presently however Snappy and ZStandard are each supported.
Shapefiles will be transformed into JSONL shortly with ogr2ogr so doubtlessly ClickHouse might be faraway from the pipeline if json2parquet is fast sufficient.
I produced a JSONL extract of the California dataset. It has 11M information and is 3 GB in uncompressed JSONL format.
$ ogr2ogr -f GeoJSONSeq /vsistdout/ California.geojson
| jq -c '.properties * tostring'
> California.jsonl
$ head -n1 California.jsonl | jq .
{
"launch": 1,
"capture_dates_range": "",
"geom": "{"kind":"Polygon","coordinates":[[[-114.127454,34.265674],[-114.127476,34.265839],[-114.127588,34.265829],[-114.127565,34.265663],[-114.127454,34.265674]]]}"
}
I then transformed that file into Parquet with ClickHouse. It took 32 seconds on my 2020 MacBook Professional. The ensuing file is 793 MB in dimension.
$ cat California.jsonl
| clickhouse native
--input-format JSONEachRow
-q "SELECT *
FROM desk
FORMAT Parquet"
> cali.snappy.pq
The then did the identical with json2parquet. The next was compiled with rustc 1.66.0 (69f9c33d7 2022-12-12).
$ git clone https://github.com/domoritz/json2parquet/
$ cd json2parquet
$ RUSTFLAGS='-Ctarget-cpu=native'
cargo construct --release
The next took 43.8 seconds to transform the JSONL into PQ with a ensuing file dimension of 815 MB.
$ goal/launch/json2parquet
-c snappy
California.jsonl
California.snappy.pq
The one main distinction I discovered was that json2parquet produced 12 row teams in its Parquet file and ClickHouse produced 306.
I did produce a ZStandard-compressed Parquet file as properly with json2parquet and though the ensuing file was 531 MB, it took 48 seconds to provide. Neither of those will cut back the compression overhead in ClickHouse.
I would prefer to see if ClickHouse might produce a ZStandard-compressed Parquet file sooner than its Snappy-compressed recordsdata however there are no CLI flags to change the Parquet compression codec presently.
Digging into the Rust Executable
I did verify that the Rust binary is utilizing vectorised directions. The next was on an Ubuntu 20 machine.
$ RUSTFLAGS='--emit asm -Ctarget-cpu=native'
cargo construct --release
$ grep -P 'television[a-ilmopr-uxz][a-il-vx]{2}[a-z0-9]{0,10}'
goal/launch/deps/parquet-9a152318b60fbda6.s
| head
vpxor %xmm0, %xmm0, %xmm0
vpcmpeqd %xmm0, %xmm0, %xmm0
vmovdqa %xmm0, (%rsp)
vpxor %xmm0, %xmm0, %xmm0
vpcmpeqd %xmm0, %xmm0, %xmm0
vmovdqa %xmm0, (%rsp)
vmovdqu 8(%rax), %xmm0
vmovdqu 8(%rax), %xmm1
vmovq %xmm0, %rsi
vmovdqa %xmm1, (%rsp)
I then ran 10 traces of JSON via each ClickHouse and json2parquet to see what strace stories.
$ sudo strace -wc
goal/launch/json2parquet
-c snappy
../cali10.jsonl
check.snappy.pq
% time seconds usecs/name calls errors syscall
------ ----------- ----------- --------- --------- ----------------
61.30 0.007141 16 434 write
21.66 0.002523 280 9 openat
4.33 0.000504 19 26 mmap
2.73 0.000318 317 1 execve
2.09 0.000244 16 15 learn
...
There have been 434 calls to write made by json2parquet which is the place it spent 61% of its time. ClickHouse solely made 2 calls to write with the identical workload.
$ sudo strace -wc
clickhouse native
--input-format JSONEachRow
-q "SELECT *
FROM desk
FORMAT Parquet"
< ../cali10.jsonl
> cali.snappy.pq
% time seconds usecs/name calls errors syscall
------ ----------- ----------- --------- --------- ----------------
29.52 0.019018 1584 12 futex
21.15 0.013625 63 214 gettid
11.19 0.007209 514 14 mprotect
11.06 0.007123 791 9 4 stat
8.72 0.005617 108 52 shut
5.16 0.003327 1109 3 ballot
2.19 0.001412 23 60 mmap
2.09 0.001344 39 34 1 openat
1.27 0.000816 18 44 learn
...
0.15 0.000098 48 2 write
I assumed that there might be an extreme variety of context switches or web page faults slowing json2parquet down however each of those occurred an order of magnitude extra with ClickHouse.
$ sudo apt-get set up
linux-tools-common
linux-tools-generic
linux-tools-`uname -r`
$ sudo perf stat -dd
goal/launch/json2parquet
-c snappy
../cali10.jsonl
check.snappy.pq
1 context-switches # 201.206 /sec
198 page-faults # 39.839 Okay/sec
$ sudo perf stat -dd
clickhouse native
--input-format JSONEachRow
-q "SELECT *
FROM desk
FORMAT Parquet"
< ../cali10.jsonl
> cali.snappy.pq
44 context-switches # 372.955 /sec
4997 page-faults # 42.356 Okay/sec
Dominik Moritz, a researcher at Apple and one of many co-authors of Altair is the developer behind json2parquet. I raised a ticket asking him if there are any compilation settings I might alter.
However given his undertaking is made up of 193 traces of Rust and is basically depending on the Rust implementation of Apache Arrow, I believe that the efficiency bottleneck is both in Apache Arrow or that extra compression settings should be uncovered if there are any efficiency enhancements to be discovered.
A day after I initially seemed into this I ran the benchmark on a recent 16-core VM whereas collecting telemetry for the Apache Arrow workforce. The efficiency distinction ended up being 4x between json2parquet and ClickHouse. Throughout the re-run, I noticed json2parquet maxing out a single core whereas ClickHouse was higher capable of reap the benefits of a number of cores.
How’s Python’s Efficiency?
I discovered utilizing PyArrow was 1.38x slower, Awkward Array was 1.5x slower than ClickHouse and fastparquet was 3.3x slower.
After elevating the above efficiency tickets I made a decision to be truthful to the ClickHouse undertaking and see if there have been any apparent performance improvements I might make with their providing as properly.
Fixing GIS Knowledge Supply
It will be nice to see knowledge distributors ship knowledge straight into the Cloud Databases of their clients. It will save a whole lot of consumer time that is spent changing and importing recordsdata. The Switch from Shapefile marketing campaign notes there are at the least 80 vector-based GIS file codecs in use and figuring out the professionals and cons of every of those codecs on your use case is a prolonged studying train.
A lot of the GIS world is concentrated on metropolis and state-level issues so it is common to see Shapefiles partitioned to at the least the state stage. When your issues are throughout the entire of the USA or bigger, you are usually coping with datasets which might be at the least 100M rows in dimension. 100M-row datasets in PostgreSQL will be actually sluggish to work with.
It will even be good to see GEOS, GDAL and PROJ built-in into ClickHouse. This may give customers a lot of the performance of PostGIS in what is among the most performant open supply databases. 100M-row datasets are small by ClickHouse’s requirements however the lack of GIS performance is holding it again.
Codecs like GeoParquet ought to see sturdy adoption once they’re the popular file-based transport for USA-wide GIS datasets if there’s sturdy compatibility with a GIS-enhanced ClickHouse. I’ve seen a 400 GB PostgreSQL desk compress down into 16 GB of GeoParquet. Dragging recordsdata of that dimension onto QGIS on a laptop computer will render it unresponsive for a while however pointing QGIS at that type of dataset in a GIS-enhanced ClickHouse could be a distinct matter altogether.