The flamepy package provides a synchronous client for Flame sessions and tasks, a service base class for host-shim applications, object-cache helpers, and the Runner API for packaging Python workloads.
By default the SDK reads ~/.flame/flame.yaml:
---
current-context: flame
contexts:
- name: flame
cluster:
endpoint: "http://127.0.0.1:8080"
cache:
endpoint: "grpc://127.0.0.1:9090"
package:
excludes:
- "*.log"
- "*.pkl"Environment variables override the file:
FLAME_ENDPOINTFLAME_CACHE_ENDPOINTFLAME_CACHE_STORAGEFLAME_CA_FILE
Top-level helpers use the configured default connection:
import flamepy
session = flamepy.create_session(
"flmping",
min_instances=1,
resreq=flamepy.ResourceRequirement.from_string("cpu=1,mem=1g"),
)
output = session.invoke(b"hello")
session.close()Public helpers:
connect(addr, tls_config=None) -> Connectioncreate_session(application, common_data=None, session_id=None, min_instances=0, max_instances=None, batch_size=1, resreq=None) -> Sessionopen_session(session_id, spec=None) -> Sessionregister_application(name, app_attrs) -> Noneunregister_application(name) -> Nonelist_applications() -> list[Application]get_application(name) -> Application | Nonelist_sessions() -> list[Session]get_session(session_id) -> Sessionclose_session(session_id) -> Session
Session represents an open or closed Flame session.
Methods:
create_task(input_data: bytes) -> Taskget_task(task_id) -> Tasklist_tasks() -> Iterator[Task]watch_task(task_id, timeout=None) -> TaskWatcherinvoke(input_data) -> bytes | Nonerun(input_data) -> concurrent.futures.Futureclose() -> Nonecommon_data() -> bytes | None
create_task() expects bytes. run() creates a task, watches it in the background, and resolves the returned Future with the task output.
SessionAttributes:
application: strid: str | Nonecommon_data: bytes | Nonemin_instances: intmax_instances: int | Nonebatch_size: intresreq: ResourceRequirement | None
ApplicationAttributes:
shim: Shim | Noneimage: str | Nonedescription: str | Nonelabels: list[str] | Nonecommand: str | Nonearguments: list[str] | Noneenvironments: dict[str, str] | Noneworking_directory: str | Nonemax_instances: int | Nonedelay_release: int | Noneschema: ApplicationSchema | Noneurl: str | Noneinstaller: str | None
ResourceRequirement.from_string("cpu=1,mem=1g,gpu=0") parses user-friendly resource strings into CPU, memory bytes, and GPU counts.
Top-level flamepy exports:
ObjectRefput_object(key_prefix, obj)get_object(ref, deserializer=None)update_object(ref, new_obj)
flamepy.core also exports:
ObjectKeyWILDCARD_SESSIONpatch_object(ref, delta)upload_object(key_or_prefix, file_path)download_object(ref, dest_path)
flamepy.cache exports delete_objects(key_prefix) in addition to the basic cache helpers.
Object references are versioned:
version=0forces a fresh download.version>=1lets the client reuse a cached base object and fetch only newer patches when the server can provide them.- Without a custom deserializer,
get_object()returns only the base object for backward compatibility. - With a deserializer,
get_object(ref, deserializer)callsdeserializer(base, deltas).
Host-shim Python services subclass flamepy.FlameService and run with flamepy.run():
import flamepy
class Echo(flamepy.FlameService):
def on_session_enter(self, context):
self.session_id = context.session_id
def on_task_invoke(self, context):
return context.input
def on_session_leave(self):
pass
if __name__ == "__main__":
flamepy.run(Echo())Service contexts expose bytes-oriented APIs:
SessionContext.session_idSessionContext.applicationSessionContext.common_data()TaskContext.task_idTaskContext.session_idTaskContext.input
Runner lives under flamepy.runner:
from flamepy.runner import Runner
def add(a, b):
return a + b
with Runner("add-app") as runner:
svc = runner.service(add)
print(svc(1, 2).get())Key classes and helpers:
Runner(name, fail_if_exists=False)Runner.service(execution_object, autoscale=None, warmup=0, resreq=None)Runner.get(futures),Runner.ref(futures),Runner.wait(futures),Runner.select(futures)ObjectFuture.get(),ObjectFuture.ref(),ObjectFuture.wait()get_data(data)for decoding Runner task input/output payloads
SessionState.OPEN,SessionState.CLOSEDTaskState.PENDING,TaskState.RUNNING,TaskState.SUCCEED,TaskState.FAILEDApplicationState.ENABLED,ApplicationState.DISABLEDShim.HOST,Shim.WASMFlameErrorCode.INVALID_CONFIG,INVALID_STATE,INVALID_ARGUMENT,INTERNAL,ALREADY_EXISTS,NOT_FOUND
SDK operations raise FlameError with a code and message:
import flamepy
try:
flamepy.connect("invalid://address")
except flamepy.FlameError as exc:
print(exc.code, exc.message)