implement background job for fetching attractions

This commit is contained in:
Michi 2025-09-20 17:47:22 +02:00
parent ff859475a0
commit ebb3191757
4 changed files with 195 additions and 6 deletions

View file

@ -3,7 +3,7 @@ import { integer, text, sqliteTable } from "drizzle-orm/sqlite-core";
export const attraction = sqliteTable('attraction', {
id: integer().primaryKey({ autoIncrement: true }),
name: text().notNull(),
apiCode: integer(),
apiCode: text().notNull().unique(),
themeparkId: integer().notNull().references(() => themepark.id)
})
@ -35,7 +35,7 @@ export const themepark = sqliteTable('themepark', {
name: text().notNull(),
countrycode: text().notNull(),
website: text(),
apiName: text()
apiName: text().notNull().unique()
})
export const user = sqliteTable('user', {

View file

@ -1,5 +1,6 @@
// Cron Router
import { updateThemeparkData } from "./update-themepark-data";
import { batchAttractionImport } from "./update-attraction-list";
export default async function cronRouter(
event: ScheduledEvent,
@ -7,10 +8,13 @@ export default async function cronRouter(
ctx: ExecutionContext,
){
switch (event.cron){
case '* * * * *':
console.log('every minute');
case '*/5 * * * *':
console.log('every 5 minutes');
break;
case '0 4 7,14,21,28 * *':
case '0 1-6 7,14,21,28 * *':
await batchAttractionImport(env, event.scheduledTime, event.cron);
break;
case '0 4 1 * *':
await updateThemeparkData(env);
break;
default:

View file

@ -0,0 +1,184 @@
import { getDbEnv } from '../db/client'
import { attraction, themepark } from '../db/schema'
import { inArray } from 'drizzle-orm'
import fetchData from '../lib/fetch-data'
import asyncBatchJob from '../lib/async-batch-job'
interface AttractionImport {
code: string,
name: string,
}
interface AttractionType {
name: string,
apiCode: string,
themeparkId: number
}
interface Themepark {
apiName: string,
id: number
}
/**
* Fetching the attractions from a specified park
* @param park API Code for request themepark
* @param endpoint Endpoint where to fetch data from (default: https://api.wartezeiten.app/v1/parks)
* @param lang Language used for API request
* @returns Interface with attraction code & name
*/
async function fetchAttractions(
park: string,
endpoint: string = "https://api.wartezeiten.app/v1/parks",
lang: string = 'de'
): Promise<AttractionImport[]>{
try{
const headers = {
'language':lang,
'park':park
};
const result = await fetchData<AttractionImport[]>(endpoint, headers);
return result;
}
catch(e){
throw new Error(`Failed to fetch attractions: ${e}`);
}
}
/**
* Return an object of all themeparks saved in the database
* @param env DB Connection
* @returns Object of themeparks with api name & id from internal DB
*/
async function getThemeparks(env: Env): Promise<Themepark[]>{
try{
const db = getDbEnv(env);
const themeparks: Themepark[] = await db.select({
apiName: themepark.apiName,
id: themepark.id
}).from(themepark);
return themeparks;
}
catch(e){
throw new Error(`Failed to get themeparks from database: ${e}`);
}
}
/**
* Return an object of all attractions from a defined park saved in the database
* @param env DB Connection
* @param parks Object of themeparks to get attractions from
* @returns Object of attractions
*/
async function getAttractionsByParks(env: Env, parks: Themepark[]): Promise<AttractionType[]>{
try{
const db = getDbEnv(env);
const parkIds: number[] = parks.map(p => p.id);
const attractions: AttractionType[] = await db.select({
name: attraction.name,
apiCode: attraction.apiCode,
themeparkId: attraction.themeparkId
}).from(attraction)
.where(inArray(attraction.themeparkId, parkIds));
return attractions;
}
catch(e){
throw new Error(`Failed to get attractions from database: ${e}`);
}
}
/**
* Imports attraction of the specified themeparks into the DB.
* Interacts with the dependant functions.
* @param env DB Connection
* @param parks Object of the themeparks from which the attractions have to be imported
*/
async function importAttractionsByParks(env: Env, parks: Themepark[]): Promise<void>{
try{
const db = getDbEnv(env);
const currentAttractions = await getAttractionsByParks(env, parks);
for (let park of parks){
const attractions = await fetchAttractions(park.apiName);
// get only the new attractions
const importableAttractions = attractions.filter(
attractionIndex => currentAttractions.length == 0 || !currentAttractions.some(id => id.apiCode == attractionIndex.code)
).map(attractionIndex => ({
name: attractionIndex.name,
apiCode: attractionIndex.code,
themeparkId: park.id
}));
// only run the import when new attractions available
if (importableAttractions.length > 0){
await asyncBatchJob(importableAttractions, 16, async (batch) => {
await db.insert(attraction).values(batch);
});
}
}
}
catch(e){
throw new Error(`Failed to import attractions into database: ${e}`);
}
}
/**
* Does create batches of themeparks from a timestamp & cron statement,
* so the imports can be done splitted into multiple batches.
* The function is exported, so that it can be run via background task.
* @param env DB Connection
* @param timestamp Current timestamp when function is run
* @param cron The cron statement specified to run the background jobs; used for batch size calculation
*/
export async function batchAttractionImport(env: Env, timestamp: number, cron: string): Promise<void>{
try{
const themeparks = await getThemeparks(env); // all themeparks
const executionHour = new Date(timestamp).getUTCHours(); // current hour, in which job is executed
const executionTimes = getExecutionCountFromCron(cron, 1); // how often the job is executed
const batchSize = Math.ceil(themeparks.length / executionTimes); // calculate batch size, so that each park gets updated
const hourIndex = executionHour - 1;
// time examples
// 01:00 -> 1757811600000
// 02:00 -> 1757815200000
// 03:00 -> 1757818800000
const batch = themeparks.slice(hourIndex * batchSize, (hourIndex + 1) * batchSize); // slice array into right batch
// import attractions from current time batch
await importAttractionsByParks(env, batch);
}
catch(e){
throw new Error(`Failed to split attraction import by time: ${e}`);
}
}
/**
* Calculates how often cron job gets executed
* @param cron Cron scheme
* @param partNr Which part of cron scheme should be checked
* @returns Number of execution times
*/
function getExecutionCountFromCron(cron: string, partNr: number): number{
const parts = cron.split(" ");
const specifiedPart = parts[partNr];
switch (true){
case specifiedPart.includes("-"):
const [start, end] = specifiedPart.split("-").map(Number);
return end - start + 1;
case specifiedPart.includes(","):
return specifiedPart.split(",").length;
case specifiedPart === "*":
return 24;
default:
return 1;
}
}

View file

@ -32,7 +32,8 @@
],
"triggers": {
"crons": [
"0 4 7,14,21,28 * *", // At 04:00 on day-of-month 7, 14, 21, and 28.
"0 1-6 7,14,21,28 * *", // Each hour from 01:00 to 06:00 on day-of-month 7, 14, 21, and 28. -> update attraction list
"0 4 1 * *", // At 04:00 on the 1th of each month -> update themeparks
"* * * * *" // every minute
]
}