Now Reading
Sooner PostgreSQL To BigQuery Transfers

Sooner PostgreSQL To BigQuery Transfers

2023-01-10 05:33:53

For a lot of the previous yr, I’ve been working with Hexvarium. Based mostly in Atherton, California, the corporate builds and manages fiber-optic networks. At current, they’ve a handful of networks within the Bay Space however have plans to broaden throughout the US.

The act of planning out how a fiber-optic community might be deployed throughout a metropolis has many constraints to think about. The method is capital-intensive so that you need the community to archive uptake as quickly as doable. My workforce builds optimum community roll-out plans. That is executed by mixing over 70 datasets collectively and weighing their metrics towards a lot of enterprise objectives. The next is an illustrative instance of a roll-out plan.

Illustrative order of deployment

The above plans are produced by LocalSolver which is fed knowledge from BigQuery. Nearly all of datasets we use arrive as Shapefiles which can’t be loaded into BigQuery immediately. If a dataset is sufficiently small or wants a whole lot of geospatial enrichments, we are going to load it into PostgreSQL with the PostGIS extension put in.

Beneath is a figurative instance of a Shapefile being imported into PostgreSQL.

$ ogr2ogr --config PG_USE_COPY YES 
          -f PGDump 
          /vsistdout/ 
          a00000001.gdbtable 
          -lco SRID=32631 
          -lco SCHEMA=schema_name 
    | psql geodata

If a dataset has 100M+ information and must be joined to a different dataset with at the least 10M+ information, then we’ll load it into ClickHouse as a substitute. ClickHouse has taken a few of our JOINs that may take 2-3 days in PostgreSQL and accomplished them in underneath an hour. It may additionally scale up particular person queries by merely including extra CPU cores. That stated, ClickHouse lacks a lot of the geospatial performance present in PostGIS so we can not use it for all workloads.

Beneath is a figurative instance of a Shapefile being imported into ClickHouse utilizing GeoJSON as an middleman format. To keep away from timeouts, the JSONL file is produced first after which imported into ClickHouse quite than being streamed immediately with a single BASH command.

$ ogr2ogr -f GeoJSONSeq 
          /vsistdout/ 
          -s_srs EPSG:4326 
          -t_srs EPSG:4326 
          a00000001.gdbtable 
    | jq -S '.properties * {geom: .geometry}
                | with_entries(.key |= ascii_downcase)' 
    > out.jsonl

$ cat out.jsonl 
    | clickhouse consumer 
        --multiquery 
        --query='SET input_format_skip_unknown_fields=1;
                 INSERT INTO table_name
                 FORMAT JSONEachRow'

Previously, I might dump PostgreSQL tables out to CSV recordsdata, compress them after which load them into BigQuery. However just lately I’ve begun utilizing ClickHouse to export PostgreSQL tables to Parquet and cargo that into BigQuery as a substitute. This has resulted in sooner load occasions.

On this submit, I am going to stroll via one such instance.

Putting in Conditions

The VM used on this submit is an e2-highcpu-32 with 32 vCPUs and 32 GB of RAM operating Ubuntu 20.04 LTS. It’s operating in Google Cloud’s us-west2-a area in Los Angeles. With a 400 GB balanced persistent disk, this technique prices $1.02 / hour to run.

$ wget -qO- 
    https://www.postgresql.org/media/keys/ACCC4CF8.asc 
        | sudo apt-key add -
$ echo "deb http://apt.postgresql.org/pub/repos/apt/ focal-pgdg primary 15" 
    | sudo tee /and so on/apt/sources.record.d/pgdg.record

$ sudo apt replace
$ sudo apt set up 
    gdal-bin 
    jq 
    pigz 
    postgresql-15 
    python3-pip 
    python3-virtualenv

The instructions used on this submit are operating on Ubuntu however they will additionally run on macOS. To put in them by way of Homebrew run the next:

$ brew set up 
    gdal 
    jq 
    pigz 
    postgresql 
    virtualenv

I am going to use PyArrow to look at a Parquet file on the finish of this submit.

$ virtualenv ~/.pq
$ supply ~/.pq/bin/activate
$ python -m pip set up 
    pyarrow 
    fastparquet 
    shpyx

ClickHouse will be put in by way of the next:

$ curl https://clickhouse.com/ | sh

This submit makes use of PostgreSQL 15 and ClickHouse v22.9.7.34. Each of those instruments launch new variations ceaselessly.

I am going to use Rustup to put in Rust model 1.66.0.

$ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

Google maintains installation notes for BigQuery’s Python-based consumer.

The Buildings Dataset

Beneath is an instance file from Microsoft’s Buildings dataset. I’ve loaded it into PostgreSQL and enriched it with H3 hexagon values. It has 130,099,920 information, is 28 GB in PostgreSQL’s inner format and takes up 61.6 GB of logical area as soon as loaded into BigQuery.

x on
SELECT *
FROM microsoft_buildings
LIMIT 1;
revision  | 2
from_date | 2019-07-17
to_date   | 2019-09-25
geom      | 010200000005000000F148BC3C9DAB57C07BBE66B96C844340E6E8F17B9BAB57C0AB4203B16C844340FE2AC0779BAB57C02B85402E71844340098B8A389DAB57C0FC00A43671844340F148BC3C9DAB57C07BBE66B96C844340
h3_7      | 872656731ffffff
h3_8      | 8826567311fffff
h3_9      | 8926567311bffff

The geom column is in Well-known Binary (WKB) format. The above worth will be expressed in GeoJSON as the next:

{"kind": "LineString",
 "coordinates":[[-94.681472, 39.034568],
                [-94.681365, 39.034567],
                [-94.681364, 39.034704],
                [-94.681471, 39.034705],
                [-94.681472, 39.034568]]}

The next exhibits a purple rendering of the above geometry with an OpenStreetMap basemap in QGIS.

QGIS Rendering

Transporting with CSV

The next will ship the above desk from PostgreSQL to BigQuery utilizing GZIP-compressed CSVs.

copy (SELECT revision,
              from_date,
              to_date,
              geom,
              h3_7,
              h3_8,
              h3_9
         FROM microsoft_buildings)
   TO PROGRAM 'break up --line-bytes=4000000000
                     --filter="pigz -1 > $FILE.csv.gz"'
     WITH CSV;
$ tr 'n' ' ' < dump.sql | psql geodata

The biggest uncompressed CSV file dimension BigQuery will settle for in a single load command is 4 GB. The above will break up the CSV knowledge into separate recordsdata with nobody file exceeding this restrict. The CSV recordsdata are then compressed with the quickest setting GZIP helps. Greater compression settings have by no means resulted in sooner end-to-end load occasions in my expertise.

These are the ensuing 12 GB of GZIP-compressed CSVs and their particular person file sizes.

xaa.csv.gz 1.3 GB
xab.csv.gz 1.3 GB
xac.csv.gz 1.3 GB
xad.csv.gz 1.3 GB
xae.csv.gz 1.3 GB
xaf.csv.gz 1.3 GB
xag.csv.gz 1.3 GB
xah.csv.gz 1.3 GB
xai.csv.gz 1.3 GB
xaj.csv.gz 481 MB

The next will load the CSVs into BigQuery in parallel.

$ ls *.csv.gz 
    | xargs -n1 
            -P10 
            -I% 
            bash -c "bq load dataset.table_name ./% revision:INT64,from_date:DATE,to_date:DATE,geom:GEOGRAPHY,h3_7:STRING,h3_8:STRING,h3_9:STRING"

The above psql and bq load instructions collectively took 18 minutes and 11 seconds to run.

Transporting with Parquet

The next will arrange a desk in ClickHouse that sources its knowledge from the microsoft_buildings desk in PostgreSQL.

CREATE TABLE pg_ch_bq_microsoft_buildings (
    revision  Nullable(Int32),
    from_date Nullable(DATE),
    to_date   Nullable(DATE),
    geom      Nullable(String),
    h3_7      Nullable(String),
    h3_8      Nullable(String),
    h3_9      Nullable(String))
ENGINE = PostgreSQL('localhost:5432',
                    'airflow',
                    'microsoft_buildings',
                    'username',
                    'password');

I am going to create a desk that’s native to ClickHouse. This avoids any extra single-threaded fetches again to PostgreSQL afterward.

CREATE TABLE microsoft_buildings ENGINE = Log() AS
SELECT *
FROM pg_ch_bq_microsoft_buildings;

I am going to then create a script to dump the contents of the ClickHouse desk throughout 14 Snappy-compressed Parquet recordsdata.

from   multiprocessing import Pool

from   shpyx import run as _exec


def ch(manifest):
    offset, filename = manifest

    sql = '''SELECT *
             FROM   microsoft_buildings
             LIMIT  10000000
             OFFSET %d
             FORMAT Parquet''' % offset
    cmd = "clickhouse consumer --query='%s' > %s" % 
                (sql.change('n', ' '), filename)
    print(cmd)
    _exec(cmd)


payload = [(x * 10000000, 'out%s.pq' % str(x + 1))
           for x in range(0, 14)]

pool = Pool(14)
pool.map(ch, payload)

I am going to then run that script and cargo the ensuing 16 GB of Parquet recordsdata into BigQuery.

$ python run.py
$ ls out*.pq 
    | xargs -n1 
            -P14 
            -I% 
            bash -c "bq load --source_format=PARQUET geodata.pq_test2 ./% revision:INT64,from_date:DATE,to_date:DATE,geom:GEOGRAPHY,h3_7:STRING,h3_8:STRING,h3_9:STRING"

The above took 14 minutes and 31 seconds to import, dump and transport from my VM to BigQuery. It is a 1.25x speed-up over utilizing GZIP-compressed CSVs for transport.

The 16 GB of Parquet is 4 GB bigger than the GZIP-compressed CSV recordsdata however the disk area and bandwidth financial savings didn’t have a constructive influence on the end-to-end switch time. So as to add to this, ClickHouse took higher benefit of the cores out there throughout the 14-thread dump to Parquet versus what pigz might do.

The Parquet recordsdata use Snappy compression. Snappy was designed to compress sooner whereas accepting the trade-off of decrease compression ratios in comparison with different codecs. Storage and community throughput charges have elevated sooner than single-core compute throughput has up to now 20 years. When a file might be consumed by thousands and thousands of individuals, the compression ratio is essential and it is value taking longer to compress. However when a file is simply ever consumed as soon as, it is extra vital to get to the purpose the place the file has completed decompressing on the buyer’s facet as shortly as doable.

ClickHouse’s Snappy Compression

I am going to dump a single Parquet file of the dataset after which decide it aside beneath.

$ clickhouse consumer 
    --query='SELECT revision,
                    from_date,
                    to_date,
                    geom,
                    h3_7,
                    h3_8,
                    h3_9
             FROM   pg_ch_bq_microsoft_buildings
             FORMAT Parquet' 
    > out.pq

$ python

The 130,099,920 information have been damaged up into 1,987-row teams.

from collections import Counter
from operator    import itemgetter
from pprint      import pprint

import pyarrow.parquet as pq


pf = pq.ParquetFile('out.pq')
print(pf.metadata)
<pyarrow._parquet.FileMetaData object at 0x7fe07f37ab80>
  created_by: parquet-cpp model 1.5.1-SNAPSHOT
  num_columns: 7
  num_rows: 130099920
  num_row_groups: 1987
  format_version: 1.0
  serialized_size: 3839172

Every row group has 65,505 rows or much less.

print(Counter(pf.metadata.row_group(rg).num_rows
              for rg in vary(0, pf.metadata.num_row_groups)))
Counter({65505: 1986, 6990: 1})

Each column has been compressed utilizing Snappy.

print(set(pf.metadata.row_group(rg).column(col).compression
          for col in vary(0, pf.metadata.num_columns)
          for rg in vary(0, pf.metadata.num_row_groups)))

2.6:1 was the best compression ratio achieved amongst any of the columns in any of the row teams. Beneath are the main points of the column that achieved this ratio.

lowest_val, lowest_rg, lowest_col = None, None, None

for rg in vary(0, pf.metadata.num_row_groups):
    for col in vary(0, pf.metadata.num_columns):
        x = pf.metadata.row_group(rg).column(col)

        ratio = x.total_compressed_size / x.total_uncompressed_size

        if not lowest_val or lowest_val > ratio:
            lowest_val = ratio
            lowest_rg, lowest_col = rg, col

print(pf.metadata.row_group(lowest_rg).column(lowest_col))
<pyarrow._parquet.ColumnChunkMetaData object at 0x7fe063224810>
  file_offset: 12888987187
  file_path:
  physical_type: BYTE_ARRAY
  num_values: 65505
  path_in_schema: h3_9
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7fe063211d10>
      has_min_max: True
      min: b'890c732020fffff'
      max: b'8944db2db5bffff'
      null_count: 0
      distinct_count: 0
      num_values: 65505
      physical_type: BYTE_ARRAY
      logical_type: None
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 12888564836
  data_page_offset: 12888855960
  total_compressed_size: 422351
  total_uncompressed_size: 1101585

However of the 7 columns throughout the 1,987-row teams, over 30% have a compression ratio of 1:0.9 or worse. It is questionable if it was value compressing these fields within the first place.

ratios = []

for rg in vary(0, pf.metadata.num_row_groups):
    for col in vary(0, pf.metadata.num_columns):
        x = pf.metadata.row_group(rg).column(col)
        ratio = x.total_compressed_size / x.total_uncompressed_size
        ratios.append('%.1f' % ratio)

pprint(sorted(Counter(ratios).gadgets(),
              key=itemgetter(0)))
[('0.4', 2345),
 ('0.5', 4020),
 ('0.6', 1201),
 ('0.7', 841),
 ('0.8', 1236),
 ('0.9', 1376),
 ('1.0', 2890)]

In 2016, Snappy had its heuristics elevated to be extra aggressive at avoiding compressing knowledge that’s unlikely to yield sturdy outcomes. However with ratios like 1:0.9, it will be fascinating to see if this might be taken additional by turning off compression on a per-column foundation.

Google additionally has a drop-in S2 extension for Snappy however help has but to seek out its manner into BigQuery. After I reviewed a Go and Assembler implementation of S2 in 2021, I discovered it might velocity up compression by 4.2x and decompression 1.7x over common Snappy. Klaus Submit, the creator of the S2 utility, famous the speed-up ought to have been even greater had it not been for the I/O limitations of the laptop computer I ran the benchmark on.

Klaus has since pointed me to a benchmarks sheet the place he managed to scale back a ~6 GB JSON file by 83% at a price of 15 GB/s on a 16-core, AMD Ryzen 3950X. LZ4 took 10x longer to attain the same compression ratio on the identical workload and GZIP took 74x longer. Snappy solely ever managed a 75% discount however even that was at a price 16.5x slower than S2’s 82% discount price.

Can Rust Do It Sooner?

BigQuery additionally supports LZO and ZSTD compression in Parquet recordsdata so there’s potential for additional optimisations. I discovered json2parquet which is a Rust-based utility that may convert JSONL into Parquet recordsdata. It would not help LZO presently however Snappy and ZStandard are each supported.

Shapefiles will be transformed into JSONL shortly with ogr2ogr so doubtlessly ClickHouse might be faraway from the pipeline if json2parquet is fast sufficient.

See Also

I produced a JSONL extract of the California dataset. It has 11M information and is 3 GB in uncompressed JSONL format.

$ ogr2ogr -f GeoJSONSeq /vsistdout/ California.geojson 
    | jq -c '.properties * tostring' 
    > California.jsonl
$ head -n1 California.jsonl | jq .
{
  "launch": 1,
  "capture_dates_range": "",
  "geom": "{"kind":"Polygon","coordinates":[[[-114.127454,34.265674],[-114.127476,34.265839],[-114.127588,34.265829],[-114.127565,34.265663],[-114.127454,34.265674]]]}"
}

I then transformed that file into Parquet with ClickHouse. It took 32 seconds on my 2020 MacBook Professional. The ensuing file is 793 MB in dimension.

$ cat California.jsonl 
    | clickhouse native 
          --input-format JSONEachRow 
          -q "SELECT *
              FROM desk
              FORMAT Parquet" 
    > cali.snappy.pq

The then did the identical with json2parquet. The next was compiled with rustc 1.66.0 (69f9c33d7 2022-12-12).

$ git clone https://github.com/domoritz/json2parquet/
$ cd json2parquet
$ RUSTFLAGS='-Ctarget-cpu=native' 
    cargo construct --release

The next took 43.8 seconds to transform the JSONL into PQ with a ensuing file dimension of 815 MB.

$ goal/launch/json2parquet 
    -c snappy 
    California.jsonl 
    California.snappy.pq

The one main distinction I discovered was that json2parquet produced 12 row teams in its Parquet file and ClickHouse produced 306.

I did produce a ZStandard-compressed Parquet file as properly with json2parquet and though the ensuing file was 531 MB, it took 48 seconds to provide. Neither of those will cut back the compression overhead in ClickHouse.

I would prefer to see if ClickHouse might produce a ZStandard-compressed Parquet file sooner than its Snappy-compressed recordsdata however there are no CLI flags to change the Parquet compression codec presently.

Digging into the Rust Executable

I did verify that the Rust binary is utilizing vectorised directions. The next was on an Ubuntu 20 machine.

$ RUSTFLAGS='--emit asm -Ctarget-cpu=native' 
    cargo construct --release
$ grep -P 'television[a-ilmopr-uxz][a-il-vx]{2}[a-z0-9]{0,10}' 
      goal/launch/deps/parquet-9a152318b60fbda6.s 
      | head
vpxor   %xmm0, %xmm0, %xmm0
vpcmpeqd    %xmm0, %xmm0, %xmm0
vmovdqa %xmm0, (%rsp)
vpxor   %xmm0, %xmm0, %xmm0
vpcmpeqd    %xmm0, %xmm0, %xmm0
vmovdqa %xmm0, (%rsp)
vmovdqu 8(%rax), %xmm0
vmovdqu 8(%rax), %xmm1
vmovq   %xmm0, %rsi
vmovdqa %xmm1, (%rsp)

I then ran 10 traces of JSON via each ClickHouse and json2parquet to see what strace stories.

$ sudo strace -wc 
    goal/launch/json2parquet 
    -c snappy 
    ../cali10.jsonl 
    check.snappy.pq
% time     seconds  usecs/name     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
 61.30    0.007141          16       434           write
 21.66    0.002523         280         9           openat
  4.33    0.000504          19        26           mmap
  2.73    0.000318         317         1           execve
  2.09    0.000244          16        15           learn
...

There have been 434 calls to write made by json2parquet which is the place it spent 61% of its time. ClickHouse solely made 2 calls to write with the identical workload.

$ sudo strace -wc 
    clickhouse native 
          --input-format JSONEachRow 
          -q "SELECT *
              FROM desk
              FORMAT Parquet" 
    < ../cali10.jsonl 
    > cali.snappy.pq
% time     seconds  usecs/name     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
 29.52    0.019018        1584        12           futex
 21.15    0.013625          63       214           gettid
 11.19    0.007209         514        14           mprotect
 11.06    0.007123         791         9         4 stat
  8.72    0.005617         108        52           shut
  5.16    0.003327        1109         3           ballot
  2.19    0.001412          23        60           mmap
  2.09    0.001344          39        34         1 openat
  1.27    0.000816          18        44           learn
...
  0.15    0.000098          48         2           write

I assumed that there might be an extreme variety of context switches or web page faults slowing json2parquet down however each of those occurred an order of magnitude extra with ClickHouse.

$ sudo apt-get set up 
    linux-tools-common 
    linux-tools-generic 
    linux-tools-`uname -r`
$ sudo perf stat -dd 
    goal/launch/json2parquet 
    -c snappy 
    ../cali10.jsonl 
    check.snappy.pq
  1      context-switches          #  201.206 /sec
198      page-faults               #   39.839 Okay/sec
$ sudo perf stat -dd 
    clickhouse native 
          --input-format JSONEachRow 
          -q "SELECT *
              FROM desk
              FORMAT Parquet" 
    < ../cali10.jsonl 
    > cali.snappy.pq
  44      context-switches          #  372.955 /sec
4997      page-faults               #   42.356 Okay/sec

Dominik Moritz, a researcher at Apple and one of many co-authors of Altair is the developer behind json2parquet. I raised a ticket asking him if there are any compilation settings I might alter.

However given his undertaking is made up of 193 traces of Rust and is basically depending on the Rust implementation of Apache Arrow, I believe that the efficiency bottleneck is both in Apache Arrow or that extra compression settings should be uncovered if there are any efficiency enhancements to be discovered.

A day after I initially seemed into this I ran the benchmark on a recent 16-core VM whereas collecting telemetry for the Apache Arrow workforce. The efficiency distinction ended up being 4x between json2parquet and ClickHouse. Throughout the re-run, I noticed json2parquet maxing out a single core whereas ClickHouse was higher capable of reap the benefits of a number of cores.

How’s Python’s Efficiency?

I discovered utilizing PyArrow was 1.38x slower, Awkward Array was 1.5x slower than ClickHouse and fastparquet was 3.3x slower.

After elevating the above efficiency tickets I made a decision to be truthful to the ClickHouse undertaking and see if there have been any apparent performance improvements I might make with their providing as properly.

Fixing GIS Knowledge Supply

It will be nice to see knowledge distributors ship knowledge straight into the Cloud Databases of their clients. It will save a whole lot of consumer time that is spent changing and importing recordsdata. The Switch from Shapefile marketing campaign notes there are at the least 80 vector-based GIS file codecs in use and figuring out the professionals and cons of every of those codecs on your use case is a prolonged studying train.

A lot of the GIS world is concentrated on metropolis and state-level issues so it is common to see Shapefiles partitioned to at the least the state stage. When your issues are throughout the entire of the USA or bigger, you are usually coping with datasets which might be at the least 100M rows in dimension. 100M-row datasets in PostgreSQL will be actually sluggish to work with.

It will even be good to see GEOS, GDAL and PROJ built-in into ClickHouse. This may give customers a lot of the performance of PostGIS in what is among the most performant open supply databases. 100M-row datasets are small by ClickHouse’s requirements however the lack of GIS performance is holding it again.

Codecs like GeoParquet ought to see sturdy adoption once they’re the popular file-based transport for USA-wide GIS datasets if there’s sturdy compatibility with a GIS-enhanced ClickHouse. I’ve seen a 400 GB PostgreSQL desk compress down into 16 GB of GeoParquet. Dragging recordsdata of that dimension onto QGIS on a laptop computer will render it unresponsive for a while however pointing QGIS at that type of dataset in a GIS-enhanced ClickHouse could be a distinct matter altogether.

Source Link

What's Your Reaction?
Excited
0
Happy
0
In Love
0
Not Sure
0
Silly
0
View Comments (0)

Leave a Reply

Your email address will not be published.

2022 Blinking Robots.
WordPress by Doejo

Scroll To Top