feat(ui): separate kapacitor flux tasks API
parent
fd90be694c
commit
72378f7e9a
|
@ -6,12 +6,15 @@ import {
|
|||
getRule as getRuleAJAX,
|
||||
deleteRule as deleteRuleAPI,
|
||||
updateRuleStatus as updateRuleStatusAPI,
|
||||
updateFluxTaskStatus as updateFluxTaskStatusAPI,
|
||||
deleteFluxTask as deleteFluxTaskAPI,
|
||||
createTask as createTaskAJAX,
|
||||
updateTask as updateTaskAJAX,
|
||||
getFluxTasks,
|
||||
} from 'src/kapacitor/apis'
|
||||
import {
|
||||
updateFluxTaskStatus as updateFluxTaskStatusAPI,
|
||||
deleteFluxTask as deleteFluxTaskAPI,
|
||||
getFluxTasks,
|
||||
} from 'src/kapacitor/apis/fluxTasks'
|
||||
|
||||
import {errorThrown} from 'shared/actions/errors'
|
||||
|
||||
import {
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
import AJAX from 'src/utils/ajax'
|
||||
import _, {values} from 'lodash'
|
||||
import {FluxTask, Kapacitor} from 'src/types'
|
||||
|
||||
const tasksBatchLimit = 500
|
||||
export const getFluxTasks = async (
|
||||
kapacitor: Kapacitor
|
||||
): Promise<FluxTask[]> => {
|
||||
const taskIds: Record<string, FluxTask> = {}
|
||||
let lastID = ''
|
||||
for (;;) {
|
||||
const {
|
||||
data: {tasks},
|
||||
} = await AJAX<{tasks: FluxTask[]}>({
|
||||
method: 'GET',
|
||||
url:
|
||||
kapacitor.links.proxy +
|
||||
'?path=' +
|
||||
encodeURIComponent(
|
||||
`/kapacitor/v1/api/v2/tasks?limit=${tasksBatchLimit}&after=${lastID}`
|
||||
),
|
||||
})
|
||||
if (!tasks || !tasks.length) {
|
||||
break
|
||||
}
|
||||
lastID = tasks[tasks.length - 1].id
|
||||
let noNewData = true
|
||||
tasks.forEach(x => {
|
||||
if (taskIds[x.id]) {
|
||||
return
|
||||
}
|
||||
noNewData = false
|
||||
taskIds[x.id] = x
|
||||
})
|
||||
if (noNewData) {
|
||||
break
|
||||
}
|
||||
if (tasks.length < tasksBatchLimit) {
|
||||
// less data returned, last chunk
|
||||
break
|
||||
}
|
||||
}
|
||||
return values(taskIds).sort((a, b) => a.name.localeCompare(b.name))
|
||||
}
|
||||
|
||||
export const getFluxTask = async (
|
||||
kapacitor: Kapacitor,
|
||||
taskID: string
|
||||
): Promise<FluxTask> => {
|
||||
const {data} = await AJAX({
|
||||
method: 'GET',
|
||||
url: kapacitor.links.proxy + `?path=/kapacitor/v1/api/v2/tasks/${taskID}`,
|
||||
})
|
||||
return data
|
||||
}
|
||||
|
||||
function friendlyID(id: number): string {
|
||||
if (id > 25) {
|
||||
return friendlyID(Math.trunc(id / 25)) + String.fromCharCode(id % 25)
|
||||
}
|
||||
return String.fromCharCode(65 + id)
|
||||
}
|
||||
export const getFluxTaskLogs = async (
|
||||
kapacitor: Kapacitor,
|
||||
taskID: string,
|
||||
maxItems: number
|
||||
) => {
|
||||
const {data} = await AJAX({
|
||||
method: 'GET',
|
||||
url:
|
||||
kapacitor.links.proxy + `?path=/kapacitor/v1/api/v2/tasks/${taskID}/runs`,
|
||||
})
|
||||
const logs = []
|
||||
let nextClusterId = 0
|
||||
const runsById = {}
|
||||
_.each(_.get(data, ['runs'], []), run => {
|
||||
runsById[run.id] = {
|
||||
name: friendlyID(nextClusterId++),
|
||||
lvl: run.status === 'failed' ? 'error' : 'info',
|
||||
}
|
||||
_.each(run.log, l => logs.push(l))
|
||||
})
|
||||
|
||||
logs.sort((a, b) => b.time.localeCompare(a.time))
|
||||
return logs.slice(0, maxItems).map(x => {
|
||||
const runDetail = runsById[x.runID]
|
||||
return {
|
||||
id: `${x.runID}-${x.time}`,
|
||||
key: `${x.runID}-${x.time}`,
|
||||
service: 'flux_task',
|
||||
lvl: runDetail.lvl,
|
||||
ts: x.time,
|
||||
msg: x.message,
|
||||
tags: x.runID,
|
||||
cluster: runDetail.name,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export const updateFluxTaskStatus = (
|
||||
kapacitor: Kapacitor,
|
||||
task: FluxTask,
|
||||
status: string
|
||||
) => {
|
||||
return AJAX({
|
||||
method: 'PATCH',
|
||||
url: kapacitor.links.proxy + '?path=' + task.links.self,
|
||||
data: {status},
|
||||
})
|
||||
}
|
||||
|
||||
export const deleteFluxTask = (kapacitor: Kapacitor, task: FluxTask) => {
|
||||
return AJAX({
|
||||
method: 'DELETE',
|
||||
url: kapacitor.links.proxy + '?path=' + task.links.self,
|
||||
})
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
import AJAX from 'utils/ajax'
|
||||
import _, {cloneDeep, get, values} from 'lodash'
|
||||
import {cloneDeep, get} from 'lodash'
|
||||
|
||||
const outRule = rule => {
|
||||
// fit into range
|
||||
|
@ -66,93 +66,6 @@ export const getRules = kapacitor => {
|
|||
})
|
||||
}
|
||||
|
||||
const tasksBatchLimit = 500
|
||||
|
||||
export const getFluxTasks = async kapacitor => {
|
||||
const taskIds = {}
|
||||
let lastID = ''
|
||||
for (;;) {
|
||||
const {
|
||||
data: {tasks},
|
||||
} = await AJAX({
|
||||
method: 'GET',
|
||||
url:
|
||||
kapacitor.links.proxy +
|
||||
'?path=' +
|
||||
encodeURIComponent(
|
||||
`/kapacitor/v1/api/v2/tasks?limit=${tasksBatchLimit}&after=${lastID}`
|
||||
),
|
||||
})
|
||||
if (!tasks || !tasks.length) {
|
||||
break
|
||||
}
|
||||
lastID = tasks[tasks.length - 1].id
|
||||
let noNewData = true
|
||||
tasks.forEach(x => {
|
||||
if (taskIds[x.id]) {
|
||||
return
|
||||
}
|
||||
noNewData = false
|
||||
taskIds[x.id] = x
|
||||
})
|
||||
if (noNewData) {
|
||||
break
|
||||
}
|
||||
if (tasks.length < tasksBatchLimit) {
|
||||
// less data returned, last chunk
|
||||
break
|
||||
}
|
||||
}
|
||||
return values(taskIds).sort((a, b) => a.name.localeCompare(b.name))
|
||||
}
|
||||
|
||||
export const getFluxTask = async (kapacitor, taskID) => {
|
||||
const {data} = await AJAX({
|
||||
method: 'GET',
|
||||
url: kapacitor.links.proxy + `?path=/kapacitor/v1/api/v2/tasks/${taskID}`,
|
||||
})
|
||||
return data
|
||||
}
|
||||
|
||||
function friendlyID(id) {
|
||||
if (id > 25) {
|
||||
return friendlyID(Math.trunc(id / 25)) + String.fromCharCode(id % 25)
|
||||
}
|
||||
return String.fromCharCode(65 + id)
|
||||
}
|
||||
export const getFluxTaskLogs = async (kapacitor, taskID, maxItems) => {
|
||||
const {data} = await AJAX({
|
||||
method: 'GET',
|
||||
url:
|
||||
kapacitor.links.proxy + `?path=/kapacitor/v1/api/v2/tasks/${taskID}/runs`,
|
||||
})
|
||||
const logs = []
|
||||
let nextClusterId = 0
|
||||
const runsById = {}
|
||||
_.each(_.get(data, ['runs'], []), run => {
|
||||
runsById[run.id] = {
|
||||
name: friendlyID(nextClusterId++),
|
||||
lvl: run.status === 'failed' ? 'error' : 'info',
|
||||
}
|
||||
_.each(run.log, l => logs.push(l))
|
||||
})
|
||||
|
||||
logs.sort((a, b) => b.time.localeCompare(a.time))
|
||||
return logs.slice(0, maxItems).map(x => {
|
||||
const runDetail = runsById[x.runID]
|
||||
return {
|
||||
id: `${x.runID}-${x.time}`,
|
||||
key: `${x.runID}-${x.time}`,
|
||||
service: 'flux_task',
|
||||
lvl: runDetail.lvl,
|
||||
ts: x.time,
|
||||
msg: x.message,
|
||||
tags: x.runID,
|
||||
cluster: runDetail.name,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export const getRule = async (kapacitor, ruleID) => {
|
||||
try {
|
||||
const response = await AJAX({
|
||||
|
@ -194,21 +107,6 @@ export const updateRuleStatus = (rule, status) => {
|
|||
})
|
||||
}
|
||||
|
||||
export const updateFluxTaskStatus = (kapacitor, task, status) => {
|
||||
return AJAX({
|
||||
method: 'PATCH',
|
||||
url: kapacitor.links.proxy + '?path=' + task.links.self,
|
||||
data: {status},
|
||||
})
|
||||
}
|
||||
|
||||
export const deleteFluxTask = (kapacitor, task) => {
|
||||
return AJAX({
|
||||
method: 'DELETE',
|
||||
url: kapacitor.links.proxy + '?path=' + task.links.self,
|
||||
})
|
||||
}
|
||||
|
||||
export const createTask = async (kapacitor, {id, dbrps, tickscript, type}) => {
|
||||
try {
|
||||
return await AJAX({
|
||||
|
|
|
@ -16,7 +16,7 @@ import FancyScrollbar from 'src/shared/components/FancyScrollbar'
|
|||
import PageSpinner from 'src/shared/components/PageSpinner'
|
||||
|
||||
import {Source, Kapacitor, FluxTask, LogItem} from 'src/types'
|
||||
import {getFluxTask, getFluxTaskLogs} from '../apis'
|
||||
import {getFluxTask, getFluxTaskLogs} from '../apis/fluxTasks'
|
||||
import LogsTableRow from '../components/LogsTableRow'
|
||||
import {useDispatch} from 'react-redux'
|
||||
import {notify} from 'src/shared/actions/notifications'
|
||||
|
|
|
@ -2,7 +2,11 @@ import React, {useEffect, useMemo, useState} from 'react'
|
|||
import {FluxTask, Kapacitor, Source} from 'src/types'
|
||||
import KapacitorScopedPage from './KapacitorScopedPage'
|
||||
import {useDispatch} from 'react-redux'
|
||||
import {deleteFluxTask, getFluxTasks, updateFluxTaskStatus} from '../apis'
|
||||
import {
|
||||
deleteFluxTask,
|
||||
getFluxTasks,
|
||||
updateFluxTaskStatus,
|
||||
} from '../apis/fluxTasks'
|
||||
import errorMessage from '../utils/errorMessage'
|
||||
import PageSpinner from 'src/shared/components/PageSpinner'
|
||||
import FluxTasksTable from '../components/FluxTasksTable'
|
||||
|
@ -32,7 +36,7 @@ const Contents = ({
|
|||
setLoading(true)
|
||||
const fetchData = async () => {
|
||||
try {
|
||||
const data = (await getFluxTasks(kapacitor)) as FluxTask[]
|
||||
const data = await getFluxTasks(kapacitor)
|
||||
setAllList(data)
|
||||
} catch (e) {
|
||||
if (e.status === 404) {
|
||||
|
|
Loading…
Reference in New Issue