Reaching for something like SUM(vals)
or AVG(vals)
is a common habit when using PostgreSQL. These aggregate functions offer users an easy, efficient way to compute results from a set of inputs.
How do they work? What makes them different than a function? How do we make one? What kinds of other uses exist?
We'll explore creating some basic ones using SQL, then create an extension that defines aggregates in Rust using pgx
0.3.0's new aggregate support.
Aggregates in the world
Aggregates have a number of uses, beyond the ones already mentioned (sum and average) we can find slightly less conceptually straightforward aggregates like:
- JSON/JSONB/XML 'collectors':
json_agg
,json_object_agg
,xmlagg
. - Bitwise operators:
bit_and
,bit_xor
, etc. - Some
std::iter::Iterator
equivalents: - PostGIS's geospacial aggregates such as
ST_3DExtent
which produces bounding boxes over geometry sets. - PG-Strom's HyperLogLog aggregates such as
hll_count
.
So, aggregates can collect items, work like a fold
, or do complex analytical math... how do we make one?
What makes an Aggregate?
Defining an aggregate is via CREATE AGGREGATE
and CREATE FUNCTION
, here's a reimplementation of sum
only for integer
:
(
state integer,
next integer
) RETURNS integer
LANGUAGE SQL
STRICT
AS $$
SELECT $1 + $2;
$$;
integer)
((
SFUNC = example_sum_state, -- State function
STYPE = integer, -- State type
INITCOND = '0' -- Must be a string or null
);
SELECT example_sum(value) FROM UNNEST(ARRAY [1, 2, 3]) as value;
-- example_sum
-- -------------
-- 6
-- (1 row)
Conceptually, the aggregate loops over each item in the input and runs the SFUNC
function on the current state as well as each value. That code is analogous to:
Aggregates are more than just a loop, though. If the aggregate specifies a combinefunc
PostgreSQL can run different instances of the aggregate over subsets of the data, then combine them later. This is called partial aggregation and enables different worker processes to handle data in parallel. Let's make our example_sum
aggregate above have a combinefunc
:
(Readers may note we could use example_sum_state
in this particular case, but not in general, so we're gonna make a new function for demonstration.)
(
first integer,
second integer
) RETURNS integer
LANGUAGE SQL
STRICT
AS $$
SELECT $1 + $2;
$$;
integer);
example_sum(
integer)
((
SFUNC = example_sum_state,
STYPE = integer,
INITCOND = '0',
combinefunc = example_sum_combine
);
SELECT example_sum(value) FROM generate_series(0, 4000) as value;
-- example_sum
-- -------------
-- 8002000
-- (1 row)
Here's one using FINALFUNC
, which offers a way to compute some final value from the state:
(
state text[],
next text
) RETURNS text[]
LANGUAGE SQL
STRICT
AS $$
SELECT array_append($1, $2);
$$;
(
state text[]
) RETURNS integer
LANGUAGE SQL
STRICT
AS $$
SELECT count(DISTINCT value) FROM UNNEST(state) as value
$$;
text)
((
SFUNC = example_uniq_state, -- State function
STYPE = text[], -- State type
INITCOND = '{}', -- Must be a string or null
FINALFUNC = example_uniq_final -- Final function
);
SELECT example_uniq(value) FROM UNNEST(ARRAY ['a', 'a', 'b']) as value;
-- example_uniq
-- --------------
-- 2
-- (1 row)
This is particularly handy as your STYPE
doesn't need to be the type you return!
Aggregates can take multiple arguments, too:
(
state text[],
first text,
second text,
third text
) RETURNS text[]
LANGUAGE SQL
STRICT
AS $$
SELECT array_append($1, concat($2, $3, $4));
$$;
text, text, text)
((
SFUNC = example_concat_state,
STYPE = text[],
INITCOND = '{}'
);
SELECT example_concat(first, second, third) FROM
UNNEST(ARRAY ['a', 'b', 'c']) as first,
UNNEST(ARRAY ['1', '2', '3']) as second,
UNNEST(ARRAY ['!', '@', '#']) as third;
-- example_concat
-- ---------------------------------------------------------------------------------------------------------------
-- {a1!,a2!,a3!,b1!,b2!,b3!,c1!,c2!,c3!,a1@,a2@,a3@,b1@,b2@,b3@,c1@,c2@,c3@,a1#,a2#,a3#,b1#,b2#,b3#,c1#,c2#,c3#}
-- (1 row)
See how we see a1
, b1
, and c1
? Multiple arguments might not work as you expect! As you can see, each argument is passed with each other argument.
SELECT UNNEST(ARRAY ['a', 'b', 'c']) as first,
UNNEST(ARRAY ['1', '2', '3']) as second,
UNNEST(ARRAY ['!', '@', '#']) as third;
-- first | second | third
-- -------+--------+-------
-- a | 1 | !
-- b | 2 | @
-- c | 3 | #
-- (3 rows)
Aggregates have several more optional fields, such as a PARALLEL
. Their signatures are documented in the CREATE AGGREGATE
documentation and this article isn't meant to be comprehensive.
Reminder: You can also create functions with
pl/pgsql
,c
, pl/Python, or even in the experimentalpl/Rust
.
Extensions can, of course, create aggregates too. Next, let's explore how to do that with Rust using pgx
0.3.0's Aggregate support.
Familiarizing with pgx
pgx
is a suite of crates that provide everything required to build, test, and package extensions for PostgreSQL versions 10 through 14 using pure Rust.
It includes:
cargo-pgx
: Acargo
plugin that provides commands likecargo pgx package
andcargo pgx test
,pgx
: A crate providing macros, high level abstractions (such as SPI), and low level generated bindings for PostgreSQL.pgx-tests
: A crate providing a test framework for running tests inside PostgreSQL.
Note: pgx
does not currently offer Windows support, but works great in WSL2.
If a Rust toolchain is not already installed, please follow the instructions on rustup.rs.
You'll also need to make sure you have some development libraries like zlib
and libclang
, as
cargo pgx init
will, by default, build it's own development PostgreSQL installs. Usually it's possible to
figure out if something is missing from error messages and then discover the required package for the system.
Install cargo-pgx
then initialize its development PostgreSQL installations (used for cargo pgx test
and cargo pgx run
):
# ...
We can create a new extension with:
Then run it:
# ...
)
)
)
exploring_aggregates=#
Observing the start of the src/lib.rs
file, we can see the pg_module_magic!()
and a function hello_exploring_aggregates
:
use *;
pg_module_magic!;
Back on our psql
prompt, we can load the extension and run the function:
CREATE EXTENSION exploring_aggregates;
-- CREATE EXTENSION
\dx+ exploring_aggregates
-- Objects in extension "exploring_aggregates"
-- Object description
-- ---------------------------------------
-- function hello_exploring_aggregates()
-- (1 row)
SELECT hello_exploring_aggregates ;
-- hello_exploring_aggregates
-- -----------------------------
-- Hello, exploring_aggregates
-- (1 row)
Next, let's run the tests:
)
)
)
)
; ; ; ; ;
)
; ; ; ; ;
; ; ; ; ;
We can also inspect the SQL the extension generates:
)
)
This creates sql/exploring_aggregates-0.0.0.sql
:
/*
This file is auto generated by pgx.
The ordering of items is not stable, it is driven by a dependency graph.
*/
-- src/lib.rs:5
-- exploring_aggregates::hello_exploring_aggregates
text /* &str */
RETURNS STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'hello_exploring_aggregates_wrapper';
Finally we can create a package for the pg_config
version installed on the system, this is done in release mode, so it takes a few minutes:
)
)
Let's make some aggregates with pgx
now!
Aggregates with pgx
While designing the aggregate support for pgx
0.3.0 we wanted to try to make things feel idiomatic and natural from the Rust side,
but it should be flexible enough for any use.
Aggregates in pgx
are defined by creating a type (this doesn't necessarily need to be the state type), then using the #[pg_aggregate]
procedural macro on an pgx::Aggregate
implementation for that type.
The pgx::Aggregate
trait has quite a few items (fn
s, const
s, type
s) that you can implement, but the procedural macro can fill in
stubs for all non-essential items. The state type (the implementation target by default) must have a #[derive(PostgresType)]
declaration,
or be a type PostgreSQL already knows about.
Here's the simplest aggregate you can make with pgx
:
use *;
use ;
pg_module_magic!;
We can review the generated SQL (generated via cargo pgx schema
):
/*
This file is auto generated by pgx.
The ordering of items is not stable, it is driven by a dependency graph.
*/
-- src/lib.rs:6
-- exploring_aggregates::DemoSum
;
-- src/lib.rs:6
-- exploring_aggregates::demosum_in
(
"input" cstring /* &cstr_core::CStr */
) RETURNS DemoSum /* exploring_aggregates::DemoSum */
IMMUTABLE PARALLEL SAFE STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'demosum_in_wrapper';
-- src/lib.rs:6
-- exploring_aggregates::demosum_out
(
"input" DemoSum /* exploring_aggregates::DemoSum */
) RETURNS cstring /* &cstr_core::CStr */
IMMUTABLE PARALLEL SAFE STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'demosum_out_wrapper';
-- src/lib.rs:6
-- exploring_aggregates::DemoSum
(
INTERNALLENGTH = variable,
INPUT = demosum_in, /* exploring_aggregates::demosum_in */
OUTPUT = demosum_out, /* exploring_aggregates::demosum_out */
STORAGE = extended
);
-- src/lib.rs:11
-- exploring_aggregates::demo_sum_state
(
"this" DemoSum, /* exploring_aggregates::DemoSum */
"arg_one" integer /* i32 */
) RETURNS DemoSum /* exploring_aggregates::DemoSum */
STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'demo_sum_state_wrapper';
-- src/lib.rs:11
-- exploring_aggregates::DemoSum
(
integer /* i32 */
)
(
SFUNC = "demo_sum_state", /* exploring_aggregates::DemoSum::state */
STYPE = DemoSum, /* exploring_aggregates::DemoSum */
INITCOND = '{ "count": 0 }' /* exploring_aggregates::DemoSum::INITIAL_CONDITION */
);
We can test it out with cargo pgx run
:
)
)
)
exploring_aggregates=#
Now we're connected via psql
:
CREATE EXTENSION exploring_aggregates;
-- CREATE EXTENSION
SELECT DemoSum(value) FROM generate_series(0, 4000) as value;
-- demosum
-- -------------------
-- {"count":8002000}
-- (1 row)
Pretty cool!
...But we don't want that silly {"count": ... }
stuff, just the number! We can resolve this by changing the State
type, or by adding a finalize
(which maps to ffunc
) as we saw in the previous section.
Let's change the State
this time:
;
Now when we run it:
SELECT DemoSum(value) FROM generate_series(0, 4000) as value;
-- demosum
-- ---------
-- 8002000
-- (1 row)
This is a fine reimplementation of SUM
so far, but as we saw previously we need a combine
(mapping to combinefunc
) to support partial aggregation:
We can also change the name of the generated aggregate, or set the PARALLEL
settings, for example:
This generates:
-- src/lib.rs:9
-- exploring_aggregates::DemoSum
(
integer /* i32 */
)
(
SFUNC = "demo_sum_state", /* exploring_aggregates::DemoSum::state */
STYPE = integer, /* i32 */
COMBINEFUNC = "demo_sum_combine", /* exploring_aggregates::DemoSum::combine */
INITCOND = '0', /* exploring_aggregates::DemoSum::INITIAL_CONDITION */
PARALLEL = UNSAFE /* exploring_aggregates::DemoSum::PARALLEL */
);
Rust state types
It's possible to use a non-SQL (say, HashSet<String>
) type as a state by using Internal
.
When using this strategy, a
finalize
function must be provided.
Here's a unique string counter aggregate that uses a HashSet
:
use *;
use HashSet;
pg_module_magic!;
;
We can test it:
SELECT DemoUnique(value) FROM UNNEST(ARRAY ['a', 'a', 'b']) as value;
-- demounique
-- ------------
-- 2
-- (1 row)
Using Internal
here means that the values it holds get dropped at the end of PgMemoryContexts::CurrentMemoryContext
, the aggregate context in this case.
Ordered-Set Aggregates
PostgreSQL also supports what are called Ordered-Set Aggregates. Ordered-Set Aggregates can take a direct argument, and specify a sort ordering for the inputs.
PostgreSQL does not order inputs behind the scenes!
Let's create a simple percentile_disc
reimplementation to get an idea of how to make one with pgx
. You'll notice we add ORDERED_SET = true
and set an (optional) OrderedSetArgs
, which determines the direct arguments.
;
This creates SQL like:
-- src/lib.rs:9
-- exploring_aggregates::DemoPercentileDisc
(
"percentile" double precision /* f64 */
ORDER BY
"input" integer /* i32 */
)
(
SFUNC = "demo_percentile_disc_state", /* exploring_aggregates::DemoPercentileDisc::state */
STYPE = internal, /* pgx::datum::internal::Internal */
FINALFUNC = "demo_percentile_disc_finalize" /* exploring_aggregates::DemoPercentileDisc::final */
);
We can test it like so:
SELECT DemoPercentileDisc(0.5) WITHIN GROUP (ORDER BY income) FROM UNNEST(ARRAY [6000, 70000, 500]) as income;
-- demopercentiledisc
-- --------------------
-- 6000
-- (1 row)
SELECT DemoPercentileDisc(0.05) WITHIN GROUP (ORDER BY income) FROM UNNEST(ARRAY [5, 100000000, 6000, 70000, 500]) as income;
-- demopercentiledisc
-- --------------------
-- 5
-- (1 row)
Moving-Aggregate mode
Aggregates can also support moving-aggregate mode, which can remove inputs from the aggregate as well.
This allows for some optimization if you are using aggregates as window functions. The documentation explains that this is because PostgreSQL doesn't need to recalculate the aggregate each time the frame starting point moves.
Moving-aggregate mode has it's own moving_state
function as well as an moving_state_inverse
function for removing inputs. Because moving-aggregate mode may require some additional tracking on the part of the aggregate, there is also a MovingState
associated type as well as a moving_state_finalize
function for any specialized final computation.
Let's take our sum example above and add moving-aggregate mode support to it:
;
This generates:
-- src/lib.rs:8
-- exploring_aggregates::DemoSum
(
integer /* i32 */
)
(
SFUNC = "demo_sum_state", /* exploring_aggregates::DemoSum::state */
STYPE = integer, /* i32 */
COMBINEFUNC = "demo_sum_combine", /* exploring_aggregates::DemoSum::combine */
INITCOND = '0', /* exploring_aggregates::DemoSum::INITIAL_CONDITION */
MSFUNC = "demo_sum_moving_state", /* exploring_aggregates::DemoSum::moving_state */
MINVFUNC = "demo_sum_moving_state_inverse", /* exploring_aggregates::DemoSum::moving_state_inverse */
MINITCOND = '0', /* exploring_aggregates::DemoSum::MOVING_INITIAL_CONDITION */
PARALLEL = UNSAFE, /* exploring_aggregates::DemoSum::PARALLEL */
MSTYPE = integer /* exploring_aggregates::DemoSum::MovingState = i32 */
);
Using it (we'll also turn on logging to see what happens with SET client_min_messages TO debug;
):
SET client_min_messages TO debug;
-- SET
SELECT demo_sum(value) OVER (
ROWS CURRENT ROW
) FROM UNNEST(ARRAY [1, 20, 300, 4000]) as value;
-- LOG: moving_state(0, 1)
-- LOG: moving_state(0, 20)
-- LOG: moving_state(0, 300)
-- LOG: moving_state(0, 4000)
-- demo_sum
-- ----------
-- 1
-- 20
-- 300
-- 4000
-- (4 rows)
Inside the OVER ()
we can use syntax for window function calls.
SELECT demo_sum(value) OVER (
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) FROM UNNEST(ARRAY [1, 20, 300, 4000]) as value;
-- LOG: moving_state(0, 1)
-- LOG: moving_state(1, 20)
-- LOG: moving_state(21, 300)
-- LOG: moving_state(321, 4000)
-- demo_sum
-- ----------
-- 4321
-- 4321
-- 4321
-- 4321
-- (4 rows)
SELECT demo_sum(value) OVER (
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
) FROM UNNEST(ARRAY [1, 20, 300, 4000]) as value;
-- LOG: moving_state(0, 1)
-- LOG: moving_state(1, 20)
-- LOG: moving_state_inverse(21, 1)
-- LOG: moving_state(20, 300)
-- LOG: moving_state_inverse(320, 20)
-- LOG: moving_state(300, 4000)
-- demo_sum
-- ----------
-- 1
-- 21
-- 320
-- 4300
-- (4 rows)
SELECT demo_sum(value) OVER (
ORDER BY sorter
ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING
) FROM (
VALUES (1, 10000),
(2, 1)
) AS v (sorter, value);
-- LOG: moving_state(0, 10000)
-- LOG: moving_state(10000, 1)
-- LOG: moving_state_inverse(10001, 10000)
-- demo_sum
-- ----------
-- 10001
-- 1
-- (2 rows)
Wrapping up
I had a lot of fun implementing the aggregate support for pgx
, and hope you have just as much fun using it! If you have questions, open up an issue.
Moving-aggregate mode is pretty new to me, and I'm still learning about it! If you have any good resources I'd love to recieve them from you!
If you're looking for more materials about aggregates, the TimescaleDB folks wrote about aggregates and how they impacted their hyperfunctions in this article. Also, My pal Tim McNamara wrote about how to implement harmonic and geometric means as aggregates in this article.