|
- '''
- Manage talks scheduling in a semantic way
- '''
- from __future__ import print_function
- import os
- import io
- from functools import wraps
- import logging
- import re
- import datetime
- import shutil
- import time
- from copy import copy
- import locale
- from contextlib import contextmanager
- from babel.dates import format_date, format_datetime, format_time
- import markdown
- from docutils import nodes
- from docutils.parsers.rst import directives, Directive
- from pelican import signals, generators
- import jinja2
- pelican = None # This will be set during register()
- def memoize(function):
- '''decorators to cache'''
- memo = {}
- @wraps(function)
- def wrapper(*args):
- if args in memo:
- return memo[args]
- else:
- rv = function(*args)
- memo[args] = rv
- return rv
- return wrapper
- @contextmanager
- def setlocale(name):
- saved = locale.setlocale(locale.LC_ALL)
- try:
- yield locale.setlocale(locale.LC_ALL, name)
- finally:
- locale.setlocale(locale.LC_ALL, saved)
- @memoize
- def get_talk_names():
- return [name for name in os.listdir(pelican.settings['TALKS_PATH'])
- if not name.startswith('_') and
- get_talk_data(name) is not None
- ]
- def all_talks():
- return [get_talk_data(tn) for tn in get_talk_names()]
- def unique_attr(iterable, attr):
- return {x[attr] for x in iterable
- if attr in x}
- @memoize
- def get_global_data():
- fname = os.path.join(pelican.settings['TALKS_PATH'], 'meta.yaml')
- if not os.path.isfile(fname):
- return None
- with io.open(fname, encoding='utf8') as buf:
- try:
- data = yaml.load(buf)
- except Exception:
- logging.exception("Syntax error reading %s; skipping", fname)
- return None
- if data is None:
- return None
- if 'startdate' not in data:
- logging.error("Missing startdate in global data")
- data['startdate'] = datetime.datetime.now()
- return data
- @memoize
- def get_talk_data(talkname):
- fname = os.path.join(pelican.settings['TALKS_PATH'], talkname, 'meta.yaml')
- if not os.path.isfile(fname):
- return None
- with io.open(fname, encoding='utf8') as buf:
- try:
- data = yaml.load(buf)
- except:
- logging.exception("Syntax error reading %s; skipping", fname)
- return None
- if data is None:
- return None
- try:
- gridstep = pelican.settings['TALKS_GRID_STEP']
- if 'title' not in data:
- logging.warn("Talk <{}> has no `title` field".format(talkname))
- data['title'] = talkname
- if 'text' not in data:
- logging.warn("Talk <{}> has no `text` field".format(talkname))
- data['text'] = ''
- if 'duration' not in data:
- logging.info("Talk <{}> has no `duration` field (50min used)"
- .format(talkname))
- data['duration'] = 50
- data['duration'] = int(data['duration'])
- if data['duration'] < gridstep:
- logging.info("Talk <{}> lasts only {} minutes; changing to {}"
- .format(talkname, data['duration'], gridstep))
- data['duration'] = gridstep
- if 'links' not in data or not data['links']:
- data['links'] = []
- if 'contacts' not in data or not data['contacts']:
- data['contacts'] = []
- if 'needs' not in data or not data['needs']:
- data['needs'] = []
- if 'room' not in data:
- logging.warn("Talk <{}> has no `room` field".format(talkname))
- if 'time' not in data or 'day' not in data:
- logging.warn("Talk <{}> has no `time` or `day`".format(talkname))
- if 'time' in data:
- del data['time']
- if 'day' in data:
- del data['day']
- if 'day' in data:
- data['day'] = get_global_data()['startdate'] + \
- datetime.timedelta(days=data['day'])
- if 'time' in data and 'day' in data:
- timeparts = re.findall(r'\d+', str(data['time']))
- if 4 > len(timeparts) > 0:
- timeparts = [int(p) for p in timeparts]
- data['time'] = datetime.datetime.combine(
- data['day'], datetime.time(*timeparts))
- else:
- logging.error("Talk <%s> has malformed `time`", talkname)
- data['id'] = talkname
- resdir = os.path.join(pelican.settings['TALKS_PATH'], talkname,
- pelican.settings['TALKS_ATTACHMENT_PATH'])
- if os.path.isdir(resdir) and os.listdir(resdir):
- data['resources'] = resdir
- return data
- except:
- logging.exception("Error on talk %s", talkname)
- raise
- @memoize
- def jinja_env():
- env = jinja2.Environment(
- loader=jinja2.FileSystemLoader(os.path.join(pelican.settings['TALKS_PATH'], '_templates')),
- autoescape=True,
- )
- env.filters['markdown'] = lambda text: \
- jinja2.Markup(markdown.Markdown(extensions=['meta']).
- convert(text))
- env.filters['dateformat'] = format_date
- env.filters['datetimeformat'] = format_datetime
- env.filters['timeformat'] = format_time
- return env
- class TalkListDirective(Directive):
- final_argument_whitespace = True
- has_content = True
- option_spec = {
- 'lang': directives.unchanged
- }
- def run(self):
- lang = self.options.get('lang', 'C')
- tmpl = jinja_env().get_template('talk.html')
- def _sort_date(name):
- '''
- This function is a helper to sort talks by start date
- When no date is available, put at the beginning
- '''
- d = get_talk_data(name)
- if 'time' in d:
- return d['time']
- return datetime.datetime(1, 1, 1)
- return [
- nodes.raw('', tmpl.render(lang=lang, **get_talk_data(n)),
- format='html')
- for n in sorted(get_talk_names(),
- key=_sort_date)
- ]
- class TalkDirective(Directive):
- required_arguments = 1
- final_argument_whitespace = True
- has_content = True
- option_spec = {
- 'lang': directives.unchanged
- }
- def run(self):
- lang = self.options.get('lang', 'C')
- tmpl = jinja_env().get_template('talk.html')
- data = get_talk_data(self.arguments[0])
- if data is None:
- return []
- return [
- nodes.raw('', tmpl.render(lang=lang, **data),
- format='html')
- ]
- class TalkGridDirective(Directive):
- '''A complete grid'''
- final_argument_whitespace = True
- has_content = True
- option_spec = {
- 'lang': directives.unchanged
- }
- def run(self):
- lang = self.options.get('lang', 'C')
- tmpl = jinja_env().get_template('grid.html')
- output = []
- days = unique_attr(all_talks(), 'day')
- gridstep = pelican.settings['TALKS_GRID_STEP']
- for day in sorted(days):
- talks = {talk['id'] for talk in all_talks()
- if talk.get('day', None) == day
- and 'time' in talk
- and 'room' in talk}
- if not talks:
- continue
- talks = [get_talk_data(t) for t in talks]
- rooms = set()
- for t in talks:
- if type(t['room']) is list:
- for r in t['room']:
- rooms.add(r)
- else:
- rooms.add(t['room'])
- rooms = list(sorted(rooms))
- # room=* is not a real room.
- # Remove it unless that day only has special rooms
- if '*' in rooms and len(rooms) > 1:
- del rooms[rooms.index('*')]
- mintime = min({talk['time'].hour * 60 +
- talk['time'].minute
- for talk in talks}) // gridstep * gridstep
- maxtime = max({talk['time'].hour * 60 +
- talk['time'].minute +
- talk['duration']
- for talk in talks})
- times = {}
- for t in range(mintime, maxtime, gridstep):
- times[t] = [None] * len(rooms)
- for talk in sorted(talks, key=lambda x: x['time']):
- talktime = talk['time'].hour * 60 + talk['time'].minute
- position = talktime // gridstep * gridstep # round
- assert position in times
- if talk['room'] == '*':
- roomnums = range(len(rooms))
- elif type(talk['room']) is list:
- roomnums = [rooms.index(r) for r in talk['room']]
- else:
- roomnums = [rooms.index(talk['room'])]
- for roomnum in roomnums:
- if times[position][roomnum] is not None:
- logging.error("Talk %s and %s overlap! (room %s)",
- times[position][roomnum]['id'],
- talk['id'],
- rooms[roomnum]
- )
- continue
- times[position][roomnum] = copy(talk)
- times[position][roomnum]['skip'] = False
- for i in range(1, talk['duration'] // gridstep):
- times[position + i*gridstep][roomnum] = copy(talk)
- times[position + i*gridstep][roomnum]['skip'] = True
- render = tmpl.render(times=times,
- rooms=rooms,
- mintime=mintime, maxtime=maxtime,
- timestep=gridstep,
- lang=lang,
- )
- output.append(nodes.raw(
- '', u'<h4>%s</h4>' % format_date(day, format='full',
- locale=lang),
- format='html'))
- output.append(nodes.raw('', render, format='html'))
- return output
- def talks_to_ics():
- content = u'BEGIN:VCALENDAR\nVERSION:2.0\nPRODID:pelican\n'
- for t in all_talks():
- try:
- content += talk_to_ics(t)
- except:
- logging.exception("Error producing calendar for talk %s", t['id'])
- content += 'END:VCALENDAR\n'
- return content
- def talk_to_ics(talk):
- if 'time' not in talk or 'duration' not in talk:
- return ''
- start = talk['time']
- end = start + datetime.timedelta(minutes=talk['duration'])
- content = 'BEGIN:VEVENT\n'
- content += "UID:%s@%d.hackmeeting.org\n" % (talk['id'], talk['day'].year)
- content += "SUMMARY:%s\n" % talk['title']
- content += "DTSTAMP:%s\n" % time.strftime('%Y%m%dT%H%M%SZ',
- time.gmtime(float(
- start.strftime('%s'))))
- content += "DTSTART:%s\n" % time.strftime('%Y%m%dT%H%M%SZ',
- time.gmtime(float(
- start.strftime('%s'))))
- content += "DTEND:%s\n" % time.strftime('%Y%m%dT%H%M%SZ',
- time.gmtime(float(
- end.strftime('%s'))))
-
- content += "LOCATION:%s\n" % (talk['room'] if 'room' in talk else 'todo')
- content += 'END:VEVENT\n'
- return content
- class TalksGenerator(generators.Generator):
- def __init__(self, *args, **kwargs):
- self.talks = []
- super(TalksGenerator, self).__init__(*args, **kwargs)
- def generate_context(self):
- self.talks = {n: get_talk_data(n) for n in get_talk_names()}
- self._update_context(('talks',))
- def generate_output(self, writer=None):
- for talkname in self.talks:
- if 'resources' in self.talks[talkname]:
- outdir = os.path.join(self.output_path,
- pelican.settings['TALKS_PATH'], talkname,
- pelican.settings['TALKS_ATTACHMENT_PATH'])
- if os.path.isdir(outdir):
- shutil.rmtree(outdir)
- shutil.copytree(self.talks[talkname]['resources'], outdir)
- with io.open(os.path.join(self.output_path, pelican.settings.get('TALKS_ICS')),
- 'w',
- encoding='utf8') as buf:
- buf.write(talks_to_ics())
- def add_talks_option_defaults(pelican):
- pelican.settings.setdefault('TALKS_PATH', 'talks')
- pelican.settings.setdefault('TALKS_ATTACHMENT_PATH', 'res')
- pelican.settings.setdefault('TALKS_ICS', 'schedule.ics')
- pelican.settings.setdefault('TALKS_GRID_STEP', 30)
- def get_generators(gen):
- return TalksGenerator
- def pelican_init(pelicanobj):
- global pelican
- pelican = pelicanobj
- try:
- import yaml
- except ImportError:
- print('ERROR: yaml not found. Talks plugins will be disabled')
- def register():
- pass
- else:
- def register():
- signals.initialized.connect(pelican_init)
- signals.get_generators.connect(get_generators)
- signals.initialized.connect(add_talks_option_defaults)
- directives.register_directive('talklist', TalkListDirective)
- directives.register_directive('talk', TalkDirective)
- directives.register_directive('talkgrid', TalkGridDirective)
|