ckanapi_harvesters.policies package
Submodules
ckanapi_harvesters.policies.data_format_policy module
Data format policy representation and enforcing
- class ckanapi_harvesters.policies.data_format_policy.CkanPackageDataFormatPolicy(label: str = None, description: str = None, package_tags: TagGroupsListPolicy = None, package_custom_fields: CustomFieldsPolicy = None, package_mandatory_attributes: Set[str] = None, resource_mandatory_attributes: Set[str] = None, datastore_fields_mandatory_attributes: Set[str] = None, resource_format: SingleValueListPolicy = None)
Bases:
DataPolicyABCMain class to define data format policy for package metadata
- copy() CkanPackageDataFormatPolicy
- enforce(values: CkanPackageInfo, *, context: dict = None, verbose: bool = True, buffer: List[DataPolicyError] = None) bool
- static from_dict(d: dict) CkanPackageDataFormatPolicy
- static from_json(policy_file: str, *, base_dir: str = None, headers: dict = None, proxies: dict = None, auth: AuthBase | Tuple[str, str] = None, verify: bool | str | None = None, error_not_found: bool = True, load_error: bool = True) CkanPackageDataFormatPolicy | None
- static from_jsons(stream: str, *, source_file: str = None, load_error: bool = True) CkanPackageDataFormatPolicy | None
- package_update_scores(ckan: CkanApiManage, package_info: CkanPackageInfo, package_buffer: List[DataPolicyError], raise_error: bool = True) bool
Update the package scores on the CKAN server in package custom fields.
- Returns:
True if a package update is required. If ckan argument was given, the package update is applied.
- policy_check_package(package_info: CkanPackageInfo, *, package_buffer: List[DataPolicyError] = None, display_message: bool = True, raise_error: bool = False) bool
Main entry-point to check the policy rules against the package.
- Parameters:
package_info – package and resources metadata
package_buffer – you can specify a list object to indirectly obtain the detailed list of error messages. The keys of this dictionary are the package names.
display_message – option to display the messages in the command line
raise_error – option to raise an exception if any rule with a high error level is encountered
- Returns:
True if no error was encountered
ckanapi_harvesters.policies.data_format_policy_abc module
Data format policy representation and enforcing
- class ckanapi_harvesters.policies.data_format_policy_abc.DataPolicyABC(error_level: ErrorLevel = ErrorLevel.Information)
Bases:
ABC- abstractmethod enforce(values: Any, *, context: dict = None, verbose: bool = True, buffer: List[DataPolicyError] = None) bool
- error_level: ErrorLevel
- abstractmethod static from_dict(d: dict)
- abstractmethod to_dict() dict
- class ckanapi_harvesters.policies.data_format_policy_abc.DataPolicyElementABC(mandatory: bool = False, error_level: ErrorLevel = ErrorLevel.Information)
Bases:
DataPolicyABC,ABC- abstractmethod static from_dict(d: dict)
- match_mode: StringMatchMode
- abstractmethod to_dict() dict
ckanapi_harvesters.policies.data_format_policy_custom_fields module
Data format policy representation and enforcing
- class ckanapi_harvesters.policies.data_format_policy_custom_fields.CustomFieldSpecification(key: str = None, values: List[str] = None, data_type: DataType = None, match_mode: StringMatchMode = StringMatchMode.Any, help: str = None, mandatory: bool = False, error_level: ErrorLevel = ErrorLevel.Information)
Bases:
DataPolicyElementABC- enforce(values: str, *, context: dict = None, verbose: bool = True, buffer: List[DataPolicyError] = None) bool
- static from_df_row(row: Series) CustomFieldSpecification
- static from_dict(d: dict) CustomFieldSpecification
- to_dict() dict
- class ckanapi_harvesters.policies.data_format_policy_custom_fields.CustomFieldsPolicy(custom_fields_spec: List[CustomFieldSpecification] = None, restrict_to_list: ErrorLevel = ErrorLevel.Information, keys_case_sensitive: bool = True, mandatory: bool = False, error_level: ErrorLevel = ErrorLevel.Information)
Bases:
DataPolicyElementABC- enforce(values: Dict[str, str], *, context: dict = None, verbose: bool = True, buffer: List[DataPolicyError] = None) bool
- static from_dict(d: dict) CustomFieldsPolicy
- to_dict() dict
ckanapi_harvesters.policies.data_format_policy_defs module
Data format policy representation and enforcing
- class ckanapi_harvesters.policies.data_format_policy_defs.DataType(*values)
Bases:
IntEnum- Bool = 4
- Numeric = 2
- Text = 1
- TimeStamp = 3
- static from_str(s)
- class ckanapi_harvesters.policies.data_format_policy_defs.ListChoiceMode(*values)
Bases:
IntEnum- Any = 0
- MandatoryMulti = 3
- MandatoryOne = 2
- MaxOne = 1
- NoExtra = 4
- static from_str(s)
- class ckanapi_harvesters.policies.data_format_policy_defs.StringMatchMode(*values)
Bases:
IntEnum- Any = 0
- Match = 2
- MatchCaseSensitive = 3
- NotEmpty = 1
- Regex = 4
- RegexCaseSensitive = 5
- Wildcard = 6
- WildcardCaseSensitive = 7
- static from_str(s)
- class ckanapi_harvesters.policies.data_format_policy_defs.StringValueSpecification(value: str, help: str = None)
Bases:
object- static from_dict(values: dict) StringValueSpecification
- static from_tuple(values: str | Tuple[str, str]) StringValueSpecification
- help: str
- to_dict() dict
- to_tuple() Tuple[str, str]
- value: str
ckanapi_harvesters.policies.data_format_policy_errors module
Data format policy representation and enforcing
- exception ckanapi_harvesters.policies.data_format_policy_errors.DataPolicyError(context: dict, error_level: ErrorLevel, policy_message: str)
Bases:
ErrorLevelMessage- to_dict() dict
- class ckanapi_harvesters.policies.data_format_policy_errors.ErrorCount(messages_list: List[DataPolicyError])
Bases:
object- error_count_message() str
- exception ckanapi_harvesters.policies.data_format_policy_errors.UnsupportedPolicyVersionError(file_version)
Bases:
Exception
- exception ckanapi_harvesters.policies.data_format_policy_errors.UrlPolicyLockedError(url)
Bases:
Exception
ckanapi_harvesters.policies.data_format_policy_lists module
Data format policy representation and enforcing for lists of values such as tags
- class ckanapi_harvesters.policies.data_format_policy_lists.ExtraValueListPolicy(list_specs: List[StringValueSpecification] = None, value_select: ListChoiceMode = ListChoiceMode.Any, mandatory: bool = False, error_level: ErrorLevel = ErrorLevel.Information)
Bases:
ValueListPolicy- enforce(values: str | List[str], *, context: dict = None, verbose: bool = True, buffer: List[DataPolicyError] = None, extra_spec_rm: Set[str] = None) bool
- static from_ValueListPolicy(value: ValueListPolicy) ExtraValueListPolicy
- static from_dict(d: dict) ExtraValueListPolicy
- class ckanapi_harvesters.policies.data_format_policy_lists.GroupedValueListPolicy(value_group_specs: List[ValueListPolicy] = None, extra_values: ExtraValueListPolicy = None, mandatory: bool = False, error_level: ErrorLevel = ErrorLevel.Information)
Bases:
DataPolicyElementABC- enforce(values: str | List[str], *, context: dict = None, verbose: bool = True, buffer: List[DataPolicyError] = None) bool
- static from_dict(d: dict) GroupedValueListPolicy
- to_dict() dict
- class ckanapi_harvesters.policies.data_format_policy_lists.SingleValueListPolicy(base_list: ValueListPolicy = None, extra_values: ListChoiceMode = ListChoiceMode.Any, mandatory: bool = False)
Bases:
DataPolicyElementABC- enforce(values: str | List[str], *, context: dict = None, verbose: bool = True, buffer: List[DataPolicyError] = None) bool
- static from_dict(d: dict) SingleValueListPolicy
- to_dict() dict
- update_base_list(base_list: ValueListPolicy)
- class ckanapi_harvesters.policies.data_format_policy_lists.ValueListPolicy(list_specs: List[StringValueSpecification] = None, group_name: str = None, value_select: ListChoiceMode = ListChoiceMode.Any, mandatory: bool = False, error_level: ErrorLevel = ErrorLevel.Information)
Bases:
DataPolicyElementABC- enforce(values: str | List[str], *, context: dict = None, verbose: bool = True, buffer: List[DataPolicyError] = None) bool
- static from_dict(d: dict) ValueListPolicy
- list_specs_str() List[str]
- to_dict() dict
ckanapi_harvesters.policies.data_format_policy_output_config module
Data format policy representation and enforcing
- class ckanapi_harvesters.policies.data_format_policy_output_config.DataFormatPolicyOutputCustomFields
Bases:
object- default_package_datastore_rowcount_custom_field: str | None = 'DataStore Rows'
- default_package_datastore_size_custom_field: str | None = 'DataStore Size'
- default_package_external_size_custom_field: str | None = 'External Size'
- default_package_filestore_size_custom_field: str | None = 'FileStore Size'
- default_report_custom_field: str | None = 'Data Policy Report'
- default_report_timestamp_field: str | None = 'Last Report Timestamp'
- default_score_custom_field: str | None = 'Data Policy Score'
- disable_all()
- set_metadata_policy_fields(*, score: str = None, report: str = None)
- to_dict() dict
ckanapi_harvesters.policies.data_format_policy_tag_groups module
Data format policy representation and enforcing for lists of tags grouped in vocabularies
- class ckanapi_harvesters.policies.data_format_policy_tag_groups.TagGroupsListPolicy(value_group_specs: List[ValueListPolicy] = None, extra_values: ExtraValueListPolicy = None, mandatory: bool = False, error_level: ErrorLevel = ErrorLevel.Information)
Bases:
GroupedValueListPolicy
- class ckanapi_harvesters.policies.data_format_policy_tag_groups.TagListPolicy(list_specs: List[StringValueSpecification] = None, group_name: str = None, value_select: ListChoiceMode = ListChoiceMode.Any, mandatory: bool = False, error_level: ErrorLevel = ErrorLevel.Information)
Bases:
ValueListPolicy- get_tags_list_dict(vocabulary_id: str = None) List[Dict[str, str]]
Generate tags dictionary to initiate a vocabulary using the CKAN API.
- Parameters:
vocabulary_id
- Returns:
Module contents
Package to enforce CKAN data policies.