Skip to main content
POST
/
job-launcher
/
launcher
Launch a job
import requests

url = "https://{FUSION_HOST}/job-launcher/launcher"

payload = {
    "type": "aggregation",
    "id": "api-test-app_click_signals_aggregation",
    "inputCollection": "api-test-app_signals",
    "sourceCatchup": True,
    "sourceRemove": False,
    "sql": "WITH sigs_with_filters AS ( SELECT c.query as query, c.doc_id, q.filters_s as filters, c.type, c.ref_time, coalesce(c.count_i,1) as count_i, c.timestamp_tdt, greatest(coalesce(c.weight_d,0.1),0.0) as weight_d FROM api-test-app_signals c LEFT JOIN (SELECT id, filters_s FROM api-test-app_signals WHERE type='response') q ON q.id = c.fusion_query_id WHERE c.type IN ('click','cart','purchase') AND c.timestamp_tdt >= c.catchup_timestamp_tdt ), signal_type_groups AS ( SELECT SUM(count_i) AS typed_aggr_count_i, query, doc_id, type, filters, time_decay(count_i, timestamp_tdt, \"30 days\", ref_time, weight_d) AS typed_weight_d FROM sigs_with_filters GROUP BY doc_id, query, filters, type ) SELECT concat_ws('|', query, doc_id, filters) as id, SUM(typed_aggr_count_i) AS aggr_count_i, query AS query_s, query AS query_t, doc_id AS doc_id_s, filters AS filters_s, SPLIT(filters, ' \\$ ') AS filters_ss, weighted_sum(typed_weight_d, type, 'click:1.0,cart:10.0,purchase:25.0') AS weight_d FROM signal_type_groups GROUP BY query, doc_id, filters",
    "rollupSql": "SELECT concat_ws('|', query_s, doc_id_s, filters_s) as id,
  query_s,
  query_s as query_t,
  doc_id_s,
  filters_s,
  first(aggr_type_s) AS aggr_type_s,
  SPLIT(filters_s, ' \\$ ') AS filters_ss,
  SUM(weight_d) AS weight_d,
  SUM(aggr_count_i) AS aggr_count_i
  FROM api-test-app_signals_aggr
  GROUP BY query_s, doc_id_s, filters_s",
    "referenceTime": "2025-10-17T18:56:14.660Z",
    "skipCheckEnabled": True,
    "readOptions": [
        {
            "key": "splits_per_shard",
            "value": "4"
        }
    ],
    "skipJobIfSignalsEmpty": True,
    "parameters": [
        {
            "key": "signalTypeWeights",
            "value": "click:1.0,cart:10.0,purchase:25.0"
        },
        {
            "key": "signalTypes",
            "value": "_regex/signalTypeWeights/([\w\-\.]*):([\d\.\-]*)(,|$)/'$1'$3/g"
        }
    ],
    "selectQuery": "*:*",
    "outputCollection": "api-test-app_signals_aggr",
    "useNaturalKey": True,
    "optimizeSegments": 0,
    "dataFormat": "solr",
    "sparkSQL": "SELECT * from spark_input",
    "sparkPartitions": 200
}
headers = {"Content-Type": "application/json"}

response = requests.post(url, json=payload, headers=headers)

print(response.json())
{
  "state": "finished",
  "jobId": "hhzitrwancwv",
  "jobConfig": {
    "type": "aggregation",
    "id": "api-test-app_click_signals_aggregation",
    "inputCollection": "api-test-app_signals",
    "sourceCatchup": true,
    "sourceRemove": false,
    "sql": "WITH sigs_with_filters AS ( SELECT c.query as query, c.doc_id, q.filters_s as filters, c.type, c.ref_time, coalesce(c.count_i,1) as count_i, c.timestamp_tdt, greatest(coalesce(c.weight_d,0.1),0.0) as weight_d FROM api-test-app_signals c LEFT JOIN (SELECT id, filters_s FROM api-test-app_signals WHERE type='response') q ON q.id = c.fusion_query_id WHERE c.type IN ('click','cart','purchase') AND c.timestamp_tdt >= c.catchup_timestamp_tdt ), signal_type_groups AS ( SELECT SUM(count_i) AS typed_aggr_count_i, query, doc_id, type, filters, time_decay(count_i, timestamp_tdt, \"30 days\", ref_time, weight_d) AS typed_weight_d FROM sigs_with_filters GROUP BY doc_id, query, filters, type ) SELECT concat_ws('|', query, doc_id, filters) as id, SUM(typed_aggr_count_i) AS aggr_count_i, query AS query_s, query AS query_t, doc_id AS doc_id_s, filters AS filters_s, SPLIT(filters, ' \\\\$ ') AS filters_ss, weighted_sum(typed_weight_d, type, 'click:1.0,cart:10.0,purchase:25.0') AS weight_d FROM signal_type_groups GROUP BY query, doc_id, filters",
    "rollupSql": "SELECT concat_ws('|', query_s, doc_id_s, filters_s) as id,\n  query_s,\n  query_s as query_t,\n  doc_id_s,\n  filters_s,\n  first(aggr_type_s) AS aggr_type_s,\n  SPLIT(filters_s, ' \\\\$ ') AS filters_ss,\n  SUM(weight_d) AS weight_d,\n  SUM(aggr_count_i) AS aggr_count_i\n  FROM api-test-app_signals_aggr\n  GROUP BY query_s, doc_id_s, filters_s",
    "referenceTime": "2025-10-17T18:56:14.660Z",
    "skipCheckEnabled": true,
    "readOptions": [
      {
        "key": "splits_per_shard",
        "value": "4"
      }
    ],
    "skipJobIfSignalsEmpty": true,
    "parameters": [
      {
        "key": "signalTypeWeights",
        "value": "click:1.0,cart:10.0,purchase:25.0"
      },
      {
        "key": "signalTypes",
        "value": "_regex/signalTypeWeights/([\\w\\-\\.]*):([\\d\\.\\-]*)(,|$)/'$1'$3/g"
      }
    ],
    "selectQuery": "*:*",
    "outputCollection": "api-test-app_signals_aggr",
    "useNaturalKey": true,
    "optimizeSegments": 0,
    "dataFormat": "solr",
    "sparkSQL": "SELECT * from spark_input",
    "sparkPartitions": 200
  },
  "hostname": "driver-api-test-app-click-signal-sjnamxwxsyhq",
  "result": {
    "jobConfigId": "api-test-app_click_signals_aggregation",
    "jobRunId": "sjnamxwxsyhq",
    "aggrClass": "SQL",
    "query": "WITH sigs_with_filters AS (SELECT c.query as query, c.doc_id, q.filters_s as filters, c.type, c.ref_time, coalesce(c.count_i,1) as count_i, c.timestamp_tdt, greatest(coalesce(c.weight_d,0.1),0.0) as weight_d FROM api_test_app_signals c LEFT JOIN (SELECT id, filters_s FROM api_test_app_signals WHERE type='response') q ON q.id = c.fusion_query_id WHERE c.type IN ('click','cart','purchase') AND c.timestamp_tdt >= c.catchup_timestamp_tdt ), signal_type_groups AS (SELECT SUM(count_i) AS typed_aggr_count_i, query, doc_id, type, filters, time_decay(count_i, timestamp_tdt, \"30 days\", ref_time, weight_d) AS typed_weight_d FROM sigs_with_filters GROUP BY doc_id, query, filters, type) SELECT concat_ws('|', query, doc_id, filters) as id, SUM(typed_aggr_count_i) AS aggr_count_i, query AS query_s, query AS query_t, doc_id AS doc_id_s, filters AS filters_s, SPLIT(filters, ' \\\\$ ') AS filters_ss, weighted_sum(typed_weight_d, type, 'click:1.0,cart:10.0,purchase:25.0') AS weight_d FROM signal_type_groups GROUP BY query, doc_id, filters",
    "state": "finished",
    "aggregated": 0,
    "applicationId": "spark-0424dda9215441c0b232210af1d7cb67",
    "podId": "driver-api-test-app-click-signal-sjnamxwxsyhq",
    "aggr_type_s": "click@doc_id,filters,query"
  },
  "startTime": "2025-10-16T19:49:15.106Z",
  "endTime": "2025-10-16T19:49:42.276Z",
  "duration": 27170
}

Body

application/json
id
string

The name of the job configuration. This is distinct from the job run ID (found in job status responses), which identifies a specific job run.

Example:

"api-test-app_click_signals_aggregation"

sparkConfig
object[]

The configuration for this Spark job. The available keys depend on the job type. You can use the /spark/schema endpoint to fetch the configuration schemas for all Spark job types.

type
enum<string>

The Spark job type.

Available options:
aggregation,
als_recommender,
argo-classification,
argo-data-augmentation,
argo-delete-model,
argo-delete-ray-model,
argo-deploy-model,
argo-deploy-ray-model,
argo-item-recommender-content,
argo-item-recommender-user,
argo-milvus-create-collections,
argo-milvus-create-indexes,
argo-milvus-delete-collections,
argo-milvus-delete-indexes,
argo-qna-coldstart,
argo-qna-evaluate,
argo-qna-supervised,
argo-upload-model,
build-training,
cluster_labeling,
custom_python_job,
custom_spark_scala_job,
doc_clustering,
experiment_sql,
ground_truth,
headTailAnalysis,
logistic_regression_classifier_trainer,
outlier_detection,
parallel-bulk-loader,
query_similarity,
random_forests_classifier,
ranking_metrics,
script,
similar_queries,
sip,
sql_template,
synonymDetection,
tokenPhraseSpellCorrection,
transfer,
trending-recommender,
word2vec

Response

200 - application/hal+json

OK

state
enum<string>
Available options:
unknown,
idle,
starting,
running,
finishing,
cancelling,
finished,
cancelled,
error,
skipped
Example:

"finished"

jobId
string

The job run ID. This is distinct from id which is the name of the job configuration.

Example:

"hhzitrwancwv"

jobConfig
object
Example:
{
"type": "aggregation",
"id": "api-test-app_click_signals_aggregation",
"inputCollection": "api-test-app_signals",
"sourceCatchup": true,
"sourceRemove": false,
"sql": "WITH sigs_with_filters AS ( SELECT c.query as query, c.doc_id, q.filters_s as filters, c.type, c.ref_time, coalesce(c.count_i,1) as count_i, c.timestamp_tdt, greatest(coalesce(c.weight_d,0.1),0.0) as weight_d FROM api-test-app_signals c LEFT JOIN (SELECT id, filters_s FROM api-test-app_signals WHERE type='response') q ON q.id = c.fusion_query_id WHERE c.type IN ('click','cart','purchase') AND c.timestamp_tdt >= c.catchup_timestamp_tdt ), signal_type_groups AS ( SELECT SUM(count_i) AS typed_aggr_count_i, query, doc_id, type, filters, time_decay(count_i, timestamp_tdt, \"30 days\", ref_time, weight_d) AS typed_weight_d FROM sigs_with_filters GROUP BY doc_id, query, filters, type ) SELECT concat_ws('|', query, doc_id, filters) as id, SUM(typed_aggr_count_i) AS aggr_count_i, query AS query_s, query AS query_t, doc_id AS doc_id_s, filters AS filters_s, SPLIT(filters, ' \\\\$ ') AS filters_ss, weighted_sum(typed_weight_d, type, 'click:1.0,cart:10.0,purchase:25.0') AS weight_d FROM signal_type_groups GROUP BY query, doc_id, filters",
"rollupSql": "SELECT concat_ws('|', query_s, doc_id_s, filters_s) as id,\n query_s,\n query_s as query_t,\n doc_id_s,\n filters_s,\n first(aggr_type_s) AS aggr_type_s,\n SPLIT(filters_s, ' \\\\$ ') AS filters_ss,\n SUM(weight_d) AS weight_d,\n SUM(aggr_count_i) AS aggr_count_i\n FROM api-test-app_signals_aggr\n GROUP BY query_s, doc_id_s, filters_s",
"referenceTime": "2025-10-17T18:56:14.660Z",
"skipCheckEnabled": true,
"readOptions": [{ "key": "splits_per_shard", "value": "4" }],
"skipJobIfSignalsEmpty": true,
"parameters": [
{
"key": "signalTypeWeights",
"value": "click:1.0,cart:10.0,purchase:25.0"
},
{
"key": "signalTypes",
"value": "_regex/signalTypeWeights/([\\w\\-\\.]*):([\\d\\.\\-]*)(,|$)/'$1'$3/g"
}
],
"selectQuery": "*:*",
"outputCollection": "api-test-app_signals_aggr",
"useNaturalKey": true,
"optimizeSegments": 0,
"dataFormat": "solr",
"sparkSQL": "SELECT * from spark_input",
"sparkPartitions": 200
}
hostname
string
Example:

"driver-api-test-app-click-signal-sjnamxwxsyhq"

result
object
Example:
{
"jobConfigId": "api-test-app_click_signals_aggregation",
"jobRunId": "sjnamxwxsyhq",
"aggrClass": "SQL",
"query": "WITH sigs_with_filters AS (SELECT c.query as query, c.doc_id, q.filters_s as filters, c.type, c.ref_time, coalesce(c.count_i,1) as count_i, c.timestamp_tdt, greatest(coalesce(c.weight_d,0.1),0.0) as weight_d FROM api_test_app_signals c LEFT JOIN (SELECT id, filters_s FROM api_test_app_signals WHERE type='response') q ON q.id = c.fusion_query_id WHERE c.type IN ('click','cart','purchase') AND c.timestamp_tdt >= c.catchup_timestamp_tdt ), signal_type_groups AS (SELECT SUM(count_i) AS typed_aggr_count_i, query, doc_id, type, filters, time_decay(count_i, timestamp_tdt, \"30 days\", ref_time, weight_d) AS typed_weight_d FROM sigs_with_filters GROUP BY doc_id, query, filters, type) SELECT concat_ws('|', query, doc_id, filters) as id, SUM(typed_aggr_count_i) AS aggr_count_i, query AS query_s, query AS query_t, doc_id AS doc_id_s, filters AS filters_s, SPLIT(filters, ' \\\\$ ') AS filters_ss, weighted_sum(typed_weight_d, type, 'click:1.0,cart:10.0,purchase:25.0') AS weight_d FROM signal_type_groups GROUP BY query, doc_id, filters",
"state": "finished",
"aggregated": 0,
"applicationId": "spark-0424dda9215441c0b232210af1d7cb67",
"podId": "driver-api-test-app-click-signal-sjnamxwxsyhq",
"aggr_type_s": "click@doc_id,filters,query"
}
startTime
string
Example:

"2025-10-16T19:49:15.106Z"

endTime
string
Example:

"2025-10-16T19:49:42.276Z"

duration
integer
Example:

27170