Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions .ci/clusters/values-oxia.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,28 @@
components:
zookeeper: false
oxia: true
# disable functions for oxia tests since there's no support for Oxia in
# BookKeeperPackagesStorage which requires Zookeeper
functions: false
# Functions are enabled on Oxia together with broker.packageManagement (FileSystemPackagesStorage).
# The default BookKeeper package storage requires ZooKeeper, but FileSystemPackagesStorage does not, so
# this validates Oxia + FileSystemPackagesStorage end to end: the function smoke test
# (ci::test_pulsar_function) creates a function from a JAR, which uploads the package via the broker's
# FileSystem-backed Packages Management Service.
functions: true

# Host the Packages Management Service on the broker with FileSystemPackagesStorage so functions work on
# Oxia. broker.replicaCount is 1 in CI (.ci/values-common.yaml), so the default ReadWriteOnce PVC on the
# kind default StorageClass is sufficient (no shared filesystem needed).
broker:
packageManagement:
enabled: true
fileSystemStorage:
enabled: true
# Use a NON-default storage path so the test actually exercises PULSAR_PREFIX_STORAGE_PATH being
# applied to the broker. The FileSystemPackagesStorage default ("packages-storage") would resolve to
# /pulsar/packages-storage (the cwd), which happens to match the chart's default mount path, so a
# broken STORAGE_PATH wiring would still pass. With a custom path the PVC is mounted here AND
# STORAGE_PATH points here, so the function-package upload only lands on the volume if both are wired
# correctly. ci::verify_package_storage_files asserts the uploaded package files exist under this path.
storagePath: /pulsar/test-packages-storage

oxia:
initialShardCount: 3
Expand Down
29 changes: 29 additions & 0 deletions .ci/helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -416,10 +416,39 @@ function ci::wait_message_processed() {
done
}

function ci::verify_package_storage_files() {
# When the broker hosts FileSystemPackagesStorage, the function package uploaded by `functions create`
# must land on the broker's shared package-storage volume (at STORAGE_PATH). Verify the files are
# actually there - this catches a broken STORAGE_PATH wiring (package written to the wrong directory,
# off the volume) or a volume the broker cannot write to (permissions / missing fsGroup).
if [[ "$(ci::helm_values_for_deployment | yq '.broker.packageManagement.fileSystemStorage.enabled')" != "true" ]]; then
return 0
fi
local storage_path
storage_path=$(ci::helm_values_for_deployment | yq '.broker.packageManagement.fileSystemStorage.storagePath')
if [[ -z "${storage_path}" || "${storage_path}" == "null" ]]; then
storage_path="/pulsar/packages-storage"
fi
echo "Verifying function package files exist under broker FileSystemPackagesStorage path: ${storage_path}"
${KUBECTL} exec -n "${NAMESPACE}" "${CLUSTER}"-broker-0 -- bash -c "ls -laR '${storage_path}' || true"
local file_count
file_count=$(${KUBECTL} exec -n "${NAMESPACE}" "${CLUSTER}"-broker-0 -- bash -c "find '${storage_path}' -type f 2>/dev/null | wc -l" | tr -d '[:space:]')
echo "FileSystemPackagesStorage file count under ${storage_path}: ${file_count}"
if [[ -z "${file_count}" || "${file_count}" -lt 1 ]]; then
echo >&2 "ERROR: no files found under FileSystemPackagesStorage path ${storage_path} on ${CLUSTER}-broker-0."
echo >&2 "The function package was not persisted to the package-storage volume (check STORAGE_PATH wiring, the PVC mount, and volume write permissions / fsGroup)."
return 1
fi
echo "OK: function package persisted to FileSystemPackagesStorage (${file_count} file(s) under ${storage_path})"
}

function ci::test_pulsar_function() {
echo "Testing functions"
echo "Creating function"
${KUBECTL} exec -n "${NAMESPACE}" "${CLUSTER}"-toolset-0 -- bin/pulsar-admin functions create --tenant pulsar-ci --namespace test --name test-function --inputs "pulsar-ci/test/test_input" --output "pulsar-ci/test/test_output" --parallelism 1 --classname org.apache.pulsar.functions.api.examples.ExclamationFunction --jar /pulsar/examples/api-examples.jar
# The package upload happens at create time; verify it landed on the broker's FileSystemPackagesStorage
# volume (no-op unless fileSystemStorage is enabled).
ci::verify_package_storage_files
echo "Creating subscription for output topic"
${KUBECTL} exec -n "${NAMESPACE}" "${CLUSTER}"-toolset-0 -- bin/pulsar-admin topics create-subscription -s test pulsar-ci/test/test_output
echo "Waiting for function to be ready"
Expand Down
39 changes: 37 additions & 2 deletions .ci/templates-all-values-patch1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,47 @@ bookkeeper:
storageClass:

# -----------------------------------------------------------------------------
# Broker: flip statefulsetUpgrade off
# Exercises: the path where broker-statefulset-upgrade.yaml renders nothing.
# Broker:
# - flip statefulsetUpgrade off (exercises the path where
# broker-statefulset-upgrade.yaml renders nothing).
# - enable FileSystemPackagesStorage with a created StorageClass + PersistentVolume + PVC
# (exercises all three branches of broker-package-storage.yaml plus the
# broker.packageManagement volume mount in broker-statefulset.yaml and the
# enablePackagesManagement keys in broker-configmap.yaml).
# -----------------------------------------------------------------------------
broker:
statefulsetUpgrade:
enabled: false
packageManagement:
enabled: true
fileSystemStorage:
enabled: true
storageClass:
metadata:
name: pulsar-pkg-sc
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
persistentVolume:
metadata:
name: pulsar-pkg-pv
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
storageClassName: pulsar-pkg-sc
hostPath:
path: /tmp/pulsar-packages
persistentVolumeClaim:
metadata:
name: pulsar-broker-package-storage
spec:
accessModes:
- ReadWriteMany
storageClassName: pulsar-pkg-sc
resources:
requests:
storage: 10Gi

# -----------------------------------------------------------------------------
# Cert-manager internal issuer: selfsigning -> ca
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pulsar-helm-chart-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ jobs:
- name: Pulsar Manager
values_file: .ci/clusters/values-pulsar-manager.yaml
shortname: pulsar-manager
- name: Oxia
- name: Oxia + FileSystemPackagesStorage
values_file: .ci/clusters/values-oxia.yaml
shortname: oxia
- name: OpenID
Expand Down
64 changes: 64 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,70 @@ The default user is `pulsar` and you can find out the password with this command
kubectl get secret -l component=pulsar-manager -o=jsonpath="{.items[0].data.UI_PASSWORD}" | base64 --decode
```

## Pulsar Functions package storage (required for Oxia)

The Pulsar **Packages Management Service** — which stores uploaded function packages
(`pulsar-admin functions create --jar ...`) — runs on the **broker**. Its default storage provider,
`BookKeeperPackagesStorage`, relies on DistributedLog metadata in **ZooKeeper**, so it does **not** work
when [Oxia](https://github.com/streamnative/oxia) is used as the metadata store (`components.oxia: true`).

To run Pulsar Functions on Oxia you must enable `FileSystemPackagesStorage` on the broker. The Packages
Management Service is configured in two levels: `broker.packageManagement.enabled` turns the service on, and
`broker.packageManagement.fileSystemStorage.enabled` selects the FileSystem provider:

```yaml
components:
oxia: true
functions: true
broker:
packageManagement:
enabled: true
fileSystemStorage:
enabled: true
```

This configures the broker with `enablePackagesManagement=true` and
`packagesManagementStorageProvider=FileSystemPackagesStorageProvider`, and mounts a **shared
`PersistentVolumeClaim`** on every broker pod as the package storage directory. If `components.functions`
is enabled without ZooKeeper (using Oxia) but FileSystemPackagesStorage is not enabled, the chart **fails the
Helm install** with an explanatory error (the default BookKeeper provider would not work without ZooKeeper).

### Choosing a volume

`FileSystemPackagesStorage` is a directory on disk, so the volume backing it determines how many broker
replicas can use it (all keys below are under `broker.packageManagement.fileSystemStorage`):

- **Single broker / single-node dev clusters (e.g. minikube):** the default `persistentVolumeClaim` is a
`ReadWriteOnce` claim on the cluster's default `StorageClass` — no extra configuration is required.
- **Multiple broker replicas:** the package directory must be on a **`ReadWriteMany` shared filesystem** — a
managed file service, **not** block storage (Persistent Disk / EBS / Azure Disk are `ReadWriteOnce` and
cannot be shared across replicas). Provision one with the matching cloud CSI driver, set
`persistentVolumeClaim: {}` so the chart does not create a claim, and point `claimName` at the pre-created
PVC:

| Cloud | Shared file service to use | CSI driver | Reference |
| ----- | -------------------------- | ---------- | --------- |
| GCP / GKE | Filestore (managed NFS) | `filestore.csi.storage.gke.io` | [Filestore CSI](https://docs.cloud.google.com/filestore/docs/csi-driver) |
| AWS / EKS | Amazon EFS (managed NFS) | `efs.csi.aws.com` | [EFS CSI on EKS](https://docs.aws.amazon.com/eks/latest/userguide/efs-csi.html) |
| Azure / AKS | Azure Files | `file.csi.azure.com` | [Azure Files on AKS](https://learn.microsoft.com/azure/aks/create-volume-azure-files) |

**Volume permissions.** Pulsar container images run as **uid `10000`, gid `0`** by default, so package files
are written by that user/group. The chart sets `broker.securityContext.fsGroup: 0`
(`fsGroupChangePolicy: OnRootMismatch`), which tells Kubernetes to set the volume's group to `0` and make it
group-writable — enough for the broker to read/write the package directory, and this works on most volume
types (block-storage CSI drivers, `hostPath`). Some shared filesystems — notably the NFS/SMB-backed
`ReadWriteMany` volumes above (EFS, Filestore, Azure Files) — **ignore `fsGroup`**. If package writes then
fail with permission errors, grant `uid 10000` / `gid 0` read-write-execute on the share itself: make the
directory group-`0`-owned and group-writable (e.g. `chown :0 <dir> && chmod 2770 <dir>` — `rwxrwx---` plus the
setgid bit so new entries inherit gid `0`), or set it via the CSI driver's mount options (for Azure Files SMB,
for example, `mountOptions: [uid=10000, gid=0, file_mode=0770, dir_mode=0770]`).

`broker.packageManagement.fileSystemStorage` can also create the `StorageClass`, `PersistentVolume`, and
`PersistentVolumeClaim` directly from raw YAML — only `apiVersion`/`kind` are fixed by the chart, and a
value of `{}` creates nothing. See the `broker.packageManagement` section in
[`values.yaml`](charts/pulsar/values.yaml) and the
[`examples/values-functions-fs-storage.yaml`](examples/values-functions-fs-storage.yaml) example.

## Grafana Dashboards

The Apache Pulsar Helm Chart uses the `victoria-metrics-k8s-stack` Helm Chart to deploy Grafana.
Expand Down
23 changes: 23 additions & 0 deletions charts/pulsar/templates/broker-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,29 @@ data:
{{- end }}
{{- end }}

# Package Management Service
# The broker hosts the Packages Management Service (packageManagement.enabled). With
# fileSystemStorage.enabled it uses FileSystemPackagesStorage, which lets Functions store packages without
# ZooKeeper (works with Oxia); the storage path is a shared volume mounted on every broker pod (see
# broker-statefulset.yaml / templates/broker-package-storage.yaml). Otherwise the broker keeps its default
# BookKeeperPackagesStorage provider.
{{- if .Values.broker.packageManagement.enabled }}
enablePackagesManagement: "true"
{{- if .Values.components.functions }}
# Route the broker-embedded function worker's package storage through the broker's Packages Management
# Service instead of BookKeeper/DLog, so Functions work without ZooKeeper (e.g. with Oxia). This is a
# broker.conf key (ServiceConfiguration): for the embedded worker, PulsarService overrides the worker's
# functionsWorkerEnablePackageManagement with the broker config value, so setting it via PF_
# (functions_worker.yml) has no effect.
functionsWorkerEnablePackageManagement: "true"
{{- end }}
{{- if .Values.broker.packageManagement.fileSystemStorage.enabled }}
packagesManagementStorageProvider: "org.apache.pulsar.packages.management.storage.filesystem.FileSystemPackagesStorageProvider"
# STORAGE_PATH is not a broker.conf key, so add it to the properties map via PULSAR_PREFIX_
PULSAR_PREFIX_STORAGE_PATH: "{{ .Values.broker.packageManagement.fileSystemStorage.storagePath }}"
{{- end }}
{{- end }}

# prometheus needs to access /metrics endpoint
webServicePort: "{{ .Values.broker.ports.http }}"
{{- if or (not .Values.tls.enabled) (not .Values.tls.broker.enabled) }}
Expand Down
31 changes: 31 additions & 0 deletions charts/pulsar/templates/broker-package-storage-validation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

{{- /*
Functions on Oxia require FileSystemPackagesStorage.
The function worker stores function packages in BookKeeper/DLog by default, which requires ZooKeeper and
therefore does not work with Oxia. The fix is to host the Packages Management Service with
FileSystemPackagesStorage on the broker (broker.packageManagement.enabled AND
broker.packageManagement.fileSystemStorage.enabled). Fail fast when functions run on Oxia without it.

This check lives in a rendered template (not a `_`-prefixed partial) so that the `fail` is executed.
*/ -}}
{{- if (and .Values.components.functions .Values.components.oxia (not (and .Values.broker.packageManagement.enabled .Values.broker.packageManagement.fileSystemStorage.enabled))) }}
{{- fail "ERROR: Pulsar Functions on Oxia require FileSystemPackagesStorage. The default BookKeeper package storage requires ZooKeeper and does not work with Oxia (components.oxia=true). Set broker.packageManagement.enabled=true and broker.packageManagement.fileSystemStorage.enabled=true to host FileSystemPackagesStorage on the broker, or use ZooKeeper as the metadata store. See the README and examples/README.md Functions section." }}
{{- end }}
55 changes: 55 additions & 0 deletions charts/pulsar/templates/broker-package-storage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Storage objects for the broker's FileSystemPackagesStorage (Packages Management Service).
# Each of broker.packageManagement.fileSystemStorage.{storageClass,persistentVolume,persistentVolumeClaim} is
# rendered verbatim from values with only apiVersion and kind fixed by the chart; an empty value ({}) renders
# nothing. This lets you either create the objects here (default: a minikube-friendly PVC on the default
# StorageClass) or set them to {} and reference a pre-created PVC (e.g. a ReadWriteMany cloud filesystem) via
# broker.packageManagement.fileSystemStorage.claimName.
{{- if and .Values.components.broker (not .Values.standalone.enabled) .Values.broker.packageManagement.enabled .Values.broker.packageManagement.fileSystemStorage.enabled }}
{{- $pm := .Values.broker.packageManagement.fileSystemStorage }}
{{- if $pm.storageClass }}
apiVersion: storage.k8s.io/v1
kind: StorageClass
{{ toYaml (omit $pm.storageClass "apiVersion" "kind") | trim }}
---
{{- end }}
{{- if $pm.persistentVolume }}
apiVersion: v1
kind: PersistentVolume
{{ toYaml (omit $pm.persistentVolume "apiVersion" "kind") | trim }}
---
{{- end }}
{{- if $pm.persistentVolumeClaim }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
{{- if not (dig "metadata" "namespace" "" $pm.persistentVolumeClaim) }}
namespace: {{ template "pulsar.namespace" . }}
{{- end }}
{{- with $pm.persistentVolumeClaim.metadata }}
{{- toYaml . | nindent 2 }}
{{- end }}
{{- with $pm.persistentVolumeClaim.spec }}
spec:
{{- toYaml . | nindent 2 }}
{{- end }}
{{- end }}
{{- end }}
15 changes: 15 additions & 0 deletions charts/pulsar/templates/broker-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ spec:
{{- end }}
spec:
serviceAccountName: "{{ template "pulsar.fullname" . }}-{{ .Values.broker.component }}-acct"
{{- if .Values.broker.securityContext }}
securityContext:
{{ toYaml .Values.broker.securityContext | indent 8 }}
{{- end }}
{{- if .Values.broker.nodeSelector }}
nodeSelector:
{{ toYaml .Values.broker.nodeSelector | indent 8 }}
Expand Down Expand Up @@ -314,6 +318,10 @@ spec:
{{- if .Values.broker.extraVolumeMounts }}
{{ toYaml .Values.broker.extraVolumeMounts | indent 10 }}
{{- end }}
{{- if and .Values.broker.packageManagement.enabled .Values.broker.packageManagement.fileSystemStorage.enabled }}
- name: broker-package-storage
mountPath: {{ .Values.broker.packageManagement.fileSystemStorage.storagePath }}
{{- end }}
{{- include "pulsar.broker.certs.volumeMounts" . | nindent 10 }}
env:
{{- if and (and .Values.broker.storageOffload (eq .Values.broker.storageOffload.driver "aws-s3")) .Values.broker.storageOffload.secret }}
Expand Down Expand Up @@ -380,6 +388,13 @@ spec:
secretName: {{ .Values.broker.storageOffload.gcsServiceAccountSecret }}
{{- end }}
{{- end }}
{{- if and .Values.broker.packageManagement.enabled .Values.broker.packageManagement.fileSystemStorage.enabled }}
# Shared package-storage volume mounted on every broker pod (FileSystemPackagesStorage).
# For more than one broker replica this PVC must be ReadWriteMany (a shared filesystem).
- name: broker-package-storage
persistentVolumeClaim:
claimName: {{ .Values.broker.packageManagement.fileSystemStorage.claimName }}
{{- end }}
{{- include "pulsar.broker.certs.volumes" . | nindent 6 }}
{{- include "pulsar.imagePullSecrets" . | nindent 6}}
{{- end }}
Loading