Created
November 26, 2025 15:42
-
-
Save TCotton/4a07040627cd1812daa8d71fd34cf303 to your computer and use it in GitHub Desktop.
Effect PostgreSQL and Drizzle with error handling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { DbSchema } from '@repo/database/schema' | |
| import { type ExtractTablesWithRelations } from 'drizzle-orm' | |
| import { drizzle, type NodePgDatabase, type NodePgQueryResultHKT } from 'drizzle-orm/node-postgres' | |
| import { type PgTransaction } from 'drizzle-orm/pg-core' | |
| import { Cause, Context, Data, Effect, Exit, Layer, Option, Redacted, Runtime } from 'effect' | |
| import * as pg from 'pg' | |
| type TransactionClient = PgTransaction< | |
| NodePgQueryResultHKT, | |
| typeof DbSchema, | |
| ExtractTablesWithRelations<typeof DbSchema> | |
| > | |
| type Client = NodePgDatabase<typeof DbSchema> & { | |
| $client: pg.Pool | |
| } | |
| type TransactionContextShape = <U>( | |
| fn: (client: TransactionClient) => Promise<U> | |
| ) => Effect.Effect<U, DatabaseError> | |
| export class TransactionContext extends Context.Tag('TransactionContext')< | |
| TransactionContext, | |
| TransactionContextShape | |
| >() { | |
| public static readonly provide = ( | |
| transaction: TransactionContextShape | |
| ): <A, E, R>( | |
| self: Effect.Effect<A, E, R> | |
| ) => Effect.Effect<A, E, Exclude<R, TransactionContext>> => Effect.provideService(this, transaction) | |
| } | |
| export class DatabaseError extends Data.TaggedError('DatabaseError')<{ | |
| readonly type: 'unique_violation' | 'foreign_key_violation' | 'connection_error' | |
| readonly cause: pg.DatabaseError | |
| }> { | |
| public override toString() { | |
| return `DatabaseError: ${this.cause.message}` | |
| } | |
| public get message() { | |
| return this.cause.message | |
| } | |
| } | |
| const matchPgError = (error: unknown) => { | |
| if (error instanceof pg.DatabaseError) { | |
| switch (error.code) { | |
| case '23505': | |
| return new DatabaseError({ type: 'unique_violation', cause: error }) | |
| case '23503': | |
| return new DatabaseError({ type: 'foreign_key_violation', cause: error }) | |
| case '08000': | |
| return new DatabaseError({ type: 'connection_error', cause: error }) | |
| } | |
| } | |
| return null | |
| } | |
| export class DatabaseConnectionLostError extends Data.TaggedError('DatabaseConnectionLostError')<{ | |
| cause: unknown | |
| message: string | |
| }> {} | |
| export type Config = { | |
| url: Redacted.Redacted | |
| ssl: boolean | |
| } | |
| const makeService = (config: Config) => | |
| Effect.gen(function*() { | |
| const pool = yield* Effect.acquireRelease( | |
| Effect.sync( | |
| () => | |
| new pg.Pool({ | |
| connectionString: Redacted.value(config.url), | |
| ssl: config.ssl, | |
| idleTimeoutMillis: 0, | |
| connectionTimeoutMillis: 0 | |
| }) | |
| ), | |
| (pool) => Effect.promise(() => pool.end()) | |
| ) | |
| yield* Effect.tryPromise(() => pool.query('SELECT 1')).pipe( | |
| Effect.timeoutFail({ | |
| duration: '10 seconds', | |
| onTimeout: () => | |
| new DatabaseConnectionLostError({ | |
| cause: new Error('[Database] Failed to connect: timeout'), | |
| message: '[Database] Failed to connect: timeout' | |
| }) | |
| }), | |
| Effect.catchTag( | |
| 'UnknownException', | |
| (error) => | |
| new DatabaseConnectionLostError({ | |
| cause: error.cause, | |
| message: '[Database] Failed to connect' | |
| }) | |
| ), | |
| Effect.tap(() => Effect.logInfo('[Database client]: Connection to the database established.')) | |
| ) | |
| const setupConnectionListeners = Effect.zipRight( | |
| Effect.async<void, DatabaseConnectionLostError>((resume) => { | |
| pool.on('error', (error) => { | |
| resume( | |
| Effect.fail( | |
| new DatabaseConnectionLostError({ | |
| cause: error, | |
| message: error.message | |
| }) | |
| ) | |
| ) | |
| }) | |
| return Effect.sync(() => { | |
| pool.removeAllListeners('error') | |
| }) | |
| }), | |
| Effect.logInfo('[Database client]: Connection error listeners initialized.'), | |
| { | |
| concurrent: true | |
| }/!**!/ | |
| ) | |
| const db = drizzle(pool, { schema: DbSchema }) | |
| const execute = Effect.fn(<T>(fn: (client: Client) => Promise<T>) => | |
| Effect.tryPromise({ | |
| try: () => fn(db), | |
| catch: (cause) => { | |
| const error = matchPgError(cause) | |
| if (error !== null) { | |
| return error | |
| } | |
| throw cause | |
| } | |
| }) | |
| ) | |
| const transaction = Effect.fn('Database.transaction')( | |
| <T, E, R>(txExecute: (tx: TransactionContextShape) => Effect.Effect<T, E, R>) => | |
| Effect.runtime<R>().pipe( | |
| Effect.map((runtime) => Runtime.runPromiseExit(runtime)), | |
| Effect.flatMap((runPromiseExit) => | |
| Effect.async<T, DatabaseError | E, R>((resume) => { | |
| db.transaction(async (tx: TransactionClient) => { | |
| const txWrapper = (fn: (client: TransactionClient) => Promise<any>) => | |
| Effect.tryPromise({ | |
| try: () => fn(tx), | |
| catch: (cause) => { | |
| const error = matchPgError(cause) | |
| if (error !== null) { | |
| return error | |
| } | |
| throw cause | |
| } | |
| }) | |
| const result = await runPromiseExit(txExecute(txWrapper)) | |
| Exit.match(result, { | |
| onSuccess: (value) => { | |
| resume(Effect.succeed(value)) | |
| }, | |
| onFailure: (cause) => { | |
| if (Cause.isFailure(cause)) { | |
| resume(Effect.fail(Cause.originalError(cause) as E)) | |
| } else { | |
| resume(Effect.die(cause)) | |
| } | |
| } | |
| }) | |
| }).catch((cause) => { | |
| const error = matchPgError(cause) | |
| resume(error !== null ? Effect.fail(error) : Effect.die(cause)) | |
| }) | |
| }) | |
| ) | |
| ) | |
| ) | |
| type ExecuteFn = <T>( | |
| fn: (client: Client | TransactionClient) => Promise<T> | |
| ) => Effect.Effect<T, DatabaseError> | |
| const makeQuery = <A, E, R, Input = never>( | |
| queryFn: (execute: ExecuteFn, input: Input) => Effect.Effect<A, E, R> | |
| ) => | |
| (...args: [Input] extends [never] ? [] : [input: Input]): Effect.Effect<A, E, R> => { | |
| const input = args[0] as Input | |
| return Effect.serviceOption(TransactionContext).pipe( | |
| Effect.map(Option.getOrNull), | |
| Effect.flatMap((txOrNull) => queryFn(txOrNull ?? execute, input)) | |
| ) | |
| } | |
| return { | |
| execute, | |
| transaction, | |
| setupConnectionListeners, | |
| makeQuery | |
| } as const | |
| }) | |
| type Shape = Effect.Effect.Success<ReturnType<typeof makeService>> | |
| export class Database extends Effect.Tag('Database')<Database, Shape>() {} | |
| export const layer = (config: Config) => Layer.scoped(Database, makeService(config)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment