Commit cec2929f authored by wangjun's avatar wangjun

初始化 SKILLs 技能集合仓库,包含 Cronicle 定时任务调度管理技能

parents
.env
.DS_Store
__pycache__/
*.pyc
# SKILLs
AI 智能体(Agent)技能集合,为 AI 助手提供专业领域的工作流与工具能力扩展。
## 技能列表
### managing-cronicle-scheduler — Cronicle 定时任务调度管理
通过 REST API 管理 [Cronicle](https://github.com/jhuckaby/Cronicle) 定时任务调度系统,提供 14 项操作覆盖定时事件和作业的全生命周期管理。
**核心能力:**
| 类别 | 操作 |
|------|------|
| **事件管理** | 列出、获取、创建、更新、删除定时事件 |
| **作业执行** | 立即运行事件、查看作业状态、列出活跃作业、修改/中止运行中的作业 |
| **历史记录** | 按事件或全局查看已完成的作业历史 |
| **调度器控制** | 查看或切换调度器启用/禁用状态 |
| **一次性任务** | 创建执行后自动禁用的一次性任务 |
**支持的插件:** HTTP 请求(urlplug)、Shell 脚本(shellplug)
---
## 目录结构
```
SKILLs/
├── README.md # 项目说明(本文件)
└── managing-cronicle-scheduler/ # Cronicle 定时任务调度管理
├── SKILL.md # 技能定义文件
├── README.md # 技能详细说明
├── .env.example # 环境变量模板
├── scripts/ # 实现脚本
└── references/ # API 参考文档
```
## 使用方式
每个技能子目录包含独立的 `SKILL.md` 定义文件,可供 AI 编码助手(如 opencode、Claude Code 等)加载使用。详细配置和用法请参阅各子目录中的 README 文件。
CRONICLE_BASE_URL=https://your-cronicle-server:3012
CRONICLE_API_KEY=your-32-character-hex-api-key
CRONICLE_DEFAULT_CATEGORY=your-default-category-id
CRONICLE_DEFAULT_TARGET=your-default-target
CRONICLE_DEFAULT_PLUGIN=your-default-plugin-id
# Managing Cronicle Scheduler
AI agent skill for managing the [Cronicle](https://github.com/jhuckaby/Cronicle) timed task scheduling system via its REST API. Provides 14 operations covering the full lifecycle of scheduled events and jobs, plus one-shot task support via Chain Reaction.
## Overview
This skill enables AI agents to programmatically manage a Cronicle scheduler — creating cron-like timed events, running jobs on demand, monitoring execution status, viewing history, and controlling the scheduler itself.
### What it can do
| Category | Operations |
|----------|-----------|
| **Event Management** | List, get, create, update, delete scheduled events |
| **Job Execution** | Run events immediately, check job status, list active jobs, modify/abort running jobs |
| **History** | View completed jobs per event or globally |
| **Scheduler Control** | Check or toggle the scheduler enabled/disabled state |
| **One-Shot Tasks** | Create non-recurring tasks that auto-disable after execution |
### Supported Plugins
| Plugin | ID | Description |
|--------|-----|-------------|
| HTTP Request | `urlplug` | Makes HTTP GET/HEAD/POST requests with full header, data, timeout, SSL, and match support |
| Shell Script | `shellplug` | Executes shell scripts on target servers with progress reporting and JSON output |
## File Structure
```
managing-cronicle-scheduler/
├── README.md # This file
├── SKILL.md # Core skill instructions (127 lines)
├── .env.example # Environment variable template
├── scripts/
│ └── cronicle_client.py # Python HTTP client (CLI + importable module)
└── references/
├── event-fields.md # Complete event data format reference (30+ fields)
├── plugin-params.md # Plugin parameter reference (urlplug + shellplug)
└── one-shot-tasks.md # One-shot task workflow with time expression mapping
```
## Quick Start
### 1. Configure
Copy the environment template and fill in your Cronicle server details:
```bash
cp .env.example .env
```
Edit `.env`:
```env
CRONICLE_BASE_URL=https://your-cronicle-server:3012
CRONICLE_API_KEY=your-32-character-hex-api-key
CRONICLE_DEFAULT_CATEGORY=your-default-category-id
CRONICLE_DEFAULT_TARGET=your-default-target
CRONICLE_DEFAULT_PLUGIN=urlplug
```
### 2. Install Dependencies
```bash
pip install httpx python-dotenv
```
### 3. Use via AI Agent
Trigger the skill by mentioning Cronicle, scheduled tasks, cron jobs, timed events, or scheduler management. The agent will read `SKILL.md` and execute operations via the client script.
### 4. Direct CLI Usage
```bash
# List all scheduled events
python scripts/cronicle_client.py get_schedule
# Create an HTTP request event that runs every 5 minutes
python scripts/cronicle_client.py create_event --title "Health Check" \
-f timing='{"minutes":[0,5,10,15,20,25,30,35,40,45,50,55]}' \
-f params='{"method":"GET","url":"https://api.example.com/health","timeout":30}'
# Run it immediately
python scripts/cronicle_client.py run_event --title "Health Check"
# Check scheduler state
python scripts/cronicle_client.py get_master_state
```
## One-Shot Task Example
Create a task that fires once at a specific time and auto-disables itself:
```bash
# "通知我下午3点买菜"
BASE=$(grep CRONICLE_BASE_URL .env | cut -d= -f2-)
KEY=$(grep CRONICLE_API_KEY .env | cut -d= -f2-)
# Step 1: Create the task
TASK_ID=$(python scripts/cronicle_client.py create_event --title "买菜通知" \
-f timing='{"hours":[15],"minutes":[0]}' \
-f params='{"method":"POST","url":"https://ntfy.example.com/groceries","data":"买菜!"}' \
--raw | python -c "import sys,json; print(json.load(sys.stdin)['id'])")
# Step 2: Create cleanup event (calls update_event API to disable task)
CLEANUP_ID=$(python scripts/cronicle_client.py create_event --title "[cleanup] 买菜通知" \
-f params="{\"method\":\"POST\",\"url\":\"${BASE}/api/app/update_event/v1\",\"headers\":\"X-API-Key: ${KEY}\\nContent-Type: application/json\\n\",\"data\":\"{\\\"id\\\":\\\"${TASK_ID}\\\",\\\"enabled\\\":0}\"}" \
--raw | python -c "import sys,json; print(json.load(sys.stdin)['id'])")
# Step 3: Link them
python scripts/cronicle_client.py update_event --id "$TASK_ID" -f chain="$CLEANUP_ID"
echo "One-shot created: task=$TASK_ID, cleanup=$CLEANUP_ID"
```
## Python Import
```python
from cronicle_client import CronicleAPI
api = CronicleAPI()
# List events
schedule = api.get_schedule(limit=10)
print(schedule)
# Create event
result = api.create_event(
title="Daily Report",
extra={
"timing": {"hours": [9], "minutes": [0]},
"params": {"method": "POST", "url": "https://hooks.example.com/report"},
}
)
```
## Environment Variables
| Variable | Required | Description |
|----------|----------|-------------|
| `CRONICLE_BASE_URL` | Yes | Cronicle server URL (e.g., `https://your-server:3012`) |
| `CRONICLE_API_KEY` | Yes | 32-character hex API key |
| `CRONICLE_DEFAULT_CATEGORY` | For create | Default category ID for new events |
| `CRONICLE_DEFAULT_TARGET` | For create | Default target server or group ID |
| `CRONICLE_DEFAULT_PLUGIN` | For create | Default plugin ID (`urlplug` or `shellplug`) |
## MCP Counterpart
This skill mirrors the [bos-crobicle-mcp](https://github.com/anomalyco/opencode) MCP server, which provides the same 14 operations plus a native `create_one_shot_event` tool via structured function calling. The skill approach uses direct CLI/Python invocation while the MCP approach uses typed tool signatures managed by the MCP protocol.
## References
- [Cronicle Official Docs](https://github.com/jhuckaby/Cronicle)
- [Cronicle API Reference](https://github.com/jhuckaby/Cronicle/blob/master/docs/APIReference.md)
- [SKILL.md](SKILL.md) — Core skill instructions
- [Event Fields Reference](references/event-fields.md) — Complete event data format
- [Plugin Params Reference](references/plugin-params.md) — urlplug & shellplug parameters
- [One-Shot Tasks](references/one-shot-tasks.md) — Non-recurring task workflow
---
name: managing-cronicle-scheduler
description: Manages Cronicle timed task scheduler via REST API. Use when the user needs to list/create/update/delete scheduled events, run jobs on demand, monitor job status, view history, abort running jobs, or enable/disable the scheduler. Triggers include mentions of Cronicle, scheduled tasks, cron jobs, timed events, or scheduler management.
---
# Cronicle Scheduler Management
Manages the Cronicle timed task scheduling system via its REST API. Provides 14 operations for full lifecycle management of scheduled events and jobs.
## Setup
1. Copy `.env.example` to `.env` in the skill directory (`managing-cronicle-scheduler/`):
```
CRONICLE_BASE_URL=https://your-cronicle-server:3012
CRONICLE_API_KEY=your-32-character-hex-api-key
CRONICLE_DEFAULT_CATEGORY=your-default-category-id
CRONICLE_DEFAULT_TARGET=your-default-target
CRONICLE_DEFAULT_PLUGIN=your-default-plugin-id
```
2. Ensure `httpx` and `python-dotenv` are available:
```bash
pip install httpx python-dotenv
```
## Using the Client Script
All operations use `scripts/cronicle_client.py`. The script auto-loads `.env` from the skill directory.
**CLI usage** — quick single calls:
```bash
python scripts/cronicle_client.py <action> [--key value ...]
```
**Python import** — for multi-step workflows:
```python
from cronicle_client import CronicleAPI
c = CronicleAPI()
print(c.get_schedule(limit=10))
```
## Operations
### Event Management
| Operation | CLI Example | Description |
|-----------|------------|-------------|
| **get_schedule** | `python scripts/cronicle_client.py get_schedule` | List all events (paged). Supports `--offset` and `--limit`. |
| **get_event** | `python scripts/cronicle_client.py get_event --id evt123` | Get event by `--id` or `--title`. |
| **create_event** | `python scripts/cronicle_client.py create_event --title "My Job" -f params='{"url":"https://..."}'` | Create event. Add `--enabled 0/1`, timing, plugin params (see [plugin-params](references/plugin-params.md)). |
| **update_event** | `python scripts/cronicle_client.py update_event --id evt123 --enabled 0` | Update event fields by `--id`. Pass any event data as JSON via `--field key=value`. |
| **delete_event** | `python scripts/cronicle_client.py delete_event --id evt123` | Delete event by `--id`. Fails if jobs are running. |
### Job Execution
| Operation | CLI Example | Description |
|-----------|------------|-------------|
| **run_event** | `python scripts/cronicle_client.py run_event --title "My Job"` | Trigger immediate execution. Returns launched job IDs. |
| **get_job_status** | `python scripts/cronicle_client.py get_job_status --id job123` | Get job status, progress, CPU/mem usage. |
| **get_active_jobs** | `python scripts/cronicle_client.py get_active_jobs` | List all currently running jobs. |
| **update_job** | `python scripts/cronicle_client.py update_job --id job123 --timeout 600` | Modify running job: timeout, retries, resource limits, notifications. |
| **abort_job** | `python scripts/cronicle_client.py abort_job --id job123` | Force-kill a running job. |
### History
| Operation | CLI Example | Description |
|-----------|------------|-------------|
| **get_event_history** | `python scripts/cronicle_client.py get_event_history --id evt123` | Completed jobs for one event. Supports `--offset` and `--limit`. |
| **get_history** | `python scripts/cronicle_client.py get_history` | Global completed jobs. Supports `--offset` and `--limit`. |
### Scheduler Control
| Operation | CLI Example | Description |
|-----------|------------|-------------|
| **get_master_state** | `python scripts/cronicle_client.py get_master_state` | Check if scheduler is enabled (`enabled: 1` or `0`). |
| **update_master_state** | `python scripts/cronicle_client.py update_master_state --enabled 1` | Enable (`--enabled 1`) or disable (`--enabled 0`) the scheduler. |
## One-Shot Tasks
Cronicle has no native one-shot task support. For non-recurring tasks like "notify me at 3pm to buy groceries", use **Chain Reaction self-cleanup**:
1. Create the task event with exact `timing`
2. Create a cleanup event (`urlplug` calling `update_event` against the task)
3. Link them: `update_event(task_id, chain=cleanup_id)`
At the scheduled time: task runs → completes → chain triggers cleanup → cleanup disables task via API.
Full workflow with time expression mapping: [one-shot-tasks.md](references/one-shot-tasks.md)
## Event Data Fields
When creating/updating events, pass additional fields via `-f key=value` (CLI) or the `extra` dict (Python). Key fields:
- `plugin` — **must use plugin ID**: `urlplug` (HTTP Request) or `shellplug` (Shell Script)
- `params` — plugin-specific parameters (see [plugin-params.md](references/plugin-params.md)):
- **urlplug**: `method`, `url`, `headers`, `data`, `timeout`, `follow`, `ssl_cert_bypass`, `success_match`, `error_match`
- **shellplug**: `script`, `json`, `annotate`
- `timing` — cron-like schedule: `{"minutes":[5]}`, `{"hours":[4,16],"minutes":[30]}`
- `timeout` — max runtime in seconds
- `notify_success` / `notify_fail` — email notification recipients
- `web_hook` — URL to ping on job completion
Full event field reference: [event-fields.md](references/event-fields.md)
Full plugin param reference: [plugin-params.md](references/plugin-params.md)
## Workflow Example
```
1. Check active jobs: python scripts/cronicle_client.py get_active_jobs
2. List scheduled events: python scripts/cronicle_client.py get_schedule
3. Create a new event: python scripts/cronicle_client.py create_event --title "Daily Backup" \\
-f timing='{"hours":[2],"minutes":[0]}' \\
-f params='{"method":"POST","url":"https://backup.example.com/run","headers":"Content-Type: application/json\\nAuthorization: Bearer token\\n","data":"{\\"env\\":\\"prod\\"}","timeout":120}'
4. Run it immediately: python scripts/cronicle_client.py run_event --title "Daily Backup"
5. Check job status: python scripts/cronicle_client.py get_job_status --id <job_id_from_step_4>
```
## Environment Variables
Loaded from the skill directory's `.env` file (or system environment):
| Variable | Required | Description |
|----------|----------|-------------|
| `CRONICLE_BASE_URL` | Yes | Cronicle server URL (e.g., `https://your-server:3012`) |
| `CRONICLE_API_KEY` | Yes | 32-character hex API key |
| `CRONICLE_DEFAULT_CATEGORY` | Only for create | Default category ID |
| `CRONICLE_DEFAULT_TARGET` | Only for create | Default target server/group |
| `CRONICLE_DEFAULT_PLUGIN` | Only for create | Default plugin ID |
# Cronicle Event Data Format
Complete reference of all properties in the Cronicle event object, from the [official API docs](https://github.com/jhuckaby/Cronicle/blob/master/docs/APIReference.md#event-data-format). Pass these as `-f key=value` in CLI or include in the `extra` dict when using the Python API.
## Identity & Status
| Field | Type | Description |
|-------|------|-------------|
| `id` | string | Unique ID assigned when the event was first created |
| `title` | string | Display name shown on Schedule Tab, reports, and emails (required for create) |
| `enabled` | 0/1 | Whether the event is active in the scheduler |
| `category` | string | Category ID the event is assigned to |
| `plugin` | string | Plugin ID that runs the job: `urlplug` (HTTP Request), `shellplug` (Shell Script) |
| `target` | string | Server Group ID or individual server hostname |
| `algo` | string | Algorithm for picking a server from the target group: `round_robin`, `random`, `least_cpu`, `least_mem` |
| `created` | number | Date/time of initial creation, in Epoch seconds |
| `modified` | number | Date/time of last modification, in Epoch seconds |
| `username` | string | Username of user who created the event (if created in UI) |
| `api_key` | string | API Key of the application that created the event (if created via API) |
## Scheduling
| Field | Type | Description |
|-------|------|-------------|
| `timing` | object | Cron-like schedule. Omit for on-demand, empty `{}` for every minute. See [Timing Object](#timing-object-format) below. |
| `timezone` | string | IANA timezone string for interpreting timing (e.g., `"America/New_York"`, `"Asia/Shanghai"`) |
| `catch_up` | 0/1 | Run All Mode — run missed jobs when the scheduler restarts |
## Job Execution Control
| Field | Type | Description |
|-------|------|-------------|
| `timeout` | number | Max runtime in seconds before the job is aborted (default: 3600) |
| `max_children` | number | Max concurrent jobs allowed for this event (default: 1) |
| `multiplex` | 0/1 | Run a job on every server in the target group (default: 0) |
| `stagger` | number | Seconds to wait between multiplexed job launches |
| `retries` | number | Number of retries on failure before reporting error (default: 0) |
| `retry_delay` | number | Seconds between retries |
| `detached` | 0/1 | Detached mode — run job without waiting for result |
## Queuing
| Field | Type | Description |
|-------|------|-------------|
| `queue` | 0/1 | Allow jobs to queue when they can't run immediately |
| `queue_max` | number | Maximum queue length, when `queue` is enabled |
## Resource Limits
| Field | Type | Description |
|-------|------|-------------|
| `cpu_limit` | number | Max CPU % allowed (100 = 1 core), abort if exceeded. 0 = disabled |
| `cpu_sustain` | number | Seconds to allow cpu_limit to be exceeded before abort |
| `memory_limit` | number | Max memory in bytes, abort if exceeded. 0 = disabled |
| `memory_sustain` | number | Seconds to allow memory_limit to be exceeded before abort |
| `log_max_size` | number | Max job log file size in bytes, abort if exceeded. 0 = disabled |
## Notification & Chain
| Field | Type | Description |
|-------|------|-------------|
| `notify_success` | string | CSV email recipients notified on job success |
| `notify_fail` | string | CSV email recipients notified on job failure |
| `web_hook` | string | URL to ping on job start and completion |
| `chain` | string | Event ID to launch when job completes successfully |
| `chain_error` | string | Event ID to launch when job fails |
| `notes` | string | Free-text notes, included in email notifications |
## Plugin Parameters
| Field | Type | Description |
|-------|------|-------------|
| `params` | object | Plugin-specific parameters. See [plugin-params.md](plugin-params.md) for full details per plugin. |
## Timing Object Format
Cron-like schedule where each property is an array of numbers. Omitted properties = all values (like `*` in cron). Empty object `{}` means every minute.
| Property | Range | Example | Description |
|----------|-------|---------|-------------|
| `years` | YYYY | `[2026]` | One or more years |
| `months` | 1–12 | `[1, 4, 7, 10]` | January is 1, December is 12 |
| `days` | 1–31 | `[1, 15]` | Month days |
| `weekdays` | 0–6 | `[1,2,3,4,5]` | Sunday=0, Saturday=6 |
| `hours` | 0–23 | `[9, 18]` | 24-hour time |
| `minutes` | 0–59 | `[0, 30]` | Minute of the hour |
**Examples:**
- Hourly on the hour: `{"minutes":[0]}`
- Twice daily at 4:30 AM/PM: `{"hours":[4,16],"minutes":[30]}`
- Weekdays at 9:00 AM: `{"weekdays":[1,2,3,4,5],"hours":[9],"minutes":[0]}`
- 1st and 15th at 3 AM: `{"days":[1,15],"hours":[3],"minutes":[0]}`
- Summer quarter (Apr–Jun), weekdays, hourly: `{"months":[4,5,6],"weekdays":[1,2,3,4,5],"minutes":[0]}`
- On-demand only: omit `timing` entirely
- Every minute: `{}`
# One-Shot Tasks
Cronicle has no native one-shot task support. Use **Chain Reaction** to simulate it: the task event links a cleanup event that disables the task after one execution.
## Chain Reaction Self-Cleanup Workflow
```
Step 1: Create task event → get task_id
Step 2: Create cleanup event (urlplug that calls update_event API on task_id)
Step 3: Link cleanup via chain: update_event(task_id, chain=cleanup_id)
At scheduled time:
⏰ Task runs → completes → 🔗 triggers cleanup event → cleanup POSTs update_event(enabled=0) → task disabled
```
## Step-by-Step (CLI)
### Step 1: Create the task event
```bash
# Create the event that does the actual work
RESULT=$(python scripts/cronicle_client.py create_event --title "买菜通知" \
-f timing='{"hours":[15],"minutes":[0]}' \
-f params='{"method":"POST","url":"https://ntfy.example.com/mychannel","data":"买菜了!赶紧去!"}' \
--raw)
# Extract the event ID
TASK_ID=$(echo "$RESULT" | python -c "import sys,json; print(json.load(sys.stdin)['id'])")
echo "Task event: $TASK_ID"
```
### Step 2: Create the cleanup event
The cleanup event uses `urlplug` to call Cronicle's own `update_event` API, disabling the task event.
```bash
# Build the cleanup payload — hardcodes the task ID into the HTTP call
BASE=$(grep CRONICLE_BASE_URL .env | cut -d= -f2-)
KEY=$(grep CRONICLE_API_KEY .env | cut -d= -f2-)
# Since headers use \n as separator, the JSON data must also disable the task
# Important: cleanup event should target the SAME server group as the task
RESULT=$(python scripts/cronicle_client.py create_event --title "[cleanup] 买菜通知" \
-f params="{\"method\":\"POST\",\"url\":\"${BASE}/api/app/update_event/v1\",\"headers\":\"X-API-Key: ${KEY}\\nContent-Type: application/json\\n\",\"data\":\"{\\\"id\\\":\\\"${TASK_ID}\\\",\\\"enabled\\\":0}\"}" \
--raw)
CLEANUP_ID=$(echo "$RESULT" | python -c "import sys,json; print(json.load(sys.stdin)['id'])")
echo "Cleanup event: $CLEANUP_ID"
```
### Step 3: Link the cleanup to the task
```bash
python scripts/cronicle_client.py update_event --id "$TASK_ID" -f chain="$CLEANUP_ID"
```
### Verify
```bash
# Check the task event — it should now have chain set to the cleanup ID
python scripts/cronicle_client.py get_event --id "$TASK_ID"
```
### After execution
The cleanup event will auto-disable the task event (sets `enabled=0`). The task and cleanup events remain in the schedule (disabled), preserving history. To fully remove, run:
```bash
python scripts/cronicle_client.py delete_event --id "$CLEANUP_ID"
python scripts/cronicle_client.py delete_event --id "$TASK_ID"
```
## Time Expression Mapping
Convert natural-language time descriptions to Cronicle `timing` objects.
| Expression | Timing Object | Notes |
|------------|---------------|-------|
| 下午3点 | `{"hours":[15],"minutes":[0]}` | 24-hour format |
| 明天上午9点半 | `{"hours":[9],"minutes":[30]}` | 日期需在当天计算(一个月内靠 days 约束) |
| 5月20日晚上8点 | `{"months":[5],"days":[20],"hours":[20],"minutes":[0]}` | 每年5月20日触发 |
| 5分钟后 | 不适用 timing | 使用立即执行方案(见下) |
| 今天下午6点 | `{"hours":[18],"minutes":[0]}` | 当天触发 |
| 2026年6月1日上午10点 | `{"years":[2026],"months":[6],"days":[1],"hours":[10],"minutes":[0]}` | 绝对日期 |
### Timing Calculation Rules
- **今天某时**:直接用小时+分钟,不加日期约束
- **明天某时**:需在创建事件时计算明天的 day-of-month 值,加上 `days` 约束
- **指定日期**:用 `months` + `days`(和 `years` 如果跨年)
- **几分钟后**:不适合 timing,直接用 `run_event` 立即执行
## Shell Script Version
For shell users who prefer piping:
```bash
# All-in-one: create task, create cleanup, link them
TASK_ID=$(python scripts/cronicle_client.py create_event --title "$TITLE" \
-f timing="$TIMING" -f params="$PARAMS" --raw | \
python -c "import sys,json; print(json.load(sys.stdin)['id'])")
CLEANUP_ID=$(python scripts/cronicle_client.py create_event --title "[cleanup] $TITLE" \
-f params="{\"method\":\"POST\",\"url\":\"${BASE}/api/app/update_event/v1\",\"headers\":\"X-API-Key: ${KEY}\\nContent-Type: application/json\\n\",\"data\":\"{\\\"id\\\":\\\"${TASK_ID}\\\",\\\"enabled\\\":0}\"}" \
--raw | python -c "import sys,json; print(json.load(sys.stdin)['id'])")
python scripts/cronicle_client.py update_event --id "$TASK_ID" -f chain="$CLEANUP_ID"
echo "One-shot created: task=$TASK_ID cleanup=$CLEANUP_ID"
```
## Important Notes
- **API Key permissions**: cleanup event uses the same API key — must have `edit_events` privilege
- **No timing on cleanup**: the cleanup event has no `timing` (on-demand), only runs via chain reaction
- **Cleanup stays**: cleanup event runs once (triggered by chain), then remains as a disabled on-demand event. Delete it manually if needed
- **Concurrent tasks**: each one-shot task needs its own cleanup event, since the cleanup data hardcodes the task ID
# Plugin Parameter Reference
Detailed parameter definitions for Cronicle plugins. Pass these inside the `params` object when creating or updating events.
---
## HTTP Request (`urlplug`)
Plugin ID: `urlplug`. Makes HTTP requests to a specified URL.
### All Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `method` | string | `"GET"` | HTTP method: `GET`, `HEAD`, or `POST` |
| `url` | string | (required) | Target URL. Must match `/^https?:\/\/\S+$/i`. Supports `[placeholders]` substitution (e.g., `[event_title]`, `[id]`). |
| `headers` | string | — | Request headers in format `"Header-Name: value\n..."`. One header per line, separated by `\n`. Supports `[placeholders]`. |
| `data` | string | — | POST request body (only used when `method` is `POST`). Supports `[placeholders]`. |
| `timeout` | number | 0 | Total request timeout in seconds. 0 = no timeout. |
| `connect_timeout` | number | 10 | Connection timeout in seconds |
| `idle_timeout` | number | same as `timeout` | Idle timeout in seconds (defaults to `timeout` if not set) |
| `follow` | any | — | If set, follow HTTP redirects (up to 32 hops) |
| `ssl_cert_bypass` | any | — | If set, skip SSL certificate verification |
| `success_match` | string | `".*"` | Regex — response body must match this for success |
| `error_match` | string | `"(?!)"` | Regex — if response matches this, job is marked as error |
### Examples
**Simple GET request:**
```json
{
"params": {
"method": "GET",
"url": "https://api.example.com/health"
}
}
```
**POST with JSON body and headers:**
```json
{
"params": {
"method": "POST",
"url": "https://api.example.com/trigger",
"headers": "Content-Type: application/json\nAuthorization: Bearer my-token\n",
"data": "{\"action\":\"deploy\",\"env\":\"production\"}",
"timeout": 60,
"follow": 1,
"success_match": "\"status\":\"ok\""
}
}
```
**GET with SSL bypass and error matching:**
```json
{
"params": {
"method": "GET",
"url": "https://internal-api.local/status",
"ssl_cert_bypass": 1,
"error_match": "error|failure|500"
}
}
```
**Using placeholders in URL and data:**
```json
{
"params": {
"method": "POST",
"url": "https://hooks.example.com/webhook",
"data": "event=[event_title]&job_id=[id]&time=[now]"
}
}
```
Available placeholders include: `[id]`, `[event]`, `[event_title]`, `[category]`, `[category_title]`, `[plugin]`, `[plugin_title]`, `[target]`, `[now]`, `[hostname]`, `[chain_description]`, `[escape_chain_description]`.
---
## Shell Script (`shellplug`)
Plugin ID: `shellplug`. Executes a shell script on the target server.
### Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `script` | string | `"#!/bin/sh\n\n# Enter your shell script code here\n"` | Shell script content. Must start with a shebang line (e.g., `#!/bin/sh`, `#!/bin/bash`). |
| `json` | any | — | If set, child process JSON output is passed to Cronicle as real-time job updates (progress, HTML, etc.). |
| `annotate` | any | — | If set, non-JSON output lines get timestamp annotations like `[YYYY-MM-DD HH:MI:SS]`. |
**Progress reporting:** Output a line matching `\d+%` (e.g., `50%`) to report progress. The number is mapped to 0.0–1.0.
**Child JSON updates:** When `json` is enabled, the child process can emit JSON to stdout to update the job. Supported keys include:
- `progress` — number 0.0–1.0
- `html``{title: "...", content: "<pre>...</pre>"}` for custom job report
- `complete`, `code`, `description` — to signal job completion
**Placeholders:** The full job object is sent to the child process on stdin, accessible via standard shell input. Events set with `run_event` pass custom params via `event_data`.
Additional generic parameters (timeout, resource limits, etc.) are set at the event level, not inside `params`.
### Examples
**Simple shell command:**
```json
{
"params": {
"script": "#!/bin/sh\n\necho \"Hello from Cronicle at $(date)\"\n"
}
}
```
**Database maintenance script:**
```json
{
"params": {
"script": "#!/bin/sh\n\n/usr/local/bin/db-reindex.pl\n"
}
}
```
**Multi-step backup with error handling:**
```json
{
"params": {
"script": "#!/bin/bash\n\nset -e\n\nBACKUP_DIR=\"/backups/$(date +%Y%m%d)\"\nmkdir -p \"$BACKUP_DIR\"\n/usr/local/bin/backup-tool --output \"$BACKUP_DIR\"\ngzip \"$BACKUP_DIR\"/*.sql\necho \"Backup complete: $BACKUP_DIR\"\n"
}
}
```
#!/usr/bin/env python3
"""Cronicle REST API client — CLI and importable module.
CLI usage:
python cronicle_client.py <action> [--key value ...] [-f key=value]...
python cronicle_client.py get_schedule
python cronicle_client.py get_event --id evt123
python cronicle_client.py create_event --title "My Job" -f timing='{"minutes":[5]}'
python cronicle_client.py update_event --id evt123 --enabled 0
python cronicle_client.py delete_event --id evt123
python cronicle_client.py run_event --title "My Job"
python cronicle_client.py get_job_status --id job123
python cronicle_client.py get_active_jobs
python cronicle_client.py update_job --id job123 --timeout 600
python cronicle_client.py abort_job --id job123
python cronicle_client.py get_event_history --id evt123
python cronicle_client.py get_history
python cronicle_client.py get_master_state
python cronicle_client.py update_master_state --enabled 1
Python import:
from cronicle_client import CronicleAPI
c = CronicleAPI()
result = c.get_schedule(limit=10)
print(result)
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
from typing import Any
_MISSING_DEPS_MSG = "Missing dependencies. Install with: pip install httpx python-dotenv"
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _env_path() -> Path:
"""Path to .env file in the same directory as this script."""
return Path(__file__).resolve().parent.parent / ".env"
class CronicleAPIError(Exception):
pass
class CronicleAPI:
"""Synchronous HTTP client for the Cronicle REST API.
Loads configuration from .env on init.
"""
def __init__(self) -> None:
# Lazy-load dependencies and config
try:
import httpx # noqa: F811
from dotenv import load_dotenv as _load_dotenv
except ImportError:
print(_MISSING_DEPS_MSG, file=sys.stderr)
sys.exit(1)
env_file = _env_path()
if env_file.exists():
_load_dotenv(env_file)
else:
_load_dotenv()
self._httpx = httpx
self._base_url = os.environ.get("CRONICLE_BASE_URL", "").rstrip("/")
self._api_key = os.environ.get("CRONICLE_API_KEY", "")
if not self._base_url or not self._api_key:
raise CronicleAPIError(
"CRONICLE_BASE_URL and CRONICLE_API_KEY must be set in .env or environment"
)
def _post(self, path: str, data: dict[str, Any] | None = None) -> dict[str, Any]:
url = f"{self._base_url}/api/app/{path}/v1"
with self._httpx.Client(
headers={"X-API-Key": self._api_key},
follow_redirects=True,
timeout=30.0,
) as client:
resp = client.post(url, json=data or {})
resp.raise_for_status()
result: dict[str, Any] = resp.json()
if result.get("code") != 0:
raise CronicleAPIError(
f"{result.get('description', 'Unknown error')} (code: {result.get('code')})"
)
return result
# ------------------------------------------------------------------
# Event Management
# ------------------------------------------------------------------
def get_schedule(self, offset: int = 0, limit: int = 50) -> dict[str, Any]:
return self._post("get_schedule", {"offset": offset, "limit": limit})
def get_event(self, id: str | None = None, title: str | None = None) -> dict[str, Any]:
data: dict[str, Any] = {}
if id:
data["id"] = id
if title:
data["title"] = title
return self._post("get_event", data)
def create_event(
self,
title: str,
enabled: int = 1,
category: str = "",
plugin: str = "",
target: str = "",
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
data: dict[str, Any] = {
"title": title,
"enabled": enabled,
"category": category or os.environ.get("CRONICLE_DEFAULT_CATEGORY", "cmp6jqxkx01"),
"plugin": plugin or os.environ.get("CRONICLE_DEFAULT_PLUGIN", "urlplug"),
"target": target or os.environ.get("CRONICLE_DEFAULT_TARGET", "allgrp"),
}
if extra:
data.update(extra)
return self._post("create_event", data)
def update_event(self, id: str, extra: dict[str, Any] | None = None) -> dict[str, Any]:
data: dict[str, Any] = {"id": id}
if extra:
data.update(extra)
return self._post("update_event", data)
def delete_event(self, id: str) -> dict[str, Any]:
return self._post("delete_event", {"id": id})
# ------------------------------------------------------------------
# Job Execution
# ------------------------------------------------------------------
def run_event(
self,
id: str | None = None,
title: str | None = None,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
data: dict[str, Any] = {}
if id:
data["id"] = id
if title:
data["title"] = title
if extra:
data.update(extra)
return self._post("run_event", data)
def get_job_status(self, id: str) -> dict[str, Any]:
return self._post("get_job_status", {"id": id})
def get_active_jobs(self) -> dict[str, Any]:
return self._post("get_active_jobs")
def update_job(self, id: str, extra: dict[str, Any]) -> dict[str, Any]:
return self._post("update_job", {"id": id, **extra})
def abort_job(self, id: str) -> dict[str, Any]:
return self._post("abort_job", {"id": id})
# ------------------------------------------------------------------
# History
# ------------------------------------------------------------------
def get_event_history(
self, id: str, offset: int = 0, limit: int = 50
) -> dict[str, Any]:
return self._post("get_event_history", {"id": id, "offset": offset, "limit": limit})
def get_history(self, offset: int = 0, limit: int = 50) -> dict[str, Any]:
return self._post("get_history", {"offset": offset, "limit": limit})
# ------------------------------------------------------------------
# Scheduler Control
# ------------------------------------------------------------------
def get_master_state(self) -> dict[str, Any]:
return self._post("get_master_state")
def update_master_state(self, enabled: int) -> dict[str, Any]:
return self._post("update_master_state", {"enabled": enabled})
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _parse_json(val: str) -> Any:
"""Parse a JSON string value, falling back to raw string on failure."""
try:
return json.loads(val)
except (json.JSONDecodeError, ValueError):
return val
def _parse_fields(raw: list[str]) -> dict[str, Any]:
"""Parse -f key=value pairs where value may be JSON."""
result: dict[str, Any] = {}
for item in raw:
if "=" not in item:
print(f"Warning: skipping invalid field '{item}' (expected key=value)", file=sys.stderr)
continue
key, _, val = item.partition("=")
result[key] = _parse_json(val)
return result
def cli_main() -> None:
parser = argparse.ArgumentParser(description="Cronicle REST API client")
parser.add_argument(
"action",
choices=[
"get_schedule", "get_event", "create_event", "update_event", "delete_event",
"run_event", "get_job_status", "get_active_jobs", "update_job", "abort_job",
"get_event_history", "get_history", "get_master_state", "update_master_state",
],
help="API action to perform",
)
parser.add_argument("--id", help="Event or Job ID")
parser.add_argument("--title", help="Event title (case-sensitive)")
parser.add_argument("--offset", type=int, default=0, help="Pagination offset")
parser.add_argument("--limit", type=int, default=50, help="Pagination limit")
parser.add_argument("--enabled", type=int, help="Enabled state: 1 or 0")
parser.add_argument("--category", default="", help="Category ID")
parser.add_argument("--plugin", default="", help="Plugin ID")
parser.add_argument("--target", default="", help="Target server or group ID")
parser.add_argument("--timeout", type=int, help="Job timeout in seconds")
parser.add_argument(
"-f", "--field", action="append", dest="fields", default=[],
metavar="key=value",
help="Extra field as key=value (value parsed as JSON if possible). Repeatable.",
)
parser.add_argument("--raw", action="store_true", help="Output raw JSON string")
args = parser.parse_args()
extra = _parse_fields(args.fields)
# Merge common CLI args into extra (only non-empty)
for param in ("enabled", "timeout", "category", "plugin", "target"):
val = getattr(args, param, None)
if val not in (None, ""):
extra[param] = val
try:
api = CronicleAPI()
except CronicleAPIError as e:
print(f"Config error: {e}", file=sys.stderr)
sys.exit(1)
try:
action = args.action
if action == "get_schedule":
result = api.get_schedule(offset=args.offset, limit=args.limit)
elif action == "get_event":
result = api.get_event(id=args.id, title=args.title)
elif action == "create_event":
if not args.title:
print("Error: --title is required for create_event", file=sys.stderr)
sys.exit(1)
result = api.create_event(
title=args.title,
category=args.category,
plugin=args.plugin,
target=args.target,
extra=extra or None,
)
elif action == "update_event":
if not args.id:
print("Error: --id is required for update_event", file=sys.stderr)
sys.exit(1)
result = api.update_event(id=args.id, extra=extra or None)
elif action == "delete_event":
if not args.id:
print("Error: --id is required for delete_event", file=sys.stderr)
sys.exit(1)
result = api.delete_event(id=args.id)
elif action == "run_event":
if not args.id and not args.title:
print("Error: --id or --title is required for run_event", file=sys.stderr)
sys.exit(1)
result = api.run_event(id=args.id, title=args.title, extra=extra or None)
elif action == "get_job_status":
if not args.id:
print("Error: --id is required for get_job_status", file=sys.stderr)
sys.exit(1)
result = api.get_job_status(id=args.id)
elif action == "get_active_jobs":
result = api.get_active_jobs()
elif action == "update_job":
if not args.id:
print("Error: --id is required for update_job", file=sys.stderr)
sys.exit(1)
result = api.update_job(id=args.id, extra=extra)
elif action == "abort_job":
if not args.id:
print("Error: --id is required for abort_job", file=sys.stderr)
sys.exit(1)
result = api.abort_job(id=args.id)
elif action == "get_event_history":
if not args.id:
print("Error: --id is required for get_event_history", file=sys.stderr)
sys.exit(1)
result = api.get_event_history(id=args.id, offset=args.offset, limit=args.limit)
elif action == "get_history":
result = api.get_history(offset=args.offset, limit=args.limit)
elif action == "get_master_state":
result = api.get_master_state()
elif action == "update_master_state":
if args.enabled is None:
print("Error: --enabled (0 or 1) is required for update_master_state", file=sys.stderr)
sys.exit(1)
result = api.update_master_state(enabled=args.enabled)
else:
print(f"Unknown action: {action}", file=sys.stderr)
sys.exit(1)
if args.raw:
print(json.dumps(result))
else:
print(json.dumps(result, indent=2))
except CronicleAPIError as e:
print(f"API error: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
if "HTTP" in type(e).__name__:
print(f"HTTP error: {e}", file=sys.stderr)
else:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
cli_main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment