Source code for infoblox.record

Base Record Object

import logging

from infoblox import exceptions
from infoblox import mapping

LOGGER = logging.getLogger(__name__)

class Record(mapping.Mapping):
    """This object is extended by specific Infoblox record types and implements
    the core API behavior of a record class. Attributes that map to other
    infoblox records will be instances of those record types.

    :param infoblox.Session session: The infoblox session object
    :param str reference_id: The infoblox _ref value for the record
    :param dict kwargs: Key-value pairs that when passed in, if the a key
        matches an attribute of the record, the value will be assigned.

    view = 'default'

    _ref = None
    _repr_keys = ['_ref']
    _return_ignore = ['view']
    _save_ignore = []
    _search_by = []
    _session = None
    _supports = []
    _wapi_type = 'record'

    def __init__(self, session, reference_id=None, **kwargs):
        """Create a new instance of the Record passing in the Infoblox
        session object and the reference id for the record.

        super(Record, self).__init__(**kwargs)
        self._session = session
        self._ref = reference_id
        self._search_values = self._build_search_values(kwargs)
        if self._ref or self._search_values:

    def __repr__(self):
        return '<%s %s>' % (self.__class__.__name__,
                            ' '.join(['%s=%s' % (key, getattr(self, key))
                                      for key in self._repr_keys]))
    def delete(self):
        """Remove the item from the infoblox server.

        :rtype: bool
        :raises: AssertionError
        :raises: ValueError
        :raises: infoblox.exceptions.ProtocolError

        if not self._ref:
            raise ValueError('Object has no reference id for deletion')
        if 'save' not in self._supports:
            raise AssertionError('Can not save this object type')
        response = self._session.delete(self._path)
        if response.status_code == 200:
            self._ref = None
            return True
            error = response.json()
            raise exceptions.ProtocolError(error['text'])
        except ValueError:
            raise exceptions.ProtocolError(response.content)

    def fetch(self):
        """Attempt to fetch the object from the Infoblox device. If successful
        the object will be updated and the method will return True.

        :rtype: bool
        :raises: infoblox.exceptions.ProtocolError

        LOGGER.debug('Fetching %s, %s', self._path, self._search_values)
        response = self._session.get(self._path, self._search_values,
                                     {'_return_fields': self._return_fields})
        if response.status_code == 200:
            values = response.json()
            return bool(values)
        elif response.status_code >= 400:
                error = response.json()
                raise exceptions.ProtocolError(error['text'])
            except ValueError:
                raise exceptions.ProtocolError(response.content)
        return False

    def reference_id(self):
        """Return a read-only handle for the reference_id of this object.

        return str(self._ref)

    def save(self):
        """Update the infoblox with new values for the specified object, or add
        the values if it's a new object all together.

        :raises: AssertionError
        :raises: infoblox.exceptions.ProtocolError

        if 'save' not in self._supports:
            raise AssertionError('Can not save this object type')

        values = {}
        for key in [key for key in self.keys() if key not in self._save_ignore]:
            if not getattr(self, key) and getattr(self, key) != False:

            if isinstance(getattr(self, key, None), list):
                value = list()
                for item in getattr(self, key):
                    if isinstance(item, dict):
                    elif hasattr(item, '_save_as'):
                    elif hasattr(item, '_ref') and getattr(item, '_ref'):
                        value.append(getattr(item, '_ref'))
                        LOGGER.warning('Cant assign %r', item)
                values[key] = value
            elif getattr(self, key, None):
                values[key] = getattr(self, key)
        if not self._ref:
            response =, values)
            values['_ref'] = self._ref
            response = self._session.put(self._path, values)
        LOGGER.debug('Response: %r, %r', response.status_code, response.content)
        if 200 <= response.status_code <= 201:
            return True
                error = response.json()
                raise exceptions.ProtocolError(error['text'])
            except ValueError:
                raise exceptions.ProtocolError(response.content)

    def _assign(self, values):
        """Assign the values passed as either a dict or list to the object if
        the key for each value matches an available attribute on the object.

        :param dict values: The values to assign

        LOGGER.debug('Assigning values: %r', values)
        if not values:
        keys = self.keys()
        if not self._ref:
        if isinstance(values, dict):
            for key in keys:
                if values.get(key):
                    if isinstance(values.get(key), list):
                        items = list()
                        for item in values[key]:
                            if isinstance(item, dict):
                                if '_ref' in item:
                                    obj_class = get_class(item['_ref'])
                                    if obj_class:
                        setattr(self, key, items)
                        setattr(self, key, values[key])
        elif isinstance(values, list):
            LOGGER.critical('Unhandled return type: %r', values)

    def _build_search_values(self, kwargs):
        """Build the search criteria dictionary. It will first try and build
        the values from already set attributes on the object, falling back
        to the passed in kwargs.

        :param dict kwargs: Values to build the dict from
        :rtype: dict

        criteria = {}
        for key in self._search_by:
            if getattr(self, key, None):
                criteria[key] = getattr(self, key)
            elif key in kwargs and kwargs.get(key):
                criteria[key] = kwargs.get(key)
        return criteria

    def _path(self):
        return self._ref if self._ref else self._wapi_type

    def _return_fields(self):
        return ','.join([key for key in self.keys()
                         if key not in self._return_ignore])

[docs]class Host(Record): """Implements the host record type. Example:: session = infoblox.Session(infoblox_host, infoblox_user, infoblox_password) host = infoblox.Host(session, name='') """ aliases = [] comment = None configure_for_dns = True disable = False dns_aliases = [] dns_name = None extattrs = None ipv4addrs = [] ipv6addrs = [] name = None rrset_order = 'cyclic' ttl = None use_ttl = False zone = None _repr_keys = ['name', 'ipv4addrs', 'ipv6addrs'] _save_ignore = ['dns_name', 'host', 'zone'] _search_by = ['name', 'ipv4addr', 'ipv6addr', 'mac'] _supports = ['delete', 'save'] _wapi_type = 'record:host' def __init__(self, session, reference_id=None, name=None, **kwargs): """Create a new instance of a Host object. If a reference_id or valid search criteria are passed in, the object will attempt to load the values for the host from the Infoblox device. When creating a new host or adding an ip address, use the Host.add_ipv4_address and Host.add_ipv6_address methods:: host.add_ipv4addr('') Valid search criteria: name, ipv4addr, ipv6addr, mac :param infobox.Session session: The established session object :param str reference_id: The Infoblox reference id for the host :param str host: The host's FQDN :param dict kwargs: Optional keyword arguments """ = name super(Host, self).__init__(session, reference_id, **kwargs)
[docs] def add_ipv4addr(self, ipv4addr): """Add an IPv4 address to the host. :param str ipv4addr: The IP address to add. :raises: ValueError """ for addr in self.ipv4addrs: if ((isinstance(addr, dict) and addr['ipv4addr'] == ipv4addr) or (isinstance(addr, HostIPv4) and addr.ipv4addr == ipv4addr)): raise ValueError('Already exists') self.ipv4addrs.append({'ipv4addr': ipv4addr})
[docs] def remove_ipv4addr(self, ipv4addr): """Remove an IPv4 address from the host. :param str ipv4addr: The IP address to remove """ for addr in self.ipv4addrs: if ((isinstance(addr, dict) and addr['ipv4addr'] == ipv4addr) or (isinstance(addr, HostIPv4) and addr.ipv4addr == ipv4addr)): self.ipv4addrs.remove(addr) break
[docs] def add_ipv6addr(self, ipv6addr): """Add an IPv6 address to the host. :param str ipv6addr: The IP address to add. :raises: ValueError """ for addr in self.ipv6addrs: if ((isinstance(addr, dict) and addr['ipv6addr'] == ipv6addr) or (isinstance(addr, HostIPv4) and addr.ipv6addr == ipv6addr)): raise ValueError('Already exists') self.ipv6addrs.append({'ipv6addr': ipv6addr})
[docs] def remove_ipv6addr(self, ipv6addr): """Remove an IPv6 address from the host. :param str ipv6addr: The IP address to remove """ for addr in self.ipv6addrs: if ((isinstance(addr, dict) and addr['ipv6addr'] == ipv6addr) or (isinstance(addr, HostIPv4) and addr.ipv6addr == ipv6addr)): self.ipv6addrs.remove(addr) break
[docs]class HostIPv4(Record): """Implements the host_ipv4addr record type. """ bootfile = None bootserver = None configure_for_dhcp = None deny_bootp = None discovered_data = None enable_pxe_lease_time = None host = None ignore_client_requested_options = None ipv4addr = None last_queried = None mac = None match_client = None network = None nextserver = None options = None pxe_lease_time = None use_bootfile = None use_bootserver = None use_deny_bootp = None use_for_ea_inheritance = None use_ignore_client_requested_options = None use_nextserver = None use_options = None use_pxe_lease_time = None _repr_keys = ['ipv4addr'] _search_by = ['ipv4addr'] _wapi_type = 'record:host_ipv4addr' def __init__(self, session, reference_id=None, ipv4addr=None, **kwargs): """Create a new instance of a HostIPv4 object. If a reference_id or valid search criteria are passed in, the object will attempt to load the values for the host_ipv4addr from the Infoblox device. Valid search criteria: ipv4addr :param infobox.Session session: The established session object :param str reference_id: The Infoblox reference id for the host :param str ipv4addr: The ipv4 address :param dict kwargs: Optional keyword arguments """ self.ipv4addr = str(ipv4addr) super(HostIPv4, self).__init__(session, reference_id, **kwargs) def _save_as(self): return {'ipv4addr': self.ipv4addr}
[docs]class HostIPv6(Record): """Implements the host_ipv6addr record type. """ address_type = None configure_for_dhcp = True discovered_data = None domain_name = None domain_name_servers = [] duid = None host = None ipv6addr = None ipv6bits = None ipv6prefix_bits = None match_client = None options = None preferred_lifetime = 27000 use_domain_name = False use_domain_name_servers = False use_for_ea_inheritance = False use_options = False use_valid_lifetime = False valid_lifetime = 43200 _repr_keys = ['ipv6addr', 'ipv6bits', 'ipv6prefix_bits'] _save_ignore = ['host'] _search_by = ['ipv6addr'] _wapi_type = 'record:host_ipv6addr' def __init__(self, session, reference_id=None, ipv6addr=None, ipv6bits=None, ipv6prefix_bits=None, **kwargs): """Create a new instance of a HostIPv6 object. If a reference_id or valid search criteria are passed in, the object will attempt to load the values for the host_ipv6addr from the Infoblox device. Valid search criteria: ipv6addr :param infobox.Session session: The established session object :param str reference_id: The Infoblox reference id for the host :param str ipv6addr: The ipv6 address :param str ipv6bits: The ipv6 address bit count :param str ipv6prefix_bits: The ipv6 address prefix bit count :param dict kwargs: Optional keyword arguments """ self.ipv6addr = str(ipv6addr) self.ipv6bits = str(ipv6bits) self.ipv6prefix_bits = str(ipv6prefix_bits) super(HostIPv6, self).__init__(session, reference_id, **kwargs) def _save_as(self): return {'ipv6addr': self.ipv6addr, 'ipv6bits': self.ipv6bits, 'ipv6prefix_bits': self.ipv6prefix_bits}
class IPv4Address(Record): """Implements the ipv4address record type. """ dhcp_client_identifier = None extattrs = None fingerprint = None ip_address = None is_conflict = None lease_state = None mac_address = None names = None network = None network_view = None objects = None status = None types = None usage = None username = None _repr_keys = ['ip_address'] _search_by = ['ip_address'] _supports = ['fetch', 'put'] _wapi_type = 'record:host_ipv4addr' def __init__(self, session, reference_id=None, ipv4addr=None, **kwargs): """Create a new instance of a HostIPv4 object. If a reference_id or valid search criteria are passed in, the object will attempt to load the values for the host_ipv4addr from the Infoblox device. Valid search criteria: ipv4addr :param infobox.Session session: The established session object :param str reference_id: The Infoblox reference id for the host :param str ipv4addr: The ipv4 address :param dict kwargs: Optional keyword arguments """ self.ipv4addr = str(ipv4addr) super(IPv4Address, self).__init__(session, reference_id, **kwargs) def get_class(reference): class_name = reference.split('/')[0].split(':')[1] LOGGER.debug('Class: %s', class_name) return CLASS_MAP.get(class_name) CLASS_MAP = {'host': Host, 'host_ipv4addr': HostIPv4, 'host_ipv6addr': HostIPv6, 'ipv4address': IPv4Address}