Skip to main content

infrahub_sdk.client

Functions

handle_relogin

handle_relogin(func: Callable[..., Coroutine[Any, Any, httpx.Response]]) -> Callable[..., Coroutine[Any, Any, httpx.Response]]

handle_relogin_sync

handle_relogin_sync(func: Callable[..., httpx.Response]) -> Callable[..., httpx.Response]

raise_for_error_deprecation_warning

raise_for_error_deprecation_warning(value: bool | None) -> None

Classes

ProcessRelationsNode

ProxyConfig

ProxyConfigSync

ProcessRelationsNodeSync

BaseClient

Base class for InfrahubClient and InfrahubClientSync

Methods:

request_context

request_context(self) -> RequestContext | None

request_context

request_context(self, request_context: RequestContext) -> None

start_tracking

start_tracking(self, identifier: str | None = None, params: dict[str, Any] | None = None, delete_unused_nodes: bool = False, group_type: str | None = None, group_params: dict[str, Any] | None = None, branch: str | None = None) -> Self

set_context_properties

set_context_properties(self, identifier: str, params: dict[str, str] | None = None, delete_unused_nodes: bool = True, reset: bool = True, group_type: str | None = None, group_params: dict[str, Any] | None = None, branch: str | None = None) -> None

InfrahubClient

GraphQL Client to interact with Infrahub.

Methods:

get_version

get_version(self) -> str

Return the Infrahub version.

get_user

get_user(self) -> dict

Return user information

get_user_permissions

get_user_permissions(self) -> dict

Return user permissions

create

create(self, kind: str, data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> InfrahubNode

create

create(self, kind: type[SchemaType], data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> SchemaType

create

create(self, kind: str | type[SchemaType], data: dict | None = None, branch: str | None = None, timeout: int | None = None, **kwargs: Any) -> InfrahubNode | SchemaType

delete

delete(self, kind: str | type[SchemaType], id: str, branch: str | None = None) -> None

get

get(self, kind: type[SchemaType], raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaType | None

get

get(self, kind: type[SchemaType], raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaType

get

get(self, kind: type[SchemaType], raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaType

get

get(self, kind: str, raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNode | None

get

get(self, kind: str, raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNode

get

get(self, kind: str, raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNode

get

get(self, kind: str | type[SchemaType], raise_when_missing: bool = True, at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, id: str | None = None, hfid: list[str] | None = None, include: list[str] | None = None, exclude: list[str] | None = None, populate_store: bool = True, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, include_metadata: bool = False, **kwargs: Any) -> InfrahubNode | SchemaType | None

count

count(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, partial_match: bool = False, **kwargs: Any) -> int

Return the number of nodes of a given kind.

all

all(self, kind: type[SchemaType], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ...) -> list[SchemaType]

all

all(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ...) -> list[InfrahubNode]

all

all(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False) -> list[InfrahubNode] | list[SchemaType]

Retrieve all nodes of a given kind

Args:

  • kind: kind of the nodes to query
  • at: Time of the query. Defaults to Now.
  • branch: Name of the branch to query from. Defaults to default_branch.
  • populate_store: Flag to indicate whether to populate the store with the retrieved nodes.
  • timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.
  • offset: The offset for pagination.
  • limit: The limit for pagination.
  • include: List of attributes or relationships to include in the query.
  • exclude: List of attributes or relationships to exclude from the query.
  • fragment: Flag to use GraphQL fragments for generic schemas.
  • prefetch_relationships: Flag to indicate whether to prefetch related node data.
  • parallel: Whether to use parallel processing for the query.
  • order: Ordering related options. Setting disable=True enhances performances.
  • include_metadata: If True, includes node_metadata and relationship_metadata in the query.

Returns:

  • list[InfrahubNode]: List of Nodes

filters

filters(self, kind: type[SchemaType], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., **kwargs: Any) -> list[SchemaType]

filters

filters(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., **kwargs: Any) -> list[InfrahubNode]

filters

filters(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, partial_match: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, **kwargs: Any) -> list[InfrahubNode] | list[SchemaType]

Retrieve nodes of a given kind based on provided filters.

Args:

  • kind: kind of the nodes to query
  • at: Time of the query. Defaults to Now.
  • branch: Name of the branch to query from. Defaults to default_branch.
  • timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.
  • populate_store: Flag to indicate whether to populate the store with the retrieved nodes.
  • offset: The offset for pagination.
  • limit: The limit for pagination.
  • include: List of attributes or relationships to include in the query.
  • exclude: List of attributes or relationships to exclude from the query.
  • fragment: Flag to use GraphQL fragments for generic schemas.
  • prefetch_relationships: Flag to indicate whether to prefetch related node data.
  • partial_match: Allow partial match of filter criteria for the query.
  • parallel: Whether to use parallel processing for the query.
  • order: Ordering related options. Setting disable=True enhances performances.
  • include_metadata: If True, includes node_metadata and relationship_metadata in the query.
  • **kwargs: Additional filter criteria for the query.

Returns:

  • list[InfrahubNodeSync]: List of Nodes that match the given filters.

clone

clone(self, branch: str | None = None) -> InfrahubClient

Return a cloned version of the client using the same configuration

execute_graphql

execute_graphql(self, query: str, variables: dict | None = None, branch_name: str | None = None, at: str | Timestamp | None = None, timeout: int | None = None, raise_for_error: bool | None = None, tracker: str | None = None) -> dict

Execute a GraphQL query (or mutation). If retry_on_failure is True, the query will retry until the server becomes reacheable.

Args:

  • query: GraphQL Query to execute, can be a query or a mutation
  • variables: Variables to pass along with the GraphQL query. Defaults to None.
  • branch_name: Name of the branch on which the query will be executed. Defaults to None.
  • at: Time when the query should be executed. Defaults to None.
  • timeout: Timeout in second for the query. Defaults to None.
  • raise_for_error: Deprecated. Controls only HTTP status handling.
  • None (default) or True: HTTP errors raise via resp.raise_for_status().
  • False: HTTP errors are not automatically raised. Defaults to None.

Raises:

  • GraphQLError: When the GraphQL response contains errors.

Returns:

  • The GraphQL data payload (response["data"]).

refresh_login

refresh_login(self) -> None

login

login(self, refresh: bool = False) -> None

query_gql_query

query_gql_query(self, name: str, variables: dict | None = None, update_group: bool = False, subscribers: list[str] | None = None, params: dict | None = None, branch_name: str | None = None, at: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> dict

create_diff

create_diff(self, branch: str, name: str, from_time: datetime, to_time: datetime, wait_until_completion: bool = True) -> bool | str

get_diff_summary

get_diff_summary(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> list[NodeDiff]

get_diff_tree

get_diff_tree(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> DiffTreeData | None

Get complete diff tree with metadata and nodes.

Returns None if no diff exists.

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaType

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaType | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> SchemaType

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNode

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNode | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> CoreNode | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType] | None = None, identifier: str | None = None, prefix_length: int | None = None, address_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> CoreNode | SchemaType | None

Allocate a new IP address by using the provided resource pool.

Args:

  • resource_pool: Node corresponding to the pool to allocate resources from.
  • identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.
  • prefix_length: Length of the prefix to set on the address to allocate.
  • address_type: Kind of the address to allocate.
  • data: A key/value map to use to set attributes values on the allocated address.
  • branch: Name of the branch to allocate from. Defaults to default_branch.
  • timeout: Flag to indicate whether to populate the store with the retrieved nodes.
  • tracker: The offset for pagination.
  • raise_for_error: Deprecated, raise an error if the HTTP status is not 2XX.

Returns: InfrahubNode: Node corresponding to the allocated resource.

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaType

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaType | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> SchemaType

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNode

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNode | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> CoreNode | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType] | None = None, identifier: str | None = None, prefix_length: int | None = None, member_type: str | None = None, prefix_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> CoreNode | SchemaType | None

Allocate a new IP prefix by using the provided resource pool.

Args:

  • resource_pool: Node corresponding to the pool to allocate resources from.
  • identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.
  • prefix_length: Length of the prefix to allocate.
  • member_type: Member type of the prefix to allocate.
  • prefix_type: Kind of the prefix to allocate.
  • data: A key/value map to use to set attributes values on the allocated prefix.
  • branch: Name of the branch to allocate from. Defaults to default_branch.
  • timeout: Flag to indicate whether to populate the store with the retrieved nodes.
  • tracker: The offset for pagination.
  • raise_for_error: Deprecated, raise an error if the HTTP status is not 2XX.

Returns: InfrahubNode: Node corresponding to the allocated resource.

create_batch

create_batch(self, return_exceptions: bool = False) -> InfrahubBatch

get_list_repositories

get_list_repositories(self, branches: dict[str, BranchData] | None = None, kind: str = 'CoreGenericRepository') -> dict[str, RepositoryData]

repository_update_commit

repository_update_commit(self, branch_name: str, repository_id: str, commit: str, is_read_only: bool = False) -> bool

convert_object_type

convert_object_type(self, node_id: str, target_kind: str, branch: str | None = None, fields_mapping: dict[str, ConversionFieldInput] | None = None) -> InfrahubNode

Convert a given node to another kind on a given branch. fields_mapping keys are target fields names and its values indicate how to fill in these fields. Any mandatory field not having an equivalent field in the source kind should be specified in this mapping. See https://docs.infrahub.app/guides/object-convert-type for more information.

InfrahubClientSync

Methods:

get_version

get_version(self) -> str

Return the Infrahub version.

get_user

get_user(self) -> dict

Return user information

get_user_permissions

get_user_permissions(self) -> dict

Return user permissions

create

create(self, kind: str, data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> InfrahubNodeSync

create

create(self, kind: type[SchemaTypeSync], data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> SchemaTypeSync

create

create(self, kind: str | type[SchemaTypeSync], data: dict | None = None, branch: str | None = None, timeout: int | None = None, **kwargs: Any) -> InfrahubNodeSync | SchemaTypeSync

delete

delete(self, kind: str | type[SchemaTypeSync], id: str, branch: str | None = None) -> None

clone

clone(self, branch: str | None = None) -> InfrahubClientSync

Return a cloned version of the client using the same configuration

execute_graphql

execute_graphql(self, query: str, variables: dict | None = None, branch_name: str | None = None, at: str | Timestamp | None = None, timeout: int | None = None, raise_for_error: bool | None = None, tracker: str | None = None) -> dict

Execute a GraphQL query (or mutation). If retry_on_failure is True, the query will retry until the server becomes reacheable.

Args:

  • query: GraphQL Query to execute, can be a query or a mutation
  • variables: Variables to pass along with the GraphQL query. Defaults to None.
  • branch_name: Name of the branch on which the query will be executed. Defaults to None.
  • at: Time when the query should be executed. Defaults to None.
  • timeout: Timeout in second for the query. Defaults to None.
  • raise_for_error: Deprecated. Controls only HTTP status handling.
  • None (default) or True: HTTP errors raise via resp.raise_for_status().
  • False: HTTP errors are not automatically raised. GraphQL errors always raise GraphQLError. Defaults to None.

Raises:

  • GraphQLError: When the GraphQL response contains errors.

Returns:

  • The GraphQL data payload (response["data"]).

count

count(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, partial_match: bool = False, **kwargs: Any) -> int

Return the number of nodes of a given kind.

all

all(self, kind: type[SchemaTypeSync], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ...) -> list[SchemaTypeSync]

all

all(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ...) -> list[InfrahubNodeSync]

all

all(self, kind: str | type[SchemaTypeSync], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False) -> list[InfrahubNodeSync] | list[SchemaTypeSync]

Retrieve all nodes of a given kind

Args:

  • kind: kind of the nodes to query
  • at: Time of the query. Defaults to Now.
  • branch: Name of the branch to query from. Defaults to default_branch.
  • timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.
  • populate_store: Flag to indicate whether to populate the store with the retrieved nodes.
  • offset: The offset for pagination.
  • limit: The limit for pagination.
  • include: List of attributes or relationships to include in the query.
  • exclude: List of attributes or relationships to exclude from the query.
  • fragment: Flag to use GraphQL fragments for generic schemas.
  • prefetch_relationships: Flag to indicate whether to prefetch related node data.
  • parallel: Whether to use parallel processing for the query.
  • order: Ordering related options. Setting disable=True enhances performances.
  • include_metadata: If True, includes node_metadata and relationship_metadata in the query.

Returns:

  • list[InfrahubNodeSync]: List of Nodes

filters

filters(self, kind: type[SchemaTypeSync], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., **kwargs: Any) -> list[SchemaTypeSync]

filters

filters(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., **kwargs: Any) -> list[InfrahubNodeSync]

filters

filters(self, kind: str | type[SchemaTypeSync], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, partial_match: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, **kwargs: Any) -> list[InfrahubNodeSync] | list[SchemaTypeSync]

Retrieve nodes of a given kind based on provided filters.

Args:

  • kind: kind of the nodes to query
  • at: Time of the query. Defaults to Now.
  • branch: Name of the branch to query from. Defaults to default_branch.
  • timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.
  • populate_store: Flag to indicate whether to populate the store with the retrieved nodes.
  • offset: The offset for pagination.
  • limit: The limit for pagination.
  • include: List of attributes or relationships to include in the query.
  • exclude: List of attributes or relationships to exclude from the query.
  • fragment: Flag to use GraphQL fragments for generic schemas.
  • prefetch_relationships: Flag to indicate whether to prefetch related node data.
  • partial_match: Allow partial match of filter criteria for the query.
  • parallel: Whether to use parallel processing for the query.
  • order: Ordering related options. Setting disable=True enhances performances.
  • include_metadata: If True, includes node_metadata and relationship_metadata in the query.
  • **kwargs: Additional filter criteria for the query.

Returns:

  • list[InfrahubNodeSync]: List of Nodes that match the given filters.

get

get(self, kind: type[SchemaTypeSync], raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaTypeSync | None

get

get(self, kind: type[SchemaTypeSync], raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaTypeSync

get

get(self, kind: type[SchemaTypeSync], raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaTypeSync

get

get(self, kind: str, raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNodeSync | None

get

get(self, kind: str, raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNodeSync

get

get(self, kind: str, raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNodeSync

get

get(self, kind: str | type[SchemaTypeSync], raise_when_missing: bool = True, at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, id: str | None = None, hfid: list[str] | None = None, include: list[str] | None = None, exclude: list[str] | None = None, populate_store: bool = True, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, include_metadata: bool = False, **kwargs: Any) -> InfrahubNodeSync | SchemaTypeSync | None

create_batch

create_batch(self, return_exceptions: bool = False) -> InfrahubBatchSync

Create a batch to execute multiple queries concurrently.

Executing the batch will be performed using a thread pool, meaning it cannot guarantee the execution order. It is not recommended to use such batch to manipulate objects that depend on each others.

get_list_repositories

get_list_repositories(self, branches: dict[str, BranchData] | None = None, kind: str = 'CoreGenericRepository') -> dict[str, RepositoryData]

query_gql_query

query_gql_query(self, name: str, variables: dict | None = None, update_group: bool = False, subscribers: list[str] | None = None, params: dict | None = None, branch_name: str | None = None, at: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> dict

create_diff

create_diff(self, branch: str, name: str, from_time: datetime, to_time: datetime, wait_until_completion: bool = True) -> bool | str

get_diff_summary

get_diff_summary(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> list[NodeDiff]

get_diff_tree

get_diff_tree(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> DiffTreeData | None

Get complete diff tree with metadata and nodes.

Returns None if no diff exists.

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaTypeSync

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaTypeSync | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> SchemaTypeSync

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNodeSync

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNodeSync | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> CoreNodeSync | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync] | None = None, identifier: str | None = None, prefix_length: int | None = None, address_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> CoreNodeSync | SchemaTypeSync | None

Allocate a new IP address by using the provided resource pool.

Args:

  • resource_pool: Node corresponding to the pool to allocate resources from.
  • identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.
  • prefix_length: Length of the prefix to set on the address to allocate.
  • address_type: Kind of the address to allocate.
  • data: A key/value map to use to set attributes values on the allocated address.
  • branch: Name of the branch to allocate from. Defaults to default_branch.
  • timeout: Flag to indicate whether to populate the store with the retrieved nodes.
  • tracker: The offset for pagination.
  • raise_for_error: The limit for pagination.

Returns: InfrahubNodeSync: Node corresponding to the allocated resource.

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaTypeSync

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaTypeSync | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> SchemaTypeSync

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNodeSync

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNodeSync | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> CoreNodeSync | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync] | None = None, identifier: str | None = None, prefix_length: int | None = None, member_type: str | None = None, prefix_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> CoreNodeSync | SchemaTypeSync | None

Allocate a new IP prefix by using the provided resource pool.

Args:

  • resource_pool: Node corresponding to the pool to allocate resources from.
  • identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.
  • size: Length of the prefix to allocate.
  • member_type: Member type of the prefix to allocate.
  • prefix_type: Kind of the prefix to allocate.
  • data: A key/value map to use to set attributes values on the allocated prefix.
  • branch: Name of the branch to allocate from. Defaults to default_branch.
  • timeout: Flag to indicate whether to populate the store with the retrieved nodes.
  • tracker: The offset for pagination.
  • raise_for_error: The limit for pagination.

Returns: InfrahubNodeSync: Node corresponding to the allocated resource.

repository_update_commit

repository_update_commit(self, branch_name: str, repository_id: str, commit: str, is_read_only: bool = False) -> bool

refresh_login

refresh_login(self) -> None

login

login(self, refresh: bool = False) -> None

convert_object_type

convert_object_type(self, node_id: str, target_kind: str, branch: str | None = None, fields_mapping: dict[str, ConversionFieldInput] | None = None) -> InfrahubNodeSync

Convert a given node to another kind on a given branch. fields_mapping keys are target fields names and its values indicate how to fill in these fields. Any mandatory field not having an equivalent field in the source kind should be specified in this mapping. See https://docs.infrahub.app/guides/object-convert-type for more information.