> ## Documentation Index
> Fetch the complete documentation index at: https://doc.lucidworks.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Custom Python Jobs

export const schema = {
  "type": "object",
  "title": "Custom Python Job",
  "description": "Use this job when you want to run a python/pyspark job",
  "required": ["id", "type"],
  "properties": {
    "id": {
      "type": "string",
      "title": "Spark Job ID",
      "description": "The ID for this Spark job. Used in the API to reference this job. Allowed characters: a-z, A-Z, dash (-) and underscore (_). Maximum length: 63 characters.",
      "maxLength": 63,
      "pattern": "[a-zA-Z][_\\-a-zA-Z0-9]*[a-zA-Z0-9]?"
    },
    "sparkConfig": {
      "type": "array",
      "title": "Spark Settings",
      "description": "Spark configuration settings.",
      "hints": ["advanced"],
      "items": {
        "type": "object",
        "required": ["key"],
        "properties": {
          "key": {
            "type": "string",
            "title": "Parameter Name"
          },
          "value": {
            "type": "string",
            "title": "Parameter Value"
          }
        }
      }
    },
    "script": {
      "type": "string",
      "title": "Python Script",
      "description": "Custom python/pyspark script to be submitted as a Fusion job",
      "hints": ["code/python", "lengthy"],
      "minLength": 1
    },
    "resourceName": {
      "type": "string",
      "title": "Blob Resource (python file)",
      "description": "Name of the resource uploaded to Blob store. This should match with the Blob name",
      "minLength": 1,
      "reference": "blob",
      "blobType": "file:spark"
    },
    "pythonFiles": {
      "type": "array",
      "title": "Python Files",
      "description": "Blob resource (.zip, .egg, .py files) to place on the PYTHONPATH for Python apps",
      "items": {
        "type": "string",
        "minLength": 1,
        "reference": "blob",
        "blobType": "file:spark"
      }
    },
    "submitArgs": {
      "type": "array",
      "title": "Spark args",
      "description": "Additional options to pass to the Spark Submit when running this job.",
      "hints": ["advanced"],
      "items": {
        "type": "string"
      }
    },
    "javaOptions": {
      "type": "array",
      "title": "Java options",
      "description": "Java options to pass to Spark driver/executor",
      "hints": ["advanced"],
      "items": {
        "type": "object",
        "required": ["key"],
        "properties": {
          "key": {
            "type": "string",
            "title": "Parameter Name"
          },
          "value": {
            "type": "string",
            "title": "Parameter Value"
          }
        }
      }
    },
    "verboseReporting": {
      "type": "boolean",
      "title": "Verbose reporting",
      "description": "Enables verbose reporting for SparkSubmit",
      "default": true,
      "hints": ["advanced"]
    },
    "envOptions": {
      "type": "array",
      "title": "ENV properties",
      "description": "Set environment variables for driver",
      "hints": ["advanced"],
      "items": {
        "type": "object",
        "required": ["key"],
        "properties": {
          "key": {
            "type": "string",
            "title": "Parameter Name"
          },
          "value": {
            "type": "string",
            "title": "Parameter Value"
          }
        }
      }
    },
    "type": {
      "type": "string",
      "title": "Spark Job Type",
      "enum": ["custom_python_job"],
      "default": "custom_python_job",
      "hints": ["readonly"]
    }
  },
  "additionalProperties": true,
  "category": "Other",
  "categoryPriority": 1
};

export const SchemaParamFields = ({schema}) => {
  const sanitize = str => {
    if (typeof str !== "string") return str;
    return str.replace(/^"(.*)"$/s, "$1").replace(/\\/g, "").replace(/"/g, "'");
  };
  const formatDescription = str => {
    const s = sanitize(str);
    return (/[.!?]\)*$/).test(s) ? s : `${s}.`;
  };
  const {description, properties = {}, required: requiredProps = []} = schema;
  const visibleProps = useMemo(() => Object.entries(properties).filter(([, prop]) => !prop.hints?.includes("hidden")), [properties]);
  return <div>
      {description && <p>{formatDescription(description)}</p>}

      {visibleProps.map(([name, prop]) => {
    const isRequired = requiredProps.includes(name);
    const hasDefault = prop.default !== undefined;
    const rawDefault = prop.default;
    const isComplexDefault = hasDefault && (typeof rawDefault === "object" || typeof rawDefault === "string" && (rawDefault.length > 20 || rawDefault.includes('"')));
    const fieldProps = {
      key: name,
      body: prop.title || name,
      type: prop.type,
      ...prop.title && ({
        post: [<><span className="text-stone-400 dark:text-stone-500">API property: </span>{name}</>]
      }),
      ...isRequired && ({
        required: true
      }),
      ...!isComplexDefault && hasDefault ? {
        default: sanitize(String(rawDefault))
      } : {}
    };
    const isObject = prop.type === "object" && prop.properties;
    const isArrayOfObjects = prop.type === "array" && prop.items?.type === "object" && prop.items.properties;
    return <ParamField {...fieldProps}>
            {prop.description && <p>{formatDescription(prop.description)}</p>}

            {isComplexDefault && <div className="flex">
                <p>
                  <strong>Default:</strong>
                </p>
                <pre className="!my-0">
                  <code>
                    {JSON.stringify(rawDefault, null, 2)}
                  </code>
                </pre>
              </div>}

            {isArrayOfObjects && <div className="flex">
              <p>
                <strong>Object attributes:</strong>
              </p>
              <pre className="!my-0">
                <code>
                  {'{\n'}
                  {Object.entries(prop.items.properties).map(([iname, iprop]) => <>
                      {`  ${iname}`}
                      {prop.items?.required?.includes(iname) && <span style={{
      color: 'red'
    }}> required</span>}
                      {`: {\n    display name: ${sanitize(iprop.title || '')}\n    type: ${iprop.type}\n  }\n`}
                    </>)}
                  {'}'}
                </code>
              </pre>
              </div>}

            {isObject && <Expandable title="properties">
                <SchemaParamFields schema={{
      properties: prop.properties,
      required: prop.required
    }} />
              </Expandable>}
          </ParamField>;
  })}
    </div>;
};

export const LwTemplate = ({title = "Key questions to get you started", icon = "sparkles", cta = "Powered by Agent Studio", linkHref = "https://lucidworks.com/demo/?utm_source=docs&utm_medium=referral&utm_campaign=docs_cta_ai"}) => {
  const [isLoaded, setIsLoaded] = useState(false);
  useEffect(() => {
    const timer = setTimeout(() => {
      setIsLoaded(true);
    }, 500);
    return () => clearTimeout(timer);
  }, []);
  return <div className="lw-template-container">
      <Card title={title} icon={icon}>
        {isLoaded && <span dangerouslySetInnerHTML={{
    __html: `<lw-template id="a029c1a9-28be-427e-b0e1-5d918920246a"></lw-template
            >`
  }} />}
        <Link href={linkHref} className="agent-studio-link text-left text-gray-600 gap-2 dark:text-gray-400 text-sm font-medium flex flex-row items-center hover:text-primary dark:hover:text-primary-light group-hover:text-primary group-hover:dark:text-primary-light">Powered by Lucidworks Agent Studio</Link>
      </Card>
    </div>;
};

[localhost link]: http://localhost:3000/docs/5/fusion/reference/config-ref/jobs/custom-python

[mintlify link]: https://doc.lucidworks.com/docs/5/fusion/reference/config-ref/jobs/custom-python

[old doc.lw link]: https://doc.lucidworks.com/fusion/5.9/569

The Custom Python job provides user the ability to run Python code via Fusion.

<Note>
  The Python version required depends on your Spark version:

  * **Spark 3.4.1** (default in Fusion 5.9.10 and later): Requires Python 3.10
  * **Spark 3.2.2** (Fusion 5.9.9 and configurable in 5.9.12 and later): Supports Python 3.7.3

  See [Configure the Spark version](/docs/5/fusion/operations/survival-guide/spark-kubernetes-overview#configure-the-spark-version) for details about switching between Spark versions.
</Note>

<LwTemplate />

## Usage

Python code can be entered directly in the job configuration editor or you can reference a script that has been uploaded to the blob store. Additional Python libraries or files can be supplied via a Python files configuration.

## Examples

Example Python script that indexes data from parquet to Solr via a Fusion index pipeline:

```py wrap  expandable  theme={"dark"}
from pyspark.sql import SparkSession

import sys

"""
Python script that indexes data from parquet to Fusion via index pipeline

zkhost of the cluster is always passed as the first argument
"""
if __name__ == "__main__":
  if len(sys.argv) != 5:
    print("Program requires 3 arguments. Args passed {}. Add <parquet_file> COLLECTION_NAME <index-pipeline> via submit args in the job config".format(sys.argv), file=sys.stderr)
    sys.exit(-1)

  zkhost = sys.argv[1]
  parquet_file = sys.argv[2]
  collection = sys.argv[3]
  index_pipeline = sys.argv[4]

  sparkSession = SparkSession.builder  
    .appName("load_data_to_index_pipeline")  
    .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")  
    .config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")  
    .getOrCreate()

  df=sparkSession.read.parquet(parquet_file).limit(1000)
  df.write.format("lucidworks.fusion.index").option(
      "zkhost", zkhost).option(
      "collection", collection).option(
      "pipeline", index_pipeline).save()
```

The above script is wrapped in the `script` variable of the job config and several arguments are passed via the `submitArgs` configuration key:

```json wrap  theme={"dark"}
{
  "type": "custom_python_job",
  "id": "test_python_script",
  "script": "\nfrom pyspark.sql import SparkSession\n\nimport sys\n\n\"\"\"\nPython script that indexes data from parquet to Fusion via index pipeline\n \nzkhost of the cluster is always passed as the first argument\n\"\"\"\nif __name__ == \"__main__\":\n  if len(sys.argv) != 5:\n    print(\"Program requires 3 arguments. Args passed {}. Add <parquet_file> COLLECTION_NAME <index-pipeline> via submit args in the job config\".format(sys.argv), file=sys.stderr)\n    sys.exit(-1)\n\n  zkhost = sys.argv[1]\n  parquet_file = sys.argv[2]\n  collection = sys.argv[3]\n  index_pipeline = sys.argv[4]  \n\n  sparkSession = SparkSession.builder    .appName(\"load_data_to_index_pipeline\")    .config(\"fs.gs.impl\", \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\")    .config(\"fs.AbstractFileSystem.gs.impl\", \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS\")    .getOrCreate()\n\n  df=sparkSession.read.parquet(parquet_file).limit(1000)\n  df.write.format(\"lucidworks.fusion.index\").option(\n      \"zkhost\", zkhost).option(\n      \"collection\", collection).option(\n      \"pipeline\", index_pipeline).save()\n",
  "submitArgs": [
    "gs://smartdata-datasets/best_buy_product_catalog.snappy.parquet",
    "demo",
    "demo"
  ],
  "verboseReporting": true
}
```

For more PySpark script examples, see [https://github.com/apache/spark/blob/v2.4.4/examples/src/main/python](https://github.com/apache/spark/blob/v2.4.4/examples/src/main/python).

## Configuration

Apache Arrow is installed to the image and the two settings below are enabled by default. If you want to disable arrow optimization, set these properties to false in the job config or in job-launcher config map:

```js theme={"dark"}
spark.sql.execution.arrow.enabled true
spark.sql.execution.arrow.fallback.enabled true
```

See also [https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html).

## Available libraries

These libraries are available in the Fusion Spark image:

* `numpy`
* `scipy`
* `matplotlib`
* `pandas`
* `scikit-learn`

## Adding libraries

If you need to add extra libraries to run your code, you can upload the Python egg files to the blob store and reference their blob IDs in the job configuration.

However, machine learning libraries (like `tensorflow`, `keras`, and `pytorch`) are not easy to install with that approach. To install those libraries, follow this approach instead:

1. Use this example `Dockerfile` to extend from the base image:

   ```py theme={"dark"}
   FROM lucidworks/fusion-spark:5.0.2
   RUN pip3 install tensorflow keras pytorch
   ```

2. Build the Docker image and publish it to your own Docker registry.

3. Once the image is built, the custom image can be specified in the Spark settings via `spark.kubernetes.driver.container.image` and `spark.kubernetes.executor.container.image`.

<Tip>
  **Important**

  If you upload `.zip` files to add libraries, use the `Other` blob type for binary files instead of the `File` blob type. If the `File` blob type is used, the custom Python job fails.
</Tip>

## Configuration properties

<SchemaParamFields schema={schema} />
