import type { ReadableStream } from 'stream/web' import type { Selectable } from 'kysely' import db from './kysely.ts' import split, { type Decoder } from './split.ts' import { parseDim, parseIB, parseKonto, parseObjekt, parseRAR, parseSRU, parseTrans, parseUB, parseVer, } from './parse_line.ts' const defaultDecoder = { decode(chunk: Uint8Array) { return Array.from(chunk, (uint) => { switch (uint) { case 132: return 'ä' case 134: return 'å' case 148: return 'ö' case 142: return 'Ä' case 143: return 'Å' case 153: return 'Ö' default: return String.fromCharCode(uint) } }).join('') }, } import type { Entry, FinancialYear } from '../../shared/types.db.ts' export default async function parseStream(stream: ReadableStream, decoder: Decoder = defaultDecoder) { const journals = new Map() let currentEntry: Pick, 'id' | 'description'> let currentInvoiceId: number | null | undefined let currentYear: Selectable const details: Record = {} const trx = await db.startTransaction().execute() async function getJournalId(identifier: string) { if (journals.has(identifier)) { return journals.get(identifier) } let journal = await trx.selectFrom('journal').selectAll().where('identifier', '=', identifier).executeTakeFirst() if (!journal) { journal = await trx.insertInto('journal').values({ identifier }).returningAll().executeTakeFirstOrThrow() } journals.set(identifier, journal.id) return journal.id } for await (let line of stream.pipeThrough(split(null, { decoder }))) { line = line.trim() if (line[0] !== '#') { continue } const splitLine = line.split(/\s+/) const lineType = splitLine[0] switch (lineType) { case '#DIM': { const { number, name } = parseDim(line) const existingDimension = await db .selectFrom('dimension') .selectAll() .where('number', '=', number) .executeTakeFirst() if (!existingDimension) { await trx.insertInto('dimension').values({ number, name }).execute() } else if (existingDimension.name !== name) { await trx.updateTable('dimension').set({ name }).where('number', '=', number).execute() } break } case '#IB': { const { yearNumber, accountNumber, balance, quantity } = parseIB(line) if (yearNumber !== 0) continue const existingAccountBalance = await trx .selectFrom('accountBalance') .selectAll() .where((eb) => eb.and({ financialYearId: currentYear!.id, accountNumber })) .executeTakeFirst() if (!existingAccountBalance) { await trx .insertInto('accountBalance') .values({ financialYearId: currentYear!.id, accountNumber, in: balance, inQuantity: quantity, }) .execute() } else { await trx .updateTable('accountBalance') .set({ in: balance, inQuantity: quantity, }) .where((eb) => eb.and({ financialYearId: currentYear!.id, accountNumber, }), ) .execute() } break } case '#KONTO': { const { number, description } = parseKonto(line) const existingAccount = await trx .selectFrom('account') .selectAll() .where('number', '=', number) .orderBy('financialYearId', 'desc') .executeTakeFirst() if (!existingAccount) { await trx .insertInto('account') .values({ financialYearId: currentYear!.id, number, description, }) .execute() } else if (existingAccount.description !== description) { await trx.updateTable('account').set({ description }).where('id', '=', existingAccount.id).execute() } break } case '#OBJEKT': { const { dimensionNumber, number, name } = parseObjekt(line) const dimension = await trx .selectFrom('dimension') .selectAll() .where('number', '=', dimensionNumber) .executeTakeFirst() if (!dimension) throw new Error(`Dimension "${dimensionNumber}" does not exist`) const existingObject = await trx .selectFrom('object') .selectAll() .where((eb) => eb.and({ dimensionId: dimension.id, number })) .executeTakeFirst() if (!existingObject) { await trx.insertInto('object').values({ dimensionId: dimension.id, number, name }).execute() } else if (existingObject.name !== name) { await trx .updateTable('object') .set({ name }) .where((eb) => eb.and({ dimensionId: dimension.id, number })) .execute() } break } case '#RAR': { const { yearNumber, startDate, endDate } = parseRAR(line) if (yearNumber !== 0) continue currentYear = ( await trx .insertInto('financialYear') .values({ year: parseInt(startDate.slice(0, 4)), startDate, endDate }) .returningAll() .execute() )[0] break } case '#SRU': { const { number, sru } = parseSRU(line) const existingAccount = await trx .selectFrom('account') .selectAll() .where('number', '=', number) .orderBy('financialYearId', 'desc') .executeTakeFirst() if (existingAccount) { if (existingAccount.sru !== sru) { await trx.updateTable('account').set({ sru: sru }).where('id', '=', existingAccount.id).execute() } } else { await trx .insertInto('account') .values({ financialYearId: currentYear!.id, number, description: existingAccount!.description, sru, }) .execute() } break } case '#TRANS': { const { objectList, ...transaction } = parseTrans(line) // Faktura 6364710056 172-2 - SBAB Bank AB // Faktura 2312 172-6 - Bredablick Frvaltning AB const rFisken = /Faktura.*172-(\d*)\s+-\s+(.+)/ const result = transaction.description?.match(rFisken) let invoiceId: number | null | undefined if (result) { const [, fiskenNumber, supplierName] = result // invoiceId = (await trx('invoice').first('id').where({ fiskenNumber})).id invoiceId = ( await trx .selectFrom('invoice') .select('id') .where('fiskenNumber', '=', parseInt(fiskenNumber)) .executeTakeFirst() )?.id if (!invoiceId) { let supplierId = ( await trx.selectFrom('supplier').select('id').where('name', '=', supplierName).executeTakeFirst() )?.id if (!supplierId) { supplierId = ( await trx .insertInto('supplier') .values({ name: supplierName, supplierTypeId: 1 }) .returning('id') .executeTakeFirstOrThrow() ).id } invoiceId = ( await trx .insertInto('invoice') .values({ financialYearId: currentYear!.id, fiskenNumber: parseInt(fiskenNumber), supplierId }) .returning('id') .executeTakeFirstOrThrow() ).id } if (transaction.accountNumber === 2441 && currentEntry!.description?.includes('Fakturajournal')) { await trx .updateTable('invoice') .set('amount', Math.abs(transaction.amount)) .where('id', '=', invoiceId!) .execute() } } if (invoiceId && currentInvoiceId) { throw new Error('invoiceId and currentInvoiceId') } const transactionId = ( await trx .insertInto('transaction') .values({ entryId: currentEntry!.id, ...transaction, invoiceId: invoiceId! || currentInvoiceId!, }) .returning('id') .executeTakeFirstOrThrow() ).id if (objectList) { for (const [dimensionNumber, objectNumber] of objectList) { const objectId = ( await trx .selectFrom('object') .innerJoin('dimension', 'object.dimensionId', 'dimension.id') .select('object.id') .where((eb) => eb.and({ 'object.number': objectNumber, 'dimension.number': dimensionNumber, }), ) .executeTakeFirst() )?.id if (!objectId) { throw new Error(`Object {${dimensionNumber} ${objectNumber}} does not exist!`) } await trx .insertInto('transactionsToObjects') .values({ transactionId, objectId, }) .execute() } } break } case '#UB': { const { yearNumber, accountNumber, balance, quantity } = parseUB(line) if (yearNumber !== 0) continue const existingAccountBalance = await trx .selectFrom('accountBalance') .selectAll() .where((eb) => eb.and({ financialYearId: currentYear!.id, accountNumber })) .executeTakeFirst() if (!existingAccountBalance) { await trx .insertInto('accountBalance') .values({ financialYearId: currentYear!.id, accountNumber, out: balance, outQuantity: quantity, }) .execute() } else { await trx .updateTable('accountBalance') .set({ out: balance, outQuantity: quantity, }) .where((eb) => eb.and({ financialYearId: currentYear!.id, accountNumber, }), ) .execute() } break } case '#VER': { const { journal, ...rest } = parseVer(line) let journalId = await getJournalId(journal) // Fisken - Fakturajournal 1248 // Fisken - Utbetalningsjournal 1056 // Levfakt EURO FINANS AB (5) / Levfakt Per Holmberg (6) // Levbet EURO FINANS AB (5) / Levbet Per Holmberg (6) const rPhm = /^Lev(?:fakt|bet)\s+[^(]*\((\d+)\)$/ const result = rest.description.match(rPhm) if (result) { currentInvoiceId = ( await trx .selectFrom('invoice') .innerJoin('supplier', 'invoice.supplierId', 'supplier.id') .selectAll('invoice') .select('supplier.name') .where('phmNumber', '=', parseInt(result[1])) .executeTakeFirst() )?.id } else { currentInvoiceId = null } currentEntry = await trx .insertInto('entry') .values({ journalId, financialYearId: currentYear!.id, ...rest, }) .returning(['id', 'description']) .executeTakeFirstOrThrow() break } default: details[lineType] = splitLine.slice(1).join(' ') } } await trx.commit().execute() // oxlint-disable-next-line no-console console.dir(details) console.info(`DONE!: ${currentYear!.startDate} - ${currentYear!.endDate}`) }