Metadata-Version: 2.4
Name: bugseq-client
Version: 0.1.7
Summary: Python client for BugSeq (https://bugseq.com)
Project-URL: Homepage, https://gitlab.com/bugseq-open/bugseq-client-py
Project-URL: Issues, https://gitlab.com/bugseq-open/bugseq-client-py/-/issues
Author-email: Josh Chorlton <josh@bugseq.com>
License: Apache 2.0
License-File: LICENSE
Requires-Python: >=3.11
Requires-Dist: authlib>=1.6.1
Requires-Dist: keyring>=25.6.0
Requires-Dist: pydantic>=2.11.7
Requires-Dist: python-dateutil>=2.9.0
Requires-Dist: requests>=2.32.4
Provides-Extra: test
Requires-Dist: pytest-cov>=4.1.0; extra == 'test'
Requires-Dist: pytest>=8.0.0; extra == 'test'
Description-Content-Type: text/markdown

# BugSeq Python Client Library

Authentication and client code for interacting with the [BugSeq](https://bugseq.com) API.

## Installation

`pip install bugseq-client`

## Example Usage

```python
import tempfile
from pathlib import Path

import requests
from bugseq_client.auth import (
    DEFAULT_OAUTH_CONFIG,
    Session,
    get_default_credential_storage_provider,
)
from bugseq_client.openapi_client import (
    ApiClient,
    AppModelsJobRunOptionsSampleType,
    Configuration,
    JobRunSubmitRequest,
    JobsApi,
    Kit,
    MoleculeType,
    Platform,
    RunOptionsRequest,
)
from bugseq_client.utils.upload import multipart_upload


# you should replace this with your lab ID
LAB_ID = "lab_123"


def submit_test_job(api_client: ApiClient, file_ids: list[str]):
    api = JobsApi(api_client)

    job_run_submit_request = JobRunSubmitRequest(
        user_provided_name="My awesome analysis",
        file_ids=file_ids,
        run_options=RunOptionsRequest(
            platform=Platform.NANOPORE,
            kit=Kit.MINION_R10_4_1,
            sample_type=AppModelsJobRunOptionsSampleType.RESPIRATORY_UPPER,
            molecule_type=MoleculeType.DNA,
        ),
        lab_id=LAB_ID,
        testmode=True,
    )
    api.submit_analysis_v1_jobs_post(job_run_submit_request)


def download_url_to_file(download_url: str, dst: Path):
    with requests.get(download_url, stream=True) as r:
        r.raise_for_status()
        with open(dst, "wb") as f:
            for chunk in r.iter_content(chunk_size=8192):
                if chunk:  # filter out keep-alive chunks
                    f.write(chunk)


def fetch_and_download_job_results(api_client: ApiClient):
    api = JobsApi(api_client)

    print("fetching jobs")
    jobs_resp = api.list_analyses_v1_jobs_get()
    for job in jobs_resp.job_runs:
        print(f"id={job.id} name={job.user_provided_name} status={job.job_status}")

    print()

    assert len(jobs_resp.job_runs) > 0, "No jobs found, no results to pull"

    job_id = jobs_resp.job_runs[0].id

    results_dir = Path(f"downloaded-results-{job_id}")

    print(f"fetching outputs for job {job_id} to {results_dir}")

    results_resp = api.get_analysis_results_v1_jobs_job_id_results_get(job_id)
    for output in results_resp.outputs:
        print(f"  output filename={output.filename} size={output.size}")

        if not output.filename.startswith("sample_reports/"):
            print("  does not match summary_reports filter, skipping")
            continue

        download_resp = (
            api.download_analysis_result_v1_jobs_job_id_results_download_get(
                job_id, output.filename
            )
        )

        download_dst = results_dir / output.filename
        download_dst.parent.mkdir(exist_ok=True, parents=True)
        download_url_to_file(download_resp.url, download_dst)
        print(f"  downloaded filename={output.filename} dst={str(download_dst)}")


class App:
    def __init__(self):
        storage = get_default_credential_storage_provider()
        self.session = Session(DEFAULT_OAUTH_CONFIG, storage)

    def run(self):
        token = self.session.get_token()

        configuration = Configuration(
            host="https://api.bugseq.com",
            access_token=token,
        )

        with ApiClient(configuration) as api_client:
            with tempfile.NamedTemporaryFile() as tmp:
                tmp_path = Path(tmp.name)
                tmp_path.write_text("test")
                file_id = multipart_upload(api_client, tmp_path)

            submit_test_job(api_client, file_ids=[file_id])
            fetch_and_download_job_results(api_client)


def main():
    app = App()
    app.run()


if __name__ == "__main__":
    main()
```
