Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions lib/db/handlers/group/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,19 @@ class GroupChangeHandler {
doUpdateDeviceOriginGroup = async(group) => {
this.scheduler?.scheduleAllGroupsTasks()

await dbapi.updateDeviceOriginGroup(group.ticket.serial, group)
await dbapi.updateDevicesOriginGroup(group.ticket.serial, group)
this.push.send([
wireutil.global
, wireutil.envelope(new wire.DeviceOriginGroupMessage(group.ticket.signature))
])
}

doUpdateDevicesCurrentGroup = (group, devices = []) =>
Promise.all(devices.map(serial => dbapi.updateDeviceCurrentGroup(serial, group)))
dbapi.updateDevicesCurrentGroup(devices, group)
.then(() => this.scheduler?.scheduleAllGroupsTasks())

doUpdateDevicesCurrentGroupFromOrigin = (devices = []) =>
Promise.all(devices.map(serial => dbapi.updateDeviceCurrentGroupFromOrigin(serial)))
dbapi.updateDevicesCurrentGroupFromOrigin(devices)
.then(() => this.scheduler?.scheduleAllGroupsTasks())

doUpdateDevicesGroupName = (group) =>
Expand Down Expand Up @@ -201,8 +201,8 @@ class GroupChangeHandler {
if (!apiutil.isOriginGroup(group.class)) {
return this.sendGroupUsersChange(group, group.users, [], false, 'GroupDeleted')
}
const rootGroup = await dbapi.getRootGroup()
await Promise.all(group.devices?.map(serial => dbapi.updateDeviceOriginGroup(serial, rootGroup)))

dbapi.updateDevicesCurrentGroupFromOrigin(group.devices)
return this.sendGroupUsersChange(group, group.users, [], false, 'GroupDeletedLater')
}
}
Expand Down
229 changes: 124 additions & 105 deletions lib/db/handlers/group/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@ export default class GroupsScheduler {
// @ts-ignore
groupsClient

setScheduledTask(groupId, {type, time, job}) {
this.scheduledTasks.set(groupId, {type, time, job})
setScheduledTask({groupId, type, time = 0, job}) {
if (Date.now() >= time) {
this.processTask({groupId, type, time, job})
}

const key = Symbol()
this.scheduledTasks.set(key, {groupId, type, time, job})
for (const i in this.taskQueue) {
if (this.scheduledTasks.get(this.taskQueue[i])?.time > time) {
// @ts-ignore
this.taskQueue.push(groupId, ...this.taskQueue.splice(i, this.taskQueue.length - i))
this.taskQueue.push(key, ...this.taskQueue.splice(i, this.taskQueue.length - i))
return
}
}
this.taskQueue.push(groupId)
this.taskQueue.push(key)
}

scheduleNextTask() {
Expand All @@ -48,166 +53,124 @@ export default class GroupsScheduler {
return
}

const [groupId] = this.taskQueue.splice(0, 1)
const task = this.scheduledTasks.get(groupId)
const [key] = this.taskQueue.splice(0, 1)
const task = this.scheduledTasks.get(key)

const now = Date.now()
const delay = Math.max(0, task.time - now)
if (delay >= apiutil.ONE_YEAR) {
this.scheduledTasks.delete(groupId)
this.scheduledTasks.delete(key)
return this.scheduleNextTask()
}

log.info(
`Scheduling next task ${
task.type
} for group ${
groupId
task.groupId
} in ${
Math.floor(delay / 1000)
}s`
)

// @ts-ignore
this.currentTimeout = setTimeout(async() => {
this.scheduledTasks.delete(groupId)
this.scheduledTasks.delete(key)

await this.processTask(groupId, task)
await this.processTask(task)

this.scheduleNextTask()
}, delay)
}

async processTask(groupId, task) {
async processTask(task) {
try {
const group = await dbapi.getGroup(groupId)
const group = await dbapi.getGroup(task.groupId)
if (!group) {
return
}

log.info(`Processing task ${task.type} for group ${groupId} \n${JSON.stringify(group, null, 2)}!\n\n`)
log.info(`Processing task ${task.type} for group ${task.groupId} \n${JSON.stringify(group, null, 2)}!\n\n`)

return await task.job(group)
}
catch (err) {
Sentry.captureException(err)
// @ts-ignore
log.error(`Error processing task ${task.type} for group ${groupId}:`, err?.stack || err)
log.error(`Error processing task ${task.type} for group ${task.groupId}:`, err?.stack || err)
}
}

processGroup(group) {
const now = Date.now()
const groupId = group.id

if (!group.dates?.length) {
log.warn(`Dates list is empty in group with id "${groupId}"`)
log.warn(`Dates list is empty in group with id "${group.id}"`)
return
}

const stopTime = group.dates[0].stop.getTime()

if (apiutil.isOriginGroup(group.class)) {
this.setScheduledTask(groupId, {
type: 'UPD_ORIGIN_GROUP_LIFETIME'
, time: stopTime
, job: (group) => this.updateOriginGroupLifetime(group)
})
}
else if (group.isActive || group.state === apiutil.WAITING) {
this.setScheduledTask(groupId, {
type: 'HANDLE_EXPIRED_ACTIVE_GROUP'
, time: stopTime
, job: (group) => (
group.dates.length === 1 ||
(group.class === apiutil.ONCE && group.devices.length === 0)
) ? this.deleteUserGroup(group) : this.doBecomeUnactiveGroup(group)
const now = Date.now()
const groupId = group.id
const isOrigin = apiutil.isOriginGroup(group.class)
const lastWindow = group.dates[group.dates.length - 1]

if (now >= lastWindow.stop) {
this.setScheduledTask({
groupId, type: 'HANDLE_EXPIRED_GROUP_WINDOW'
, time: lastWindow.stop.getTime()
, job: (group) => isOrigin ?
this.updateOriginGroupLifetime(group) :
this.deleteUserGroup(group)
})
return
}
else if (!group.isActive) {
for (const i in group.dates) {
if (now >= group.dates[i].stop.getTime()) {
if (group.dates[i].stop === group.dates[group.dates.length - 1].stop) {
return this.deleteUserGroup(group)
}
}
else if (now < group.dates[i].start.getTime()) {
return this.doCleanElapsedGroupDates(group, i)
}

// TODO: for-loop is useless
return this.doBecomeActiveGroup(group, i)
}
if (isOrigin) {
return
}
}

scheduleAllGroupsTasks = debounce(() => Sentry.startSpan(
{name: 'groups-engine scheduler'},
async() => {
try {
const groups = await dbapi.getReadyGroupsOrderByIndex('startTime')
this.scheduledTasks.clear()
this.taskQueue = []

await Promise.all(groups?.map((group) => this.processGroup(group)) || [])

this.scheduleNextTask()
if (group.class !== apiutil.ONCE) {
const windowIndex = group.dates.findIndex((w, i) => i > group.repetitions - 1 && now < w.stop)
if (windowIndex > 0) { // only immediately update length of group.dates
this.setScheduledTask({
groupId, type: 'HANDLE_EXPIRED_GROUP_WINDOW'
, time: group.dates[windowIndex - 1].stop.getTime()
, job: (group) => this.updateGroupDates(group, windowIndex, group.isActive)
})
group.dates.slice(windowIndex)
}
catch (err) {
Sentry.captureException(err)
// @ts-ignore
log.error('Error loading groups and scheduling tasks:', err?.stack || err)
}
}), 3000)
}

async setupChangeStream() {
try {
this.changeStream = this.groupsClient.watch()
const window = group.dates[0]
const isLastRepetition = group.dates.length === 1

this.changeStream.on('change', async(change) => {
log.info('Detected change in groups collection:', change.operationType)

// Reload all groups and reschedule
await this.scheduleAllGroupsTasks()
if (now < window.stop && !group.isActive) { // if not, then the next task onExpire will be executed immediately
this.setScheduledTask({
groupId, type: 'ACTIVATE_GROUP'
, time: window.start.getTime()
, job: this.doBecomeActiveGroup
})

this.changeStream.on('error', (error) => {
Sentry.captureException(error)
log.error('Error in change stream:', error)
})

this.changeStream.on('close', () => {
log.warn('Change stream closed, attempting to reconnect')
setTimeout(this.setupChangeStream, 5000)
})

log.info('MongoDB change stream set up successfully')
}
catch (err) {
Sentry.captureException(err)
// @ts-ignore
log.error('Error setting up change stream:', err?.stack || err)

setTimeout(this.setupChangeStream, 5000)
}
this.setScheduledTask({
groupId, type: 'HANDLE_EXPIRED_GROUP_WINDOW'
, time: window.stop.getTime()
, job: (group) => (
group.class === apiutil.ONCE || isLastRepetition
) ? this.deleteUserGroup(group) : this.doBecomeUnactiveGroup(group)
})
}

genIndexesLoop = () => setTimeout(() => {
dbapi.generateIndexes()
this.genIndexesLoop()
}, apiutil.ONE_HOUR * 8)

doBecomeUnactiveGroup(group) {
return this.updateGroupDates(group, 1, false)
}

doCleanElapsedGroupDates(group, incr) {
return this.updateGroupDates(group, incr, false)
}
doBecomeActiveGroup = (group) => Promise.all([
db.groups.updateOne({id: group.id}, {$set: {isActive: true}})
, dbapi.updateDevicesCurrentGroup(group.devices, group)
])

doBecomeActiveGroup(group, incr) {
return this.updateGroupDates(group, incr, true)
}
doBecomeUnactiveGroup = (group, windowIndex = 0) =>
this.updateGroupDates(group, windowIndex, true)

updateOriginGroupLifetime(group) {
const now = Date.now()
Expand All @@ -228,9 +191,9 @@ export default class GroupsScheduler {
return dbapi.deleteUserGroup(group.id)
}

async updateGroupDates(group, incr, isActive) {
const repetitions = group.repetitions - incr
const dates = group.dates.slice(incr)
async updateGroupDates(group, windowIndex, isActive) {
const repetitions = group.repetitions - (windowIndex)
const dates = group.dates.slice(windowIndex)
const duration = group.devices.length * (dates[0].stop - dates[0].start) * (repetitions + 1)

try {
Expand All @@ -243,6 +206,11 @@ export default class GroupsScheduler {
, state: apiutil.READY
}
})

if (!isActive && group.devices?.length) {
await dbapi.updateDevicesCurrentGroupFromOrigin(group.devices)
}

await dbapi.updateUserGroupDuration(group.owner.email, group.duration, duration)
await this.scheduleAllGroupsTasks()
}
Expand All @@ -251,6 +219,25 @@ export default class GroupsScheduler {
}
}

scheduleAllGroupsTasks = debounce(() => Sentry.startSpan(
{name: 'groups-engine scheduler'},
async() => {
try {
const groups = await dbapi.getReadyGroupsOrderByIndex('startTime')
this.scheduledTasks.clear()
this.taskQueue = []

await Promise.all(groups?.map((group) => this.processGroup(group)) || [])

this.scheduleNextTask()
}
catch (err) {
Sentry.captureException(err)
// @ts-ignore
log.error('Error loading groups and scheduling tasks:', err?.stack || err)
}
}), 3000)

async init() {
try {
await db.connect()
Expand All @@ -277,4 +264,36 @@ export default class GroupsScheduler {
throw err
}
}

async setupChangeStream() {
try {
this.changeStream = this.groupsClient.watch()

this.changeStream.on('change', async(change) => {
log.info('Detected change in groups collection:', change.operationType)

// Reload all groups and reschedule
await this.scheduleAllGroupsTasks()
})

this.changeStream.on('error', (error) => {
Sentry.captureException(error)
log.error('Error in change stream:', error)
})

this.changeStream.on('close', () => {
log.warn('Change stream closed, attempting to reconnect')
setTimeout(this.setupChangeStream, 5000)
})

log.info('MongoDB change stream set up successfully')
}
catch (err) {
Sentry.captureException(err)
// @ts-ignore
log.error('Error setting up change stream:', err?.stack || err)

setTimeout(this.setupChangeStream, 5000)
}
}
}
Loading