Alex's Slip-box

These are my org-mode notes in sort of Zettelkasten style

Large SQL data migrations

:ID: F544CDDD-34B9-4475-B265-1139F18D9090

# Use PL/pgSQL and batches

See SQL Procedural Language. This is useful for batching (see below).

# Transaction Management

One important caveat here is that we can’t COMMIT transactions in PL/pgSQL. This becomes an issue when using LOOP on batched operations where it would be ideal to commit on each iteration of a loop. For this, we need to use PostgreSQL Stored Procedures See plpgsql transaction management docs.

See also https://newbedev.com/postgresql-cannot-begin-end-transactions-in-pl-pgsql

# Stored Procedures

Can be used in PG versions 11+

  • Provide feedback to stdout
  • commit at the conclusion of each loop
CREATE OR REPLACE PROCEDURE update_things_foo_to_bar()
  LANGUAGE plpgsql
  AS $$

  DECLARE
    current_id INT := (select min(id) from things);
    max_id INT := (select max(id) from things);
    batch_size INT := 1000;
    rows_updated INT;

  BEGIN
    WHILE current_id <= max_id LOOP
      UPDATE things
      SET foo = 'bar'
      WHERE id >= current_id
      AND id < current_id + batch_size;

      GET DIAGNOSTICS rows_updated = ROW_COUNT;

      COMMIT;

      RAISE NOTICE 'current_id: %; rows_updated: %', current_id, rows_updated;

      current_id := current_id + batch_size;
    END LOOP;
  END $$

# dblink extension

For older versions of Postgres, one hack is to use the dblink extension, build a SQL string, execute it on the dblink connection and COMMIT. Note the use of format to interpolate any variables into the SQL string. Variables declared in the script’s scope won’t be available to be evaluated on the dblink context.

The overall structure here is two functions:

  1. An outer function that handles looping.
  2. An inner function called on each loop iteration that performs the data operation (ie, inserts and/or updates).
  3. Call the outer function to kick it off.

For example:

CREATE OR replace function f_migrate_things_batch(batch_start, batch_end)
RETURNS void AS
$$
    PERFORM dblink_connect('my_dblink','dbname=my_database port=5432 user=username');

    sql_string := $query$
      -- Some long SQL string that references batch_start/batch_end as `%1$s` and `%2$s`
    $query$;

    -- Interpolate dynamic variables since dblink won't have access to them.
    sql_string := format(sql_string, batch_start, batch_end);

    PERFORM dblink('my_dblink', sql_string);
    PERFORM dblink('my_dblink','COMMIT;');
    PERFORM dblink_disconnect('my_dblink');
$$
LANGUAGE plpgsql;

CREATE OR replace function f_migrate_things_in_batches()
RETURNS void AS
$$

    DECLARE
        row_count   integer := 0;
        batch_size  integer := 5000;
        batch_start integer := 1;
        batch_end   integer := batch_size;

    BEGIN

        CREATE EXTENSION IF NOT EXISTS dblink;

        row_count := (SELECT count(*) FROM things);
        RAISE NOTICE '% things to update', row_count;

        WHILE row_count > 0
            LOOP
                PERFORM f_migrate_things_batch(batch_start, batch_end);

                batch_start := batch_start + batch_size
                batch_end := batch_end + batch_size;
                row_count := row_count - batch_size;

                RAISE NOTICE '% things remaining', row_count;

            END LOOP;

    END;
$$
LANGUAGE plpgsql;

select f_migrate_things_in_batches();

# Drop and recreate indexes

This can speed things up.

# Use batches

Here I’m just using a counter to increment by a certain batch size. This is a kind of boiler plate template for doing this.

Also, there is the use of GET DIAGNOSTICS which is a handy tool to get information about the previously executed statement. ROW_COUNT is one of the items that can be used with GET DIAGNOSTICS and is not the same as row_count variable in the example.

DO $$
DECLARE
    row_count   integer := 0;
    batch_size  integer := 5000;
    batch_start integer := 1;
    batch_end   integer := batch_size;
    affected    integer;

BEGIN
    row_count := (SELECT count(*) FROM things);
    RAISE NOTICE '% things to update', row_count;

    WHILE row_count > 0
        LOOP

            -- DO STUFF LIKE INSERT OF UPDATE records
            -- UPDATE things...

            GET DIAGNOSTICS affected = ROW_COUNT;
            RAISE NOTICE '% things migrated', affected;

            batch_start := batch_start + batch_size;
            batch_end := batch_end + batch_size;
            row_count := row_count - batch_size;

            RAISE NOTICE '% things remaining', row_count;
        END LOOP;
END $$

# Dealing with constraints

A nice feature is ON CONFLICT (UPSERT) for taking certain actions when encountering issue with constraints so it doesn’t blow up the entire migration.

Note that the UPDATE action cannot affect the same row more than once.

# Resources

Search Results