import { type ReadableStream } from 'node:stream/web' import knex from './knex.ts' import split, { type Decoder } from './split.ts' import { parseDim, parseIB, parseKonto, parseObjekt, parseRAR, parseSRU, parseTrans, parseUB, parseVer, } from './parse_line.ts' const defaultDecoder = { decode(chunk: Uint8Array) { return Array.from(chunk, (uint) => { switch (uint) { case 132: return 'ä' case 134: return 'å' case 148: return 'ö' case 142: return 'Ä' case 143: return 'Å' case 153: return 'Ö' default: return String.fromCharCode(uint) } }).join('') }, } export default async function parseStream(stream: ReadableStream, decoder: Decoder = defaultDecoder) { const journals = new Map() let currentEntryId: number const details: Record = {} let currentYear = null const trx = await knex.transaction() async function getJournalId(identifier: string) { if (journals.has(identifier)) { return journals.get(identifier) } let journal = await trx('journal').first('*').where('identifier', identifier) if (!journal) { journal = (await trx('journal').insert({ identifier }).returning('*'))[0] } journals.set(identifier, journal.id) return journal.id } for await (let line of stream.pipeThrough(split(null, { decoder }))) { line = line.trim() if (line[0] !== '#') { continue } const splitLine = line.split(/\s+/) const lineType = splitLine[0] switch (lineType) { case '#DIM': { const { number, name } = parseDim(line) const existingDimension = await trx('dimension').first('*').where('number', number) if (!existingDimension) { await trx('dimension').insert({ number, name }) } else if (existingDimension.name !== name) { await trx('dimension').update({ name }).where('number', number) } break } case '#IB': { const { yearNumber, accountNumber, balance, quantity } = parseIB(line) if (yearNumber !== 0) continue const existingAccountBalance = await trx('accountBalance') .first('*') .where({ financialYearId: currentYear.id, accountNumber }) if (!existingAccountBalance) { await trx('accountBalance').insert({ financialYearId: currentYear.id, accountNumber, in: balance, inQuantity: quantity, }) } else { await trx('accountBalance') .update({ in: balance, inQuantity: quantity, }) .where({ financialYearId: currentYear.id, accountNumber, }) } break } case '#KONTO': { const { number, description } = parseKonto(line) const existingAccount = await trx('account') .first('*') .where('number', number) .orderBy('financialYearId', 'desc') if (!existingAccount) { await trx('account').insert({ financialYearId: currentYear!.id, number, description, }) } else if (existingAccount.description !== description) { await trx('account').update({ description }).where('id', existingAccount.id) } break } case '#OBJEKT': { const { dimensionNumber, number, name } = parseObjekt(line) const dimension = await trx('dimension').first('*').where('number', dimensionNumber) if (!dimension) throw new Error(`Dimension "${dimensionNumber}" does not exist`) const existingObject = await trx('object').first('*').where({ dimensionId: dimension.id, number }) if (!existingObject) { await trx('object').insert({ dimensionId: dimension.id, number, name }) } else if (existingObject.name !== name) { await trx('object').update({ name }).where({ dimensionId: dimension.id, number }) } break } case '#RAR': { const { yearNumber, startDate, endDate } = parseRAR(line) if (yearNumber !== 0) continue currentYear = (await trx('financial_year').insert({ startDate, endDate }).returning('*'))[0] break } case '#SRU': { const { number, sru } = parseSRU(line) const existingAccount = await trx('account') .first('*') .where('number', number) .orderBy('financialYearId', 'desc') if (existingAccount) { if (existingAccount.sru !== sru) { await trx('account').update({ sru: sru }).where('id', existingAccount.id) } } else { await trx('account').insert({ financialYearId: currentYear!.id, number, description: existingAccount.description, sru, }) } break } case '#TRANS': { const { objectList, ...transaction } = parseTrans(line) let objectId: number const transactionId = ( await trx('transaction') .insert({ entryId: currentEntryId, objectId, ...transaction, }) .returning('id') )[0].id if (objectList) { for (const [dimensionNumber, objectNumber] of objectList) { const objectId = ( await trx('object') .first('object.id') .innerJoin('dimension', 'object.dimension_id', 'dimension.id') .where({ 'object.number': objectNumber, 'dimension.number': dimensionNumber, }) )?.id if (!objectId) { throw new Error(`Object {${dimensionNumber} ${objectNumber}} does not exist!`) } await trx('transactionsToObjects').insert({ transactionId, objectId, }) } } break } case '#UB': { const { yearNumber, accountNumber, balance, quantity } = parseUB(line) if (yearNumber !== 0) continue const existingAccountBalance = await trx('accountBalance') .first('*') .where({ financialYearId: currentYear.id, accountNumber }) if (!existingAccountBalance) { await trx('accountBalance').insert({ financialYearId: currentYear.id, accountNumber, out: balance, outQuantity: quantity, }) } else { await trx('accountBalance') .update({ out: balance, outQuantity: quantity, }) .where({ financialYearId: currentYear.id, accountNumber, }) } break } case '#VER': { const { journal, ...rest } = parseVer(line) let journalId = await getJournalId(journal) currentEntryId = ( await trx('entry') .insert({ journalId, financialYearId: currentYear!.id, ...rest, }) .returning('id') )[0].id break } default: details[lineType] = splitLine.slice(1).join(' ') } } await trx.commit() console.dir(details) console.info(`DONE!: ${currentYear.startDate} - ${currentYear.endDate}`) }