# -*- coding: utf-8 -*-
"""Classes in this module define the wire format of the data sent from the REST API server.
All the classes in this module have a **json** method and a **from_json**
static method. The **json** method converts the class instance to a python
dictionary that can be encoded into a json string. The **from_json** method
decodes a json string into a class instance.
"""
from .common import _JsonEncodable, _JsonDecodable, String
[docs]class Bits(String):
"""String representation of the access mode's bits.
Args:
name (str): The string representation of the access mode.
Examples:
The unix mode bits *wrx* can be represented as :data:`BITS_ALL`.
Existing instances are:
* :data:`BITS_NONE`
* :data:`BITS_EXECUTE`
* :data:`BITS_WRITE`
* :data:`BITS_WRITE_EXECUTE`
* :data:`BITS_READ`
* :data:`BITS_READ_EXECUTE`
* :data:`BITS_READ_WRITE`
* :data:`BITS_ALL`
"""
#: No access.
BITS_NONE = Bits('NONE')
#: Execute access.
BITS_EXECUTE = Bits('EXECUTE')
#: Write access.
BITS_WRITE = Bits('WRITE')
#: Write and execute access.
BITS_WRITE_EXECUTE = Bits('WRITE_EXECUTE')
#: Read access.
BITS_READ = Bits('READ')
#: Read and execute access.
BITS_READ_EXECUTE = Bits('READ_EXECUTE')
#: Read and write access.
BITS_READ_WRITE = Bits('READ_WRITE')
#: Read, write, and execute access
BITS_ALL = Bits('ALL')
[docs]class BlockInfo(_JsonEncodable, _JsonDecodable):
"""A block's information.
Args:
block_id (int): Block ID.
length (int): Block size in bytes.
locations (list of :obj:`alluxio.wire.BlockLocation`): List of file
block locations.
"""
def __init__(self, block_id=0, length=0, locations=[]):
self.block_id = block_id
self.length = length
self.locations = locations
def json(self):
return {
'blockId': self.block_id,
'length': self.length,
'locations': [location.json() for location in self.locations],
}
@classmethod
def from_json(cls, obj):
block_id = obj['blockId']
length = obj['length']
locations = obj['locations']
locations = [BlockLocation.from_json(
location) for location in locations]
return cls(block_id, length, locations)
[docs]class WorkerNetAddress(_JsonEncodable, _JsonDecodable):
"""Worker network address.
Args:
host (str): Worker's hostname.
rpc_port (int): Port of the worker's RPC server.
data_port (int): Port of the worker's data server.
web_port (int): Port of the worker's web server.
"""
def __init__(self, host='', rpc_port=0, data_port=0, web_port=0):
self.host = host
self.rpc_port = rpc_port
self.data_port = data_port
self.web_port = web_port
def json(self):
return {
'host': self.host,
'rpcPort': self.rpc_port,
'dataPort': self.data_port,
'webPort': self.web_port,
}
@classmethod
def from_json(cls, obj):
addr = cls()
addr.host = obj['host']
addr.rpc_port = obj['rpcPort']
addr.data_port = obj['dataPort']
addr.web_port = obj['webPort']
return addr
[docs]class BlockLocation(_JsonEncodable, _JsonDecodable):
"""A block's location.
Args:
worker_id (int): ID of the worker that contains the block.
worker_address (:obj:`alluxio.wire.WorkerNetAddress`): Address of the
worker that contains the block.
tier_alias (str): Alias of the Alluxio storage tier that contains the
block, for example, MEM, SSD, or HDD.
"""
def __init__(self, worker_id=0, worker_address=WorkerNetAddress(), tier_alias=''):
self.worker_id = worker_id
self.worker_address = worker_address
self.tier_alias = tier_alias
def json(self):
return {
'workerId': self.worker_id,
'workerAddress': self.worker_address.json(),
'tierAlias': self.tier_alias,
}
@classmethod
def from_json(cls, obj):
worker_id = obj['workerId']
worker_address = WorkerNetAddress.from_json(obj['workerAddress'])
tier_alias = obj['tierAlias']
return cls(worker_id, worker_address, tier_alias)
[docs]class FileBlockInfo(_JsonEncodable, _JsonDecodable):
"""A file block's information.
Args:
block_info (:obj:`alluxio.wire.BlockInfo`): The block's information.
offset (int): The block's offset in the file.
ufs_locations (list of str): The under storage locations that contain this block.
"""
def __init__(self, block_info=BlockInfo(), offset=0, ufs_locations=[]):
self.block_info = block_info
self.offset = offset
self.ufs_locations = ufs_locations
def json(self):
return {
'blockInfo': self.block_info.json(),
'offset': self.offset,
'ufsLocations': self.ufs_locations,
}
@classmethod
def from_json(cls, obj):
block_info = BlockInfo.from_json(obj['blockInfo'])
offset = obj['offset']
ufs_locations = obj['ufsLocations']
return cls(block_info, offset, ufs_locations)
[docs]class FileInfo(_JsonEncodable, _JsonDecodable):
"""A file or directory's information.
Two :obj:`FileInfo` are comparable based on the attribute **name**. So a
list of :obj:`FileInfo` can be sorted by python's built-in **sort** function.
Args:
block_ids (list of int): List of block IDs.
block_size_bytes (int): Block size in bytes.
cacheable (bool): Whether the file can be cached in Alluxio.
completed (bool): Whether the file has been marked as completed.
creation_time_ms (int): The epoch time the file was created.
last_modification_time_ms (int): The epoch time the file was last modified.
file_block_infos (list of :obj:`alluxio.wire.FileBlockInfo`): List of file block information.
file_id (int): File ID.
folder (bool): Whether this is a directory.
owner (str): Owner of this file or directory.
group (str): Group of this file or directory.
in_memory_percentage (int): Percentage of the in memory data.
length (int): File size in bytes.
name (str): File name.
path (str): Absolute file path.
ufs_path (str): Under storage path of this file.
pinned (bool): Whether the file is pinned.
persisted (bool): Whether the file is persisted.
persistence_state (:obj:`alluxio.wire.PersistenceState`): Persistence state.
mode (int): Access mode of the file or directory.
mount_point (bool): Whether this is a mount point.
ttl (int): The TTL (time to live) value. It identifies duration
(in milliseconds) the created file should be kept around before it
is automatically deleted. -1 means no TTL value is set.
ttl_action (:obj:`alluxio.wire.TTLAction`): The file action to take when
its TTL expires.
"""
def __init__(self,
block_ids=[],
block_size_bytes=0,
cacheable=False,
completed=False,
creation_time_ms=0,
last_modification_time_ms=0,
file_block_infos=[],
file_id=0,
folder=False,
owner='',
group='',
in_memory_percentage=0,
length=0,
name='',
path='',
ufs_path='',
pinned=False,
persisted=False,
persistence_state='',
mode=0,
mount_point=False,
ttl=0,
ttl_action='',
):
self.block_ids = block_ids
self.block_size_bytes = block_size_bytes
self.cacheable = cacheable
self.completed = completed
self.creation_time_ms = creation_time_ms
self.last_modification_time_ms = last_modification_time_ms
self.file_block_infos = file_block_infos
self.file_id = file_id
self.folder = folder
self.owner = owner
self.group = group
self.in_memory_percentage = in_memory_percentage
self.length = length
self.name = name
self.path = path
self.ufs_path = ufs_path
self.pinned = pinned
self.persisted = persisted
self.persistence_state = persistence_state
self.mode = mode
self.mount_point = mount_point
self.ttl = ttl
self.ttl_action = ttl_action
def __lt__(self, other):
return self.name < other.name
def __eq__(self, other):
return self.name == other.name
def __hash__(self):
return hash(self.name)
def json(self):
return {
'blockIds': self.block_ids,
'blockSizeBytes': self.block_size_bytes,
'cacheable': self.cacheable,
'completed': self.completed,
'creationTimeMs': self.creation_time_ms,
'lastModificationTimeMs': self.last_modification_time_ms,
'fileBlockInfos': [info.json() for info in self.file_block_infos],
'fileId': self.file_id,
'folder': self.folder,
'owner': self.owner,
'group': self.group,
'inMemoryPercentage': self.in_memory_percentage,
'length': self.length,
'name': self.name,
'path': self.path,
'ufsPath': self.ufs_path,
'pinned': self.pinned,
'persisted': self.persisted,
'persistenceState': self.persistence_state.json(),
'mode': self.mode,
'mountPoint': self.mount_point,
'ttl': self.ttl,
'ttlAction': self.ttl_action.json(),
}
@classmethod
def from_json(cls, obj):
info = cls()
info.block_ids = obj['blockIds']
info.block_size_bytes = obj['blockSizeBytes']
info.cacheable = obj['cacheable']
info.completed = obj['completed']
info.creation_time_ms = obj['creationTimeMs']
info.last_modification_time_ms = obj['lastModificationTimeMs']
info.file_block_infos = [FileBlockInfo.from_json(
block) for block in obj['fileBlockInfos']]
info.file_id = obj['fileId']
info.folder = obj['folder']
info.owner = obj['owner']
info.group = obj['group']
info.in_memory_percentage = obj['inMemoryPercentage']
info.length = obj['length']
info.name = obj['name']
info.path = obj['path']
info.ufs_path = obj['ufsPath']
info.pinned = obj['pinned']
info.persisted = obj['persisted']
info.persistence_state = PersistenceState.from_json(obj['persistenceState'])
info.mode = obj['mode']
info.mount_point = obj['mountPoint']
info.ttl = obj['ttl']
info.ttl_action = TTLAction.from_json(obj['ttlAction'])
return info
#: Never load metadata.
LOAD_METADATA_TYPE_NEVER = LoadMetadataType('Never')
#: Load metadata only at the first time of listing status on a directory.
LOAD_METADATA_TYPE_ONCE = LoadMetadataType('Once')
#: Always load metadata when listing status on a directory.
LOAD_METADATA_TYPE_ALWAYS = LoadMetadataType('Always')
[docs]class Mode(_JsonEncodable, _JsonDecodable):
"""A file's access mode.
Args:
owner_bits (:obj:`alluxio.wire.Bits`): Access mode of the file's owner.
group_bits (:obj:`alluxio.wire.Bits`): Access mode of the users in the file's group.
other_bits (:obj:`alluxio.wire.Bits`): Access mode of others who are neither the owner nor in the group.
"""
def __init__(self, owner_bits=Bits(), group_bits=Bits(), other_bits=Bits()):
# owner_bits represents the owner access mode
self.owner_bits = owner_bits
# group_bits represents the group access mode
self.group_bits = group_bits
# other_bits represents the other access mode
self.other_bits = other_bits
def json(self):
return {
'ownerBits': self.owner_bits.json(),
'groupBits': self.group_bits.json(),
'otherBits': self.other_bits.json(),
}
@classmethod
def from_json(cls, obj):
owner = Bits.from_json(obj['ownerBits'])
group = Bits.from_json(obj['groupBits'])
other = Bits.from_json(obj['otherBits'])
return cls(owner_bits=owner, group_bits=group, other_bits=other)
[docs]class ReadType(String):
"""Convenience modes for commonly used read types.
This can be one of the following, see their documentation for details:
* :data:`READ_TYPE_NO_CACHE`
* :data:`READ_TYPE_CACHE`
* :data:`READ_TYPE_CACHE_PROMOTE`
Args:
name (str): The string representation of the read type.
"""
#: Read the file and skip Alluxio storage. This read type will not cause any
#: data migration or eviction in Alluxio storage.
READ_TYPE_NO_CACHE = ReadType('NO_CACHE')
#: Read the file and cache it in the highest tier of a local worker.
#: This read type will not move data between tiers of Alluxio Storage.
#: Users should use :data:`READ_TYPE_CACHE_PROMOTE` for more optimized
#: performance with tiered storage.
READ_TYPE_CACHE = ReadType('CACHE')
#: Read the file and cache it in a local worker. Additionally, if the file was
#: in Alluxio storage, it will be promoted to the top storage layer.
READ_TYPE_CACHE_PROMOTE = ReadType('CACHE_PROMOTE')
[docs]class TTLAction(String):
"""Represent the file action to take when its TTL expires.
This can be one of the following, see their documentation for details:
* :data:`TTL_ACTION_DELETE`
* :data:`TTL_ACTION_FREE`
Args:
name (str): The string representation of the read type.
"""
#: Represents the action of deleting a path.
TTL_ACTION_DELETE = TTLAction("DELETE")
#: Represents the action of freeing a path.
TTL_ACTION_FREE = TTLAction("FREE")
[docs]class WriteType(String):
"""Write types for creating a file.
This can be one of the following, see their documentation for details:
* :data:`WRITE_TYPE_MUST_CACHE`
* :data:`WRITE_TYPE_CACHE_THROUGH`
* :data:`WRITE_TYPE_THROUGH`
* :data:`WRITE_TYPE_ASYNC_THROUGH`
Args:
name (str): The string representation of the write type.
"""
#: Write the file, guaranteeing the data is written to Alluxio storage or
#: failing the operation. The data will be written to the highest tier in a
#: worker's storage. Data will not be persisted to the under storage.
WRITE_TYPE_MUST_CACHE = WriteType('MUST_CACHE')
#: Write the file synchronously to the under storage, and also try to write to
#: the highest tier in a worker's Alluxio storage.
WRITE_TYPE_CACHE_THROUGH = WriteType('CACHE_THROUGH')
#: Write the file synchronously to the under storage, skipping Alluxio storage.
WRITE_TYPE_THROUGH = WriteType('THROUGH')
#: Write the file asynchronously to the under storage.
WRITE_TYPE_ASYNC_THROUGH = WriteType('ASYNC_THROUGH')
class PersistenceState(String):
"""Persistence state of a file.
This can be one of the following, see their documentation for details:
* :data:`PERSISTENCE_STATE_NOT_PERSISTED`
* :data:`PERSISTENCE_STATE_TO_BE_PERSISTED`
* :data:`PERSISTENCE_STATE_PERSISTED`
* :data:`PERSISTENCE_STATE_LOST`
Args:
name (str): The string representation of the persistence state.
"""
#: File not persisted in the under FS.
PERSISTENCE_STATE_NOT_PERSISTED = PersistenceState('NOT_PERSISTED')
#: File is to be persisted in the under FS.
PERSISTENCE_STATE_TO_BE_PERSISTED = PersistenceState('TO_BE_PERSISTED')
#: File is persisted in the under FS.
PERSISTENCE_STATE_PERSISTED = PersistenceState('PERSISTED')
#: File is lost but not persisted in the under FS.
PERSISTENCE_STATE_LOST = PersistenceState('LOST')