IT技术互动交流平台

OpenstackNova(八) Instance建(流水线)

来源:IT165收集  发布日期:2015-04-28 22:52:00

在前面的章节中, 学习了Nova的WSGI相关的服务器创建及路由的基本原理。现在看看Deploy中的流水线操作。

在api-paste.ini中, 可以看出Nova API有以下的流水线。

[composite:openstack_compute_api_v2]
use = call:nova.api.auth:pipeline_factory
noauth = faultwrap sizelimit noauth ratelimit osapi_compute_app_v2
keystone = faultwrap sizelimit authtoken keystonecontext ratelimit osapi_compute_app_v2
keystone_nolimit = faultwrap sizelimit authtoken keystonecontext osapi_compute_app_v2

看过之前章节的就明白, 这里使用的是keystone这条流水线。下面一个一个来分析下。

faultwrap

[filter:faultwrap]
paste.filter_factory = nova.api.openstack:FaultWrapper.factory
class FaultWrapper(wsgi.Middleware):
    '''Calls the middleware stack, captures any exceptions into faults.'''

    @webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):
        try:
            return req.get_response(self.application)
        except Exception as ex:
            LOG.exception(_('FaultWrapper: %s'), unicode(ex))
            return faults.Fault(webob.exc.HTTPInternalServerError())

可以看出, 这个非常简单, 只是获得一个response对象。

sizelimit

[filter:sizelimit]
paste.filter_factory = nova.api.sizelimit:RequestBodySizeLimiter.factory
class RequestBodySizeLimiter(wsgi.Middleware):
    '''Limit the size of incoming requests.'''

    def __init__(self, *args, **kwargs):
        super(RequestBodySizeLimiter, self).__init__(*args, **kwargs)

    @webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):
        if req.content_length > CONF.osapi_max_request_body_size:
            msg = _('Request is too large.')
            raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg)
        if req.content_length is None and req.is_body_readable:
            limiter = LimitingReader(req.body_file,
                                     CONF.osapi_max_request_body_size)
            req.body_file = limiter
        return self.application

这个就像它的名字一样, 只是检查了REST请求中的内容大小。

authtoken

[filter:authtoken]
paste.filter_factory = keystoneclient.middleware.auth_token:filter_factory
auth_host = 127.0.0.1
auth_port = 35357
auth_protocol = http
admin_tenant_name = %SERVICE_TENANT_NAME%
admin_user = %SERVICE_USER%
admin_password = %SERVICE_PASSWORD%
# signing_dir is configurable, but the default behavior of the authtoken
# middleware should be sufficient.  It will create a temporary directory
# in the home directory for the user the nova process is running as.
#signing_dir = /var/lib/nova/keystone-signing
# Workaround for https://bugs.launchpad.net/nova/+bug/1154809
auth_version = v2.0

在这里, 我没有去下载keystoneclient的代码。所以就没法去从代码的角度分析实际的操作。但是如果之前有看过keystone的文章。这里其实很清楚。就是从当前的REST请求中, 取出token, 然后发给keystone服务,再返回验证结果。

keystonecontext

[filter:keystonecontext]
paste.filter_factory = nova.api.auth:NovaKeystoneContext.factory
class NovaKeystoneContext(wsgi.Middleware):
    '''Make a request context from keystone headers.'''

    @webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):
        user_id = req.headers.get('X_USER')
        user_id = req.headers.get('X_USER_ID', user_id)
        if user_id is None:
            LOG.debug('Neither X_USER_ID nor X_USER found in request')
            return webob.exc.HTTPUnauthorized()

        roles = self._get_roles(req)

        if 'X_TENANT_ID' in req.headers:
            # This is the new header since Keystone went to ID/Name
            project_id = req.headers['X_TENANT_ID']
        else:
            # This is for legacy compatibility
            project_id = req.headers['X_TENANT']
        project_name = req.headers.get('X_TENANT_NAME')
        user_name = req.headers.get('X_USER_NAME')

        # Get the auth token
        auth_token = req.headers.get('X_AUTH_TOKEN',
                                     req.headers.get('X_STORAGE_TOKEN'))

        # Build a context, including the auth_token...
        remote_address = req.remote_addr
        if CONF.use_forwarded_for:
            remote_address = req.headers.get('X-Forwarded-For', remote_address)

        service_catalog = None
        if req.headers.get('X_SERVICE_CATALOG') is not None:
            try:
                catalog_header = req.headers.get('X_SERVICE_CATALOG')
                service_catalog = jsonutils.loads(catalog_header)
            except ValueError:
                raise webob.exc.HTTPInternalServerError(
                          _('Invalid service catalog json.'))

        ctx = context.RequestContext(user_id,
                                     project_id,
                                     user_name=user_name,
                                     project_name=project_name,
                                     roles=roles,
                                     auth_token=auth_token,
                                     remote_address=remote_address,
                                     service_catalog=service_catalog)

        req.environ['nova.context'] = ctx
        return self.application

    def _get_roles(self, req):
        '''Get the list of roles.'''

        if 'X_ROLES' in req.headers:
            roles = req.headers.get('X_ROLES', '')
        else:
            # Fallback to deprecated role header:
            roles = req.headers.get('X_ROLE', '')
            if roles:
                LOG.warn(_('Sourcing roles from deprecated X-Role HTTP '
                           'header'))
        return [r.strip() for r in roles.split(',')]

从代码可以看出, 这段代码的目的就是从当前HTTP 头中取出相对于的上下文环境, 以方便后面的环节使用。

ratelimit

[filter:ratelimit]
paste.filter_factory = nova.api.openstack.compute.limits:RateLimitingMiddleware.factory

这是一个基本漏桶的限速模型。

class RateLimitingMiddleware(base_wsgi.Middleware):
    '''
    Rate-limits requests passing through this middleware. All limit information
    is stored in memory for this implementation.
    '''

    def __init__(self, application, limits=None, limiter=None, **kwargs):
        '''
        Initialize new `RateLimitingMiddleware`, which wraps the given WSGI
        application and sets up the given limits.

        @param application: WSGI application to wrap
        @param limits: String describing limits
        @param limiter: String identifying class for representing limits

        Other parameters are passed to the constructor for the limiter.
        '''
        base_wsgi.Middleware.__init__(self, application)

        #因为使用factory生成,所以参数都会不存在。也就是说limiter和limits都是None.
        # Select the limiter class
        if limiter is None:
            limiter = Limiter
        else:
            limiter = importutils.import_class(limiter)

        # Parse the limits, if any are provided
        if limits is not None:
            limits = limiter.parse_limits(limits)
        #这里会取DEFAULT_LIMITS
        self._limiter = limiter(limits or DEFAULT_LIMITS, **kwargs)

    @webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):
        '''
        Represents a single call through this middleware. We should record the
        request if we have a limit relevant to it. If no limit is relevant to
        the request, ignore it.

        If the request should be rate limited, return a fault telling the user
        they are over the limit and need to retry later.
        '''
        verb = req.method
        url = req.url
        context = req.environ.get('nova.context')

        if context:
            username = context.user_id
        else:
            username = None
        #根据当前的用户去检查速率
        delay, error = self._limiter.check_for_delay(verb, url, username)
        #如果delay存在,就超出了当前的速率。这里的delay可以理解为需要多久后能够发这种类型的请求
        if delay:
            msg = _('This request was rate-limited.')
            retry = time.time() + delay
            return wsgi.RateLimitFault(msg, error, retry)

        req.environ['nova.limits'] = self._limiter.get_limits(username)

        return self.application

再看看DEFAULT_LIMITS及limit的实现

DEFAULT_LIMITS = [
    Limit('POST', '*', '.*', 120, utils.TIME_UNITS['MINUTE']),
    Limit('POST', '*/servers', '^/servers', 120, utils.TIME_UNITS['MINUTE']),
    Limit('PUT', '*', '.*', 120, utils.TIME_UNITS['MINUTE']),
    Limit('GET', '*changes-since*', '.*changes-since.*', 120,
          utils.TIME_UNITS['MINUTE']),
    Limit('DELETE', '*', '.*', 120, utils.TIME_UNITS['MINUTE']),
    Limit('GET', '*/os-fping', '^/os-fping', 12, utils.TIME_UNITS['MINUTE']),
]

class Limit(object):
    '''
    Stores information about a limit for HTTP requests.
    '''

    UNITS = dict([(v, k) for k, v in utils.TIME_UNITS.items()])

    def __init__(self, verb, uri, regex, value, unit):
        '''
        Initialize a new `Limit`.

        @param verb: HTTP verb (POST, PUT, etc.)
        @param uri: Human-readable URI
        @param regex: Regular expression format for this limit
        @param value: Integer number of requests which can be made
        @param unit: Unit of measure for the value parameter
        '''

        #这里的参数都比较显示, 其中value表示单位时间内可以通过的请求数
        #unit表示单位, 最终都会转化为秒, 比如说120每分钟, 实际会变成120每60秒
        self.verb = verb
        self.uri = uri
        self.regex = regex
        self.value = int(value)
        self.unit = unit
        self.unit_string = self.display_unit().lower()
        self.remaining = int(value)

        if value <= 0:
            raise ValueError('Limit value must be > 0')

        self.last_request = None
        self.next_request = None
        #水位值(water_level), 表示当前已使用了多少 
        self.water_level = 0
        #容量(capacity)是最大容量, 这里简单的和时间单位相等
        self.capacity = self.unit
        #request_value相当于一次请求占总容量的多少。
        #比如上面的120次每60秒,因为容量是60
        self.request_value = float(self.capacity) / float(self.value)
        msg = _('Only %(value)s %(verb)s request(s) can be '
                'made to %(uri)s every %(unit_string)s.')
        self.error_message = msg % self.__dict__

    def __call__(self, verb, url):
        '''
        Represents a call to this limit from a relevant request.

        @param verb: string http verb (POST, GET, etc.)
        @param url: string URL
        '''
        #基于HTTP的请求类型及URL的正则式进行匹配
        if self.verb != verb or not re.match(self.regex, url):
            return

        now = self._get_time()

        if self.last_request is None:
            self.last_request = now

        leak_value = now - self.last_request
        #从上次调用到这次间隔的时间,这样可以确定当前的水位是多少
        self.water_level -= leak_value
        self.water_level = max(self.water_level, 0)
        #每次调用, 对应的水位需要上升
        self.water_level += self.request_value
        #水位和容量的差值
        difference = self.water_level - self.capacity

        self.last_request = now
        #如果差值大于0,也就是说要再经过差值这么多时间, 然后才有可能有容量, 这也是上面delay变量定义的由来
        if difference > 0:
            self.water_level -= self.request_value
            self.next_request = now + difference
            return difference

        cap = self.capacity
        water = self.water_level
        val = self.value

        self.remaining = math.floor(((cap - water) / cap) * val)
        self.next_request = now

    def _get_time(self):
        '''Retrieve the current time. Broken out for testability.'''
        return time.time()

    def display_unit(self):
        '''Display the string name of the unit.'''
        return self.UNITS.get(self.unit, 'UNKNOWN')

    def display(self):
        '''Return a useful representation of this class.'''
        return {
            'verb': self.verb,
            'URI': self.uri,
            'regex': self.regex,
            'value': self.value,
            'remaining': int(self.remaining),
            'unit': self.display_unit(),
            'resetTime': int(self.next_request or self._get_time()),
        }

这里还有一个相当于limit的管理类

class Limiter(object):
    '''
    Rate-limit checking class which handles limits in memory.
    '''

    def __init__(self, limits, **kwargs):
        '''
        Initialize the new `Limiter`.

        @param limits: List of `Limit` objects
        '''
        self.limits = copy.deepcopy(limits)
        #这里很关键,可以看出在check_for_delay中, 参数中带了用户名,但是在所有的配置中,是没有用户名的,就是通过defaultdict来取值的。
        self.levels = collections.defaultdict(lambda: copy.deepcopy(limits))

        # Pick up any per-user limit information
        for key, value in kwargs.items():
            if key.startswith(LIMITS_PREFIX):
                username = key[len(LIMITS_PREFIX):]
                self.levels[username] = self.parse_limits(value)

    def get_limits(self, username=None):
        '''
        Return the limits for a given user.
        '''
        return [limit.display() for limit in self.levels[username]]

    def check_for_delay(self, verb, url, username=None):
        '''
        Check the given verb/user/user triplet for limit.

        @return: Tuple of delay (in seconds) and error message (or None, None)
        '''
        delays = []
        #取出当前的用户所对应的limit, 因为这里没有基于用户名的配置,所以取的是默认参数。也就是DEFAULT_LIMITS 
        for limit in self.levels[username]:
            delay = limit(verb, url)
            if delay:
                delays.append((delay, limit.error_message))

        if delays:
            delays.sort()
            return delays[0]

        return None, None

    # Note: This method gets called before the class is instantiated,
    # so this must be either a static method or a class method.  It is
    # used to develop a list of limits to feed to the constructor.  We
    # put this in the class so that subclasses can override the
    # default limit parsing.
    @staticmethod
    def parse_limits(limits):
        '''
        Convert a string into a list of Limit instances.  This
        implementation expects a semicolon-separated sequence of
        parenthesized groups, where each group contains a
        comma-separated sequence consisting of HTTP method,
        user-readable URI, a URI reg-exp, an integer number of
        requests which can be made, and a unit of measure.  Valid
        values for the latter are 'SECOND', 'MINUTE', 'HOUR', and
        'DAY'.

        @return: List of Limit instances.
        '''

        # Handle empty limit strings
        limits = limits.strip()
        if not limits:
            return []

        # Split up the limits by semicolon
        result = []
        for group in limits.split(';'):
            group = group.strip()
            if group[:1] != '(' or group[-1:] != ')':
                raise ValueError('Limit rules must be surrounded by '
                                 'parentheses')
            group = group[1:-1]

            # Extract the Limit arguments
            args = [a.strip() for a in group.split(',')]
            if len(args) != 5:
                raise ValueError('Limit rules must contain the following '
                                 'arguments: verb, uri, regex, value, unit')

            # Pull out the arguments
            verb, uri, regex, value, unit = args

            # Upper-case the verb
            verb = verb.upper()

            # Convert value--raises ValueError if it's not integer
            value = int(value)

            # Convert unit
            unit = unit.upper()
            if unit not in utils.TIME_UNITS:
                raise ValueError('Invalid units specified')
            unit = utils.TIME_UNITS[unit]

            # Build a limit
            result.append(Limit(verb, uri, regex, value, unit))

        return result

至此, 限速的部分基于结束, 虽然代码看起来有点多, 但原理其实很简单,就是基于HTTP的请求类型及URL做正则式的匹配, 然后用漏桶算法来做限速。

osapi_compute_app_v2

[app:osapi_compute_app_v2]
paste.app_factory = nova.api.openstack.compute:APIRouter.factory

这是流水线的最后一级,也是我们的APP。从这里看出, 它只是简单的生成一些URL的路由信息。
这里有一个不同的地方,mapper.resource这是一个用来生成RESTful风格的路由。

class APIRouter(nova.api.openstack.APIRouter):
    '''
    Routes requests on the OpenStack API to the appropriate controller
    and method.
    '''
    ExtensionManager = extensions.ExtensionManager

    def _setup_routes(self, mapper, ext_mgr, init_only):
        if init_only is None or 'versions' in init_only:
            self.resources['versions'] = versions.create_resource()
            mapper.connect('versions', '/',
                        controller=self.resources['versions'],
                        action='show',
                        conditions={'method': ['GET']})

        mapper.redirect('', '/')

        if init_only is None or 'consoles' in init_only:
            self.resources['consoles'] = consoles.create_resource()
            mapper.resource('console', 'consoles',
                        controller=self.resources['consoles'],
                        parent_resource=dict(member_name='server',
                        collection_name='servers'))

        if init_only is None or 'consoles' in init_only or 
                'servers' in init_only or ips in init_only:
            self.resources['servers'] = servers.create_resource(ext_mgr)
            #生成servers相关的路由,也就是我们用来创建Instance的路由
            mapper.resource('server', 'servers',
                            controller=self.resources['servers'],
                            collection={'detail': 'GET'},
                            member={'action': 'POST'})

        if init_only is None or 'ips' in init_only:
            self.resources['ips'] = ips.create_resource()
            mapper.resource('ip', 'ips', controller=self.resources['ips'],
                            parent_resource=dict(member_name='server',
                                                 collection_name='servers'))

        if init_only is None or 'images' in init_only:
            self.resources['images'] = images.create_resource()
            mapper.resource('image', 'images',
                            controller=self.resources['images'],
                            collection={'detail': 'GET'})

        if init_only is None or 'limits' in init_only:
            self.resources['limits'] = limits.create_resource()
            mapper.resource('limit', 'limits',
                            controller=self.resources['limits'])

        if init_only is None or 'flavors' in init_only:
            self.resources['flavors'] = flavors.create_resource()
            mapper.resource('flavor', 'flavors',
                            controller=self.resources['flavors'],
                            collection={'detail': 'GET'},
                            member={'action': 'POST'})

        if init_only is None or 'image_metadata' in init_only:
            self.resources['image_metadata'] = image_metadata.create_resource()
            image_metadata_controller = self.resources['image_metadata']

            mapper.resource('image_meta', 'metadata',
                            controller=image_metadata_controller,
                            parent_resource=dict(member_name='image',
                            collection_name='images'))

            mapper.connect('metadata',
                           '/{project_id}/images/{image_id}/metadata',
                           controller=image_metadata_controller,
                           action='update_all',
                           conditions={'method': ['PUT']})

        if init_only is None or 'server_metadata' in init_only:
            self.resources['server_metadata'] = 
                server_metadata.create_resource()
            server_metadata_controller = self.resources['server_metadata']

            mapper.resource('server_meta', 'metadata',
                            controller=server_metadata_controller,
                            parent_resource=dict(member_name='server',
                            collection_name='servers'))

            mapper.connect('metadata',
                           '/{project_id}/servers/{server_id}/metadata',
                           controller=server_metadata_controller,
                           action='update_all',
                           conditions={'method': ['PUT']})

但是这里还有一个问题, 创建一个Instance的URL如下:
/v2/?{tenant_id}?/servers
可以看出,里面还有tenant_id这个值没有地方解析。
回到APIRouter的父类nova.api.openstack.APIRouter,

def __init__(self, ext_mgr=None, init_only=None):
        if ext_mgr is None:
            if self.ExtensionManager:
                ext_mgr = self.ExtensionManager()
            else:
                raise Exception(_('Must specify an ExtensionManager class'))
        #mapper是ProjectMapper的对象, 而ProjectMapper是自定义的。
        mapper = ProjectMapper()
        self.resources = {}
        self._setup_routes(mapper, ext_mgr, init_only)
        self._setup_ext_routes(mapper, ext_mgr, init_only)
        self._setup_extensions(ext_mgr)
        super(APIRouter, self).__init__(mapper)

再看ProjectMapper

class ProjectMapper(APIMapper):
    def resource(self, member_name, collection_name, **kwargs):
        if 'parent_resource' not in kwargs:
            #在URL的路由中,增加前缀{project_id}
            kwargs['path_prefix'] = '{project_id}/'
        else:
            parent_resource = kwargs['parent_resource']
            p_collection = parent_resource['collection_name']
            p_member = parent_resource['member_name']
            kwargs['path_prefix'] = '{project_id}/%s/:%s_id' % (p_collection,
                                                                p_member)
        routes.Mapper.resource(self, member_name,
                                     collection_name,
                                     **kwargs)

至此, 路由的信息基本全了,这里还有一些扩展路由,可以自己去看。

 

延伸阅读:

Tag标签: 流水线  
  • 专题推荐

About IT165 - 广告服务 - 隐私声明 - 版权申明 - 免责条款 - 网站地图 - 网友投稿 - 联系方式
本站内容来自于互联网,仅供用于网络技术学习,学习中请遵循相关法律法规